summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/databases/main/event_push_actions.py9
1 files changed, 7 insertions, 2 deletions
diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index f432d578b5..bb6e104d71 100644
--- a/synapse/storage/databases/main/event_push_actions.py
+++ b/synapse/storage/databases/main/event_push_actions.py
@@ -982,7 +982,12 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
         stream_row = txn.fetchone()
         if stream_row:
             (offset_stream_ordering,) = stream_row
-            rotate_to_stream_ordering = offset_stream_ordering
+
+            # We need to bound by the current token to ensure that we handle
+            # out-of-order writes correctly.
+            rotate_to_stream_ordering = min(
+                offset_stream_ordering, self._stream_id_gen.get_current_token()
+            )
             caught_up = False
         else:
             rotate_to_stream_ordering = self._stream_id_gen.get_current_token()
@@ -1014,7 +1019,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
                 SELECT user_id, room_id, count(*) as cnt,
                     max(stream_ordering) as stream_ordering
                 FROM event_push_actions
-                WHERE ? <= stream_ordering AND stream_ordering < ?
+                WHERE ? < stream_ordering AND stream_ordering <= ?
                     AND %s = 1
                 GROUP BY user_id, room_id
             ) AS upd