diff options
author | Erik Johnston <erikj@jki.re> | 2018-02-26 14:28:35 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-02-26 14:28:35 +0000 |
commit | 73fe86684736b5509ad2bb27b74e4e99c0db7570 (patch) | |
tree | 63c99e09fabe0a21f9465b1c3bf14cbef493d708 /synapse | |
parent | Merge pull request #2900 from matrix-org/erikj/split_event_push_actions (diff) | |
parent | Merge branch 'develop' of github.com:matrix-org/synapse into erikj/handle_unp... (diff) | |
download | synapse-73fe86684736b5509ad2bb27b74e4e99c0db7570.tar.xz |
Merge pull request #2894 from matrix-org/erikj/handle_unpersisted_events_push
Ensure all push actions are deleted from staging
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/storage/event_push_actions.py | 73 | ||||
-rw-r--r-- | synapse/storage/events.py | 22 |
2 files changed, 61 insertions, 34 deletions
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 4cabf70ad0..ca0d71e6b8 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -407,11 +407,21 @@ class EventPushActionsStore(EventPushActionsWorkerStore): self._rotate_notifs, 30 * 60 * 1000 ) - def _set_push_actions_for_event_and_users_txn(self, txn, event): - """ + def _set_push_actions_for_event_and_users_txn(self, txn, events_and_contexts, + all_events_and_contexts): + """Handles moving push actions from staging table to main + event_push_actions table for all events in `events_and_contexts`. + + Also ensures that all events in `all_events_and_contexts` are removed + from the push action staging area. + Args: - event: the event set actions for - tuples: list of tuples of (user_id, actions) + events_and_contexts (list[(EventBase, EventContext)]): events + we are persisting + all_events_and_contexts (list[(EventBase, EventContext)]): all + events that we were going to persist. This includes events + we've already persisted, etc, that wouldn't appear in + events_and_context. """ sql = """ @@ -424,33 +434,40 @@ class EventPushActionsStore(EventPushActionsWorkerStore): WHERE event_id = ? """ - txn.execute(sql, ( - event.room_id, event.internal_metadata.stream_ordering, - event.depth, event.event_id, - )) - - user_ids = self._simple_select_onecol_txn( - txn, - table="event_push_actions_staging", - keyvalues={ - "event_id": event.event_id, - }, - retcol="user_id", - ) + if events_and_contexts: + txn.executemany(sql, ( + ( + event.room_id, event.internal_metadata.stream_ordering, + event.depth, event.event_id, + ) + for event, _ in events_and_contexts + )) + + for event, _ in events_and_contexts: + user_ids = self._simple_select_onecol_txn( + txn, + table="event_push_actions_staging", + keyvalues={ + "event_id": event.event_id, + }, + retcol="user_id", + ) - self._simple_delete_txn( - txn, - table="event_push_actions_staging", - keyvalues={ - "event_id": event.event_id, - }, - ) + for uid in user_ids: + txn.call_after( + self.get_unread_event_push_actions_by_room_for_user.invalidate_many, + (event.room_id, uid,) + ) - for uid in user_ids: - txn.call_after( - self.get_unread_event_push_actions_by_room_for_user.invalidate_many, - (event.room_id, uid,) + # Now we delete the staging area for *all* events that were being + # persisted. + txn.executemany( + "DELETE FROM event_push_actions_staging WHERE event_id = ?", + ( + (event.event_id,) + for event, _ in all_events_and_contexts ) + ) @defer.inlineCallbacks def get_push_actions_for_user(self, user_id, before=None, limit=50, diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 99d6cca585..b63392a6cd 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -627,6 +627,8 @@ class EventsStore(EventsWorkerStore): list of the event ids which are the forward extremities. """ + all_events_and_contexts = events_and_contexts + max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering self._update_current_state_txn(txn, state_delta_for_room, max_stream_order) @@ -689,6 +691,7 @@ class EventsStore(EventsWorkerStore): self._update_metadata_tables_txn( txn, events_and_contexts=events_and_contexts, + all_events_and_contexts=all_events_and_contexts, backfilled=backfilled, ) @@ -1086,26 +1089,33 @@ class EventsStore(EventsWorkerStore): ec for ec in events_and_contexts if ec[0] not in to_remove ] - def _update_metadata_tables_txn(self, txn, events_and_contexts, backfilled): + def _update_metadata_tables_txn(self, txn, events_and_contexts, + all_events_and_contexts, backfilled): """Update all the miscellaneous tables for new events Args: txn (twisted.enterprise.adbapi.Connection): db connection events_and_contexts (list[(EventBase, EventContext)]): events we are persisting + all_events_and_contexts (list[(EventBase, EventContext)]): all + events that we were going to persist. This includes events + we've already persisted, etc, that wouldn't appear in + events_and_context. backfilled (bool): True if the events were backfilled """ + # Insert all the push actions into the event_push_actions table. + self._set_push_actions_for_event_and_users_txn( + txn, + events_and_contexts=events_and_contexts, + all_events_and_contexts=all_events_and_contexts, + ) + if not events_and_contexts: # nothing to do here return for event, context in events_and_contexts: - # Insert all the push actions into the event_push_actions table. - self._set_push_actions_for_event_and_users_txn( - txn, event, - ) - if event.type == EventTypes.Redaction and event.redacts is not None: # Remove the entries in the event_push_actions table for the # redacted event. |