diff options
author | Erik Johnston <erik@matrix.org> | 2020-10-13 12:07:56 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-10-13 12:07:56 +0100 |
commit | b2486f6656bec2307e62de19d2830994a42b879d (patch) | |
tree | 630ab6e5b341a53b5f6c1f7e966ab79673eb73b9 /synapse/storage/databases/main/events.py | |
parent | Merge branch 'master' into develop (diff) | |
download | synapse-b2486f6656bec2307e62de19d2830994a42b879d.tar.xz |
Fix message duplication if something goes wrong after persisting the event (#8476)
Should fix #3365.
Diffstat (limited to 'synapse/storage/databases/main/events.py')
-rw-r--r-- | synapse/storage/databases/main/events.py | 31 |
1 files changed, 31 insertions, 0 deletions
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index b19c424ba9..fdb17745f6 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -361,6 +361,8 @@ class PersistEventsStore: self._store_event_txn(txn, events_and_contexts=events_and_contexts) + self._persist_transaction_ids_txn(txn, events_and_contexts) + # Insert into event_to_state_groups. self._store_event_state_mappings_txn(txn, events_and_contexts) @@ -405,6 +407,35 @@ class PersistEventsStore: # room_memberships, where applicable. self._update_current_state_txn(txn, state_delta_for_room, min_stream_order) + def _persist_transaction_ids_txn( + self, + txn: LoggingTransaction, + events_and_contexts: List[Tuple[EventBase, EventContext]], + ): + """Persist the mapping from transaction IDs to event IDs (if defined). + """ + + to_insert = [] + for event, _ in events_and_contexts: + token_id = getattr(event.internal_metadata, "token_id", None) + txn_id = getattr(event.internal_metadata, "txn_id", None) + if token_id and txn_id: + to_insert.append( + { + "event_id": event.event_id, + "room_id": event.room_id, + "user_id": event.sender, + "token_id": token_id, + "txn_id": txn_id, + "inserted_ts": self._clock.time_msec(), + } + ) + + if to_insert: + self.db_pool.simple_insert_many_txn( + txn, table="event_txn_id", values=to_insert, + ) + def _update_current_state_txn( self, txn: LoggingTransaction, |