summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/events/snapshot.py4
-rw-r--r--synapse/handlers/message.py12
-rw-r--r--synapse/push/action_generator.py6
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py15
-rw-r--r--synapse/storage/event_push_actions.py99
-rw-r--r--synapse/storage/events.py7
-rw-r--r--synapse/storage/schema/delta/47/push_actions_staging.sql28
-rw-r--r--tests/replication/slave/storage/test_events.py5
-rw-r--r--tests/storage/test_event_push_actions.py10
9 files changed, 140 insertions, 46 deletions
diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py
index f9445bef13..8e684d91b5 100644
--- a/synapse/events/snapshot.py
+++ b/synapse/events/snapshot.py
@@ -52,7 +52,6 @@ class EventContext(object):
         "prev_state_ids",
         "state_group",
         "rejected",
-        "push_actions",
         "prev_group",
         "delta_ids",
         "prev_state_events",
@@ -67,7 +66,6 @@ class EventContext(object):
         self.state_group = None
 
         self.rejected = False
-        self.push_actions = []
 
         # A previously persisted state group and a delta between that
         # and this state.
@@ -104,7 +102,6 @@ class EventContext(object):
             "event_state_key": event.state_key if event.is_state() else None,
             "state_group": self.state_group,
             "rejected": self.rejected,
-            "push_actions": self.push_actions,
             "prev_group": self.prev_group,
             "delta_ids": _encode_state_dict(self.delta_ids),
             "prev_state_events": self.prev_state_events,
@@ -127,7 +124,6 @@ class EventContext(object):
         context = EventContext()
         context.state_group = input["state_group"]
         context.rejected = input["rejected"]
-        context.push_actions = input["push_actions"]
         context.prev_group = input["prev_group"]
         context.delta_ids = _decode_state_dict(input["delta_ids"])
         context.prev_state_events = input["prev_state_events"]
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 1c3ac03f20..d99d8049b3 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -683,9 +683,15 @@ class EventCreationHandler(object):
             event, context
         )
 
-        (event_stream_id, max_stream_id) = yield self.store.persist_event(
-            event, context=context
-        )
+        try:
+            (event_stream_id, max_stream_id) = yield self.store.persist_event(
+                event, context=context
+            )
+        except:  # noqa: E722, as we reraise the exception this is fine.
+            # Ensure that we actually remove the entries in the push actions
+            # staging area
+            preserve_fn(self.store.remove_push_actions_from_staging)(event.event_id)
+            raise
 
         # this intentionally does not yield: we don't care about the result
         # and don't need to wait for it.
diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py
index fe09d50d55..8f619a7a1b 100644
--- a/synapse/push/action_generator.py
+++ b/synapse/push/action_generator.py
@@ -40,10 +40,6 @@ class ActionGenerator(object):
     @defer.inlineCallbacks
     def handle_push_actions_for_event(self, event, context):
         with Measure(self.clock, "action_for_event_by_user"):
