diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 554ab59bf3..36665db8e1 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -49,6 +49,7 @@ from synapse.types import (
PersistedEventPosition,
Requester,
RoomStreamToken,
+ SlidingSyncStreamToken,
StateMap,
StreamKeyType,
StreamToken,
@@ -362,7 +363,7 @@ class SlidingSyncHandler:
self,
requester: Requester,
sync_config: SlidingSyncConfig,
- from_token: Optional[StreamToken] = None,
+ from_token: Optional[SlidingSyncStreamToken] = None,
timeout_ms: int = 0,
) -> SlidingSyncResult:
"""
@@ -393,7 +394,7 @@ class SlidingSyncHandler:
# this returns false, it means we timed out waiting, and we should
# just return an empty response.
before_wait_ts = self.clock.time_msec()
- if not await self.notifier.wait_for_stream_token(from_token):
+ if not await self.notifier.wait_for_stream_token(from_token.stream_token):
logger.warning(
"Timed out waiting for worker to catch up. Returning empty response"
)
@@ -431,7 +432,7 @@ class SlidingSyncHandler:
sync_config.user.to_string(),
timeout_ms,
current_sync_callback,
- from_token=from_token,
+ from_token=from_token.stream_token,
)
return result
@@ -440,7 +441,7 @@ class SlidingSyncHandler:
self,
sync_config: SlidingSyncConfig,
to_token: StreamToken,
- from_token: Optional[StreamToken] = None,
+ from_token: Optional[SlidingSyncStreamToken] = None,
) -> SlidingSyncResult:
"""
Generates the response body of a Sliding Sync result, represented as a
@@ -473,7 +474,7 @@ class SlidingSyncHandler:
await self.get_room_membership_for_user_at_to_token(
user=sync_config.user,
to_token=to_token,
- from_token=from_token,
+ from_token=from_token.stream_token if from_token else None,
)
)
@@ -631,8 +632,11 @@ class SlidingSyncHandler:
to_token=to_token,
)
+ # TODO: Update this when we implement per-connection state
+ connection_token = 0
+
return SlidingSyncResult(
- next_pos=to_token,
+ next_pos=SlidingSyncStreamToken(to_token, connection_token),
lists=lists,
rooms=rooms,
extensions=extensions,
@@ -1367,7 +1371,7 @@ class SlidingSyncHandler:
room_id: str,
room_sync_config: RoomSyncConfig,
room_membership_for_user_at_to_token: _RoomMembershipForUser,
- from_token: Optional[StreamToken],
+ from_token: Optional[SlidingSyncStreamToken],
to_token: StreamToken,
) -> SlidingSyncResult.RoomResult:
"""
@@ -1431,7 +1435,7 @@ class SlidingSyncHandler:
# - TODO: For an incremental sync where we haven't sent it down this
# connection before
to_bound = (
- from_token.room_key
+ from_token.stream_token.room_key
if from_token is not None
and not room_membership_for_user_at_to_token.newly_joined
else None
@@ -1498,7 +1502,9 @@ class SlidingSyncHandler:
instance_name=timeline_event.internal_metadata.instance_name,
stream=timeline_event.internal_metadata.stream_ordering,
)
- if persisted_position.persisted_after(from_token.room_key):
+ if persisted_position.persisted_after(
+ from_token.stream_token.room_key
+ ):
num_live += 1
else:
# Since we're iterating over the timeline events in
@@ -1786,7 +1792,7 @@ class SlidingSyncHandler:
self,
sync_config: SlidingSyncConfig,
to_token: StreamToken,
- from_token: Optional[StreamToken],
+ from_token: Optional[SlidingSyncStreamToken],
) -> SlidingSyncResult.Extensions:
"""Handle extension requests.
@@ -1900,7 +1906,7 @@ class SlidingSyncHandler:
sync_config: SlidingSyncConfig,
e2ee_request: SlidingSyncConfig.Extensions.E2eeExtension,
to_token: StreamToken,
- from_token: Optional[StreamToken],
+ from_token: Optional[SlidingSyncStreamToken],
) -> Optional[SlidingSyncResult.Extensions.E2eeExtension]:
"""Handle E2EE device extension (MSC3884)
@@ -1922,7 +1928,7 @@ class SlidingSyncHandler:
# TODO: This should take into account the `from_token` and `to_token`
device_list_updates = await self.device_handler.get_user_ids_changed(
user_id=user_id,
- from_token=from_token,
+ from_token=from_token.stream_token,
)
device_one_time_keys_count: Mapping[str, int] = {}
diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py
index 93fe1d439e..d72dfa2b10 100644
--- a/synapse/rest/client/sync.py
+++ b/synapse/rest/client/sync.py
@@ -54,7 +54,7 @@ from synapse.http.servlet import (
from synapse.http.site import SynapseRequest
from synapse.logging.opentracing import trace_with_opname
from synapse.rest.admin.experimental_features import ExperimentalFeature
-from synapse.types import JsonDict, Requester, StreamToken
+from synapse.types import JsonDict, Requester, SlidingSyncStreamToken, StreamToken
from synapse.types.rest.client import SlidingSyncBody
from synapse.util import json_decoder
from synapse.util.caches.lrucache import LruCache
@@ -889,7 +889,9 @@ class SlidingSyncRestServlet(RestServlet):
from_token = None
if from_token_string is not None:
- from_token = await StreamToken.from_string(self.store, from_token_string)
+ from_token = await SlidingSyncStreamToken.from_string(
+ self.store, from_token_string
+ )
# TODO: We currently don't know whether we're going to use sticky params or
# maybe some filters like sync v2 where they are built up once and referenced
diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py
index c0d30ac2a3..5259550f1c 100644
--- a/synapse/types/__init__.py
+++ b/synapse/types/__init__.py
@@ -1161,6 +1161,49 @@ StreamToken.START = StreamToken(
@attr.s(slots=True, frozen=True, auto_attribs=True)
+class SlidingSyncStreamToken:
+ """The same as a `StreamToken`, but includes an extra field at the start for
+ the sliding sync connection token (separated by a '/'). This is used to
+ store per-connection state.
+
+ This then looks something like:
+ 5/s2633508_17_338_6732159_1082514_541479_274711_265584_1_379
+
+ Attributes:
+ stream_token: Token representing the position of all the standard
+ streams.
+ connection_position: Token used by sliding sync to track updates to any
+ per-connection state stored by Synapse.
+ """
+
+ stream_token: StreamToken
+ connection_position: int
+
+ @staticmethod
+ @cancellable
+ async def from_string(store: "DataStore", string: str) -> "SlidingSyncStreamToken":
+ """Creates a SlidingSyncStreamToken from its textual representation."""
+ try:
+ connection_position_str, stream_token_str = string.split("/", 1)
+ connection_position = int(connection_position_str)
+ stream_token = await StreamToken.from_string(store, stream_token_str)
+
+ return SlidingSyncStreamToken(
+ stream_token=stream_token,
+ connection_position=connection_position,
+ )
+ except CancelledError:
+ raise
+ except Exception:
+ raise SynapseError(400, "Invalid stream token")
+
+ async def to_string(self, store: "DataStore") -> str:
+ """Serializes the token to a string"""
+ stream_token_str = await self.stream_token.to_string(store)
+ return f"{self.connection_position}/{stream_token_str}"
+
+
+@attr.s(slots=True, frozen=True, auto_attribs=True)
class PersistedPosition:
"""Position of a newly persisted row with instance that persisted it."""
diff --git a/synapse/types/handlers/__init__.py b/synapse/types/handlers/__init__.py
index 4c6c42db04..59eb0963ee 100644
--- a/synapse/types/handlers/__init__.py
+++ b/synapse/types/handlers/__init__.py
@@ -31,7 +31,14 @@ else:
from pydantic import Extra
from synapse.events import EventBase
-from synapse.types import DeviceListUpdates, JsonDict, JsonMapping, StreamToken, UserID
+from synapse.types import (
+ DeviceListUpdates,
+ JsonDict,
+ JsonMapping,
+ SlidingSyncStreamToken,
+ StreamToken,
+ UserID,
+)
from synapse.types.rest.client import SlidingSyncBody
if TYPE_CHECKING:
@@ -329,7 +336,7 @@ class SlidingSyncResult:
def __bool__(self) -> bool:
return bool(self.to_device or self.e2ee)
- next_pos: StreamToken
+ next_pos: SlidingSyncStreamToken
lists: Dict[str, SlidingWindowList]
rooms: Dict[str, RoomResult]
extensions: Extensions
@@ -342,7 +349,7 @@ class SlidingSyncResult:
return bool(self.lists or self.rooms or self.extensions)
@staticmethod
- def empty(next_pos: StreamToken) -> "SlidingSyncResult":
+ def empty(next_pos: SlidingSyncStreamToken) -> "SlidingSyncResult":
"Return a new empty result"
return SlidingSyncResult(
next_pos=next_pos,
|