summary refs log tree commit diff
path: root/synapse/streams
diff options
context:
space:
mode:
authorErik Johnston <erikj@element.io>2024-07-02 14:06:36 +0100
committerGitHub <noreply@github.com>2024-07-02 14:06:36 +0100
commitb905ae27caac4bb27262d9d7ac6e834de5694f10 (patch)
tree819a4517b51e842bbc447d84b64e3b5f24344bc3 /synapse/streams
parentFix sync waiting for an invalid token from the "future" (#17386) (diff)
downloadsynapse-b905ae27caac4bb27262d9d7ac6e834de5694f10.tar.xz
Fix regression when bounding future tokens (#17391)
Fix bug added in #17386, where we accidentally used `room_key` for the
receipts stream. See first commit.

Reviewable commit-by-commit
Diffstat (limited to 'synapse/streams')
-rw-r--r--synapse/streams/events.py26
1 files changed, 22 insertions, 4 deletions
diff --git a/synapse/streams/events.py b/synapse/streams/events.py
index 93d5ae1a55..856f646795 100644
--- a/synapse/streams/events.py
+++ b/synapse/streams/events.py
@@ -19,6 +19,7 @@
 #
 #
 
+import logging
 from typing import TYPE_CHECKING, Sequence, Tuple
 
 import attr
@@ -41,6 +42,9 @@ if TYPE_CHECKING:
     from synapse.server import HomeServer
 
 
+logger = logging.getLogger(__name__)
+
+
 @attr.s(frozen=True, slots=True, auto_attribs=True)
 class _EventSourcesInner:
     room: RoomEventSource
@@ -139,9 +143,16 @@ class EventSources:
                         key
                     ].get_max_allocated_token()
 
-                    token = token.copy_and_replace(
-                        key, token.room_key.bound_stream_token(max_token)
-                    )
+                    if max_token < token_value.get_max_stream_pos():
+                        logger.error(
+                            "Bounding token from the future '%s': token: %s, bound: %s",
+                            key,
+                            token_value,
+                            max_token,
+                        )
+                        token = token.copy_and_replace(
+                            key, token_value.bound_stream_token(max_token)
+                        )
             else:
                 assert isinstance(current_value, int)
                 if current_value < token_value:
@@ -149,7 +160,14 @@ class EventSources:
                         key
                     ].get_max_allocated_token()
 
-                    token = token.copy_and_replace(key, min(token_value, max_token))
+                    if max_token < token_value:
+                        logger.error(
+                            "Bounding token from the future '%s': token: %s, bound: %s",
+                            key,
+                            token_value,
+                            max_token,
+                        )
+                        token = token.copy_and_replace(key, max_token)
 
         return token