summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2018-02-27 12:01:36 +0000
committerErik Johnston <erik@matrix.org>2018-02-27 13:58:16 +0000
commit493e25d5545389264f696be0e07544bf82a0818a (patch)
tree15b19197bf78f9f0382bb901cfea771712fc176f /synapse/storage
parentMerge pull request #2904 from matrix-org/erikj/receipt_cache_invalidation (diff)
downloadsynapse-493e25d5545389264f696be0e07544bf82a0818a.tar.xz
Move storage functions for push calculations
This will allow push actions for an event to be calculated on workers.
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/event_push_actions.py126
-rw-r--r--synapse/storage/push_rule.py14
-rw-r--r--synapse/storage/pusher.py22
-rw-r--r--synapse/storage/roommember.py24
4 files changed, 99 insertions, 87 deletions
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index fe6887414e..6454045c2d 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -380,6 +380,69 @@ class EventPushActionsWorkerStore(SQLBaseStore):
         # Now return the first `limit`
         defer.returnValue(notifs[:limit])
 
+    def add_push_actions_to_staging(self, event_id, user_id_actions):
+        """Add the push actions for the event to the push action staging area.
+
+        Args:
+            event_id (str)
+            user_id_actions (dict[str, list[dict|str])]): A dictionary mapping
+                user_id to list of push actions, where an action can either be
+                a string or dict.
+
+        Returns:
+            Deferred
+        """
+
+        if not user_id_actions:
+            return
+
+        # This is a helper function for generating the necessary tuple that
+        # can be used to inert into the `event_push_actions_staging` table.
+        def _gen_entry(user_id, actions):
+            is_highlight = 1 if _action_has_highlight(actions) else 0
+            return (
+                event_id,  # event_id column
+                user_id,  # user_id column
+                _serialize_action(actions, is_highlight),  # actions column
+                1,  # notif column
+                is_highlight,  # highlight column
+            )
+
+        def _add_push_actions_to_staging_txn(txn):
+            # We don't use _simple_insert_many here to avoid the overhead
+            # of generating lists of dicts.
+
+            sql = """
+                INSERT INTO event_push_actions_staging
+                    (event_id, user_id, actions, notif, highlight)
+                VALUES (?, ?, ?, ?, ?)
+            """
+
+            txn.executemany(sql, (
+                _gen_entry(user_id, actions)
+                for user_id, actions in user_id_actions.iteritems()
+            ))
+
+        return self.runInteraction(
+            "add_push_actions_to_staging", _add_push_actions_to_staging_txn
+        )
+
+    def remove_push_actions_from_staging(self, event_id):
+        """Called if we failed to persist the event to ensure that stale push
+        actions don't build up in the DB
+
+        Args:
+            event_id (str)
+        """
+
+        return self._simple_delete(
+            table="event_push_actions_staging",
+            keyvalues={
+                "event_id": event_id,
+            },
+            desc="remove_push_actions_from_staging",
+        )
+
 
 class EventPushActionsStore(EventPushActionsWorkerStore):
     EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
@@ -775,69 +838,6 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
             (rotate_to_stream_ordering,)
         )
 
-    def add_push_actions_to_staging(self, event_id, user_id_actions):
-        """Add the push actions for the event to the push action staging area.
-
-        Args:
-            event_id (str)
-            user_id_actions (dict[str, list[dict|str])]): A dictionary mapping
-                user_id to list of push actions, where an action can either be
-                a string or dict.
-
-        Returns:
-            Deferred
-        """
-
-        if not user_id_actions:
-            return
-
-        # This is a helper function for generating the necessary tuple that
-        # can be used to inert into the `event_push_actions_staging` table.
-        def _gen_entry(user_id, actions):
-            is_highlight = 1 if _action_has_highlight(actions) else 0
-            return (
-                event_id,  # event_id column
-                user_id,  # user_id column
-                _serialize_action(actions, is_highlight),  # actions column
-                1,  # notif column
-                is_highlight,  # highlight column
-            )
-
-        def _add_push_actions_to_staging_txn(txn):
-            # We don't use _simple_insert_many here to avoid the overhead
-            # of generating lists of dicts.
-
-            sql = """
-                INSERT INTO event_push_actions_staging
-                    (event_id, user_id, actions, notif, highlight)
-                VALUES (?, ?, ?, ?, ?)
-            """
-
-            txn.executemany(sql, (
-                _gen_entry(user_id, actions)
-                for user_id, actions in user_id_actions.iteritems()
-            ))
-
-        return self.runInteraction(
-            "add_push_actions_to_staging", _add_push_actions_to_staging_txn
-        )
-
-    def remove_push_actions_from_staging(self, event_id):
-        """Called if we failed to persist the event to ensure that stale push
-        actions don't build up in the DB
-
-        Args:
-            event_id (str)
-        """
-
-        return self._simple_delete(
-            table="event_push_actions_staging",
-            keyvalues={
-                "event_id": event_id,
-            },
-            desc="remove_push_actions_from_staging",
-        )
-
 
 def _action_has_highlight(actions):
     for action in actions:
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index 583efb7bdf..04a0b59a39 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -15,6 +15,10 @@
 # limitations under the License.
 
 from ._base import SQLBaseStore
