diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 554ab59bf3..f1f6f30b95 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -49,6 +49,7 @@ from synapse.types import (
PersistedEventPosition,
Requester,
RoomStreamToken,
+ SlidingSyncStreamToken,
StateMap,
StreamKeyType,
StreamToken,
@@ -362,7 +363,7 @@ class SlidingSyncHandler:
self,
requester: Requester,
sync_config: SlidingSyncConfig,
- from_token: Optional[StreamToken] = None,
+ from_token: Optional[SlidingSyncStreamToken] = None,
timeout_ms: int = 0,
) -> SlidingSyncResult:
"""
@@ -393,7 +394,7 @@ class SlidingSyncHandler:
# this returns false, it means we timed out waiting, and we should
# just return an empty response.
before_wait_ts = self.clock.time_msec()
- if not await self.notifier.wait_for_stream_token(from_token):
+ if not await self.notifier.wait_for_stream_token(from_token.stream_token):
logger.warning(
"Timed out waiting for worker to catch up. Returning empty response"
)
@@ -431,7 +432,7 @@ class SlidingSyncHandler:
sync_config.user.to_string(),
timeout_ms,
current_sync_callback,
- from_token=from_token,
+ from_token=from_token.stream_token,
)
return result
@@ -440,7 +441,7 @@ class SlidingSyncHandler:
self,
sync_config: SlidingSyncConfig,
to_token: StreamToken,
- from_token: Optional[StreamToken] = None,
+ from_token: Optional[SlidingSyncStreamToken] = None,
) -> SlidingSyncResult:
"""
Generates the response body of a Sliding Sync result, represented as a
@@ -473,7 +474,7 @@ class SlidingSyncHandler:
await self.get_room_membership_for_user_at_to_token(
user=sync_config.user,
to_token=to_token,
- from_token=from_token,
+ from_token=from_token.stream_token if from_token else None,
)
)
@@ -631,8 +632,11 @@ class SlidingSyncHandler:
to_token=to_token,
)
+ # TODO: Update this when we implement per-connection state
+ connection_token = 0
+
return SlidingSyncResult(
- next_pos=to_token,
+ next_pos=SlidingSyncStreamToken(to_token, connection_token),
lists=lists,
rooms=rooms,
extensions=extensions,
@@ -1367,7 +1371,7 @@ class SlidingSyncHandler:
room_id: str,
room_sync_config: RoomSyncConfig,
room_membership_for_user_at_to_token: _RoomMembershipForUser,
- from_token: Optional[StreamToken],
+ from_token: Optional[SlidingSyncStreamToken],
to_token: StreamToken,
) -> SlidingSyncResult.RoomResult:
"""
@@ -1431,7 +1435,7 @@ class SlidingSyncHandler:
# - TODO: For an incremental sync where we haven't sent it down this
# connection before
to_bound = (
- from_token.room_key
+ from_token.stream_token.room_key
if from_token is not None
and not room_membership_for_user_at_to_token.newly_joined
else None
@@ -1498,7 +1502,9 @@ class SlidingSyncHandler:
instance_name=timeline_event.internal_metadata.instance_name,
stream=timeline_event.internal_metadata.stream_ordering,
)
- if persisted_position.persisted_after(from_token.room_key):
+ if persisted_position.persisted_after(
+ from_token.stream_token.room_key
+ ):
num_live += 1
else:
# Since we're iterating over the timeline events in
@@ -1752,8 +1758,14 @@ class SlidingSyncHandler:
bump_stamp = room_membership_for_user_at_to_token.event_pos.stream
# But if we found a bump event, use that instead
if last_bump_event_result is not None:
- _, bump_event_pos = last_bump_event_result
- bump_stamp = bump_event_pos.stream
+ _, new_bump_event_pos = last_bump_event_result
+
+ # If we've just joined a remote room, then the last bump event may
+ # have been backfilled (and so have a negative stream ordering).
+ # These negative stream orderings can't sensibly be compared, so
+ # instead we use the membership event position.
+ if new_bump_event_pos.stream > 0:
+ bump_stamp = new_bump_event_pos.stream
return SlidingSyncResult.RoomResult(
name=room_name,
@@ -1786,7 +1798,7 @@ class SlidingSyncHandler:
self,
sync_config: SlidingSyncConfig,
to_token: StreamToken,
- from_token: Optional[StreamToken],
+ from_token: Optional[SlidingSyncStreamToken],
) -> SlidingSyncResult.Extensions:
"""Handle extension requests.
@@ -1900,7 +1912,7 @@ class SlidingSyncHandler:
sync_config: SlidingSyncConfig,
e2ee_request: SlidingSyncConfig.Extensions.E2eeExtension,
to_token: StreamToken,
- from_token: Optional[StreamToken],
+ from_token: Optional[SlidingSyncStreamToken],
) -> Optional[SlidingSyncResult.Extensions.E2eeExtension]:
"""Handle E2EE device extension (MSC3884)
@@ -1922,7 +1934,7 @@ class SlidingSyncHandler:
# TODO: This should take into account the `from_token` and `to_token`
device_list_updates = await self.device_handler.get_user_ids_changed(
user_id=user_id,
- from_token=from_token,
+ from_token=from_token.stream_token,
)
device_one_time_keys_count: Mapping[str, int] = {}
|