diff options
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/databases/main/receipts.py | 34 |
1 files changed, 26 insertions, 8 deletions
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 3468f354e6..29972d5204 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -941,10 +941,14 @@ class ReceiptsBackgroundUpdateStore(SQLBaseStore): receipts.""" def _remote_duplicate_receipts_txn(txn: LoggingTransaction) -> None: + if isinstance(self.database_engine, PostgresEngine): + ROW_ID_NAME = "ctid" + else: + ROW_ID_NAME = "rowid" + # Identify any duplicate receipts arising from # https://github.com/matrix-org/synapse/issues/14406. - # We expect the following query to use the per-thread receipt index and take - # less than a minute. + # The following query takes less than a minute on matrix.org. sql = """ SELECT MAX(stream_id), room_id, receipt_type, user_id FROM receipts_linearized @@ -956,19 +960,33 @@ class ReceiptsBackgroundUpdateStore(SQLBaseStore): duplicate_keys = cast(List[Tuple[int, str, str, str]], list(txn)) # Then remove duplicate receipts, keeping the one with the highest - # `stream_id`. There should only be a single receipt with any given - # `stream_id`. - for max_stream_id, room_id, receipt_type, user_id in duplicate_keys: - sql = """ + # `stream_id`. Since there might be duplicate rows with the same + # `stream_id`, we delete by the ctid instead. + for stream_id, room_id, receipt_type, user_id in duplicate_keys: + sql = f""" + SELECT {ROW_ID_NAME} + FROM receipts_linearized + WHERE + room_id = ? AND + receipt_type = ? AND + user_id = ? AND + thread_id IS NULL AND + stream_id = ? + LIMIT 1 + """ + txn.execute(sql, (room_id, receipt_type, user_id, stream_id)) + row_id = cast(Tuple[str], txn.fetchone())[0] + + sql = f""" DELETE FROM receipts_linearized WHERE room_id = ? AND receipt_type = ? AND user_id = ? AND thread_id IS NULL AND - stream_id < ? + {ROW_ID_NAME} != ? """ - txn.execute(sql, (room_id, receipt_type, user_id, max_stream_id)) + txn.execute(sql, (room_id, receipt_type, user_id, row_id)) await self.db_pool.runInteraction( self.RECEIPTS_LINEARIZED_UNIQUE_INDEX_UPDATE_NAME, |