summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erikj@jki.re>2018-02-26 14:45:40 +0000
committerGitHub <noreply@github.com>2018-02-26 14:45:40 +0000
commite5b4a208ced9df795e1944cda79846690530eac7 (patch)
tree5bd57b753d84383966b9f14346ade11faae9e168
parentMerge pull request #2894 from matrix-org/erikj/handle_unpersisted_events_push (diff)
parentUpdate comments (diff)
downloadsynapse-e5b4a208ced9df795e1944cda79846690530eac7.tar.xz
Merge pull request #2892 from matrix-org/erikj/batch_inserts_push_actions
Batch inserts into event_push_actions_staging
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py16
-rw-r--r--synapse/storage/event_push_actions.py53
-rw-r--r--tests/replication/slave/storage/test_events.py10
-rw-r--r--tests/storage/test_event_push_actions.py2
4 files changed, 53 insertions, 28 deletions
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index bf4f1c5836..7c680659b6 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -144,6 +144,7 @@ class BulkPushRuleEvaluator(object):
             Deferred
         """
         rules_by_user = yield self._get_rules_for_event(event, context)
+        actions_by_user = {}
 
         room_members = yield self.store.get_joined_users_from_context(
             event, context
@@ -189,14 +190,17 @@ class BulkPushRuleEvaluator(object):
                 if matches:
                     actions = [x for x in rule['actions'] if x != 'dont_notify']
                     if actions and 'notify' in actions:
-                        # Push rules say we should notify the user of this event,
-                        # so we mark it in the DB in the staging area. (This
-                        # will then get handled when we persist the event)
-                        yield self.store.add_push_actions_to_staging(
-                            event.event_id, uid, actions,
-                        )
+                        # Push rules say we should notify the user of this event
+                        actions_by_user[uid] = actions
                     break
 
+        # Mark in the DB staging area the push actions for users who should be
+        # notified for this event. (This will then get handled when we persist
+        # the event)
+        yield self.store.add_push_actions_to_staging(
+            event.event_id, actions_by_user,
+        )
+
 
 def _condition_checker(evaluator, conditions, uid, display_name, cache):
     for cond in conditions:
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index ca0d71e6b8..fe6887414e 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -775,32 +775,51 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
             (rotate_to_stream_ordering,)
         )
 
-    def add_push_actions_to_staging(self, event_id, user_id, actions):
-        """Add the push actions for the user and event to the push
-        action staging area.
+    def add_push_actions_to_staging(self, event_id, user_id_actions):
+        """Add the push actions for the event to the push action staging area.
 
         Args:
             event_id (str)
-            user_id (str)
-            actions (list[dict|str]): An action can either be a string or
-                dict.
+            user_id_actions (dict[str, list[dict|str])]): A dictionary mapping
+                user_id to list of push actions, where an action can either be
+                a string or dict.
 
         Returns:
             Deferred
         """
 
-        is_highlight = 1 if _action_has_highlight(actions) else 0
+        if not user_id_actions:
+            return
 
-        return self._simple_insert(
-            table="event_push_actions_staging",
-            values={
-                "event_id": event_id,
-                "user_id": user_id,
-                "actions": _serialize_action(actions, is_highlight),
-                "notif": 1,
-                "highlight": is_highlight,
-            },
-            desc="add_push_actions_to_staging",
+        # This is a helper function for generating the necessary tuple that
+        # can be used to inert into the `event_push_actions_staging` table.
+        def _gen_entry(user_id, actions):
+            is_highlight = 1 if _action_has_highlight(actions) else 0
+            return (
+                event_id,  # event_id column
+                user_id,  # user_id column
+                _serialize_action(actions, is_highlight),  # actions column
+                1,  # notif column
+                is_highlight,  # highlight column
+            )
+
+        def _add_push_actions_to_staging_txn(txn):
+            # We don't use _simple_insert_many here to avoid the overhead
+            # of generating lists of dicts.
+
+            sql = """
+                INSERT INTO event_push_actions_staging
+                    (event_id, user_id, actions, notif, highlight)
+                VALUES (?, ?, ?, ?, ?)
+            """
+
+            txn.executemany(sql, (
+                _gen_entry(user_id, actions)
+                for user_id, actions in user_id_actions.iteritems()
+            ))
+
+        return self.runInteraction(
+            "add_push_actions_to_staging", _add_push_actions_to_staging_txn
         )
 
     def remove_push_actions_from_staging(self, event_id):
diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py
index 4780f2ab72..cb058d3142 100644
--- a/tests/replication/slave/storage/test_events.py
+++ b/tests/replication/slave/storage/test_events.py
@@ -230,10 +230,12 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
             state_handler = self.hs.get_state_handler()
             context = yield state_handler.compute_event_context(event)
 
-        for user_id, actions in push_actions:
-            yield self.master_store.add_push_actions_to_staging(
-                event.event_id, user_id, actions,
-            )
+        yield self.master_store.add_push_actions_to_staging(
+            event.event_id, {
+                user_id: actions
+                for user_id, actions in push_actions
+            },
+        )
 
         ordering = None
         if backfill:
diff --git a/tests/storage/test_event_push_actions.py b/tests/storage/test_event_push_actions.py
index dc90e5c243..6c1aad149b 100644
--- a/tests/storage/test_event_push_actions.py
+++ b/tests/storage/test_event_push_actions.py
@@ -71,7 +71,7 @@ class EventPushActionsStoreTestCase(tests.unittest.TestCase):
             event.depth = stream
 
             yield self.store.add_push_actions_to_staging(
-                event.event_id, user_id, action,
+                event.event_id, {user_id: action},
             )
             yield self.store.runInteraction(
                 "", self.store._set_push_actions_for_event_and_users_txn,