summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2024-07-16 12:13:40 +0100
committerErik Johnston <erik@matrix.org>2024-07-17 12:14:28 +0100
commit1ad1cce3f2137f6be2c8d48121de27ea194fef4f (patch)
treeeb7037b5e070bd80c8ab3fa13b98cf62db8944ad /synapse/handlers
parentAdd SlidingSyncStreamToken (diff)
downloadsynapse-1ad1cce3f2137f6be2c8d48121de27ea194fef4f.tar.xz
Pass throught SlidingSyncStreamToken
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/sliding_sync.py24
1 files changed, 15 insertions, 9 deletions
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 1b5262d667..3aa139ff1c 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -36,6 +36,7 @@ from synapse.types import (
     PersistedEventPosition,
     Requester,
     RoomStreamToken,
+    SlidingSyncStreamToken,
     StateMap,
     StreamKeyType,
     StreamToken,
@@ -343,7 +344,7 @@ class SlidingSyncHandler:
         self,
         requester: Requester,
         sync_config: SlidingSyncConfig,
-        from_token: Optional[StreamToken] = None,
+        from_token: Optional[SlidingSyncStreamToken] = None,
         timeout_ms: int = 0,
     ) -> SlidingSyncResult:
         """
@@ -378,7 +379,7 @@ class SlidingSyncHandler:
             # this returns false, it means we timed out waiting, and we should
             # just return an empty response.
             before_wait_ts = self.clock.time_msec()
-            if not await self.notifier.wait_for_stream_token(from_token):
+            if not await self.notifier.wait_for_stream_token(from_token.stream_token):
                 logger.warning(
                     "Timed out waiting for worker to catch up. Returning empty response"
                 )
@@ -416,7 +417,7 @@ class SlidingSyncHandler:
                 sync_config.user.to_string(),
                 timeout_ms,
                 current_sync_callback,
-                from_token=from_token,
+                from_token=from_token.stream_token,
             )
 
         return result
@@ -425,7 +426,7 @@ class SlidingSyncHandler:
         self,
         sync_config: SlidingSyncConfig,
         to_token: StreamToken,
-        from_token: Optional[StreamToken] = None,
+        from_token: Optional[SlidingSyncStreamToken] = None,
     ) -> SlidingSyncResult:
         """
         Generates the response body of a Sliding Sync result, represented as a
@@ -458,7 +459,7 @@ class SlidingSyncHandler:
                 await self.get_room_membership_for_user_at_to_token(
                     user=sync_config.user,
                     to_token=to_token,
-                    from_token=from_token,
+                    from_token=from_token.stream_token if from_token else None,
                 )
             )
 
@@ -609,8 +610,11 @@ class SlidingSyncHandler:
             sync_config=sync_config, to_token=to_token
         )
 
+        # TODO: Update this when we implement per-connection state
+        connection_token = 0
+
         return SlidingSyncResult(
-            next_pos=to_token,
+            next_pos=SlidingSyncStreamToken(to_token, connection_token),
             lists=lists,
             rooms=rooms,
             extensions=extensions,
@@ -1346,7 +1350,7 @@ class SlidingSyncHandler:
         room_id: str,
         room_sync_config: RoomSyncConfig,
         room_membership_for_user_at_to_token: _RoomMembershipForUser,
-        from_token: Optional[StreamToken],
+        from_token: Optional[SlidingSyncStreamToken],
         to_token: StreamToken,
     ) -> SlidingSyncResult.RoomResult:
         """
@@ -1410,7 +1414,7 @@ class SlidingSyncHandler:
             #  - TODO: For an incremental sync where we haven't sent it down this
             #    connection before
             to_bound = (
-                from_token.room_key
+                from_token.stream_token.room_key
                 if from_token is not None
                 and not room_membership_for_user_at_to_token.newly_joined
                 else None
@@ -1477,7 +1481,9 @@ class SlidingSyncHandler:
                         instance_name=timeline_event.internal_metadata.instance_name,
                         stream=timeline_event.internal_metadata.stream_ordering,
                     )
-                    if persisted_position.persisted_after(from_token.room_key):
+                    if persisted_position.persisted_after(
+                        from_token.stream_token.room_key
+                    ):
                         num_live += 1
                     else:
                         # Since we're iterating over the timeline events in