+from synapse.storage.appservice import ApplicationServiceWorkerStore
+from synapse.storage.pusher import PusherWorkerStore
+from synapse.storage.receipts import ReceiptsWorkerStore
+from synapse.storage.roommember import RoomMemberWorkerStore
 from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.push.baserules import list_with_base_rules
@@ -51,7 +55,11 @@ def _load_rules(rawrules, enabled_map):
     return rules
 
 
-class PushRulesWorkerStore(SQLBaseStore):
+class PushRulesWorkerStore(ApplicationServiceWorkerStore,
+                           ReceiptsWorkerStore,
+                           PusherWorkerStore,
+                           RoomMemberWorkerStore,
+                           SQLBaseStore):
     """This is an abstract base class where subclasses must implement
     `get_max_push_rules_stream_id` which can be called in the initializer.
     """
@@ -140,8 +148,6 @@ class PushRulesWorkerStore(SQLBaseStore):
                 "have_push_rules_changed", have_push_rules_changed_txn
             )
 
-
-class PushRuleStore(PushRulesWorkerStore):
     @cachedList(cached_method_name="get_push_rules_for_user",
                 list_name="user_ids", num_args=1, inlineCallbacks=True)
     def bulk_get_push_rules(self, user_ids):
@@ -281,6 +287,8 @@ class PushRuleStore(PushRulesWorkerStore):
             results.setdefault(row['user_name'], {})[row['rule_id']] = enabled
         defer.returnValue(results)
 
+
+class PushRuleStore(PushRulesWorkerStore):
     @defer.inlineCallbacks
     def add_push_rule(
         self, user_id, rule_id, priority_class, conditions, actions,
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index f4af3e4caa..307660b99a 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -175,11 +175,6 @@ class PusherWorkerStore(SQLBaseStore):
             "get_all_updated_pushers_rows", get_all_updated_pushers_rows_txn
         )
 
-
-class PusherStore(PusherWorkerStore):
-    def get_pushers_stream_token(self):
-        return self._pushers_id_gen.get_current_token()
-
     @cachedInlineCallbacks(num_args=1, max_entries=15000)
     def get_if_user_has_pusher(self, user_id):
         # This only exists for the cachedList decorator
@@ -201,6 +196,11 @@ class PusherStore(PusherWorkerStore):
 
         defer.returnValue(result)
 
+
+class PusherStore(PusherWorkerStore):
+    def get_pushers_stream_token(self):
+        return self._pushers_id_gen.get_current_token()
+
     @defer.inlineCallbacks
     def add_pusher(self, user_id, access_token, kind, app_id,
                    app_display_name, device_display_name,
@@ -233,14 +233,18 @@ class PusherStore(PusherWorkerStore):
             )
 
             if newly_inserted:
-                # get_if_user_has_pusher only cares if the user has
-                # at least *one* pusher.
-                self.get_if_user_has_pusher.invalidate(user_id,)
+                self.runInteraction(
+                    "add_pusher",
+                    self._invalidate_cache_and_stream,
+                    self.get_if_user_has_pusher, (user_id,)
+                )
 
     @defer.inlineCallbacks
     def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id):
         def delete_pusher_txn(txn, stream_id):
-            txn.call_after(self.get_if_user_has_pusher.invalidate, (user_id,))
+            self._invalidate_cache_and_stream(
+                txn, self.get_if_user_has_pusher, (user_id,)
+            )
 
             self._simple_delete_one_txn(
                 txn,
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index b9158b9896..d79877dac7 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -432,6 +432,18 @@ class RoomMemberWorkerStore(EventsWorkerStore):
     def _get_joined_hosts_cache(self, room_id):
         return _JoinedHostsCache(self, room_id)
 
+    @cached()
+    def who_forgot_in_room(self, room_id):
+        return self._simple_select_list(
+            table="room_memberships",
+            retcols=("user_id", "event_id"),
+            keyvalues={
+                "room_id": room_id,
+                "forgotten": 1,
+            },
+            desc="who_forgot"
+        )
+
 
 class RoomMemberStore(RoomMemberWorkerStore):
     def __init__(self, db_conn, hs):
@@ -595,18 +607,6 @@ class RoomMemberStore(RoomMemberWorkerStore):
         forgot = yield self.runInteraction("did_forget_membership_at", f)
         defer.returnValue(forgot == 1)
 
-    @cached()
-    def who_forgot_in_room(self, room_id):
-        return self._simple_select_list(
-            table="room_memberships",
-            retcols=("user_id", "event_id"),
-            keyvalues={
-                "room_id": room_id,
-                "forgotten": 1,
-            },
-            desc="who_forgot"
-        )
-
     @defer.inlineCallbacks
     def _background_add_membership_profile(self, progress, batch_size):
         target_min_stream_id = progress.get(