1 files changed, 7 insertions, 2 deletions
diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index 7d4754b3d3..505616e210 100644
--- a/synapse/storage/databases/main/event_push_actions.py
+++ b/synapse/storage/databases/main/event_push_actions.py
@@ -972,7 +972,12 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, EventsWorkerStore, SQLBas
stream_row = txn.fetchone()
if stream_row:
(offset_stream_ordering,) = stream_row
- rotate_to_stream_ordering = offset_stream_ordering
+
+ # We need to bound by the current token to ensure that we handle
+ # out-of-order writes correctly.
+ rotate_to_stream_ordering = min(
+ offset_stream_ordering, self._stream_id_gen.get_current_token()
+ )
caught_up = False
else:
rotate_to_stream_ordering = self._stream_id_gen.get_current_token()
@@ -1004,7 +1009,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, EventsWorkerStore, SQLBas
SELECT user_id, room_id, count(*) as cnt,
max(stream_ordering) as stream_ordering
FROM event_push_actions
- WHERE ? <= stream_ordering AND stream_ordering < ?
+ WHERE ? < stream_ordering AND stream_ordering <= ?
AND %s = 1
GROUP BY user_id, room_id
) AS upd
|