diff --git a/synapse/handlers/sliding_sync/store.py b/synapse/handlers/sliding_sync/store.py
new file mode 100644
index 0000000000..d24fccf76f
--- /dev/null
+++ b/synapse/handlers/sliding_sync/store.py
@@ -0,0 +1,128 @@
+#
+# This file is licensed under the Affero General Public License (AGPL) version 3.
+#
+# Copyright (C) 2023 New Vector, Ltd
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# See the GNU Affero General Public License for more details:
+# <https://www.gnu.org/licenses/agpl-3.0.html>.
+#
+
+import logging
+from typing import TYPE_CHECKING, Optional
+
+import attr
+
+from synapse.logging.opentracing import trace
+from synapse.storage.databases.main import DataStore
+from synapse.types import SlidingSyncStreamToken
+from synapse.types.handlers.sliding_sync import (
+ MutablePerConnectionState,
+ PerConnectionState,
+ SlidingSyncConfig,
+)
+
+if TYPE_CHECKING:
+ pass
+
+logger = logging.getLogger(__name__)
+
+
+@attr.s(auto_attribs=True)
+class SlidingSyncConnectionStore:
+ """In-memory store of per-connection state, including what rooms we have
+ previously sent down a sliding sync connection.
+
+ Note: This is NOT safe to run in a worker setup because connection positions will
+ point to different sets of rooms on different workers. e.g. for the same connection,
+ a connection position of 5 might have totally different states on worker A and
+ worker B.
+
+ One complication that we need to deal with here is needing to handle requests being
+ resent, i.e. if we sent down a room in a response that the client received, we must
+ consider the room *not* sent when we get the request again.
+
+ This is handled by using an integer "token", which is returned to the client
+ as part of the sync token. For each connection we store a mapping from
+ tokens to the room states, and create a new entry when we send down new
+ rooms.
+
+ Note that for any given sliding sync connection we will only store a maximum
+ of two different tokens: the previous token from the request and a new token
+ sent in the response. When we receive a request with a given token, we then
+ clear out all other entries with a different token.
+
+ Attributes:
+ _connections: Mapping from `(user_id, conn_id)` to mapping of `token`
+ to mapping of room ID to `HaveSentRoom`.
+ """
+
+ store: "DataStore"
+
+ async def get_and_clear_connection_positions(
+ self,
+ sync_config: SlidingSyncConfig,
+ from_token: Optional[SlidingSyncStreamToken],
+ ) -> PerConnectionState:
+ """Fetch the per-connection state for the token.
+
+ Raises:
+ SlidingSyncUnknownPosition if the connection_token is unknown
+ """
+ # If this is our first request, there is no previous connection state to fetch out of the database
+ if from_token is None or from_token.connection_position == 0:
+ return PerConnectionState()
+
+ conn_id = sync_config.conn_id or ""
+
+ device_id = sync_config.requester.device_id
+ assert device_id is not None
+
+ return await self.store.get_and_clear_connection_positions(
+ sync_config.user.to_string(),
+ device_id,
+ conn_id,
+ from_token.connection_position,
+ )
+
+ @trace
+ async def record_new_state(
+ self,
+ sync_config: SlidingSyncConfig,
+ from_token: Optional[SlidingSyncStreamToken],
+ new_connection_state: MutablePerConnectionState,
+ ) -> int:
+ """Record updated per-connection state, returning the connection
+ position associated with the new state.
+ If there are no changes to the state this may return the same token as
+ the existing per-connection state.
+ """
+ if not new_connection_state.has_updates():
+ if from_token is not None:
+ return from_token.connection_position
+ else:
+ return 0
+
+ # A from token with a zero connection position means there was no
+ # previously stored connection state, so we treat a zero the same as
+ # there being no previous position.
+ previous_connection_position = None
+ if from_token is not None and from_token.connection_position != 0:
+ previous_connection_position = from_token.connection_position
+
+ conn_id = sync_config.conn_id or ""
+
+ device_id = sync_config.requester.device_id
+ assert device_id is not None
+
+ return await self.store.persist_per_connection_state(
+ sync_config.user.to_string(),
+ device_id,
+ conn_id,
+ previous_connection_position,
+ new_connection_state,
+ )
|