-            actions_by_user = yield self.bulk_evaluator.action_for_event_by_user(
+            yield self.bulk_evaluator.action_for_event_by_user(
                 event, context
             )
-
-        context.push_actions = [
-            (uid, actions) for uid, actions in actions_by_user.iteritems()
-        ]
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 425a017bdf..bf4f1c5836 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -137,14 +137,13 @@ class BulkPushRuleEvaluator(object):
 
     @defer.inlineCallbacks
     def action_for_event_by_user(self, event, context):
-        """Given an event and context, evaluate the push rules and return
-        the results
+        """Given an event and context, evaluate the push rules and insert the
+        results into the event_push_actions_staging table.
 
         Returns:
-            dict of user_id -> action
+            Deferred
         """
         rules_by_user = yield self._get_rules_for_event(event, context)
-        actions_by_user = {}
 
         room_members = yield self.store.get_joined_users_from_context(
             event, context
@@ -190,9 +189,13 @@ class BulkPushRuleEvaluator(object):
                 if matches:
                     actions = [x for x in rule['actions'] if x != 'dont_notify']
                     if actions and 'notify' in actions:
-                        actions_by_user[uid] = actions
+                        # Push rules say we should notify the user of this event,
+                        # so we mark it in the DB in the staging area. (This
+                        # will then get handled when we persist the event)
+                        yield self.store.add_push_actions_to_staging(
+                            event.event_id, uid, actions,
+                        )
                     break
-        defer.returnValue(actions_by_user)
 
 
 def _condition_checker(evaluator, conditions, uid, display_name, cache):
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 8efe2fd4bb..f787431b7a 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -88,33 +88,50 @@ class EventPushActionsStore(SQLBaseStore):
             self._rotate_notifs, 30 * 60 * 1000
         )
 
-    def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples):
+    def _set_push_actions_for_event_and_users_txn(self, txn, event):
         """
         Args:
             event: the event set actions for
             tuples: list of tuples of (user_id, actions)
         """
-        values = []
-        for uid, actions in tuples:
-            is_highlight = 1 if _action_has_highlight(actions) else 0
-
-            values.append({
-                'room_id': event.room_id,
-                'event_id': event.event_id,
-                'user_id': uid,
-                'actions': _serialize_action(actions, is_highlight),
-                'stream_ordering': event.internal_metadata.stream_ordering,
-                'topological_ordering': event.depth,
-                'notif': 1,
-                'highlight': is_highlight,
-            })
-
-        for uid, __ in tuples:
+
+        sql = """
+            INSERT INTO event_push_actions (
+                room_id, event_id, user_id, actions, stream_ordering,
+                topological_ordering, notif, highlight
+            )
+            SELECT ?, event_id, user_id, actions, ?, ?, notif, highlight
+            FROM event_push_actions_staging
+            WHERE event_id = ?
+        """
+
+        txn.execute(sql, (
+            event.room_id, event.internal_metadata.stream_ordering,
+            event.depth, event.event_id,
+        ))
+
+        user_ids = self._simple_select_onecol_txn(
+            txn,
+            table="event_push_actions_staging",
+            keyvalues={
+                "event_id": event.event_id,
+            },
+            retcol="user_id",
+        )
+
+        self._simple_delete_txn(
+            txn,
+            table="event_push_actions_staging",
+            keyvalues={
+                "event_id": event.event_id,
+            },
+        )
+
+        for uid in user_ids:
             txn.call_after(
                 self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
-                (event.room_id, uid)
+                (event.room_id, uid,)
             )
-        self._simple_insert_many_txn(txn, "event_push_actions", values)
 
     @cachedInlineCallbacks(num_args=3, tree=True, max_entries=5000)
     def get_unread_event_push_actions_by_room_for_user(
@@ -738,6 +755,50 @@ class EventPushActionsStore(SQLBaseStore):
             (rotate_to_stream_ordering,)
         )
 
+    def add_push_actions_to_staging(self, event_id, user_id, actions):
+        """Add the push actions for the user and event to the push
+        action staging area.
+
+        Args:
+            event_id (str)
+            user_id (str)
+            actions (list[dict|str]): An action can either be a string or
+                dict.
+
+        Returns:
+            Deferred
+        """
+
+        is_highlight = 1 if _action_has_highlight(actions) else 0
+
+        return self._simple_insert(
+            table="event_push_actions_staging",
+            values={
+                "event_id": event_id,
+                "user_id": user_id,
+                "actions": _serialize_action(actions, is_highlight),
+                "notif": 1,
+                "highlight": is_highlight,
+            },
+            desc="add_push_actions_to_staging",
+        )
+
+    def remove_push_actions_from_staging(self, event_id):
+        """Called if we failed to persist the event to ensure that stale push
+        actions don't build up in the DB
+
+        Args:
+            event_id (str)
+        """
+
+        return self._simple_delete(
+            table="event_push_actions_staging",
+            keyvalues={
+                "event_id": event_id,
+            },
+            desc="remove_push_actions_from_staging",
+        )
+
 
 def _action_has_highlight(actions):
     for action in actions:
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 28cce2979c..52b7b34749 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -1168,10 +1168,9 @@ class EventsStore(SQLBaseStore):
 
         for event, context in events_and_contexts:
             # Insert all the push actions into the event_push_actions table.
-            if context.push_actions:
-                self._set_push_actions_for_event_and_users_txn(
-                    txn, event, context.push_actions
-                )
+            self._set_push_actions_for_event_and_users_txn(
+                txn, event,
+            )
 
             if event.type == EventTypes.Redaction and event.redacts is not None:
                 # Remove the entries in the event_push_actions table for the
diff --git a/synapse/storage/schema/delta/47/push_actions_staging.sql b/synapse/storage/schema/delta/47/push_actions_staging.sql
new file mode 100644
index 0000000000..edccf4a96f
--- /dev/null
+++ b/synapse/storage/schema/delta/47/push_actions_staging.sql
@@ -0,0 +1,28 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Temporary staging area for push actions that have been calculated for an
+-- event, but the event hasn't yet been persisted.
+-- When the event is persisted the rows are moved over to the
+-- event_push_actions table.
+CREATE TABLE event_push_actions_staging (
+    event_id TEXT NOT NULL,
+    user_id TEXT NOT NULL,
+    actions TEXT NOT NULL,
+    notif SMALLINT NOT NULL,
+    highlight SMALLINT NOT NULL
+);
+
+CREATE INDEX event_push_actions_staging_id ON event_push_actions_staging(event_id);
diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py
index f430cce931..4780f2ab72 100644
--- a/tests/replication/slave/storage/test_events.py
+++ b/tests/replication/slave/storage/test_events.py
@@ -230,7 +230,10 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
             state_handler = self.hs.get_state_handler()
             context = yield state_handler.compute_event_context(event)
 
-        context.push_actions = push_actions
+        for user_id, actions in push_actions:
+            yield self.master_store.add_push_actions_to_staging(
+                event.event_id, user_id, actions,
+            )
 
         ordering = None
         if backfill:
diff --git a/tests/storage/test_event_push_actions.py b/tests/storage/test_event_push_actions.py
index 3135488353..d483e7cf9e 100644
--- a/tests/storage/test_event_push_actions.py
+++ b/tests/storage/test_event_push_actions.py
@@ -62,6 +62,7 @@ class EventPushActionsStoreTestCase(tests.unittest.TestCase):
                 {"notify_count": noitf_count, "highlight_count": highlight_count}
             )
 
+        @defer.inlineCallbacks
         def _inject_actions(stream, action):
             event = Mock()
             event.room_id = room_id
@@ -69,11 +70,12 @@ class EventPushActionsStoreTestCase(tests.unittest.TestCase):
             event.internal_metadata.stream_ordering = stream
             event.depth = stream
 
-            tuples = [(user_id, action)]
-
-            return self.store.runInteraction(
+            yield self.store.add_push_actions_to_staging(
+                event.event_id, user_id, action,
+            )
+            yield self.store.runInteraction(
                 "", self.store._set_push_actions_for_event_and_users_txn,
-                event, tuples
+                event,
             )
 
         def _rotate(stream):