diff options
author | Erik Johnston <erik@matrix.org> | 2020-10-13 12:07:56 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-10-13 12:07:56 +0100 |
commit | b2486f6656bec2307e62de19d2830994a42b879d (patch) | |
tree | 630ab6e5b341a53b5f6c1f7e966ab79673eb73b9 /synapse/storage/databases/main | |
parent | Merge branch 'master' into develop (diff) | |
download | synapse-b2486f6656bec2307e62de19d2830994a42b879d.tar.xz |
Fix message duplication if something goes wrong after persisting the event (#8476)
Should fix #3365.
Diffstat (limited to 'synapse/storage/databases/main')
-rw-r--r-- | synapse/storage/databases/main/events.py | 31 | ||||
-rw-r--r-- | synapse/storage/databases/main/events_worker.py | 83 | ||||
-rw-r--r-- | synapse/storage/databases/main/registration.py | 6 | ||||
-rw-r--r-- | synapse/storage/databases/main/schema/delta/58/19txn_id.sql | 40 |
4 files changed, 158 insertions, 2 deletions
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index b19c424ba9..fdb17745f6 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -361,6 +361,8 @@ class PersistEventsStore: self._store_event_txn(txn, events_and_contexts=events_and_contexts) + self._persist_transaction_ids_txn(txn, events_and_contexts) + # Insert into event_to_state_groups. self._store_event_state_mappings_txn(txn, events_and_contexts) @@ -405,6 +407,35 @@ class PersistEventsStore: # room_memberships, where applicable. self._update_current_state_txn(txn, state_delta_for_room, min_stream_order) + def _persist_transaction_ids_txn( + self, + txn: LoggingTransaction, + events_and_contexts: List[Tuple[EventBase, EventContext]], + ): + """Persist the mapping from transaction IDs to event IDs (if defined). + """ + + to_insert = [] + for event, _ in events_and_contexts: + token_id = getattr(event.internal_metadata, "token_id", None) + txn_id = getattr(event.internal_metadata, "txn_id", None) + if token_id and txn_id: + to_insert.append( + { + "event_id": event.event_id, + "room_id": event.room_id, + "user_id": event.sender, + "token_id": token_id, + "txn_id": txn_id, + "inserted_ts": self._clock.time_msec(), + } + ) + + if to_insert: + self.db_pool.simple_insert_many_txn( + txn, table="event_txn_id", values=to_insert, + ) + def _update_current_state_txn( self, txn: LoggingTransaction, diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 4e74fafe43..3ec4d1d9c2 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -12,7 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import itertools import logging import threading @@ -137,6 +136,15 @@ class EventsWorkerStore(SQLBaseStore): db_conn, "events", "stream_ordering", step=-1 ) + if not hs.config.worker.worker_app: + # We periodically clean out old transaction ID mappings + self._clock.looping_call( + run_as_background_process, + 5 * 60 * 1000, + "_cleanup_old_transaction_ids", + self._cleanup_old_transaction_ids, + ) + self._get_event_cache = Cache( "*getEvent*", keylen=3, @@ -1308,3 +1316,76 @@ class EventsWorkerStore(SQLBaseStore): return await self.db_pool.runInteraction( desc="get_next_event_to_expire", func=get_next_event_to_expire_txn ) + + async def get_event_id_from_transaction_id( + self, room_id: str, user_id: str, token_id: int, txn_id: str + ) -> Optional[str]: + """Look up if we have already persisted an event for the transaction ID, + returning the event ID if so. + """ + return await self.db_pool.simple_select_one_onecol( + table="event_txn_id", + keyvalues={ + "room_id": room_id, + "user_id": user_id, + "token_id": token_id, + "txn_id": txn_id, + }, + retcol="event_id", + allow_none=True, + desc="get_event_id_from_transaction_id", + ) + + async def get_already_persisted_events( + self, events: Iterable[EventBase] + ) -> Dict[str, str]: + """Look up if we have already persisted an event for the transaction ID, + returning a mapping from event ID in the given list to the event ID of + an existing event. + + Also checks if there are duplicates in the given events, if there are + will map duplicates to the *first* event. + """ + + mapping = {} + txn_id_to_event = {} # type: Dict[Tuple[str, int, str], str] + + for event in events: + token_id = getattr(event.internal_metadata, "token_id", None) + txn_id = getattr(event.internal_metadata, "txn_id", None) + + if token_id and txn_id: + # Check if this is a duplicate of an event in the given events. + existing = txn_id_to_event.get((event.room_id, token_id, txn_id)) + if existing: + mapping[event.event_id] = existing + continue + + # Check if this is a duplicate of an event we've already + # persisted. + existing = await self.get_event_id_from_transaction_id( + event.room_id, event.sender, token_id, txn_id + ) + if existing: + mapping[event.event_id] = existing + txn_id_to_event[(event.room_id, token_id, txn_id)] = existing + else: + txn_id_to_event[(event.room_id, token_id, txn_id)] = event.event_id + + return mapping + + async def _cleanup_old_transaction_ids(self): + """Cleans out transaction id mappings older than 24hrs. + """ + + def _cleanup_old_transaction_ids_txn(txn): + sql = """ + DELETE FROM event_txn_id + WHERE inserted_ts < ? + """ + one_day_ago = self._clock.time_msec() - 24 * 60 * 60 * 1000 + txn.execute(sql, (one_day_ago,)) + + return await self.db_pool.runInteraction( + "_cleanup_old_transaction_ids", _cleanup_old_transaction_ids_txn, + ) diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index 236d3cdbe3..9a003e30f9 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -1003,7 +1003,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore): token: str, device_id: Optional[str], valid_until_ms: Optional[int], - ) -> None: + ) -> int: """Adds an access token for the given user. Args: @@ -1013,6 +1013,8 @@ class RegistrationStore(RegistrationBackgroundUpdateStore): valid_until_ms: when the token is valid until. None for no expiry. Raises: StoreError if there was a problem adding this. + Returns: + The token ID """ next_id = self._access_tokens_id_gen.get_next() @@ -1028,6 +1030,8 @@ class RegistrationStore(RegistrationBackgroundUpdateStore): desc="add_access_token_to_user", ) + return next_id + def _set_device_for_access_token_txn(self, txn, token: str, device_id: str) -> str: old_device_id = self.db_pool.simple_select_one_onecol_txn( txn, "access_tokens", {"token": token}, "device_id" diff --git a/synapse/storage/databases/main/schema/delta/58/19txn_id.sql b/synapse/storage/databases/main/schema/delta/58/19txn_id.sql new file mode 100644 index 0000000000..b2454121a8 --- /dev/null +++ b/synapse/storage/databases/main/schema/delta/58/19txn_id.sql @@ -0,0 +1,40 @@ +/* Copyright 2020 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +-- A map of recent events persisted with transaction IDs. Used to deduplicate +-- send event requests with the same transaction ID. +-- +-- Note: transaction IDs are scoped to the room ID/user ID/access token that was +-- used to make the request. +-- +-- Note: The foreign key constraints are ON DELETE CASCADE, as if we delete the +-- events or access token we don't want to try and de-duplicate the event. +CREATE TABLE IF NOT EXISTS event_txn_id ( + event_id TEXT NOT NULL, + room_id TEXT NOT NULL, + user_id TEXT NOT NULL, + token_id BIGINT NOT NULL, + txn_id TEXT NOT NULL, + inserted_ts BIGINT NOT NULL, + FOREIGN KEY (event_id) + REFERENCES events (event_id) ON DELETE CASCADE, + FOREIGN KEY (token_id) + REFERENCES access_tokens (id) ON DELETE CASCADE +); + +CREATE UNIQUE INDEX IF NOT EXISTS event_txn_id_event_id ON event_txn_id(event_id); +CREATE UNIQUE INDEX IF NOT EXISTS event_txn_id_txn_id ON event_txn_id(room_id, user_id, token_id, txn_id); +CREATE INDEX IF NOT EXISTS event_txn_id_ts ON event_txn_id(inserted_ts); |