diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 554ab59bf3..73414dbf69 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -18,6 +18,7 @@
#
#
import logging
+from enum import Enum
from itertools import chain
from typing import (
TYPE_CHECKING,
@@ -34,22 +35,26 @@ from typing import (
import attr
from immutabledict import immutabledict
+from typing_extensions import assert_never
from synapse.api.constants import AccountDataTypes, Direction, EventTypes, Membership
from synapse.events import EventBase
from synapse.events.utils import strip_event
from synapse.handlers.relations import BundledAggregations
-from synapse.logging.opentracing import start_active_span, tag_args, trace
+from synapse.logging.opentracing import log_kv, start_active_span, tag_args, trace
from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
from synapse.storage.databases.main.stream import CurrentStateDeltaMembership
from synapse.storage.roommember import MemberSummary
from synapse.types import (
DeviceListUpdates,
JsonDict,
+ JsonMapping,
PersistedEventPosition,
Requester,
RoomStreamToken,
+ SlidingSyncStreamToken,
StateMap,
+ StrCollection,
StreamKeyType,
StreamToken,
UserID,
@@ -356,13 +361,16 @@ class SlidingSyncHandler:
self.event_sources = hs.get_event_sources()
self.relations_handler = hs.get_relations_handler()
self.device_handler = hs.get_device_handler()
+ self.push_rules_handler = hs.get_push_rules_handler()
self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync
+ self.connection_store = SlidingSyncConnectionStore()
+
async def wait_for_sync_for_user(
self,
requester: Requester,
sync_config: SlidingSyncConfig,
- from_token: Optional[StreamToken] = None,
+ from_token: Optional[SlidingSyncStreamToken] = None,
timeout_ms: int = 0,
) -> SlidingSyncResult:
"""
@@ -393,7 +401,7 @@ class SlidingSyncHandler:
# this returns false, it means we timed out waiting, and we should
# just return an empty response.
before_wait_ts = self.clock.time_msec()
- if not await self.notifier.wait_for_stream_token(from_token):
+ if not await self.notifier.wait_for_stream_token(from_token.stream_token):
logger.warning(
"Timed out waiting for worker to catch up. Returning empty response"
)
@@ -431,16 +439,17 @@ class SlidingSyncHandler:
sync_config.user.to_string(),
timeout_ms,
current_sync_callback,
- from_token=from_token,
+ from_token=from_token.stream_token,
)
return result
+ @trace
async def current_sync_for_user(
self,
sync_config: SlidingSyncConfig,
to_token: StreamToken,
- from_token: Optional[StreamToken] = None,
+ from_token: Optional[SlidingSyncStreamToken] = None,
) -> SlidingSyncResult:
"""
Generates the response body of a Sliding Sync result, represented as a
@@ -461,6 +470,11 @@ class SlidingSyncHandler:
# See https://github.com/matrix-org/matrix-doc/issues/1144
raise NotImplementedError()
+ await self.connection_store.mark_token_seen(
+ sync_config=sync_config,
+ from_token=from_token,
+ )
+
# Get all of the room IDs that the user should be able to see in the sync
# response
has_lists = sync_config.lists is not None and len(sync_config.lists) > 0
@@ -473,7 +487,7 @@ class SlidingSyncHandler:
await self.get_room_membership_for_user_at_to_token(
user=sync_config.user,
to_token=to_token,
- from_token=from_token,
+ from_token=from_token.stream_token if from_token else None,
)
)
@@ -606,11 +620,56 @@ class SlidingSyncHandler:
# Fetch room data
rooms: Dict[str, SlidingSyncResult.RoomResult] = {}
+ # Filter out rooms that haven't received updates and we've sent down
+ # previously.
+ if from_token:
+ rooms_should_send = set()
+
+ # First we check if there are rooms that match a list/room
+ # subscription and have updates we need to send (i.e. either because
+ # we haven't sent the room down, or we have but there are missing
+ # updates).
+ for room_id in relevant_room_map:
+ status = await self.connection_store.have_sent_room(
+ sync_config,
+ from_token.connection_position,
+ room_id,
+ )
+ if (
+ # The room was never sent down before so the client needs to know
+ # about it regardless of any updates.
+ status.status == HaveSentRoomFlag.NEVER
+ # `PREVIOUSLY` literally means the "room was sent down before *AND*
+ # there are updates we haven't sent down" so we already know this
+ # room has updates.
+ or status.status == HaveSentRoomFlag.PREVIOUSLY
+ ):
+ rooms_should_send.add(room_id)
+ elif status.status == HaveSentRoomFlag.LIVE:
+ # We know that we've sent all updates up until `from_token`,
+ # so we just need to check if there have been updates since
+ # then.
+ pass
+ else:
+ assert_never(status.status)
+
+ # We only need to check for new events since any state changes
+ # will also come down as new events.
+ rooms_that_have_updates = self.store.get_rooms_that_might_have_updates(
+ relevant_room_map.keys(), from_token.stream_token.room_key
+ )
+ rooms_should_send.update(rooms_that_have_updates)
+ relevant_room_map = {
+ room_id: room_sync_config
+ for room_id, room_sync_config in relevant_room_map.items()
+ if room_id in rooms_should_send
+ }
+
@trace
@tag_args
async def handle_room(room_id: str) -> None:
room_sync_result = await self.get_room_sync_data(
- user=sync_config.user,
+ sync_config=sync_config,
room_id=room_id,
room_sync_config=relevant_room_map[room_id],
room_membership_for_user_at_to_token=room_membership_for_user_map[
@@ -620,19 +679,37 @@ class SlidingSyncHandler:
to_token=to_token,
)
- rooms[room_id] = room_sync_result
+ # Filter out empty room results during incremental sync
+ if room_sync_result or not from_token:
+ rooms[room_id] = room_sync_result
- with start_active_span("sliding_sync.generate_room_entries"):
- await concurrently_execute(handle_room, relevant_room_map, 10)
+ if relevant_room_map:
+ with start_active_span("sliding_sync.generate_room_entries"):
+ await concurrently_execute(handle_room, relevant_room_map, 10)
extensions = await self.get_extensions_response(
sync_config=sync_config,
+ lists=lists,
from_token=from_token,
to_token=to_token,
)
+ if has_lists or has_room_subscriptions:
+ connection_position = await self.connection_store.record_rooms(
+ sync_config=sync_config,
+ from_token=from_token,
+ sent_room_ids=relevant_room_map.keys(),
+ # TODO: We need to calculate which rooms have had updates since the `from_token` but were not included in the `sent_room_ids`
+ unsent_room_ids=[],
+ )
+ elif from_token:
+ connection_position = from_token.connection_position
+ else:
+ # Initial sync without a `from_token` starts at `0`
+ connection_position = 0
+
return SlidingSyncResult(
- next_pos=to_token,
+ next_pos=SlidingSyncStreamToken(to_token, connection_position),
lists=lists,
rooms=rooms,
extensions=extensions,
@@ -1086,6 +1163,7 @@ class SlidingSyncHandler:
# return None
+ @trace
async def filter_rooms(
self,
user: UserID,
@@ -1209,6 +1287,7 @@ class SlidingSyncHandler:
# Assemble a new sync room map but only with the `filtered_room_id_set`
return {room_id: sync_room_map[room_id] for room_id in filtered_room_id_set}
+ @trace
async def sort_rooms(
self,
sync_room_map: Dict[str, _RoomMembershipForUser],
@@ -1363,11 +1442,11 @@ class SlidingSyncHandler:
async def get_room_sync_data(
self,
- user: UserID,
+ sync_config: SlidingSyncConfig,
room_id: str,
room_sync_config: RoomSyncConfig,
room_membership_for_user_at_to_token: _RoomMembershipForUser,
- from_token: Optional[StreamToken],
+ from_token: Optional[SlidingSyncStreamToken],
to_token: StreamToken,
) -> SlidingSyncResult.RoomResult:
"""
@@ -1385,6 +1464,41 @@ class SlidingSyncHandler:
from_token: The point in the stream to sync from.
to_token: The point in the stream to sync up to.
"""
+ user = sync_config.user
+
+ # Determine whether we should limit the timeline to the token range.
+ #
+ # We should return historical messages (before token range) in the
+ # following cases because we want clients to be able to show a basic
+ # screen of information:
+ # - Initial sync (because no `from_token` to limit us anyway)
+ # - When users `newly_joined`
+ # - For an incremental sync where we haven't sent it down this
+ # connection before
+ from_bound = None
+ initial = True
+ if from_token and not room_membership_for_user_at_to_token.newly_joined:
+ room_status = await self.connection_store.have_sent_room(
+ sync_config=sync_config,
+ connection_token=from_token.connection_position,
+ room_id=room_id,
+ )
+ if room_status.status == HaveSentRoomFlag.LIVE:
+ from_bound = from_token.stream_token.room_key
+ initial = False
+ elif room_status.status == HaveSentRoomFlag.PREVIOUSLY:
+ assert room_status.last_token is not None
+ from_bound = room_status.last_token
+ initial = False
+ elif room_status.status == HaveSentRoomFlag.NEVER:
+ from_bound = None
+ initial = True
+ else:
+ assert_never(room_status.status)
+
+ log_kv({"sliding_sync.room_status": room_status})
+
+ log_kv({"sliding_sync.from_bound": from_bound, "sliding_sync.initial": initial})
# Assemble the list of timeline events
#
@@ -1411,36 +1525,23 @@ class SlidingSyncHandler:
prev_batch_token = to_token
# We're going to paginate backwards from the `to_token`
- from_bound = to_token.room_key
+ to_bound = to_token.room_key
# People shouldn't see past their leave/ban event
if room_membership_for_user_at_to_token.membership in (
Membership.LEAVE,
Membership.BAN,
):
- from_bound = (
+ to_bound = (
room_membership_for_user_at_to_token.event_pos.to_room_stream_token()
)
- # Determine whether we should limit the timeline to the token range.
- #
- # We should return historical messages (before token range) in the
- # following cases because we want clients to be able to show a basic
- # screen of information:
- # - Initial sync (because no `from_token` to limit us anyway)
- # - When users `newly_joined`
- # - TODO: For an incremental sync where we haven't sent it down this
- # connection before
- to_bound = (
- from_token.room_key
- if from_token is not None
- and not room_membership_for_user_at_to_token.newly_joined
- else None
- )
-
timeline_events, new_room_key = await self.store.paginate_room_events(
room_id=room_id,
- from_key=from_bound,
- to_key=to_bound,
+ # The bounds are reversed so we can paginate backwards
+ # (from newer to older events) starting at to_bound.
+ # This ensures we fill the `limit` with the newest events first,
+ from_key=to_bound,
+ to_key=from_bound,
direction=Direction.BACKWARDS,
# We add one so we can determine if there are enough events to saturate
# the limit or not (see `limited`)
@@ -1498,7 +1599,9 @@ class SlidingSyncHandler:
instance_name=timeline_event.internal_metadata.instance_name,
stream=timeline_event.internal_metadata.stream_ordering,
)
- if persisted_position.persisted_after(from_token.room_key):
+ if persisted_position.persisted_after(
+ from_token.stream_token.room_key
+ ):
num_live += 1
else:
# Since we're iterating over the timeline events in
@@ -1555,12 +1658,6 @@ class SlidingSyncHandler:
# indicate to the client that a state reset happened. Perhaps we should indicate
# this by setting `initial: True` and empty `required_state`.
- # TODO: Since we can't determine whether we've already sent a room down this
- # Sliding Sync connection before (we plan to add this optimization in the
- # future), we're always returning the requested room state instead of
- # updates.
- initial = True
-
# Check whether the room has a name set
name_state_ids = await self.get_current_state_ids_at(
room_id=room_id,
@@ -1706,9 +1803,22 @@ class SlidingSyncHandler:
to_token=to_token,
)
else:
- # TODO: Once we can figure out if we've sent a room down this connection before,
- # we can return updates instead of the full required state.
- raise NotImplementedError()
+ assert from_bound is not None
+
+ # TODO: Limit the number of state events we're about to send down
+ # the room, if its too many we should change this to an
+ # `initial=True`?
+ deltas = await self.store.get_current_state_deltas_for_room(
+ room_id=room_id,
+ from_token=from_bound,
+ to_token=to_token.room_key,
+ )
+ # TODO: Filter room state before fetching events
+ # TODO: Handle state resets where event_id is None
+ events = await self.store.get_events(
+ [d.event_id for d in deltas if d.event_id]
+ )
+ room_state = {(s.type, s.state_key): s for s in events.values()}
required_room_state: StateMap[EventBase] = {}
if required_state_filter != StateFilter.none():
@@ -1752,8 +1862,14 @@ class SlidingSyncHandler:
bump_stamp = room_membership_for_user_at_to_token.event_pos.stream
# But if we found a bump event, use that instead
if last_bump_event_result is not None:
- _, bump_event_pos = last_bump_event_result
- bump_stamp = bump_event_pos.stream
+ _, new_bump_event_pos = last_bump_event_result
+
+ # If we've just joined a remote room, then the last bump event may
+ # have been backfilled (and so have a negative stream ordering).
+ # These negative stream orderings can't sensibly be compared, so
+ # instead we use the membership event position.
+ if new_bump_event_pos.stream > 0:
+ bump_stamp = new_bump_event_pos.stream
return SlidingSyncResult.RoomResult(
name=room_name,
@@ -1782,16 +1898,19 @@ class SlidingSyncHandler:
highlight_count=0,
)
+ @trace
async def get_extensions_response(
self,
sync_config: SlidingSyncConfig,
+ lists: Dict[str, SlidingSyncResult.SlidingWindowList],
to_token: StreamToken,
- from_token: Optional[StreamToken],
+ from_token: Optional[SlidingSyncStreamToken],
) -> SlidingSyncResult.Extensions:
"""Handle extension requests.
Args:
sync_config: Sync configuration
+ lists: Sliding window API. A map of list key to list results.
to_token: The point in the stream to sync up to.
from_token: The point in the stream to sync from.
"""
@@ -1816,11 +1935,23 @@ class SlidingSyncHandler:
from_token=from_token,
)
+ account_data_response = None
+ if sync_config.extensions.account_data is not None:
+ account_data_response = await self.get_account_data_extension_response(
+ sync_config=sync_config,
+ lists=lists,
+ account_data_request=sync_config.extensions.account_data,
+ to_token=to_token,
+ from_token=from_token,
+ )
+
return SlidingSyncResult.Extensions(
to_device=to_device_response,
e2ee=e2ee_response,
+ account_data=account_data_response,
)
+ @trace
async def get_to_device_extension_response(
self,
sync_config: SlidingSyncConfig,
@@ -1835,7 +1966,7 @@ class SlidingSyncHandler:
to_token: The point in the stream to sync up to.
"""
user_id = sync_config.user.to_string()
- device_id = sync_config.device_id
+ device_id = sync_config.requester.device_id
# Skip if the extension is not enabled
if not to_device_request.enabled:
@@ -1895,12 +2026,13 @@ class SlidingSyncHandler:
events=messages,
)
+ @trace
async def get_e2ee_extension_response(
self,
sync_config: SlidingSyncConfig,
e2ee_request: SlidingSyncConfig.Extensions.E2eeExtension,
to_token: StreamToken,
- from_token: Optional[StreamToken],
+ from_token: Optional[SlidingSyncStreamToken],
) -> Optional[SlidingSyncResult.Extensions.E2eeExtension]:
"""Handle E2EE device extension (MSC3884)
@@ -1911,7 +2043,7 @@ class SlidingSyncHandler:
from_token: The point in the stream to sync from.
"""
user_id = sync_config.user.to_string()
- device_id = sync_config.device_id
+ device_id = sync_config.requester.device_id
# Skip if the extension is not enabled
if not e2ee_request.enabled:
@@ -1922,7 +2054,7 @@ class SlidingSyncHandler:
# TODO: This should take into account the `from_token` and `to_token`
device_list_updates = await self.device_handler.get_user_ids_changed(
user_id=user_id,
- from_token=from_token,
+ from_token=from_token.stream_token,
)
device_one_time_keys_count: Mapping[str, int] = {}
@@ -1944,3 +2076,358 @@ class SlidingSyncHandler:
device_one_time_keys_count=device_one_time_keys_count,
device_unused_fallback_key_types=device_unused_fallback_key_types,
)
+
+ @trace
+ async def get_account_data_extension_response(
+ self,
+ sync_config: SlidingSyncConfig,
+ lists: Dict[str, SlidingSyncResult.SlidingWindowList],
+ account_data_request: SlidingSyncConfig.Extensions.AccountDataExtension,
+ to_token: StreamToken,
+ from_token: Optional[SlidingSyncStreamToken],
+ ) -> Optional[SlidingSyncResult.Extensions.AccountDataExtension]:
+ """Handle Account Data extension (MSC3959)
+
+ Args:
+ sync_config: Sync configuration
+ lists: Sliding window API. A map of list key to list results.
+ account_data_request: The account_data extension from the request
+ to_token: The point in the stream to sync up to.
+ from_token: The point in the stream to sync from.
+ """
+ user_id = sync_config.user.to_string()
+
+ # Skip if the extension is not enabled
+ if not account_data_request.enabled:
+ return None
+
+ global_account_data_map: Mapping[str, JsonMapping] = {}
+ if from_token is not None:
+ global_account_data_map = (
+ await self.store.get_updated_global_account_data_for_user(
+ user_id, from_token.stream_token.account_data_key
+ )
+ )
+
+ have_push_rules_changed = await self.store.have_push_rules_changed_for_user(
+ user_id, from_token.stream_token.push_rules_key
+ )
+ if have_push_rules_changed:
+ global_account_data_map = dict(global_account_data_map)
+ global_account_data_map[AccountDataTypes.PUSH_RULES] = (
+ await self.push_rules_handler.push_rules_for_user(sync_config.user)
+ )
+ else:
+ all_global_account_data = await self.store.get_global_account_data_for_user(
+ user_id
+ )
+
+ global_account_data_map = dict(all_global_account_data)
+ global_account_data_map[AccountDataTypes.PUSH_RULES] = (
+ await self.push_rules_handler.push_rules_for_user(sync_config.user)
+ )
+
+ # We only want to include account data for rooms that are already in the sliding
+ # sync response AND that were requested in the account data request.
+ relevant_room_ids: Set[str] = set()
+
+ # See what rooms from the room subscriptions we should get account data for
+ if (
+ account_data_request.rooms is not None
+ and sync_config.room_subscriptions is not None
+ ):
+ actual_room_ids = sync_config.room_subscriptions.keys()
+
+ for room_id in account_data_request.rooms:
+ # A wildcard means we process all rooms from the room subscriptions
+ if room_id == "*":
+ relevant_room_ids.update(sync_config.room_subscriptions.keys())
+ break
+
+ if room_id in actual_room_ids:
+ relevant_room_ids.add(room_id)
+
+ # See what rooms from the sliding window lists we should get account data for
+ if account_data_request.lists is not None:
+ for list_key in account_data_request.lists:
+ # Just some typing because we share the variable name in multiple places
+ actual_list: Optional[SlidingSyncResult.SlidingWindowList] = None
+
+ # A wildcard means we process rooms from all lists
+ if list_key == "*":
+ for actual_list in lists.values():
+ # We only expect a single SYNC operation for any list
+ assert len(actual_list.ops) == 1
+ sync_op = actual_list.ops[0]
+ assert sync_op.op == OperationType.SYNC
+
+ relevant_room_ids.update(sync_op.room_ids)
+
+ break
+
+ actual_list = lists.get(list_key)
+ if actual_list is not None:
+ # We only expect a single SYNC operation for any list
+ assert len(actual_list.ops) == 1
+ sync_op = actual_list.ops[0]
+ assert sync_op.op == OperationType.SYNC
+
+ relevant_room_ids.update(sync_op.room_ids)
+
+ # Fetch room account data
+ account_data_by_room_map: Mapping[str, Mapping[str, JsonMapping]] = {}
+ if len(relevant_room_ids) > 0:
+ if from_token is not None:
+ account_data_by_room_map = (
+ await self.store.get_updated_room_account_data_for_user(
+ user_id, from_token.stream_token.account_data_key
+ )
+ )
+ else:
+ account_data_by_room_map = (
+ await self.store.get_room_account_data_for_user(user_id)
+ )
+
+ # Filter down to the relevant rooms
+ account_data_by_room_map = {
+ room_id: account_data_map
+ for room_id, account_data_map in account_data_by_room_map.items()
+ if room_id in relevant_room_ids
+ }
+
+ return SlidingSyncResult.Extensions.AccountDataExtension(
+ global_account_data_map=global_account_data_map,
+ account_data_by_room_map=account_data_by_room_map,
+ )
+
+
+class HaveSentRoomFlag(Enum):
+ """Flag for whether we have sent the room down a sliding sync connection.
+
+ The valid state changes here are:
+ NEVER -> LIVE
+ LIVE -> PREVIOUSLY
+ PREVIOUSLY -> LIVE
+ """
+
+ # The room has never been sent down (or we have forgotten we have sent it
+ # down).
+ NEVER = 1
+
+ # We have previously sent the room down, but there are updates that we
+ # haven't sent down.
+ PREVIOUSLY = 2
+
+ # We have sent the room down and the client has received all updates.
+ LIVE = 3
+
+
+@attr.s(auto_attribs=True, slots=True, frozen=True)
+class HaveSentRoom:
+ """Whether we have sent the room down a sliding sync connection.
+
+ Attributes:
+ status: Flag of if we have or haven't sent down the room
+ last_token: If the flag is `PREVIOUSLY` then this is non-null and
+ contains the last stream token of the last updates we sent down
+ the room, i.e. we still need to send everything since then to the
+ client.
+ """
+
+ status: HaveSentRoomFlag
+ last_token: Optional[RoomStreamToken]
+
+ @staticmethod
+ def previously(last_token: RoomStreamToken) -> "HaveSentRoom":
+ """Constructor for `PREVIOUSLY` flag."""
+ return HaveSentRoom(HaveSentRoomFlag.PREVIOUSLY, last_token)
+
+
+HAVE_SENT_ROOM_NEVER = HaveSentRoom(HaveSentRoomFlag.NEVER, None)
+HAVE_SENT_ROOM_LIVE = HaveSentRoom(HaveSentRoomFlag.LIVE, None)
+
+
+@attr.s(auto_attribs=True)
+class SlidingSyncConnectionStore:
+ """In-memory store of per-connection state, including what rooms we have
+ previously sent down a sliding sync connection.
+
+ Note: This is NOT safe to run in a worker setup because connection positions will
+ point to different sets of rooms on different workers. e.g. for the same connection,
+ a connection position of 5 might have totally different states on worker A and
+ worker B.
+
+ One complication that we need to deal with here is needing to handle requests being
+ resent, i.e. if we sent down a room in a response that the client received, we must
+ consider the room *not* sent when we get the request again.
+
+ This is handled by using an integer "token", which is returned to the client
+ as part of the sync token. For each connection we store a mapping from
+ tokens to the room states, and create a new entry when we send down new
+ rooms.
+
+ Note that for any given sliding sync connection we will only store a maximum
+ of two different tokens: the previous token from the request and a new token
+ sent in the response. When we receive a request with a given token, we then
+ clear out all other entries with a different token.
+
+ Attributes:
+ _connections: Mapping from `(user_id, conn_id)` to mapping of `token`
+ to mapping of room ID to `HaveSentRoom`.
+ """
+
+ # `(user_id, conn_id)` -> `token` -> `room_id` -> `HaveSentRoom`
+ _connections: Dict[Tuple[str, str], Dict[int, Dict[str, HaveSentRoom]]] = (
+ attr.Factory(dict)
+ )
+
+ async def have_sent_room(
+ self, sync_config: SlidingSyncConfig, connection_token: int, room_id: str
+ ) -> HaveSentRoom:
+ """For the given user_id/conn_id/token, return whether we have
+ previously sent the room down
+ """
+
+ conn_key = self._get_connection_key(sync_config)
+ sync_statuses = self._connections.setdefault(conn_key, {})
+ room_status = sync_statuses.get(connection_token, {}).get(
+ room_id, HAVE_SENT_ROOM_NEVER
+ )
+
+ return room_status
+
+ async def record_rooms(
+ self,
+ sync_config: SlidingSyncConfig,
+ from_token: Optional[SlidingSyncStreamToken],
+ *,
+ sent_room_ids: StrCollection,
+ unsent_room_ids: StrCollection,
+ ) -> int:
+ """Record which rooms we have/haven't sent down in a new response
+
+ Attributes:
+ sync_config
+ from_token: The since token from the request, if any
+ sent_room_ids: The set of room IDs that we have sent down as
+ part of this request (only needs to be ones we didn't
+ previously sent down).
+ unsent_room_ids: The set of room IDs that have had updates
+ since the `from_token`, but which were not included in
+ this request
+ """
+ prev_connection_token = 0
+ if from_token is not None:
+ prev_connection_token = from_token.connection_position
+
+ # If there are no changes then this is a noop.
+ if not sent_room_ids and not unsent_room_ids:
+ return prev_connection_token
+
+ conn_key = self._get_connection_key(sync_config)
+ sync_statuses = self._connections.setdefault(conn_key, {})
+
+ # Generate a new token, removing any existing entries in that token
+ # (which can happen if requests get resent).
+ new_store_token = prev_connection_token + 1
+ sync_statuses.pop(new_store_token, None)
+
+ # Copy over and update the room mappings.
+ new_room_statuses = dict(sync_statuses.get(prev_connection_token, {}))
+
+ # Whether we have updated the `new_room_statuses`, if we don't by the
+ # end we can treat this as a noop.
+ have_updated = False
+ for room_id in sent_room_ids:
+ new_room_statuses[room_id] = HAVE_SENT_ROOM_LIVE
+ have_updated = True
+
+ # Whether we add/update the entries for unsent rooms depends on the
+ # existing entry:
+ # - LIVE: We have previously sent down everything up to
+ # `last_room_token, so we update the entry to be `PREVIOUSLY` with
+ # `last_room_token`.
+ # - PREVIOUSLY: We have previously sent down everything up to *a*
+ # given token, so we don't need to update the entry.
+ # - NEVER: We have never previously sent down the room, and we haven't
+ # sent anything down this time either so we leave it as NEVER.
+
+ # Work out the new state for unsent rooms that were `LIVE`.
+ if from_token:
+ new_unsent_state = HaveSentRoom.previously(from_token.stream_token.room_key)
+ else:
+ new_unsent_state = HAVE_SENT_ROOM_NEVER
+
+ for room_id in unsent_room_ids:
+ prev_state = new_room_statuses.get(room_id)
+ if prev_state is not None and prev_state.status == HaveSentRoomFlag.LIVE:
+ new_room_statuses[room_id] = new_unsent_state
+ have_updated = True
+
+ if not have_updated:
+ return prev_connection_token
+
+ sync_statuses[new_store_token] = new_room_statuses
+
+ return new_store_token
+
+ async def mark_token_seen(
+ self,
+ sync_config: SlidingSyncConfig,
+ from_token: Optional[SlidingSyncStreamToken],
+ ) -> None:
+ """We have received a request with the given token, so we can clear out
+ any other tokens associated with the connection.
+
+ If there is no from token then we have started afresh, and so we delete
+ all tokens associated with the device.
+ """
+ # Clear out any tokens for the connection that doesn't match the one
+ # from the request.
+
+ conn_key = self._get_connection_key(sync_config)
+ sync_statuses = self._connections.pop(conn_key, {})
+ if from_token is None:
+ return
+
+ sync_statuses = {
+ connection_token: room_statuses
+ for connection_token, room_statuses in sync_statuses.items()
+ if connection_token == from_token.connection_position
+ }
+ if sync_statuses:
+ self._connections[conn_key] = sync_statuses
+
+ @staticmethod
+ def _get_connection_key(sync_config: SlidingSyncConfig) -> Tuple[str, str]:
+ """Return a unique identifier for this connection.
+
+ The first part is simply the user ID.
+
+ The second part is generally a combination of device ID and conn_id.
+ However, both these two are optional (e.g. puppet access tokens don't
+ have device IDs), so this handles those edge cases.
+
+ We use this over the raw `conn_id` to avoid clashes between different
+ clients that use the same `conn_id`. Imagine a user uses a web client
+ that uses `conn_id: main_sync_loop` and an Android client that also has
+ a `conn_id: main_sync_loop`.
+ """
+
+ user_id = sync_config.user.to_string()
+
+ # Only one sliding sync connection is allowed per given conn_id (empty
+ # or not).
+ conn_id = sync_config.conn_id or ""
+
+ if sync_config.requester.device_id:
+ return (user_id, f"D/{sync_config.requester.device_id}/{conn_id}")
+
+ if sync_config.requester.access_token_id:
+ # If we don't have a device, then the access token ID should be a
+ # stable ID.
+ return (user_id, f"A/{sync_config.requester.access_token_id}/{conn_id}")
+
+ # If we have neither then its likely an AS or some weird token. Either
+ # way we can just fail here.
+ raise Exception("Cannot use sliding sync with access token type")
|