summary refs log tree commit diff
path: root/synapse/storage/databases/main
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-10-13 12:07:56 +0100
committerGitHub <noreply@github.com>2020-10-13 12:07:56 +0100
commitb2486f6656bec2307e62de19d2830994a42b879d (patch)
tree630ab6e5b341a53b5f6c1f7e966ab79673eb73b9 /synapse/storage/databases/main
parentMerge branch 'master' into develop (diff)
downloadsynapse-b2486f6656bec2307e62de19d2830994a42b879d.tar.xz
Fix message duplication if something goes wrong after persisting the event (#8476)
Should fix #3365.
Diffstat (limited to 'synapse/storage/databases/main')
-rw-r--r--synapse/storage/databases/main/events.py31
-rw-r--r--synapse/storage/databases/main/events_worker.py83
-rw-r--r--synapse/storage/databases/main/registration.py6
-rw-r--r--synapse/storage/databases/main/schema/delta/58/19txn_id.sql40
4 files changed, 158 insertions, 2 deletions
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index b19c424ba9..fdb17745f6 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -361,6 +361,8 @@ class PersistEventsStore:
 
         self._store_event_txn(txn, events_and_contexts=events_and_contexts)
 
+        self._persist_transaction_ids_txn(txn, events_and_contexts)
+
         # Insert into event_to_state_groups.
         self._store_event_state_mappings_txn(txn, events_and_contexts)
 
@@ -405,6 +407,35 @@ class PersistEventsStore:
         # room_memberships, where applicable.
         self._update_current_state_txn(txn, state_delta_for_room, min_stream_order)
 
+    def _persist_transaction_ids_txn(
+        self,
+        txn: LoggingTransaction,
+        events_and_contexts: List[Tuple[EventBase, EventContext]],
+    ):
+        """Persist the mapping from transaction IDs to event IDs (if defined).
+        """
+
+        to_insert = []
+        for event, _ in events_and_contexts:
+            token_id = getattr(event.internal_metadata, "token_id", None)
+            txn_id = getattr(event.internal_metadata, "txn_id", None)
+            if token_id and txn_id:
+                to_insert.append(
+                    {
+                        "event_id": event.event_id,
+                        "room_id": event.room_id,
+                        "user_id": event.sender,
+                        "token_id": token_id,
+                        "txn_id": txn_id,
+                        "inserted_ts": self._clock.time_msec(),
+                    }
+                )
+
+        if to_insert:
+            self.db_pool.simple_insert_many_txn(
+                txn, table="event_txn_id", values=to_insert,
+            )
+
     def _update_current_state_txn(
         self,
         txn: LoggingTransaction,
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 4e74fafe43..3ec4d1d9c2 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -12,7 +12,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 import itertools
 import logging
 import threading
@@ -137,6 +136,15 @@ class EventsWorkerStore(SQLBaseStore):
                     db_conn, "events", "stream_ordering", step=-1
                 )
 
+        if not hs.config.worker.worker_app:
+            # We periodically clean out old transaction ID mappings
+            self._clock.looping_call(
+                run_as_background_process,
+                5 * 60 * 1000,
+                "_cleanup_old_transaction_ids",
+                self._cleanup_old_transaction_ids,
+            )
+
         self._get_event_cache = Cache(
             "*getEvent*",
             keylen=3,
@@ -1308,3 +1316,76 @@ class EventsWorkerStore(SQLBaseStore):
         return await self.db_pool.runInteraction(
             desc="get_next_event_to_expire", func=get_next_event_to_expire_txn
         )
+
+    async def get_event_id_from_transaction_id(
+        self, room_id: str, user_id: str, token_id: int, txn_id: str
+    ) -> Optional[str]:
+        """Look up if we have already persisted an event for the transaction ID,
+        returning the event ID if so.
+        """
+        return await self.db_pool.simple_select_one_onecol(
+            table="event_txn_id",
+            keyvalues={
+                "room_id": room_id,
+                "user_id": user_id,
+                "token_id": token_id,
+                "txn_id": txn_id,
+            },
+            retcol="event_id",
+            allow_none=True,
+            desc="get_event_id_from_transaction_id",
+        )
+
+    async def get_already_persisted_events(
+        self, events: Iterable[EventBase]
+    ) -> Dict[str, str]:
+        """Look up if we have already persisted an event for the transaction ID,
+        returning a mapping from event ID in the given list to the event ID of
+        an existing event.
+
+        Also checks if there are duplicates in the given events, if there are
+        will map duplicates to the *first* event.
+        """
+
+        mapping = {}
+        txn_id_to_event = {}  # type: Dict[Tuple[str, int, str], str]
+
+        for event in events:
+            token_id = getattr(event.internal_metadata, "token_id", None)
+            txn_id = getattr(event.internal_metadata, "txn_id", None)
+
+            if token_id and txn_id:
+                # Check if this is a duplicate of an event in the given events.
+                existing = txn_id_to_event.get((event.room_id, token_id, txn_id))
+                if existing:
+                    mapping[event.event_id] = existing
+                    continue
+
+                # Check if this is a duplicate of an event we've already
+                # persisted.
+                existing = await self.get_event_id_from_transaction_id(
+                    event.room_id, event.sender, token_id, txn_id
+                )
+                if existing:
+                    mapping[event.event_id] = existing
+                    txn_id_to_event[(event.room_id, token_id, txn_id)] = existing
+                else:
+                    txn_id_to_event[(event.room_id, token_id, txn_id)] = event.event_id
+
+        return mapping
+
+    async def _cleanup_old_transaction_ids(self):
+        """Cleans out transaction id mappings older than 24hrs.
+        """
+
+        def _cleanup_old_transaction_ids_txn(txn):
+            sql = """
+                DELETE FROM event_txn_id
+                WHERE inserted_ts < ?
+            """
+            one_day_ago = self._clock.time_msec() - 24 * 60 * 60 * 1000
+            txn.execute(sql, (one_day_ago,))
+
+        return await self.db_pool.runInteraction(
+            "_cleanup_old_transaction_ids", _cleanup_old_transaction_ids_txn,
+        )
diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index 236d3cdbe3..9a003e30f9 100644
--- a/synapse/storage/databases/main/registration.py
+++ b/synapse/storage/databases/main/registration.py
@@ -1003,7 +1003,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
         token: str,
         device_id: Optional[str],
         valid_until_ms: Optional[int],
-    ) -> None:
+    ) -> int:
         """Adds an access token for the given user.
 
         Args:
@@ -1013,6 +1013,8 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
             valid_until_ms: when the token is valid until. None for no expiry.
         Raises:
             StoreError if there was a problem adding this.
+        Returns:
+            The token ID
         """
         next_id = self._access_tokens_id_gen.get_next()
 
