diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 3aa139ff1c..1c2ed95660 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -18,6 +18,7 @@
#
#
import logging
+from enum import Enum
from itertools import chain
from typing import TYPE_CHECKING, Any, Dict, Final, List, Mapping, Optional, Set, Tuple
@@ -38,6 +39,7 @@ from synapse.types import (
RoomStreamToken,
SlidingSyncStreamToken,
StateMap,
+ StrCollection,
StreamKeyType,
StreamToken,
UserID,
@@ -1861,3 +1863,198 @@ class SlidingSyncHandler:
next_batch=f"{stream_id}",
events=messages,
)
+
+
+class HaveSentRoomFlag(Enum):
+ """Flag for whether we have sent the room down a sliding sync connection.
+
+ The valid state changes here are:
+ NEVER -> LIVE
+ LIVE -> PREVIOUSLY
+ PREVIOUSLY -> LIVE
+ """
+
+ # The room has never been sent down (or we have forgotten we have sent it
+ # down).
+ NEVER = 1
+
+ # We have previously sent the room down, but there are updates that we
+ # haven't sent down.
+ PREVIOUSLY = 2
+
+ # We have sent the room down and the client has received all updates.
+ LIVE = 3
+
+
+@attr.s(auto_attribs=True, slots=True, frozen=True)
+class HaveSentRoom:
+ """Whether we have sent the room down a sliding sync connection.
+
+ Attributes:
+ status: Flag of if we have or haven't sent down the room
+ last_token: If the flag is `PREVIOUSLY` then this is non-null and
+ contains the last stream token of the last updates we sent down
+ the room, i.e. we still need to send everything since then to the
+ client.
+ """
+
+ status: HaveSentRoomFlag
+ last_token: Optional[RoomStreamToken]
+
+ @staticmethod
+ def previously(last_token: RoomStreamToken) -> "HaveSentRoom":
+ """Constructor for `PREVIOUSLY` flag."""
+ return HaveSentRoom(HaveSentRoomFlag.PREVIOUSLY, last_token)
+
+
+HAVE_SENT_ROOM_NEVER = HaveSentRoom(HaveSentRoomFlag.NEVER, None)
+HAVE_SENT_ROOM_LIVE = HaveSentRoom(HaveSentRoomFlag.LIVE, None)
+
+
+@attr.s(auto_attribs=True)
+class SlidingSyncConnectionStore:
+ """In-memory store of per-connection state, including what rooms we have
+ previously sent down a sliding sync connection.
+
+ Note: This is NOT safe to run in a worker setup.
+
+ The complication here is that we need to handle requests being resent, i.e.
+ if we sent down a room in a response that the client received, we must
+ consider the room *not* sent when we get the request again.
+
+ This is handled by using an integer "token", which is returned to the client
+ as part of the sync token. For each connection we store a mapping from
+ tokens to the room states, and create a new entry when we send down new
+ rooms.
+
+ Note that for any given sliding sync connection we will only store a maximum
+ of two different tokens: the previous token from the request and a new token
+ sent in the response. When we receive a request with a given token, we then
+ clear out all other entries with a different token.
+
+ Attributes:
+ _connections: Mapping from `(user_id, conn_id)` to mapping of `token`
+ to mapping of room ID to `HaveSentRoom`.
+ """
+
+ # `(user_id, conn_id)` -> `token` -> `room_id` -> `HaveSentRoom`
+ _connections: Dict[Tuple[str, str], Dict[int, Dict[str, HaveSentRoom]]] = (
+ attr.Factory(dict)
+ )
+
+ async def have_sent_room(
+ self, user_id: str, conn_id: str, connection_token: int, room_id: str
+ ) -> HaveSentRoom:
+ """Whether for the given user_id/conn_id/token, return whether we have
+ previously sent the room down
+ """
+
+ sync_statuses = self._connections.setdefault((user_id, conn_id), {})
+ room_status = sync_statuses.get(connection_token, {}).get(
+ room_id, HAVE_SENT_ROOM_NEVER
+ )
+
+ return room_status
+
+ async def record_rooms(
+ self,
+ user_id: str,
+ conn_id: str,
+ from_token: Optional[SlidingSyncStreamToken],
+ *,
+ sent_room_ids: StrCollection,
+ unsent_room_ids: StrCollection,
+ ) -> int:
+ """Record which rooms we have/haven't sent down in a new response
+
+ Attributes:
+ user_id
+ conn_id
+ from_token: The since token from the request, if any
+ sent_room_ids: The set of room IDs that we have sent down as
+ part of this request (only needs to be ones we didn't
+ previously sent down).
+ unsent_room_ids: The set of room IDs that have had updates
+ since the `last_room_token`, but which were not included in
+ this request
+ """
+ prev_connection_token = 0
+ if from_token is not None:
+ prev_connection_token = from_token.connection_token
+
+ # If there are no changes then this is a noop.
+ if not sent_room_ids and not unsent_room_ids:
+ return prev_connection_token
+
+ sync_statuses = self._connections.setdefault((user_id, conn_id), {})
+
+ # Generate a new token, removing any existing entries in that token
+ # (which can happen if requests get resent).
+ new_store_token = prev_connection_token + 1
+ sync_statuses.pop(new_store_token, None)
+
+ # Copy over and update the room mappings.
+ new_room_statuses = dict(sync_statuses.get(prev_connection_token, {}))
+
+ # Whether we have updated the `new_room_statuses`, if we don't by the
+ # end we can treat this as a noop.
+ have_updated = False
+ for room_id in sent_room_ids:
+ new_room_statuses[room_id] = HAVE_SENT_ROOM_LIVE
+ have_updated = True
+
+ # Whether we add/update the entries for unsent rooms depends on the
+ # existing entry:
+ # - LIVE: We have previously sent down everything up to
+ # `last_room_token, so we update the entry to be `PREVIOUSLY` with
+ # `last_room_token`.
+ # - PREVIOUSLY: We have previously sent down everything up to *a*
+ # given token, so we don't need to update the entry.
+ # - NEVER: We have never previously sent down the room, and we haven't
+ # sent anything down this time either so we leave it as NEVER.
+
+ # Work out the new state for unsent rooms that were `LIVE`.
+ if from_token:
+ new_unsent_state = HaveSentRoom.previously(from_token.stream_token.room_key)
+ else:
+ new_unsent_state = HAVE_SENT_ROOM_NEVER
+
+ for room_id in unsent_room_ids:
+ prev_state = new_room_statuses.get(room_id)
+ if prev_state is not None and prev_state.status == HaveSentRoomFlag.LIVE:
+ new_room_statuses[room_id] = new_unsent_state
+ have_updated = True
+
+ if not have_updated:
+ return prev_connection_token
+
+ sync_statuses[new_store_token] = new_room_statuses
+
+ return new_store_token
+
+ async def mark_token_seen(
+ self,
+ user_id: str,
+ conn_id: str,
+ from_token: Optional[SlidingSyncStreamToken],
+ ) -> None:
+ """We have received a request with the given token, so we can clear out
+ any other tokens associated with the connection.
+
+ If there is no from token then we have started afresh, and so we delete
+ all tokens associated with the device.
+ """
+ # Clear out any tokens for the connection that doesn't match the one
+ # from the request.
+
+ sync_statuses = self._connections.pop((user_id, conn_id), {})
+ if from_token is None:
+ return
+
+ sync_statuses = {
+ i: room_statuses
+ for i, room_statuses in sync_statuses.items()
+ if i == from_token.connection_token
+ }
+ if sync_statuses:
+ self._connections[(user_id, conn_id)] = sync_statuses
|