summary refs log tree commit diff
path: root/synapse/handlers/sliding_sync.py
diff options
context:
space:
mode:
authorErik Johnston <erikj@element.io>2024-07-29 22:45:48 +0100
committerGitHub <noreply@github.com>2024-07-29 22:45:48 +0100
commitbe4a16ff445c9dfba04aeaed695afb3a56e204f7 (patch)
tree62de0ad9059e727a072ea06743afbf21765991ae /synapse/handlers/sliding_sync.py
parentRefactor Sliding Sync tests to better utilize the `SlidingSyncBase.do_sync(..... (diff)
downloadsynapse-be4a16ff445c9dfba04aeaed695afb3a56e204f7.tar.xz
Sliding Sync: Track whether we have sent rooms down to clients (#17447)
The basic idea is that we introduce a new token for a sliding sync
connection, which stores the mapping of room to room "status" (i.e. have
we sent the room down?). This token allows us to handle duplicate
requests properly. In future it can be used to store more
"per-connection" information safely.

In future this should be migrated into the DB, so its important that we
try to reduce the number of syncs where we need to update the
per-connection information. In this PoC this only happens when we: a)
send down a set of room for the first time, or b) we have previously
sent down a room and there are updates but we are not sending the room
down the sync (due to not falling in a list range)

Co-authored-by: Eric Eastwood <eric.eastwood@beta.gouv.fr>
Diffstat (limited to 'synapse/handlers/sliding_sync.py')
-rw-r--r--synapse/handlers/sliding_sync.py350
1 files changed, 314 insertions, 36 deletions
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 3231574402..2b74f1c9c9 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -18,6 +18,7 @@
 #
 #
 import logging
+from enum import Enum
 from itertools import chain
 from typing import (
     TYPE_CHECKING,
@@ -34,6 +35,7 @@ from typing import (
 
 import attr
 from immutabledict import immutabledict
+from typing_extensions import assert_never
 
 from synapse.api.constants import AccountDataTypes, Direction, EventTypes, Membership
 from synapse.events import EventBase
@@ -52,6 +54,7 @@ from synapse.types import (
     RoomStreamToken,
     SlidingSyncStreamToken,
     StateMap,
+    StrCollection,
     StreamKeyType,
     StreamToken,
     UserID,
@@ -361,6 +364,8 @@ class SlidingSyncHandler:
         self.push_rules_handler = hs.get_push_rules_handler()
         self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync
 
+        self.connection_store = SlidingSyncConnectionStore()
+
     async def wait_for_sync_for_user(
         self,
         requester: Requester,
@@ -464,6 +469,11 @@ class SlidingSyncHandler:
             # See https://github.com/matrix-org/matrix-doc/issues/1144
             raise NotImplementedError()
 
+        await self.connection_store.mark_token_seen(
+            sync_config=sync_config,
+            from_token=from_token,
+        )
+
         # Get all of the room IDs that the user should be able to see in the sync
         # response
         has_lists = sync_config.lists is not None and len(sync_config.lists) > 0
@@ -613,7 +623,7 @@ class SlidingSyncHandler:
         @tag_args
         async def handle_room(room_id: str) -> None:
             room_sync_result = await self.get_room_sync_data(
-                user=sync_config.user,
+                sync_config=sync_config,
                 room_id=room_id,
                 room_sync_config=relevant_room_map[room_id],
                 room_membership_for_user_at_to_token=room_membership_for_user_map[
@@ -635,11 +645,22 @@ class SlidingSyncHandler:
             to_token=to_token,
         )
 
-        # TODO: Update this when we implement per-connection state
-        connection_token = 0
+        if has_lists or has_room_subscriptions:
+            connection_position = await self.connection_store.record_rooms(
+                sync_config=sync_config,
+                from_token=from_token,
+                sent_room_ids=relevant_room_map.keys(),
+                # TODO: We need to calculate which rooms have had updates since the `from_token` but were not included in the `sent_room_ids`
+                unsent_room_ids=[],
+            )
+        elif from_token:
+            connection_position = from_token.connection_position
+        else:
+            # Initial sync without a `from_token` starts at `0`
+            connection_position = 0
 
         return SlidingSyncResult(
-            next_pos=SlidingSyncStreamToken(to_token, connection_token),
+            next_pos=SlidingSyncStreamToken(to_token, connection_position),
             lists=lists,
             rooms=rooms,
             extensions=extensions,
@@ -1370,7 +1391,7 @@ class SlidingSyncHandler:
 
     async def get_room_sync_data(
         self,
-        user: UserID,
+        sync_config: SlidingSyncConfig,
         room_id: str,
         room_sync_config: RoomSyncConfig,
         room_membership_for_user_at_to_token: _RoomMembershipForUser,
@@ -1392,6 +1413,37 @@ class SlidingSyncHandler:
             from_token: The point in the stream to sync from.
             to_token: The point in the stream to sync up to.
         """
+        user = sync_config.user
+
+        # Determine whether we should limit the timeline to the token range.
+        #
+        # We should return historical messages (before token range) in the
+        # following cases because we want clients to be able to show a basic
+        # screen of information:
+        #  - Initial sync (because no `from_token` to limit us anyway)
+        #  - When users `newly_joined`
+        #  - For an incremental sync where we haven't sent it down this
+        #    connection before
+        from_bound = None
+        initial = True
+        if from_token and not room_membership_for_user_at_to_token.newly_joined:
+            room_status = await self.connection_store.have_sent_room(
+                sync_config=sync_config,
+                connection_token=from_token.connection_position,
+                room_id=room_id,
+            )
+            if room_status.status == HaveSentRoomFlag.LIVE:
+                from_bound = from_token.stream_token.room_key
+                initial = False
+            elif room_status.status == HaveSentRoomFlag.PREVIOUSLY:
+                assert room_status.last_token is not None
+                from_bound = room_status.last_token
+                initial = False
+            elif room_status.status == HaveSentRoomFlag.NEVER:
+                from_bound = None
+                initial = True
+            else:
+                assert_never(room_status.status)
 
         # Assemble the list of timeline events
         #
@@ -1418,36 +1470,23 @@ class SlidingSyncHandler:
             prev_batch_token = to_token
 
             # We're going to paginate backwards from the `to_token`
-            from_bound = to_token.room_key
+            to_bound = to_token.room_key
             # People shouldn't see past their leave/ban event
             if room_membership_for_user_at_to_token.membership in (
                 Membership.LEAVE,
                 Membership.BAN,
             ):
-                from_bound = (
+                to_bound = (
                     room_membership_for_user_at_to_token.event_pos.to_room_stream_token()
                 )
 
-            # Determine whether we should limit the timeline to the token range.
-            #
-            # We should return historical messages (before token range) in the
-            # following cases because we want clients to be able to show a basic
-            # screen of information:
-            #  - Initial sync (because no `from_token` to limit us anyway)
-            #  - When users `newly_joined`
-            #  - TODO: For an incremental sync where we haven't sent it down this
-            #    connection before
-            to_bound = (
-                from_token.stream_token.room_key
-                if from_token is not None
-                and not room_membership_for_user_at_to_token.newly_joined
-                else None
-            )
-
             timeline_events, new_room_key = await self.store.paginate_room_events(
                 room_id=room_id,
-                from_key=from_bound,
-                to_key=to_bound,
+                # The bounds are reversed so we can paginate backwards
+                # (from newer to older events) starting at to_bound.
+                # This ensures we fill the `limit` with the newest events first,
+                from_key=to_bound,
+                to_key=from_bound,
                 direction=Direction.BACKWARDS,
                 # We add one so we can determine if there are enough events to saturate
                 # the limit or not (see `limited`)
@@ -1564,12 +1603,6 @@ class SlidingSyncHandler:
         # indicate to the client that a state reset happened. Perhaps we should indicate
         # this by setting `initial: True` and empty `required_state`.
 
-        # TODO: Since we can't determine whether we've already sent a room down this
-        # Sliding Sync connection before (we plan to add this optimization in the
-        # future), we're always returning the requested room state instead of
-        # updates.
-        initial = True
-
         # Check whether the room has a name set
         name_state_ids = await self.get_current_state_ids_at(
             room_id=room_id,
@@ -1715,9 +1748,22 @@ class SlidingSyncHandler:
                 to_token=to_token,
             )
         else:
-            # TODO: Once we can figure out if we've sent a room down this connection before,
-            # we can return updates instead of the full required state.
-            raise NotImplementedError()
+            assert from_bound is not None
+
+            # TODO: Limit the number of state events we're about to send down
+            # the room, if its too many we should change this to an
+            # `initial=True`?
+            deltas = await self.store.get_current_state_deltas_for_room(
+                room_id=room_id,
+                from_token=from_bound,
+                to_token=to_token.room_key,
+            )
+            # TODO: Filter room state before fetching events
+            # TODO: Handle state resets where event_id is None
+            events = await self.store.get_events(
+                [d.event_id for d in deltas if d.event_id]
+            )
+            room_state = {(s.type, s.state_key): s for s in events.values()}
 
         required_room_state: StateMap[EventBase] = {}
         if required_state_filter != StateFilter.none():
@@ -1863,7 +1909,7 @@ class SlidingSyncHandler:
             to_token: The point in the stream to sync up to.
         """
         user_id = sync_config.user.to_string()
-        device_id = sync_config.device_id
+        device_id = sync_config.requester.device_id
 
         # Skip if the extension is not enabled
         if not to_device_request.enabled:
@@ -1939,7 +1985,7 @@ class SlidingSyncHandler:
             from_token: The point in the stream to sync from.
         """
         user_id = sync_config.user.to_string()
-        device_id = sync_config.device_id
+        device_id = sync_config.requester.device_id
 
         # Skip if the extension is not enabled
         if not e2ee_request.enabled:
@@ -2094,3 +2140,235 @@ class SlidingSyncHandler:
             global_account_data_map=global_account_data_map,
             account_data_by_room_map=account_data_by_room_map,
         )
+
+
+class HaveSentRoomFlag(Enum):
+    """Flag for whether we have sent the room down a sliding sync connection.
+
+    The valid state changes here are:
+        NEVER -> LIVE
+        LIVE -> PREVIOUSLY
+        PREVIOUSLY -> LIVE
+    """
+
+    # The room has never been sent down (or we have forgotten we have sent it
+    # down).
+    NEVER = 1
+
+    # We have previously sent the room down, but there are updates that we
+    # haven't sent down.
+    PREVIOUSLY = 2
+
+    # We have sent the room down and the client has received all updates.
+    LIVE = 3
+
+
+@attr.s(auto_attribs=True, slots=True, frozen=True)
+class HaveSentRoom:
+    """Whether we have sent the room down a sliding sync connection.
+
+    Attributes:
+        status: Flag of if we have or haven't sent down the room
+        last_token: If the flag is `PREVIOUSLY` then this is non-null and
+            contains the last stream token of the last updates we sent down
+            the room, i.e. we still need to send everything since then to the
+            client.
+    """
+
+    status: HaveSentRoomFlag
+    last_token: Optional[RoomStreamToken]
+
+    @staticmethod
+    def previously(last_token: RoomStreamToken) -> "HaveSentRoom":
+        """Constructor for `PREVIOUSLY` flag."""
+        return HaveSentRoom(HaveSentRoomFlag.PREVIOUSLY, last_token)
+
+
+HAVE_SENT_ROOM_NEVER = HaveSentRoom(HaveSentRoomFlag.NEVER, None)
+HAVE_SENT_ROOM_LIVE = HaveSentRoom(HaveSentRoomFlag.LIVE, None)
+
+
+@attr.s(auto_attribs=True)
+class SlidingSyncConnectionStore:
+    """In-memory store of per-connection state, including what rooms we have
+    previously sent down a sliding sync connection.
+
+    Note: This is NOT safe to run in a worker setup because connection positions will
+    point to different sets of rooms on different workers. e.g. for the same connection,
+    a connection position of 5 might have totally different states on worker A and
+    worker B.
+
+     One complication that we need to deal with here is needing to handle requests being
+    resent, i.e. if we sent down a room in a response that the client received, we must
+    consider the room *not* sent when we get the request again.
+
+    This is handled by using an integer "token", which is returned to the client
+    as part of the sync token. For each connection we store a mapping from
+    tokens to the room states, and create a new entry when we send down new
+    rooms.
+
+    Note that for any given sliding sync connection we will only store a maximum
+    of two different tokens: the previous token from the request and a new token
+    sent in the response. When we receive a request with a given token, we then
+    clear out all other entries with a different token.
+
+    Attributes:
+        _connections: Mapping from `(user_id, conn_id)` to mapping of `token`
+            to mapping of room ID to `HaveSentRoom`.
+    """
+
+    # `(user_id, conn_id)` -> `token` -> `room_id` -> `HaveSentRoom`
+    _connections: Dict[Tuple[str, str], Dict[int, Dict[str, HaveSentRoom]]] = (
+        attr.Factory(dict)
+    )
+
+    async def have_sent_room(
+        self, sync_config: SlidingSyncConfig, connection_token: int, room_id: str
+    ) -> HaveSentRoom:
+        """For the given user_id/conn_id/token, return whether we have
+        previously sent the room down
+        """
+
+        conn_key = self._get_connection_key(sync_config)
+        sync_statuses = self._connections.setdefault(conn_key, {})
+        room_status = sync_statuses.get(connection_token, {}).get(
+            room_id, HAVE_SENT_ROOM_NEVER
+        )
+
+        return room_status
+
+    async def record_rooms(
+        self,
+        sync_config: SlidingSyncConfig,
+        from_token: Optional[SlidingSyncStreamToken],
+        *,
+        sent_room_ids: StrCollection,
+        unsent_room_ids: StrCollection,
+    ) -> int:
+        """Record which rooms we have/haven't sent down in a new response
+
+        Attributes:
+            sync_config
+            from_token: The since token from the request, if any
+            sent_room_ids: The set of room IDs that we have sent down as
+                part of this request (only needs to be ones we didn't
+                previously sent down).
+            unsent_room_ids: The set of room IDs that have had updates
+                since the `from_token`, but which were not included in
+                this request
+        """
+        prev_connection_token = 0
+        if from_token is not None:
+            prev_connection_token = from_token.connection_position
+
+        # If there are no changes then this is a noop.
+        if not sent_room_ids and not unsent_room_ids:
+            return prev_connection_token
+
+        conn_key = self._get_connection_key(sync_config)
+        sync_statuses = self._connections.setdefault(conn_key, {})
+
+        # Generate a new token, removing any existing entries in that token
+        # (which can happen if requests get resent).
+        new_store_token = prev_connection_token + 1
+        sync_statuses.pop(new_store_token, None)
+
+        # Copy over and update the room mappings.
+        new_room_statuses = dict(sync_statuses.get(prev_connection_token, {}))
+
+        # Whether we have updated the `new_room_statuses`, if we don't by the
+        # end we can treat this as a noop.
+        have_updated = False
+        for room_id in sent_room_ids:
+            new_room_statuses[room_id] = HAVE_SENT_ROOM_LIVE
+            have_updated = True
+
+        # Whether we add/update the entries for unsent rooms depends on the
+        # existing entry:
+        #   - LIVE: We have previously sent down everything up to
+        #     `last_room_token, so we update the entry to be `PREVIOUSLY` with
+        #     `last_room_token`.
+        #   - PREVIOUSLY: We have previously sent down everything up to *a*
+        #     given token, so we don't need to update the entry.
+        #   - NEVER: We have never previously sent down the room, and we haven't
+        #     sent anything down this time either so we leave it as NEVER.
+
+        # Work out the new state for unsent rooms that were `LIVE`.
+        if from_token:
+            new_unsent_state = HaveSentRoom.previously(from_token.stream_token.room_key)
+        else:
+            new_unsent_state = HAVE_SENT_ROOM_NEVER
+
+        for room_id in unsent_room_ids:
+            prev_state = new_room_statuses.get(room_id)
+            if prev_state is not None and prev_state.status == HaveSentRoomFlag.LIVE:
+                new_room_statuses[room_id] = new_unsent_state
+                have_updated = True
+
+        if not have_updated:
+            return prev_connection_token
+
+        sync_statuses[new_store_token] = new_room_statuses
+
+        return new_store_token
+
+    async def mark_token_seen(
+        self,
+        sync_config: SlidingSyncConfig,
+        from_token: Optional[SlidingSyncStreamToken],
+    ) -> None:
+        """We have received a request with the given token, so we can clear out
+        any other tokens associated with the connection.
+
+        If there is no from token then we have started afresh, and so we delete
+        all tokens associated with the device.
+        """
+        # Clear out any tokens for the connection that doesn't match the one
+        # from the request.
+
+        conn_key = self._get_connection_key(sync_config)
+        sync_statuses = self._connections.pop(conn_key, {})
+        if from_token is None:
+            return
+
+        sync_statuses = {
+            connection_token: room_statuses
+            for connection_token, room_statuses in sync_statuses.items()
+            if connection_token == from_token.connection_position
+        }
+        if sync_statuses:
+            self._connections[conn_key] = sync_statuses
+
+    @staticmethod
+    def _get_connection_key(sync_config: SlidingSyncConfig) -> Tuple[str, str]:
+        """Return a unique identifier for this connection.
+
+        The first part is simply the user ID.
+
+        The second part is generally a combination of device ID and conn_id.
+        However, both these two are optional (e.g. puppet access tokens don't
+        have device IDs), so this handles those edge cases.
+
+        We use this over the raw `conn_id` to avoid clashes between different
+        clients that use the same `conn_id`. Imagine a user uses a web client
+        that uses `conn_id: main_sync_loop` and an Android client that also has
+        a `conn_id: main_sync_loop`.
+        """
+
+        user_id = sync_config.user.to_string()
+
+        # Only one sliding sync connection is allowed per given conn_id (empty
+        # or not).
+        conn_id = sync_config.conn_id or ""
+
+        if sync_config.requester.device_id:
+            return (user_id, f"D/{sync_config.requester.device_id}/{conn_id}")
+
+        if sync_config.requester.access_token_id:
+            # If we don't have a device, then the access token ID should be a
+            # stable ID.
+            return (user_id, f"A/{sync_config.requester.access_token_id}/{conn_id}")
+
+        # If we have neither then its likely an AS or some weird token. Either
+        # way we can just fail here.
+        raise Exception("Cannot use sliding sync with access token type")