1 files changed, 7 insertions, 2 deletions
diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index f432d578b5..bb6e104d71 100644
--- a/synapse/storage/databases/main/event_push_actions.py
+++ b/synapse/storage/databases/main/event_push_actions.py
@@ -982,7 +982,12 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
stream_row = txn.fetchone()
if stream_row:
(offset_stream_ordering,) = stream_row
- rotate_to_stream_ordering = offset_stream_ordering
+
+ # We need to bound by the current token to ensure that we handle
+ # out-of-order writes correctly.
+ rotate_to_stream_ordering = min(
+ offset_stream_ordering, self._stream_id_gen.get_current_token()
+ )
caught_up = False
else:
rotate_to_stream_ordering = self._stream_id_gen.get_current_token()
@@ -1014,7 +1019,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
SELECT user_id, room_id, count(*) as cnt,
max(stream_ordering) as stream_ordering
FROM event_push_actions
- WHERE ? <= stream_ordering AND stream_ordering < ?
+ WHERE ? < stream_ordering AND stream_ordering <= ?
AND %s = 1
GROUP BY user_id, room_id
) AS upd
|