summary refs log tree commit diff
path: root/synapse/streams/events.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/streams/events.py')
-rw-r--r--synapse/streams/events.py26
1 files changed, 22 insertions, 4 deletions
diff --git a/synapse/streams/events.py b/synapse/streams/events.py
index 93d5ae1a55..856f646795 100644
--- a/synapse/streams/events.py
+++ b/synapse/streams/events.py
@@ -19,6 +19,7 @@
 #
 #
 
+import logging
 from typing import TYPE_CHECKING, Sequence, Tuple
 
 import attr
@@ -41,6 +42,9 @@ if TYPE_CHECKING:
     from synapse.server import HomeServer
 
 
+logger = logging.getLogger(__name__)
+
+
 @attr.s(frozen=True, slots=True, auto_attribs=True)
 class _EventSourcesInner:
     room: RoomEventSource
@@ -139,9 +143,16 @@ class EventSources:
                         key
                     ].get_max_allocated_token()
 
-                    token = token.copy_and_replace(
-                        key, token.room_key.bound_stream_token(max_token)
-                    )
+                    if max_token < token_value.get_max_stream_pos():
+                        logger.error(
+                            "Bounding token from the future '%s': token: %s, bound: %s",
+                            key,
+                            token_value,
+                            max_token,
+                        )
+                        token = token.copy_and_replace(
+                            key, token_value.bound_stream_token(max_token)
+                        )
             else:
                 assert isinstance(current_value, int)
                 if current_value < token_value:
@@ -149,7 +160,14 @@ class EventSources:
                         key
                     ].get_max_allocated_token()
 
-                    token = token.copy_and_replace(key, min(token_value, max_token))
+                    if max_token < token_value:
+                        logger.error(
+                            "Bounding token from the future '%s': token: %s, bound: %s",
+                            key,
+                            token_value,
+                            max_token,
+                        )
+                        token = token.copy_and_replace(key, max_token)
 
         return token