diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 4cabf70ad0..ca0d71e6b8 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -407,11 +407,21 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
self._rotate_notifs, 30 * 60 * 1000
)
- def _set_push_actions_for_event_and_users_txn(self, txn, event):
- """
+ def _set_push_actions_for_event_and_users_txn(self, txn, events_and_contexts,
+ all_events_and_contexts):
+ """Handles moving push actions from staging table to main
+ event_push_actions table for all events in `events_and_contexts`.
+
+ Also ensures that all events in `all_events_and_contexts` are removed
+ from the push action staging area.
+
Args:
- event: the event set actions for
- tuples: list of tuples of (user_id, actions)
+ events_and_contexts (list[(EventBase, EventContext)]): events
+ we are persisting
+ all_events_and_contexts (list[(EventBase, EventContext)]): all
+ events that we were going to persist. This includes events
+ we've already persisted, etc, that wouldn't appear in
+ events_and_context.
"""
sql = """
@@ -424,33 +434,40 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
WHERE event_id = ?
"""
- txn.execute(sql, (
- event.room_id, event.internal_metadata.stream_ordering,
- event.depth, event.event_id,
- ))
-
- user_ids = self._simple_select_onecol_txn(
- txn,
- table="event_push_actions_staging",
- keyvalues={
- "event_id": event.event_id,
- },
- retcol="user_id",
- )
+ if events_and_contexts:
+ txn.executemany(sql, (
+ (
+ event.room_id, event.internal_metadata.stream_ordering,
+ event.depth, event.event_id,
+ )
+ for event, _ in events_and_contexts
+ ))
+
+ for event, _ in events_and_contexts:
+ user_ids = self._simple_select_onecol_txn(
+ txn,
+ table="event_push_actions_staging",
+ keyvalues={
+ "event_id": event.event_id,
+ },
+ retcol="user_id",
+ )
- self._simple_delete_txn(
- txn,
- table="event_push_actions_staging",
- keyvalues={
- "event_id": event.event_id,
- },
- )
+ for uid in user_ids:
+ txn.call_after(
+ self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
+ (event.room_id, uid,)
+ )
- for uid in user_ids:
- txn.call_after(
- self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
- (event.room_id, uid,)
+ # Now we delete the staging area for *all* events that were being
+ # persisted.
+ txn.executemany(
+ "DELETE FROM event_push_actions_staging WHERE event_id = ?",
+ (
+ (event.event_id,)
+ for event, _ in all_events_and_contexts
)
+ )
@defer.inlineCallbacks
def get_push_actions_for_user(self, user_id, before=None, limit=50,
|