diff --git a/changelog.d/17529.misc b/changelog.d/17529.misc
new file mode 100644
index 0000000000..37b2ee07a4
--- /dev/null
+++ b/changelog.d/17529.misc
@@ -0,0 +1 @@
+Reset the sliding sync connection if we don't recognize the per-connection state position.
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index dd4a1ae706..99fc7eab54 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -128,6 +128,10 @@ class Codes(str, Enum):
# MSC2677
DUPLICATE_ANNOTATION = "M_DUPLICATE_ANNOTATION"
+ # MSC3575 we are telling the client they need to reset their sliding sync
+ # connection.
+ UNKNOWN_POS = "M_UNKNOWN_POS"
+
class CodeMessageException(RuntimeError):
"""An exception with integer code, a message string attributes and optional headers.
@@ -847,3 +851,17 @@ class PartialStateConflictError(SynapseError):
msg=PartialStateConflictError.message(),
errcode=Codes.UNKNOWN,
)
+
+
+class SlidingSyncUnknownPosition(SynapseError):
+ """An error that Synapse can return to signal to the client to expire their
+ sliding sync connection (i.e. send a new request without a `?since=`
+ param).
+ """
+
+ def __init__(self) -> None:
+ super().__init__(
+ HTTPStatus.BAD_REQUEST,
+ msg="Unknown position",
+ errcode=Codes.UNKNOWN_POS,
+ )
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 8467766518..1936471345 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -47,6 +47,7 @@ from synapse.api.constants import (
EventTypes,
Membership,
)
+from synapse.api.errors import SlidingSyncUnknownPosition
from synapse.events import EventBase, StrippedStateEvent
from synapse.events.utils import parse_stripped_state_event, strip_event
from synapse.handlers.relations import BundledAggregations
@@ -491,6 +492,22 @@ class SlidingSyncHandler:
# See https://github.com/matrix-org/matrix-doc/issues/1144
raise NotImplementedError()
+ if from_token:
+ # Check that we recognize the connection position, if not tell the
+ # clients that they need to start again.
+ #
+ # If we don't do this and the client asks for the full range of
+ # rooms, we end up sending down all rooms and their state from
+ # scratch (which can be very slow). By expiring the connection we
+ # allow the client a chance to do an initial request with a smaller
+ # range of rooms to get them some results sooner but will end up
+ # taking the same amount of time (more with round-trips and
+ # re-processing) in the end to get everything again.
+ if not await self.connection_store.is_valid_token(
+ sync_config, from_token.connection_position
+ ):
+ raise SlidingSyncUnknownPosition()
+
await self.connection_store.mark_token_seen(
sync_config=sync_config,
from_token=from_token,
@@ -2821,6 +2838,16 @@ class SlidingSyncConnectionStore:
attr.Factory(dict)
)
+ async def is_valid_token(
+ self, sync_config: SlidingSyncConfig, connection_token: int
+ ) -> bool:
+ """Return whether the connection token is valid/recognized"""
+ if connection_token == 0:
+ return True
+
+ conn_key = self._get_connection_key(sync_config)
+ return connection_token in self._connections.get(conn_key, {})
+
async def have_sent_room(
self, sync_config: SlidingSyncConfig, connection_token: int, room_id: str
) -> HaveSentRoom:
diff --git a/tests/rest/client/sliding_sync/test_rooms_required_state.py b/tests/rest/client/sliding_sync/test_rooms_required_state.py
index 03e36914ae..a13cad223f 100644
--- a/tests/rest/client/sliding_sync/test_rooms_required_state.py
+++ b/tests/rest/client/sliding_sync/test_rooms_required_state.py
@@ -161,10 +161,10 @@ class SlidingSyncRoomsRequiredStateTestCase(SlidingSyncBase):
self.assertIsNone(response_body["rooms"][room_id1].get("required_state"))
self.assertIsNone(response_body["rooms"][room_id1].get("invite_state"))
- def test_rooms_required_state_incremental_sync_restart(self) -> None:
+ def test_rooms_incremental_sync_restart(self) -> None:
"""
- Test `rooms.required_state` returns requested state events in the room during an
- incremental sync, after a restart (and so the in memory caches are reset).
+ Test that after a restart (and so the in memory caches are reset) that
+ we correctly return an `M_UNKNOWN_POS`
"""
user1_id = self.register_user("user1", "pass")
@@ -195,22 +195,16 @@ class SlidingSyncRoomsRequiredStateTestCase(SlidingSyncBase):
self.hs.get_sliding_sync_handler().connection_store._connections.clear()
# Make the Sliding Sync request
- response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
-
- # If the cache has been cleared then we do expect the state to come down
- state_map = self.get_success(
- self.storage_controllers.state.get_current_state(room_id1)
+ channel = self.make_request(
+ method="POST",
+ path=self.sync_endpoint + f"?pos={from_token}",
+ content=sync_body,
+ access_token=user1_tok,
)
-
- self._assertRequiredStateIncludes(
- response_body["rooms"][room_id1]["required_state"],
- {
- state_map[(EventTypes.Create, "")],
- state_map[(EventTypes.RoomHistoryVisibility, "")],
- },
- exact=True,
+ self.assertEqual(channel.code, 400, channel.json_body)
+ self.assertEqual(
+ channel.json_body["errcode"], "M_UNKNOWN_POS", channel.json_body
)
- self.assertIsNone(response_body["rooms"][room_id1].get("invite_state"))
def test_rooms_required_state_wildcard(self) -> None:
"""
|