diff --git a/changelog.d/17389.misc b/changelog.d/17389.misc
new file mode 100644
index 0000000000..7022ed93d9
--- /dev/null
+++ b/changelog.d/17389.misc
@@ -0,0 +1 @@
+Fix building debian package for debian sid.
diff --git a/changelog.d/17391.bugfix b/changelog.d/17391.bugfix
new file mode 100644
index 0000000000..9686b5c276
--- /dev/null
+++ b/changelog.d/17391.bugfix
@@ -0,0 +1 @@
+Fix bug where `/sync` requests could get blocked indefinitely after an upgrade from Synapse versions before v1.109.0.
diff --git a/docker/Dockerfile-dhvirtualenv b/docker/Dockerfile-dhvirtualenv
index b7679924c2..f000144567 100644
--- a/docker/Dockerfile-dhvirtualenv
+++ b/docker/Dockerfile-dhvirtualenv
@@ -73,6 +73,8 @@ RUN apt-get update -qq -o Acquire::Languages=none \
curl \
debhelper \
devscripts \
+ # Required for building cffi from source.
+ libffi-dev \
libsystemd-dev \
lsb-release \
pkg-config \
diff --git a/synapse/streams/events.py b/synapse/streams/events.py
index 93d5ae1a55..856f646795 100644
--- a/synapse/streams/events.py
+++ b/synapse/streams/events.py
@@ -19,6 +19,7 @@
#
#
+import logging
from typing import TYPE_CHECKING, Sequence, Tuple
import attr
@@ -41,6 +42,9 @@ if TYPE_CHECKING:
from synapse.server import HomeServer
+logger = logging.getLogger(__name__)
+
+
@attr.s(frozen=True, slots=True, auto_attribs=True)
class _EventSourcesInner:
room: RoomEventSource
@@ -139,9 +143,16 @@ class EventSources:
key
].get_max_allocated_token()
- token = token.copy_and_replace(
- key, token.room_key.bound_stream_token(max_token)
- )
+ if max_token < token_value.get_max_stream_pos():
+ logger.error(
+ "Bounding token from the future '%s': token: %s, bound: %s",
+ key,
+ token_value,
+ max_token,
+ )
+ token = token.copy_and_replace(
+ key, token_value.bound_stream_token(max_token)
+ )
else:
assert isinstance(current_value, int)
if current_value < token_value:
@@ -149,7 +160,14 @@ class EventSources:
key
].get_max_allocated_token()
- token = token.copy_and_replace(key, min(token_value, max_token))
+ if max_token < token_value:
+ logger.error(
+ "Bounding token from the future '%s': token: %s, bound: %s",
+ key,
+ token_value,
+ max_token,
+ )
+ token = token.copy_and_replace(key, max_token)
return token
diff --git a/tests/handlers/test_sync.py b/tests/handlers/test_sync.py
index 5319928c28..674dd4fb54 100644
--- a/tests/handlers/test_sync.py
+++ b/tests/handlers/test_sync.py
@@ -36,7 +36,14 @@ from synapse.handlers.sync import SyncConfig, SyncRequestKey, SyncResult, SyncVe
from synapse.rest import admin
from synapse.rest.client import knock, login, room
from synapse.server import HomeServer
-from synapse.types import JsonDict, StreamKeyType, UserID, create_requester
+from synapse.types import (
+ JsonDict,
+ MultiWriterStreamToken,
+ RoomStreamToken,
+ StreamKeyType,
+ UserID,
+ create_requester,
+)
from synapse.util import Clock
import tests.unittest
@@ -999,7 +1006,13 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
self.get_success(sync_d, by=1.0)
- def test_wait_for_invalid_future_sync_token(self) -> None:
+ @parameterized.expand(
+ [(key,) for key in StreamKeyType.__members__.values()],
+ name_func=lambda func, _, param: f"{func.__name__}_{param.args[0].name}",
+ )
+ def test_wait_for_invalid_future_sync_token(
+ self, stream_key: StreamKeyType
+ ) -> None:
"""Like the previous test, except we give a token that has a stream
position ahead of what is in the DB, i.e. its invalid and we shouldn't
wait for the stream to advance (as it may never do so).
@@ -1010,11 +1023,23 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
"""
user = self.register_user("alice", "password")
- # Create a token and arbitrarily advance one of the streams.
+ # Create a token and advance one of the streams.
current_token = self.hs.get_event_sources().get_current_token()
- since_token = current_token.copy_and_advance(
- StreamKeyType.PRESENCE, current_token.presence_key + 1
- )
+ token_value = current_token.get_field(stream_key)
+
+ # How we advance the streams depends on the type.
+ if isinstance(token_value, int):
+ since_token = current_token.copy_and_advance(stream_key, token_value + 1)
+ elif isinstance(token_value, MultiWriterStreamToken):
+ since_token = current_token.copy_and_advance(
+ stream_key, MultiWriterStreamToken(stream=token_value.stream + 1)
+ )
+ elif isinstance(token_value, RoomStreamToken):
+ since_token = current_token.copy_and_advance(
+ stream_key, RoomStreamToken(stream=token_value.stream + 1)
+ )
+ else:
+ raise Exception("Unreachable")
sync_d = defer.ensureDeferred(
self.sync_handler.wait_for_sync_for_user(
|