summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2024-07-16 12:13:40 +0100
committerErik Johnston <erik@matrix.org>2024-07-17 12:14:28 +0100
commit1ad1cce3f2137f6be2c8d48121de27ea194fef4f (patch)
treeeb7037b5e070bd80c8ab3fa13b98cf62db8944ad /synapse
parentAdd SlidingSyncStreamToken (diff)
downloadsynapse-1ad1cce3f2137f6be2c8d48121de27ea194fef4f.tar.xz
Pass throught SlidingSyncStreamToken
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/sliding_sync.py24
-rw-r--r--synapse/rest/client/sync.py6
-rw-r--r--synapse/types/handlers/__init__.py12
3 files changed, 28 insertions, 14 deletions
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 1b5262d667..3aa139ff1c 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -36,6 +36,7 @@ from synapse.types import (
     PersistedEventPosition,
     Requester,
     RoomStreamToken,
+    SlidingSyncStreamToken,
     StateMap,
     StreamKeyType,
     StreamToken,
@@ -343,7 +344,7 @@ class SlidingSyncHandler:
         self,
         requester: Requester,
         sync_config: SlidingSyncConfig,
-        from_token: Optional[StreamToken] = None,
+        from_token: Optional[SlidingSyncStreamToken] = None,
         timeout_ms: int = 0,
     ) -> SlidingSyncResult:
         """
@@ -378,7 +379,7 @@ class SlidingSyncHandler:
             # this returns false, it means we timed out waiting, and we should
             # just return an empty response.
             before_wait_ts = self.clock.time_msec()
-            if not await self.notifier.wait_for_stream_token(from_token):
+            if not await self.notifier.wait_for_stream_token(from_token.stream_token):
                 logger.warning(
                     "Timed out waiting for worker to catch up. Returning empty response"
                 )
@@ -416,7 +417,7 @@ class SlidingSyncHandler:
                 sync_config.user.to_string(),
                 timeout_ms,
                 current_sync_callback,
-                from_token=from_token,
+                from_token=from_token.stream_token,
             )
 
         return result
@@ -425,7 +426,7 @@ class SlidingSyncHandler:
         self,
         sync_config: SlidingSyncConfig,
         to_token: StreamToken,
-        from_token: Optional[StreamToken] = None,
+        from_token: Optional[SlidingSyncStreamToken] = None,
     ) -> SlidingSyncResult:
         """
         Generates the response body of a Sliding Sync result, represented as a
@@ -458,7 +459,7 @@ class SlidingSyncHandler:
                 await self.get_room_membership_for_user_at_to_token(
                     user=sync_config.user,
                     to_token=to_token,
-                    from_token=from_token,
+                    from_token=from_token.stream_token if from_token else None,
                 )
             )
 
@@ -609,8 +610,11 @@ class SlidingSyncHandler:
             sync_config=sync_config, to_token=to_token
         )
 
+        # TODO: Update this when we implement per-connection state
+        connection_token = 0
+
         return SlidingSyncResult(
-            next_pos=to_token,
+            next_pos=SlidingSyncStreamToken(to_token, connection_token),
             lists=lists,
             rooms=rooms,
             extensions=extensions,
@@ -1346,7 +1350,7 @@ class SlidingSyncHandler:
         room_id: str,
         room_sync_config: RoomSyncConfig,
         room_membership_for_user_at_to_token: _RoomMembershipForUser,
-        from_token: Optional[StreamToken],
+        from_token: Optional[SlidingSyncStreamToken],
         to_token: StreamToken,
     ) -> SlidingSyncResult.RoomResult:
         """
@@ -1410,7 +1414,7 @@ class SlidingSyncHandler:
             #  - TODO: For an incremental sync where we haven't sent it down this
             #    connection before
             to_bound = (
-                from_token.room_key
+                from_token.stream_token.room_key
                 if from_token is not None
                 and not room_membership_for_user_at_to_token.newly_joined
                 else None
@@ -1477,7 +1481,9 @@ class SlidingSyncHandler:
                         instance_name=timeline_event.internal_metadata.instance_name,
                         stream=timeline_event.internal_metadata.stream_ordering,
                     )
-                    if persisted_position.persisted_after(from_token.room_key):
+                    if persisted_position.persisted_after(
+                        from_token.stream_token.room_key
+                    ):
                         num_live += 1
                     else:
                         # Since we're iterating over the timeline events in
diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py
index 1d8cbfdf00..b3967db18d 100644
--- a/synapse/rest/client/sync.py
+++ b/synapse/rest/client/sync.py
@@ -54,7 +54,7 @@ from synapse.http.servlet import (
 from synapse.http.site import SynapseRequest
 from synapse.logging.opentracing import trace_with_opname
 from synapse.rest.admin.experimental_features import ExperimentalFeature
-from synapse.types import JsonDict, Requester, StreamToken
+from synapse.types import JsonDict, Requester, SlidingSyncStreamToken, StreamToken
 from synapse.types.rest.client import SlidingSyncBody
 from synapse.util import json_decoder
 from synapse.util.caches.lrucache import LruCache
@@ -889,7 +889,9 @@ class SlidingSyncRestServlet(RestServlet):
 
         from_token = None
         if from_token_string is not None:
-            from_token = await StreamToken.from_string(self.store, from_token_string)
+            from_token = await SlidingSyncStreamToken.from_string(
+                self.store, from_token_string
+            )
 
         # TODO: We currently don't know whether we're going to use sticky params or
         # maybe some filters like sync v2  where they are built up once and referenced
diff --git a/synapse/types/handlers/__init__.py b/synapse/types/handlers/__init__.py
index 409120470a..642e5483a9 100644
--- a/synapse/types/handlers/__init__.py
+++ b/synapse/types/handlers/__init__.py
@@ -31,7 +31,13 @@ else:
     from pydantic import Extra
 
 from synapse.events import EventBase
-from synapse.types import JsonDict, JsonMapping, StreamToken, UserID
+from synapse.types import (
+    JsonDict,
+    JsonMapping,
+    SlidingSyncStreamToken,
+    StreamToken,
+    UserID,
+)
 from synapse.types.rest.client import SlidingSyncBody
 
 if TYPE_CHECKING:
@@ -287,7 +293,7 @@ class SlidingSyncResult:
         def __bool__(self) -> bool:
             return bool(self.to_device)
 
-    next_pos: StreamToken
+    next_pos: SlidingSyncStreamToken
     lists: Dict[str, SlidingWindowList]
     rooms: Dict[str, RoomResult]
     extensions: Extensions
@@ -300,7 +306,7 @@ class SlidingSyncResult:
         return bool(self.lists or self.rooms or self.extensions)
 
     @staticmethod
-    def empty(next_pos: StreamToken) -> "SlidingSyncResult":
+    def empty(next_pos: SlidingSyncStreamToken) -> "SlidingSyncResult":
         "Return a new empty result"
         return SlidingSyncResult(
             next_pos=next_pos,