diff --git a/synapse/streams/events.py b/synapse/streams/events.py
index 93d5ae1a55..856f646795 100644
--- a/synapse/streams/events.py
+++ b/synapse/streams/events.py
@@ -19,6 +19,7 @@
#
#
+import logging
from typing import TYPE_CHECKING, Sequence, Tuple
import attr
@@ -41,6 +42,9 @@ if TYPE_CHECKING:
from synapse.server import HomeServer
+logger = logging.getLogger(__name__)
+
+
@attr.s(frozen=True, slots=True, auto_attribs=True)
class _EventSourcesInner:
room: RoomEventSource
@@ -139,9 +143,16 @@ class EventSources:
key
].get_max_allocated_token()
- token = token.copy_and_replace(
- key, token.room_key.bound_stream_token(max_token)
- )
+ if max_token < token_value.get_max_stream_pos():
+ logger.error(
+ "Bounding token from the future '%s': token: %s, bound: %s",
+ key,
+ token_value,
+ max_token,
+ )
+ token = token.copy_and_replace(
+ key, token_value.bound_stream_token(max_token)
+ )
else:
assert isinstance(current_value, int)
if current_value < token_value:
@@ -149,7 +160,14 @@ class EventSources:
key
].get_max_allocated_token()
- token = token.copy_and_replace(key, min(token_value, max_token))
+ if max_token < token_value:
+ logger.error(
+ "Bounding token from the future '%s': token: %s, bound: %s",
+ key,
+ token_value,
+ max_token,
+ )
+ token = token.copy_and_replace(key, max_token)
return token
|