summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/databases/main/receipts.py34
1 files changed, 26 insertions, 8 deletions
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 3468f354e6..29972d5204 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -941,10 +941,14 @@ class ReceiptsBackgroundUpdateStore(SQLBaseStore):
         receipts."""
 
         def _remote_duplicate_receipts_txn(txn: LoggingTransaction) -> None:
+            if isinstance(self.database_engine, PostgresEngine):
+                ROW_ID_NAME = "ctid"
+            else:
+                ROW_ID_NAME = "rowid"
+
             # Identify any duplicate receipts arising from
             # https://github.com/matrix-org/synapse/issues/14406.
-            # We expect the following query to use the per-thread receipt index and take
-            # less than a minute.
+            # The following query takes less than a minute on matrix.org.
             sql = """
                 SELECT MAX(stream_id), room_id, receipt_type, user_id
                 FROM receipts_linearized
@@ -956,19 +960,33 @@ class ReceiptsBackgroundUpdateStore(SQLBaseStore):
             duplicate_keys = cast(List[Tuple[int, str, str, str]], list(txn))
 
             # Then remove duplicate receipts, keeping the one with the highest
-            # `stream_id`. There should only be a single receipt with any given
-            # `stream_id`.
-            for max_stream_id, room_id, receipt_type, user_id in duplicate_keys:
-                sql = """
+            # `stream_id`. Since there might be duplicate rows with the same
+            # `stream_id`, we delete by the ctid instead.
+            for stream_id, room_id, receipt_type, user_id in duplicate_keys:
+                sql = f"""
+                SELECT {ROW_ID_NAME}
+                FROM receipts_linearized
+                WHERE
+                    room_id = ? AND
+                    receipt_type = ? AND
+                    user_id = ? AND
+                    thread_id IS NULL AND
+                    stream_id = ?
+                LIMIT 1
+                """
+                txn.execute(sql, (room_id, receipt_type, user_id, stream_id))
+                row_id = cast(Tuple[str], txn.fetchone())[0]
+
+                sql = f"""
                     DELETE FROM receipts_linearized
                     WHERE
                         room_id = ? AND
                         receipt_type = ? AND
                         user_id = ? AND
                         thread_id IS NULL AND
-                        stream_id < ?
+                        {ROW_ID_NAME} != ?
                 """
-                txn.execute(sql, (room_id, receipt_type, user_id, max_stream_id))
+                txn.execute(sql, (room_id, receipt_type, user_id, row_id))
 
         await self.db_pool.runInteraction(
             self.RECEIPTS_LINEARIZED_UNIQUE_INDEX_UPDATE_NAME,