diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 3468f354e6..29972d5204 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -941,10 +941,14 @@ class ReceiptsBackgroundUpdateStore(SQLBaseStore):
receipts."""
def _remote_duplicate_receipts_txn(txn: LoggingTransaction) -> None:
+ if isinstance(self.database_engine, PostgresEngine):
+ ROW_ID_NAME = "ctid"
+ else:
+ ROW_ID_NAME = "rowid"
+
# Identify any duplicate receipts arising from
# https://github.com/matrix-org/synapse/issues/14406.
- # We expect the following query to use the per-thread receipt index and take
- # less than a minute.
+ # The following query takes less than a minute on matrix.org.
sql = """
SELECT MAX(stream_id), room_id, receipt_type, user_id
FROM receipts_linearized
@@ -956,19 +960,33 @@ class ReceiptsBackgroundUpdateStore(SQLBaseStore):
duplicate_keys = cast(List[Tuple[int, str, str, str]], list(txn))
# Then remove duplicate receipts, keeping the one with the highest
- # `stream_id`. There should only be a single receipt with any given
- # `stream_id`.
- for max_stream_id, room_id, receipt_type, user_id in duplicate_keys:
- sql = """
+ # `stream_id`. Since there might be duplicate rows with the same
+ # `stream_id`, we delete by the ctid instead.
+ for stream_id, room_id, receipt_type, user_id in duplicate_keys:
+ sql = f"""
+ SELECT {ROW_ID_NAME}
+ FROM receipts_linearized
+ WHERE
+ room_id = ? AND
+ receipt_type = ? AND
+ user_id = ? AND
+ thread_id IS NULL AND
+ stream_id = ?
+ LIMIT 1
+ """
+ txn.execute(sql, (room_id, receipt_type, user_id, stream_id))
+ row_id = cast(Tuple[str], txn.fetchone())[0]
+
+ sql = f"""
DELETE FROM receipts_linearized
WHERE
room_id = ? AND
receipt_type = ? AND
user_id = ? AND
thread_id IS NULL AND
- stream_id < ?
+ {ROW_ID_NAME} != ?
"""
- txn.execute(sql, (room_id, receipt_type, user_id, max_stream_id))
+ txn.execute(sql, (room_id, receipt_type, user_id, row_id))
await self.db_pool.runInteraction(
self.RECEIPTS_LINEARIZED_UNIQUE_INDEX_UPDATE_NAME,
|