summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/17529.misc1
-rw-r--r--synapse/api/errors.py18
-rw-r--r--synapse/handlers/sliding_sync.py27
-rw-r--r--tests/rest/client/sliding_sync/test_rooms_required_state.py28
4 files changed, 57 insertions, 17 deletions
diff --git a/changelog.d/17529.misc b/changelog.d/17529.misc
new file mode 100644
index 0000000000..37b2ee07a4
--- /dev/null
+++ b/changelog.d/17529.misc
@@ -0,0 +1 @@
+Reset the sliding sync connection if we don't recognize the per-connection state position.
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index dd4a1ae706..99fc7eab54 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -128,6 +128,10 @@ class Codes(str, Enum):
     # MSC2677
     DUPLICATE_ANNOTATION = "M_DUPLICATE_ANNOTATION"
 
+    # MSC3575 we are telling the client they need to reset their sliding sync
+    # connection.
+    UNKNOWN_POS = "M_UNKNOWN_POS"
+
 
 class CodeMessageException(RuntimeError):
     """An exception with integer code, a message string attributes and optional headers.
@@ -847,3 +851,17 @@ class PartialStateConflictError(SynapseError):
             msg=PartialStateConflictError.message(),
             errcode=Codes.UNKNOWN,
         )
+
+
+class SlidingSyncUnknownPosition(SynapseError):
+    """An error that Synapse can return to signal to the client to expire their
+    sliding sync connection (i.e. send a new request without a `?since=`
+    param).
+    """
+
+    def __init__(self) -> None:
+        super().__init__(
+            HTTPStatus.BAD_REQUEST,
+            msg="Unknown position",
+            errcode=Codes.UNKNOWN_POS,
+        )
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 8467766518..1936471345 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -47,6 +47,7 @@ from synapse.api.constants import (
     EventTypes,
     Membership,
 )
+from synapse.api.errors import SlidingSyncUnknownPosition
 from synapse.events import EventBase, StrippedStateEvent
 from synapse.events.utils import parse_stripped_state_event, strip_event
 from synapse.handlers.relations import BundledAggregations
@@ -491,6 +492,22 @@ class SlidingSyncHandler:
             # See https://github.com/matrix-org/matrix-doc/issues/1144
             raise NotImplementedError()
 
+        if from_token:
+            # Check that we recognize the connection position, if not tell the
+            # clients that they need to start again.
+            #
+            # If we don't do this and the client asks for the full range of
+            # rooms, we end up sending down all rooms and their state from
+            # scratch (which can be very slow). By expiring the connection we
+            # allow the client a chance to do an initial request with a smaller
+            # range of rooms to get them some results sooner but will end up
+            # taking the same amount of time (more with round-trips and
+            # re-processing) in the end to get everything again.
+            if not await self.connection_store.is_valid_token(
+                sync_config, from_token.connection_position
+            ):
+                raise SlidingSyncUnknownPosition()
+
         await self.connection_store.mark_token_seen(
             sync_config=sync_config,
             from_token=from_token,
@@ -2821,6 +2838,16 @@ class SlidingSyncConnectionStore:
         attr.Factory(dict)
     )
 
+    async def is_valid_token(
+        self, sync_config: SlidingSyncConfig, connection_token: int
+    ) -> bool:
+        """Return whether the connection token is valid/recognized"""
+        if connection_token == 0:
+            return True
+
+        conn_key = self._get_connection_key(sync_config)
+        return connection_token in self._connections.get(conn_key, {})
+
     async def have_sent_room(
         self, sync_config: SlidingSyncConfig, connection_token: int, room_id: str
     ) -> HaveSentRoom:
diff --git a/tests/rest/client/sliding_sync/test_rooms_required_state.py b/tests/rest/client/sliding_sync/test_rooms_required_state.py
index 03e36914ae..a13cad223f 100644
--- a/tests/rest/client/sliding_sync/test_rooms_required_state.py
+++ b/tests/rest/client/sliding_sync/test_rooms_required_state.py
@@ -161,10 +161,10 @@ class SlidingSyncRoomsRequiredStateTestCase(SlidingSyncBase):
         self.assertIsNone(response_body["rooms"][room_id1].get("required_state"))
         self.assertIsNone(response_body["rooms"][room_id1].get("invite_state"))
 
-    def test_rooms_required_state_incremental_sync_restart(self) -> None:
+    def test_rooms_incremental_sync_restart(self) -> None:
         """
-        Test `rooms.required_state` returns requested state events in the room during an
-        incremental sync, after a restart (and so the in memory caches are reset).
+        Test that after a restart (and so the in memory caches are reset) that
+        we correctly return an `M_UNKNOWN_POS`
         """
 
         user1_id = self.register_user("user1", "pass")
@@ -195,22 +195,16 @@ class SlidingSyncRoomsRequiredStateTestCase(SlidingSyncBase):
         self.hs.get_sliding_sync_handler().connection_store._connections.clear()
 
         # Make the Sliding Sync request
-        response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
-
-        # If the cache has been cleared then we do expect the state to come down
-        state_map = self.get_success(
-            self.storage_controllers.state.get_current_state(room_id1)
+        channel = self.make_request(
+            method="POST",
+            path=self.sync_endpoint + f"?pos={from_token}",
+            content=sync_body,
+            access_token=user1_tok,
         )
-
-        self._assertRequiredStateIncludes(
-            response_body["rooms"][room_id1]["required_state"],
-            {
-                state_map[(EventTypes.Create, "")],
-                state_map[(EventTypes.RoomHistoryVisibility, "")],
-            },
-            exact=True,
+        self.assertEqual(channel.code, 400, channel.json_body)
+        self.assertEqual(
+            channel.json_body["errcode"], "M_UNKNOWN_POS", channel.json_body
         )
-        self.assertIsNone(response_body["rooms"][room_id1].get("invite_state"))
 
     def test_rooms_required_state_wildcard(self) -> None:
         """