@@ -1028,6 +1030,8 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
             desc="add_access_token_to_user",
         )
 
+        return next_id
+
     def _set_device_for_access_token_txn(self, txn, token: str, device_id: str) -> str:
         old_device_id = self.db_pool.simple_select_one_onecol_txn(
             txn, "access_tokens", {"token": token}, "device_id"
diff --git a/synapse/storage/databases/main/schema/delta/58/19txn_id.sql b/synapse/storage/databases/main/schema/delta/58/19txn_id.sql
new file mode 100644
index 0000000000..b2454121a8
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/58/19txn_id.sql
@@ -0,0 +1,40 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+-- A map of recent events persisted with transaction IDs. Used to deduplicate
+-- send event requests with the same transaction ID.
+--
+-- Note: transaction IDs are scoped to the room ID/user ID/access token that was
+-- used to make the request.
+--
+-- Note: The foreign key constraints are ON DELETE CASCADE, as if we delete the
+-- events or access token we don't want to try and de-duplicate the event.
+CREATE TABLE IF NOT EXISTS event_txn_id (
+    event_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    user_id TEXT NOT NULL,
+    token_id BIGINT NOT NULL,
+    txn_id TEXT NOT NULL,
+    inserted_ts BIGINT NOT NULL,
+    FOREIGN KEY (event_id)
+        REFERENCES events (event_id) ON DELETE CASCADE,
+    FOREIGN KEY (token_id)
+        REFERENCES access_tokens (id) ON DELETE CASCADE
+);
+
+CREATE UNIQUE INDEX IF NOT EXISTS event_txn_id_event_id ON event_txn_id(event_id);
+CREATE UNIQUE INDEX IF NOT EXISTS event_txn_id_txn_id ON event_txn_id(room_id, user_id, token_id, txn_id);
+CREATE INDEX IF NOT EXISTS event_txn_id_ts ON event_txn_id(inserted_ts);