summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2024-07-17 11:20:23 +0100
committerErik Johnston <erik@matrix.org>2024-07-17 12:14:28 +0100
commit30263b43c29620244e277f613b9bdcff9c7ed00b (patch)
tree7c75443302d83cd5bb8d574930c6714353850196
parentBump mypy from 1.9.0 to 1.10.1 (#17445) (diff)
downloadsynapse-30263b43c29620244e277f613b9bdcff9c7ed00b.tar.xz
Add SlidingSyncStreamToken
-rw-r--r--synapse/types/__init__.py37
1 files changed, 37 insertions, 0 deletions
diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py
index 3962ecc996..23ac1842f8 100644
--- a/synapse/types/__init__.py
+++ b/synapse/types/__init__.py
@@ -1138,6 +1138,43 @@ StreamToken.START = StreamToken(
 
 
 @attr.s(slots=True, frozen=True, auto_attribs=True)
+class SlidingSyncStreamToken:
+    """The same as a `StreamToken`, but includes an extra field at the start for
+    the sliding sync connection token (separated by a '/'). This is used to
+    store per-connection state.
+
+    This then looks something like:
+        5/s2633508_17_338_6732159_1082514_541479_274711_265584_1_379
+    """
+
+    stream_token: StreamToken
+    connection_token: int
+
+    @staticmethod
+    @cancellable
+    async def from_string(store: "DataStore", string: str) -> "SlidingSyncStreamToken":
+        """Creates a SlidingSyncStreamToken from its textual representation."""
+        try:
+            connection_token_str, stream_token_str = string.split("/", 1)
+            connection_token = int(connection_token_str)
+            stream_token = await StreamToken.from_string(store, stream_token_str)
+
+            return SlidingSyncStreamToken(
+                stream_token=stream_token,
+                connection_token=connection_token,
+            )
+        except CancelledError:
+            raise
+        except Exception:
+            raise SynapseError(400, "Invalid stream token")
+
+    async def to_string(self, store: "DataStore") -> str:
+        """Serializes the token to a string"""
+        stream_token_str = await self.stream_token.to_string(store)
+        return f"{self.connection_token}/{stream_token_str}"
+
+
+@attr.s(slots=True, frozen=True, auto_attribs=True)
 class PersistedPosition:
     """Position of a newly persisted row with instance that persisted it."""