summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erikj@jki.re>2018-02-26 14:28:35 +0000
committerGitHub <noreply@github.com>2018-02-26 14:28:35 +0000
commit73fe86684736b5509ad2bb27b74e4e99c0db7570 (patch)
tree63c99e09fabe0a21f9465b1c3bf14cbef493d708
parentMerge pull request #2900 from matrix-org/erikj/split_event_push_actions (diff)
parentMerge branch 'develop' of github.com:matrix-org/synapse into erikj/handle_unp... (diff)
downloadsynapse-73fe86684736b5509ad2bb27b74e4e99c0db7570.tar.xz
Merge pull request #2894 from matrix-org/erikj/handle_unpersisted_events_push
Ensure all push actions are deleted from staging
-rw-r--r--synapse/storage/event_push_actions.py73
-rw-r--r--synapse/storage/events.py22
-rw-r--r--tests/storage/test_event_push_actions.py2
3 files changed, 62 insertions, 35 deletions
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 4cabf70ad0..ca0d71e6b8 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -407,11 +407,21 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
             self._rotate_notifs, 30 * 60 * 1000
         )
 
-    def _set_push_actions_for_event_and_users_txn(self, txn, event):
-        """
+    def _set_push_actions_for_event_and_users_txn(self, txn, events_and_contexts,
+                                                  all_events_and_contexts):
+        """Handles moving push actions from staging table to main
+        event_push_actions table for all events in `events_and_contexts`.
+
+        Also ensures that all events in `all_events_and_contexts` are removed
+        from the push action staging area.
+
         Args:
-            event: the event set actions for
-            tuples: list of tuples of (user_id, actions)
+            events_and_contexts (list[(EventBase, EventContext)]): events
+                we are persisting
+            all_events_and_contexts (list[(EventBase, EventContext)]): all
+                events that we were going to persist. This includes events
+                we've already persisted, etc, that wouldn't appear in
+                events_and_context.
         """
 
         sql = """
@@ -424,33 +434,40 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
             WHERE event_id = ?
         """
 
-        txn.execute(sql, (
-            event.room_id, event.internal_metadata.stream_ordering,
-            event.depth, event.event_id,
-        ))
-
-        user_ids = self._simple_select_onecol_txn(
-            txn,
-            table="event_push_actions_staging",
-            keyvalues={
-                "event_id": event.event_id,
-            },
-            retcol="user_id",
-        )
+        if events_and_contexts:
+            txn.executemany(sql, (
+                (
+                    event.room_id, event.internal_metadata.stream_ordering,
+                    event.depth, event.event_id,
+                )
+                for event, _ in events_and_contexts
+            ))
+
+        for event, _ in events_and_contexts:
+            user_ids = self._simple_select_onecol_txn(
+                txn,
+                table="event_push_actions_staging",
+                keyvalues={
+                    "event_id": event.event_id,
+                },
+                retcol="user_id",
+            )
 
-        self._simple_delete_txn(
-            txn,
-            table="event_push_actions_staging",
-            keyvalues={
-                "event_id": event.event_id,
-            },
-        )
+            for uid in user_ids:
+                txn.call_after(
+                    self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
+                    (event.room_id, uid,)
+                )
 
-        for uid in user_ids:
-            txn.call_after(
-                self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
-                (event.room_id, uid,)
+        # Now we delete the staging area for *all* events that were being
+        # persisted.
+        txn.executemany(
+            "DELETE FROM event_push_actions_staging WHERE event_id = ?",
+            (
+                (event.event_id,)
+                for event, _ in all_events_and_contexts
             )
+        )
 
     @defer.inlineCallbacks
     def get_push_actions_for_user(self, user_id, before=None, limit=50,
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 99d6cca585..b63392a6cd 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -627,6 +627,8 @@ class EventsStore(EventsWorkerStore):
                 list of the event ids which are the forward extremities.
 
         """
+        all_events_and_contexts = events_and_contexts
+
         max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering
 
         self._update_current_state_txn(txn, state_delta_for_room, max_stream_order)
@@ -689,6 +691,7 @@ class EventsStore(EventsWorkerStore):
         self._update_metadata_tables_txn(
             txn,
             events_and_contexts=events_and_contexts,
+            all_events_and_contexts=all_events_and_contexts,
             backfilled=backfilled,
         )
 
@@ -1086,26 +1089,33 @@ class EventsStore(EventsWorkerStore):
             ec for ec in events_and_contexts if ec[0] not in to_remove
         ]
 
-    def _update_metadata_tables_txn(self, txn, events_and_contexts, backfilled):
+    def _update_metadata_tables_txn(self, txn, events_and_contexts,
+                                    all_events_and_contexts, backfilled):
         """Update all the miscellaneous tables for new events
 
         Args:
             txn (twisted.enterprise.adbapi.Connection): db connection
             events_and_contexts (list[(EventBase, EventContext)]): events
                 we are persisting
+            all_events_and_contexts (list[(EventBase, EventContext)]): all
+                events that we were going to persist. This includes events
+                we've already persisted, etc, that wouldn't appear in
+                events_and_context.
             backfilled (bool): True if the events were backfilled
         """
 
+        # Insert all the push actions into the event_push_actions table.
+        self._set_push_actions_for_event_and_users_txn(
+            txn,
+            events_and_contexts=events_and_contexts,
+            all_events_and_contexts=all_events_and_contexts,
+        )
+
         if not events_and_contexts:
             # nothing to do here
             return
 
         for event, context in events_and_contexts:
-            # Insert all the push actions into the event_push_actions table.
-            self._set_push_actions_for_event_and_users_txn(
-                txn, event,
-            )
-
             if event.type == EventTypes.Redaction and event.redacts is not None:
                 # Remove the entries in the event_push_actions table for the
                 # redacted event.
diff --git a/tests/storage/test_event_push_actions.py b/tests/storage/test_event_push_actions.py
index d483e7cf9e..dc90e5c243 100644
--- a/tests/storage/test_event_push_actions.py
+++ b/tests/storage/test_event_push_actions.py
@@ -75,7 +75,7 @@ class EventPushActionsStoreTestCase(tests.unittest.TestCase):
             )
             yield self.store.runInteraction(
                 "", self.store._set_push_actions_for_event_and_users_txn,
-                event,
+                [(event, None)], [(event, None)],
             )
 
         def _rotate(stream):