diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index 40bf000e9c..bdd0781c48 100644
--- a/synapse/storage/databases/main/event_push_actions.py
+++ b/synapse/storage/databases/main/event_push_actions.py
@@ -385,7 +385,6 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
WITH all_receipts AS (
SELECT room_id, thread_id, MAX(event_stream_ordering) AS max_receipt_stream_ordering
FROM receipts_linearized
- LEFT JOIN events USING (room_id, event_id)
WHERE
{receipt_types_clause}
AND user_id = ?
@@ -621,13 +620,12 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
SELECT notif_count, COALESCE(unread_count, 0), thread_id
FROM event_push_summary
LEFT JOIN (
- SELECT thread_id, MAX(stream_ordering) AS threaded_receipt_stream_ordering
+ SELECT thread_id, MAX(event_stream_ordering) AS threaded_receipt_stream_ordering
FROM receipts_linearized
- LEFT JOIN events USING (room_id, event_id)
WHERE
user_id = ?
AND room_id = ?
- AND stream_ordering > ?
+ AND event_stream_ordering > ?
AND {receipt_types_clause}
GROUP BY thread_id
) AS receipts USING (thread_id)
@@ -659,13 +657,12 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
sql = f"""
SELECT COUNT(*), thread_id FROM event_push_actions
LEFT JOIN (
- SELECT thread_id, MAX(stream_ordering) AS threaded_receipt_stream_ordering
+ SELECT thread_id, MAX(event_stream_ordering) AS threaded_receipt_stream_ordering
FROM receipts_linearized
- LEFT JOIN events USING (room_id, event_id)
WHERE
user_id = ?
AND room_id = ?
- AND stream_ordering > ?
+ AND event_stream_ordering > ?
AND {receipt_types_clause}
GROUP BY thread_id
) AS receipts USING (thread_id)
@@ -738,13 +735,12 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
thread_id
FROM event_push_actions
LEFT JOIN (
- SELECT thread_id, MAX(stream_ordering) AS threaded_receipt_stream_ordering
+ SELECT thread_id, MAX(event_stream_ordering) AS threaded_receipt_stream_ordering
FROM receipts_linearized
- LEFT JOIN events USING (room_id, event_id)
WHERE
user_id = ?
AND room_id = ?
- AND stream_ordering > ?
+ AND event_stream_ordering > ?
AND {receipt_types_clause}
GROUP BY thread_id
) AS receipts USING (thread_id)
@@ -910,9 +906,8 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
# given this function generally gets called with only one room and
# thread ID.
sql = f"""
- SELECT room_id, thread_id, MAX(stream_ordering)
+ SELECT room_id, thread_id, MAX(event_stream_ordering)
FROM receipts_linearized
- INNER JOIN events USING (room_id, event_id)
WHERE {receipt_types_clause}
AND {thread_ids_clause}
AND {room_ids_clause}
@@ -1442,9 +1437,8 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
)
sql = """
- SELECT r.stream_id, r.room_id, r.user_id, r.thread_id, e.stream_ordering
+ SELECT r.stream_id, r.room_id, r.user_id, r.thread_id, r.event_stream_ordering
FROM receipts_linearized AS r
- INNER JOIN events AS e USING (event_id)
WHERE ? < r.stream_id AND r.stream_id <= ? AND user_id LIKE ?
ORDER BY r.stream_id ASC
LIMIT ?
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index d513c42530..13387a3839 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -178,14 +178,13 @@ class ReceiptsWorkerStore(SQLBaseStore):
)
sql = f"""
- SELECT event_id, stream_ordering
+ SELECT event_id, event_stream_ordering
FROM receipts_linearized
- INNER JOIN events USING (room_id, event_id)
WHERE {clause}
AND user_id = ?
AND room_id = ?
AND thread_id IS NULL
- ORDER BY stream_ordering DESC
+ ORDER BY event_stream_ordering DESC
LIMIT 1
"""
@@ -735,10 +734,13 @@ class ReceiptsWorkerStore(SQLBaseStore):
thread_clause = "r.thread_id = ?"
thread_args = (thread_id,)
+ # If the receipt doesn't have a stream ordering it is because we
+ # don't have the associated event, and so must be a remote receipt.
+ # Hence it's safe to just allow new receipts to clobber it.
sql = f"""
- SELECT stream_ordering, event_id FROM events
- INNER JOIN receipts_linearized AS r USING (event_id, room_id)
- WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ? AND {thread_clause}
+ SELECT r.event_stream_ordering, r.event_id FROM receipts_linearized AS r
+ WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?
+ AND r.event_stream_ordering IS NOT NULL AND {thread_clause}
"""
txn.execute(
sql,
|