diff options
author | Erik Johnston <erikj@jki.re> | 2018-02-26 14:28:35 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-02-26 14:28:35 +0000 |
commit | 73fe86684736b5509ad2bb27b74e4e99c0db7570 (patch) | |
tree | 63c99e09fabe0a21f9465b1c3bf14cbef493d708 /synapse/storage/event_push_actions.py | |
parent | Merge pull request #2900 from matrix-org/erikj/split_event_push_actions (diff) | |
parent | Merge branch 'develop' of github.com:matrix-org/synapse into erikj/handle_unp... (diff) | |
download | synapse-73fe86684736b5509ad2bb27b74e4e99c0db7570.tar.xz |
Merge pull request #2894 from matrix-org/erikj/handle_unpersisted_events_push
Ensure all push actions are deleted from staging
Diffstat (limited to 'synapse/storage/event_push_actions.py')
-rw-r--r-- | synapse/storage/event_push_actions.py | 73 |
1 files changed, 45 insertions, 28 deletions
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 4cabf70ad0..ca0d71e6b8 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -407,11 +407,21 @@ class EventPushActionsStore(EventPushActionsWorkerStore): self._rotate_notifs, 30 * 60 * 1000 ) - def _set_push_actions_for_event_and_users_txn(self, txn, event): - """ + def _set_push_actions_for_event_and_users_txn(self, txn, events_and_contexts, + all_events_and_contexts): + """Handles moving push actions from staging table to main + event_push_actions table for all events in `events_and_contexts`. + + Also ensures that all events in `all_events_and_contexts` are removed + from the push action staging area. + Args: - event: the event set actions for - tuples: list of tuples of (user_id, actions) + events_and_contexts (list[(EventBase, EventContext)]): events + we are persisting + all_events_and_contexts (list[(EventBase, EventContext)]): all + events that we were going to persist. This includes events + we've already persisted, etc, that wouldn't appear in + events_and_context. """ sql = """ @@ -424,33 +434,40 @@ class EventPushActionsStore(EventPushActionsWorkerStore): WHERE event_id = ? """ - txn.execute(sql, ( - event.room_id, event.internal_metadata.stream_ordering, - event.depth, event.event_id, - )) - - user_ids = self._simple_select_onecol_txn( - txn, - table="event_push_actions_staging", - keyvalues={ - "event_id": event.event_id, - }, - retcol="user_id", - ) + if events_and_contexts: + txn.executemany(sql, ( + ( + event.room_id, event.internal_metadata.stream_ordering, + event.depth, event.event_id, + ) + for event, _ in events_and_contexts + )) + + for event, _ in events_and_contexts: + user_ids = self._simple_select_onecol_txn( + txn, + table="event_push_actions_staging", + keyvalues={ + "event_id": event.event_id, + }, + retcol="user_id", + ) - self._simple_delete_txn( - txn, - table="event_push_actions_staging", - keyvalues={ - "event_id": event.event_id, - }, - ) + for uid in user_ids: + txn.call_after( + self.get_unread_event_push_actions_by_room_for_user.invalidate_many, + (event.room_id, uid,) + ) - for uid in user_ids: - txn.call_after( - self.get_unread_event_push_actions_by_room_for_user.invalidate_many, - (event.room_id, uid,) + # Now we delete the staging area for *all* events that were being + # persisted. + txn.executemany( + "DELETE FROM event_push_actions_staging WHERE event_id = ?", + ( + (event.event_id,) + for event, _ in all_events_and_contexts ) + ) @defer.inlineCallbacks def get_push_actions_for_user(self, user_id, before=None, limit=50, |