summary refs log tree commit diff
path: root/synapse/handlers/sliding_sync
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/handlers/sliding_sync.py3158
-rw-r--r--synapse/handlers/sliding_sync/__init__.py1691
-rw-r--r--synapse/handlers/sliding_sync/extensions.py879
-rw-r--r--synapse/handlers/sliding_sync/room_lists.py2304
-rw-r--r--synapse/handlers/sliding_sync/store.py128
5 files changed, 5002 insertions, 3158 deletions
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
deleted file mode 100644

index 18a96843be..0000000000 --- a/synapse/handlers/sliding_sync.py +++ /dev/null
@@ -1,3158 +0,0 @@ -# -# This file is licensed under the Affero General Public License (AGPL) version 3. -# -# Copyright (C) 2024 New Vector, Ltd -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as -# published by the Free Software Foundation, either version 3 of the -# License, or (at your option) any later version. -# -# See the GNU Affero General Public License for more details: -# <https://www.gnu.org/licenses/agpl-3.0.html>. -# -# Originally licensed under the Apache License, Version 2.0: -# <http://www.apache.org/licenses/LICENSE-2.0>. -# -# [This file includes modifications made by New Vector Limited] -# -# -import enum -import logging -from enum import Enum -from itertools import chain -from typing import ( - TYPE_CHECKING, - Any, - Dict, - Final, - List, - Literal, - Mapping, - Optional, - Sequence, - Set, - Tuple, - Union, -) - -import attr -from immutabledict import immutabledict -from typing_extensions import assert_never - -from synapse.api.constants import ( - AccountDataTypes, - Direction, - EventContentFields, - EventTypes, - Membership, -) -from synapse.api.errors import SlidingSyncUnknownPosition -from synapse.events import EventBase, StrippedStateEvent -from synapse.events.utils import parse_stripped_state_event, strip_event -from synapse.handlers.relations import BundledAggregations -from synapse.logging.opentracing import ( - SynapseTags, - log_kv, - set_tag, - start_active_span, - tag_args, - trace, -) -from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary -from synapse.storage.databases.main.state import ( - ROOM_UNKNOWN_SENTINEL, - Sentinel as StateSentinel, -) -from synapse.storage.databases.main.stream import ( - CurrentStateDeltaMembership, - PaginateFunction, -) -from synapse.storage.roommember import MemberSummary -from synapse.types import ( - DeviceListUpdates, - JsonDict, - JsonMapping, - MultiWriterStreamToken, - MutableStateMap, - PersistedEventPosition, - Requester, - RoomStreamToken, - SlidingSyncStreamToken, - StateMap, - StrCollection, - StreamKeyType, - StreamToken, - UserID, -) -from synapse.types.handlers import OperationType, SlidingSyncConfig, SlidingSyncResult -from synapse.types.state import StateFilter -from synapse.util.async_helpers import concurrently_execute -from synapse.visibility import filter_events_for_client - -if TYPE_CHECKING: - from synapse.server import HomeServer - -logger = logging.getLogger(__name__) - - -class Sentinel(enum.Enum): - # defining a sentinel in this way allows mypy to correctly handle the - # type of a dictionary lookup and subsequent type narrowing. - UNSET_SENTINEL = object() - - -# The event types that clients should consider as new activity. -DEFAULT_BUMP_EVENT_TYPES = { - EventTypes.Create, - EventTypes.Message, - EventTypes.Encrypted, - EventTypes.Sticker, - EventTypes.CallInvite, - EventTypes.PollStart, - EventTypes.LiveLocationShareStart, -} - - -@attr.s(slots=True, frozen=True, auto_attribs=True) -class _RoomMembershipForUser: - """ - Attributes: - room_id: The room ID of the membership event - event_id: The event ID of the membership event - event_pos: The stream position of the membership event - membership: The membership state of the user in the room - sender: The person who sent the membership event - newly_joined: Whether the user newly joined the room during the given token - range and is still joined to the room at the end of this range. - newly_left: Whether the user newly left (or kicked) the room during the given - token range and is still "leave" at the end of this range. - is_dm: Whether this user considers this room as a direct-message (DM) room - """ - - room_id: str - # Optional because state resets can affect room membership without a corresponding event. - event_id: Optional[str] - # Even during a state reset which removes the user from the room, we expect this to - # be set because `current_state_delta_stream` will note the position that the reset - # happened. - event_pos: PersistedEventPosition - # Even during a state reset which removes the user from the room, we expect this to - # be set to `LEAVE` because we can make that assumption based on the situaton (see - # `get_current_state_delta_membership_changes_for_user(...)`) - membership: str - # Optional because state resets can affect room membership without a corresponding event. - sender: Optional[str] - newly_joined: bool - newly_left: bool - is_dm: bool - - def copy_and_replace(self, **kwds: Any) -> "_RoomMembershipForUser": - return attr.evolve(self, **kwds) - - -def filter_membership_for_sync( - *, user_id: str, room_membership_for_user: _RoomMembershipForUser -) -> bool: - """ - Returns True if the membership event should be included in the sync response, - otherwise False. - - Attributes: - user_id: The user ID that the membership applies to - room_membership_for_user: Membership information for the user in the room - """ - - membership = room_membership_for_user.membership - sender = room_membership_for_user.sender - newly_left = room_membership_for_user.newly_left - - # We want to allow everything except rooms the user has left unless `newly_left` - # because we want everything that's *still* relevant to the user. We include - # `newly_left` rooms because the last event that the user should see is their own - # leave event. - # - # A leave != kick. This logic includes kicks (leave events where the sender is not - # the same user). - # - # When `sender=None`, it means that a state reset happened that removed the user - # from the room without a corresponding leave event. We can just remove the rooms - # since they are no longer relevant to the user but will still appear if they are - # `newly_left`. - return ( - # Anything except leave events - membership != Membership.LEAVE - # Unless... - or newly_left - # Allow kicks - or (membership == Membership.LEAVE and sender not in (user_id, None)) - ) - - -# We can't freeze this class because we want to update it in place with the -# de-duplicated data. -@attr.s(slots=True, auto_attribs=True) -class RoomSyncConfig: - """ - Holds the config for what data we should fetch for a room in the sync response. - - Attributes: - timeline_limit: The maximum number of events to return in the timeline. - - required_state_map: Map from state event type to state_keys requested for the - room. The values are close to `StateKey` but actually use a syntax where you - can provide `*` wildcard and `$LAZY` for lazy-loading room members. - """ - - timeline_limit: int - required_state_map: Dict[str, Set[str]] - - @classmethod - def from_room_config( - cls, - room_params: SlidingSyncConfig.CommonRoomParameters, - ) -> "RoomSyncConfig": - """ - Create a `RoomSyncConfig` from a `SlidingSyncList`/`RoomSubscription` config. - - Args: - room_params: `SlidingSyncConfig.SlidingSyncList` or `SlidingSyncConfig.RoomSubscription` - """ - required_state_map: Dict[str, Set[str]] = {} - for ( - state_type, - state_key, - ) in room_params.required_state: - # If we already have a wildcard for this specific `state_key`, we don't need - # to add it since the wildcard already covers it. - if state_key in required_state_map.get(StateValues.WILDCARD, set()): - continue - - # If we already have a wildcard `state_key` for this `state_type`, we don't need - # to add anything else - if StateValues.WILDCARD in required_state_map.get(state_type, set()): - continue - - # If we're getting wildcards for the `state_type` and `state_key`, that's - # all that matters so get rid of any other entries - if state_type == StateValues.WILDCARD and state_key == StateValues.WILDCARD: - required_state_map = {StateValues.WILDCARD: {StateValues.WILDCARD}} - # We can break, since we don't need to add anything else - break - - # If we're getting a wildcard for the `state_type`, get rid of any other - # entries with the same `state_key`, since the wildcard will cover it already. - elif state_type == StateValues.WILDCARD: - # Get rid of any entries that match the `state_key` - # - # Make a copy so we don't run into an error: `dictionary changed size - # during iteration`, when we remove items - for ( - existing_state_type, - existing_state_key_set, - ) in list(required_state_map.items()): - # Make a copy so we don't run into an error: `Set changed size during - # iteration`, when we filter out and remove items - for existing_state_key in existing_state_key_set.copy(): - if existing_state_key == state_key: - existing_state_key_set.remove(state_key) - - # If we've the left the `set()` empty, remove it from the map - if existing_state_key_set == set(): - required_state_map.pop(existing_state_type, None) - - # If we're getting a wildcard `state_key`, get rid of any other state_keys - # for this `state_type` since the wildcard will cover it already. - if state_key == StateValues.WILDCARD: - required_state_map[state_type] = {state_key} - # Otherwise, just add it to the set - else: - if required_state_map.get(state_type) is None: - required_state_map[state_type] = {state_key} - else: - required_state_map[state_type].add(state_key) - - return cls( - timeline_limit=room_params.timeline_limit, - required_state_map=required_state_map, - ) - - def deep_copy(self) -> "RoomSyncConfig": - required_state_map: Dict[str, Set[str]] = { - state_type: state_key_set.copy() - for state_type, state_key_set in self.required_state_map.items() - } - - return RoomSyncConfig( - timeline_limit=self.timeline_limit, - required_state_map=required_state_map, - ) - - def combine_room_sync_config( - self, other_room_sync_config: "RoomSyncConfig" - ) -> None: - """ - Combine this `RoomSyncConfig` with another `RoomSyncConfig` and take the - superset union of the two. - """ - # Take the highest timeline limit - if self.timeline_limit < other_room_sync_config.timeline_limit: - self.timeline_limit = other_room_sync_config.timeline_limit - - # Union the required state - for ( - state_type, - state_key_set, - ) in other_room_sync_config.required_state_map.items(): - # If we already have a wildcard for everything, we don't need to add - # anything else - if StateValues.WILDCARD in self.required_state_map.get( - StateValues.WILDCARD, set() - ): - break - - # If we already have a wildcard `state_key` for this `state_type`, we don't need - # to add anything else - if StateValues.WILDCARD in self.required_state_map.get(state_type, set()): - continue - - # If we're getting wildcards for the `state_type` and `state_key`, that's - # all that matters so get rid of any other entries - if ( - state_type == StateValues.WILDCARD - and StateValues.WILDCARD in state_key_set - ): - self.required_state_map = {state_type: {StateValues.WILDCARD}} - # We can break, since we don't need to add anything else - break - - for state_key in state_key_set: - # If we already have a wildcard for this specific `state_key`, we don't need - # to add it since the wildcard already covers it. - if state_key in self.required_state_map.get( - StateValues.WILDCARD, set() - ): - continue - - # If we're getting a wildcard for the `state_type`, get rid of any other - # entries with the same `state_key`, since the wildcard will cover it already. - if state_type == StateValues.WILDCARD: - # Get rid of any entries that match the `state_key` - # - # Make a copy so we don't run into an error: `dictionary changed size - # during iteration`, when we remove items - for existing_state_type, existing_state_key_set in list( - self.required_state_map.items() - ): - # Make a copy so we don't run into an error: `Set changed size during - # iteration`, when we filter out and remove items - for existing_state_key in existing_state_key_set.copy(): - if existing_state_key == state_key: - existing_state_key_set.remove(state_key) - - # If we've the left the `set()` empty, remove it from the map - if existing_state_key_set == set(): - self.required_state_map.pop(existing_state_type, None) - - # If we're getting a wildcard `state_key`, get rid of any other state_keys - # for this `state_type` since the wildcard will cover it already. - if state_key == StateValues.WILDCARD: - self.required_state_map[state_type] = {state_key} - break - # Otherwise, just add it to the set - else: - if self.required_state_map.get(state_type) is None: - self.required_state_map[state_type] = {state_key} - else: - self.required_state_map[state_type].add(state_key) - - -class StateValues: - """ - Understood values of the (type, state_key) tuple in `required_state`. - """ - - # Include all state events of the given type - WILDCARD: Final = "*" - # Lazy-load room membership events (include room membership events for any event - # `sender` in the timeline). We only give special meaning to this value when it's a - # `state_key`. - LAZY: Final = "$LAZY" - # Subsitute with the requester's user ID. Typically used by clients to get - # the user's membership. - ME: Final = "$ME" - - -class SlidingSyncHandler: - def __init__(self, hs: "HomeServer"): - self.clock = hs.get_clock() - self.store = hs.get_datastores().main - self.storage_controllers = hs.get_storage_controllers() - self.auth_blocking = hs.get_auth_blocking() - self.notifier = hs.get_notifier() - self.event_sources = hs.get_event_sources() - self.relations_handler = hs.get_relations_handler() - self.device_handler = hs.get_device_handler() - self.push_rules_handler = hs.get_push_rules_handler() - self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync - - self.connection_store = SlidingSyncConnectionStore() - - async def wait_for_sync_for_user( - self, - requester: Requester, - sync_config: SlidingSyncConfig, - from_token: Optional[SlidingSyncStreamToken] = None, - timeout_ms: int = 0, - ) -> SlidingSyncResult: - """ - Get the sync for a client if we have new data for it now. Otherwise - wait for new data to arrive on the server. If the timeout expires, then - return an empty sync result. - - Args: - requester: The user making the request - sync_config: Sync configuration - from_token: The point in the stream to sync from. Token of the end of the - previous batch. May be `None` if this is the initial sync request. - timeout_ms: The time in milliseconds to wait for new data to arrive. If 0, - we will immediately but there might not be any new data so we just return an - empty response. - """ - # If the user is not part of the mau group, then check that limits have - # not been exceeded (if not part of the group by this point, almost certain - # auth_blocking will occur) - await self.auth_blocking.check_auth_blocking(requester=requester) - - # If we're working with a user-provided token, we need to make sure to wait for - # this worker to catch up with the token so we don't skip past any incoming - # events or future events if the user is nefariously, manually modifying the - # token. - if from_token is not None: - # We need to make sure this worker has caught up with the token. If - # this returns false, it means we timed out waiting, and we should - # just return an empty response. - before_wait_ts = self.clock.time_msec() - if not await self.notifier.wait_for_stream_token(from_token.stream_token): - logger.warning( - "Timed out waiting for worker to catch up. Returning empty response" - ) - return SlidingSyncResult.empty(from_token) - - # If we've spent significant time waiting to catch up, take it off - # the timeout. - after_wait_ts = self.clock.time_msec() - if after_wait_ts - before_wait_ts > 1_000: - timeout_ms -= after_wait_ts - before_wait_ts - timeout_ms = max(timeout_ms, 0) - - # We're going to respond immediately if the timeout is 0 or if this is an - # initial sync (without a `from_token`) so we can avoid calling - # `notifier.wait_for_events()`. - if timeout_ms == 0 or from_token is None: - now_token = self.event_sources.get_current_token() - result = await self.current_sync_for_user( - sync_config, - from_token=from_token, - to_token=now_token, - ) - else: - # Otherwise, we wait for something to happen and report it to the user. - async def current_sync_callback( - before_token: StreamToken, after_token: StreamToken - ) -> SlidingSyncResult: - return await self.current_sync_for_user( - sync_config, - from_token=from_token, - to_token=after_token, - ) - - result = await self.notifier.wait_for_events( - sync_config.user.to_string(), - timeout_ms, - current_sync_callback, - from_token=from_token.stream_token, - ) - - return result - - @trace - async def current_sync_for_user( - self, - sync_config: SlidingSyncConfig, - to_token: StreamToken, - from_token: Optional[SlidingSyncStreamToken] = None, - ) -> SlidingSyncResult: - """ - Generates the response body of a Sliding Sync result, represented as a - `SlidingSyncResult`. - - We fetch data according to the token range (> `from_token` and <= `to_token`). - - Args: - sync_config: Sync configuration - to_token: The point in the stream to sync up to. - from_token: The point in the stream to sync from. Token of the end of the - previous batch. May be `None` if this is the initial sync request. - """ - user_id = sync_config.user.to_string() - app_service = self.store.get_app_service_by_user_id(user_id) - if app_service: - # We no longer support AS users using /sync directly. - # See https://github.com/matrix-org/matrix-doc/issues/1144 - raise NotImplementedError() - - if from_token: - # Check that we recognize the connection position, if not tell the - # clients that they need to start again. - # - # If we don't do this and the client asks for the full range of - # rooms, we end up sending down all rooms and their state from - # scratch (which can be very slow). By expiring the connection we - # allow the client a chance to do an initial request with a smaller - # range of rooms to get them some results sooner but will end up - # taking the same amount of time (more with round-trips and - # re-processing) in the end to get everything again. - if not await self.connection_store.is_valid_token( - sync_config, from_token.connection_position - ): - raise SlidingSyncUnknownPosition() - - await self.connection_store.mark_token_seen( - sync_config=sync_config, - from_token=from_token, - ) - - # Get all of the room IDs that the user should be able to see in the sync - # response - has_lists = sync_config.lists is not None and len(sync_config.lists) > 0 - has_room_subscriptions = ( - sync_config.room_subscriptions is not None - and len(sync_config.room_subscriptions) > 0 - ) - if has_lists or has_room_subscriptions: - room_membership_for_user_map = ( - await self.get_room_membership_for_user_at_to_token( - user=sync_config.user, - to_token=to_token, - from_token=from_token.stream_token if from_token else None, - ) - ) - - # Assemble sliding window lists - lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {} - # Keep track of the rooms that we can display and need to fetch more info about - relevant_room_map: Dict[str, RoomSyncConfig] = {} - # The set of room IDs of all rooms that could appear in any list. These - # include rooms that are outside the list ranges. - all_rooms: Set[str] = set() - if has_lists and sync_config.lists is not None: - with start_active_span("assemble_sliding_window_lists"): - sync_room_map = await self.filter_rooms_relevant_for_sync( - user=sync_config.user, - room_membership_for_user_map=room_membership_for_user_map, - ) - - for list_key, list_config in sync_config.lists.items(): - # Apply filters - filtered_sync_room_map = sync_room_map - if list_config.filters is not None: - filtered_sync_room_map = await self.filter_rooms( - sync_config.user, - sync_room_map, - list_config.filters, - to_token, - ) - - # Find which rooms are partially stated and may need to be filtered out - # depending on the `required_state` requested (see below). - partial_state_room_map = ( - await self.store.is_partial_state_room_batched( - filtered_sync_room_map.keys() - ) - ) - - # Since creating the `RoomSyncConfig` takes some work, let's just do it - # once and make a copy whenever we need it. - room_sync_config = RoomSyncConfig.from_room_config(list_config) - membership_state_keys = room_sync_config.required_state_map.get( - EventTypes.Member - ) - # Also see `StateFilter.must_await_full_state(...)` for comparison - lazy_loading = ( - membership_state_keys is not None - and StateValues.LAZY in membership_state_keys - ) - - if not lazy_loading: - # Exclude partially-stated rooms unless the `required_state` - # only has `["m.room.member", "$LAZY"]` for membership - # (lazy-loading room members). - filtered_sync_room_map = { - room_id: room - for room_id, room in filtered_sync_room_map.items() - if not partial_state_room_map.get(room_id) - } - - all_rooms.update(filtered_sync_room_map) - - # Sort the list - sorted_room_info = await self.sort_rooms( - filtered_sync_room_map, to_token - ) - - ops: List[SlidingSyncResult.SlidingWindowList.Operation] = [] - if list_config.ranges: - for range in list_config.ranges: - room_ids_in_list: List[str] = [] - - # We're going to loop through the sorted list of rooms starting - # at the range start index and keep adding rooms until we fill - # up the range or run out of rooms. - # - # Both sides of range are inclusive so we `+ 1` - max_num_rooms = range[1] - range[0] + 1 - for room_membership in sorted_room_info[range[0] :]: - room_id = room_membership.room_id - - if len(room_ids_in_list) >= max_num_rooms: - break - - # Take the superset of the `RoomSyncConfig` for each room. - # - # Update our `relevant_room_map` with the room we're going - # to display and need to fetch more info about. - existing_room_sync_config = relevant_room_map.get( - room_id - ) - if existing_room_sync_config is not None: - existing_room_sync_config.combine_room_sync_config( - room_sync_config - ) - else: - # Make a copy so if we modify it later, it doesn't - # affect all references. - relevant_room_map[room_id] = ( - room_sync_config.deep_copy() - ) - - room_ids_in_list.append(room_id) - - ops.append( - SlidingSyncResult.SlidingWindowList.Operation( - op=OperationType.SYNC, - range=range, - room_ids=room_ids_in_list, - ) - ) - - lists[list_key] = SlidingSyncResult.SlidingWindowList( - count=len(sorted_room_info), - ops=ops, - ) - - # Handle room subscriptions - if has_room_subscriptions and sync_config.room_subscriptions is not None: - with start_active_span("assemble_room_subscriptions"): - for ( - room_id, - room_subscription, - ) in sync_config.room_subscriptions.items(): - room_membership_for_user_at_to_token = ( - await self.check_room_subscription_allowed_for_user( - room_id=room_id, - room_membership_for_user_map=room_membership_for_user_map, - to_token=to_token, - ) - ) - - # Skip this room if the user isn't allowed to see it - if not room_membership_for_user_at_to_token: - continue - - all_rooms.add(room_id) - - room_membership_for_user_map[room_id] = ( - room_membership_for_user_at_to_token - ) - - # Take the superset of the `RoomSyncConfig` for each room. - # - # Update our `relevant_room_map` with the room we're going to display - # and need to fetch more info about. - room_sync_config = RoomSyncConfig.from_room_config( - room_subscription - ) - existing_room_sync_config = relevant_room_map.get(room_id) - if existing_room_sync_config is not None: - existing_room_sync_config.combine_room_sync_config( - room_sync_config - ) - else: - relevant_room_map[room_id] = room_sync_config - - # Fetch room data - rooms: Dict[str, SlidingSyncResult.RoomResult] = {} - - # Filter out rooms that haven't received updates and we've sent down - # previously. - # Keep track of the rooms that we're going to display and need to fetch more info about - relevant_rooms_to_send_map = relevant_room_map - with start_active_span("filter_relevant_rooms_to_send"): - if from_token: - rooms_should_send = set() - - # First we check if there are rooms that match a list/room - # subscription and have updates we need to send (i.e. either because - # we haven't sent the room down, or we have but there are missing - # updates). - for room_id in relevant_room_map: - status = await self.connection_store.have_sent_room( - sync_config, - from_token.connection_position, - room_id, - ) - if ( - # The room was never sent down before so the client needs to know - # about it regardless of any updates. - status.status == HaveSentRoomFlag.NEVER - # `PREVIOUSLY` literally means the "room was sent down before *AND* - # there are updates we haven't sent down" so we already know this - # room has updates. - or status.status == HaveSentRoomFlag.PREVIOUSLY - ): - rooms_should_send.add(room_id) - elif status.status == HaveSentRoomFlag.LIVE: - # We know that we've sent all updates up until `from_token`, - # so we just need to check if there have been updates since - # then. - pass - else: - assert_never(status.status) - - # We only need to check for new events since any state changes - # will also come down as new events. - rooms_that_have_updates = self.store.get_rooms_that_might_have_updates( - relevant_room_map.keys(), from_token.stream_token.room_key - ) - rooms_should_send.update(rooms_that_have_updates) - relevant_rooms_to_send_map = { - room_id: room_sync_config - for room_id, room_sync_config in relevant_room_map.items() - if room_id in rooms_should_send - } - - @trace - @tag_args - async def handle_room(room_id: str) -> None: - room_sync_result = await self.get_room_sync_data( - sync_config=sync_config, - room_id=room_id, - room_sync_config=relevant_rooms_to_send_map[room_id], - room_membership_for_user_at_to_token=room_membership_for_user_map[ - room_id - ], - from_token=from_token, - to_token=to_token, - ) - - # Filter out empty room results during incremental sync - if room_sync_result or not from_token: - rooms[room_id] = room_sync_result - - if relevant_rooms_to_send_map: - with start_active_span("sliding_sync.generate_room_entries"): - await concurrently_execute(handle_room, relevant_rooms_to_send_map, 10) - - extensions = await self.get_extensions_response( - sync_config=sync_config, - actual_lists=lists, - # We're purposely using `relevant_room_map` instead of - # `relevant_rooms_to_send_map` here. This needs to be all room_ids we could - # send regardless of whether they have an event update or not. The - # extensions care about more than just normal events in the rooms (like - # account data, read receipts, typing indicators, to-device messages, etc). - actual_room_ids=set(relevant_room_map.keys()), - actual_room_response_map=rooms, - from_token=from_token, - to_token=to_token, - ) - - if has_lists or has_room_subscriptions: - # We now calculate if any rooms outside the range have had updates, - # which we are not sending down. - # - # We *must* record rooms that have had updates, but it is also fine - # to record rooms as having updates even if there might not actually - # be anything new for the user (e.g. due to event filters, events - # having happened after the user left, etc). - unsent_room_ids = [] - if from_token: - # The set of rooms that the client (may) care about, but aren't - # in any list range (or subscribed to). - missing_rooms = all_rooms - relevant_room_map.keys() - - # We now just go and try fetching any events in the above rooms - # to see if anything has happened since the `from_token`. - # - # TODO: Replace this with something faster. When we land the - # sliding sync tables that record the most recent event - # positions we can use that. - missing_event_map_by_room = ( - await self.store.get_room_events_stream_for_rooms( - room_ids=missing_rooms, - from_key=to_token.room_key, - to_key=from_token.stream_token.room_key, - limit=1, - ) - ) - unsent_room_ids = list(missing_event_map_by_room) - - connection_position = await self.connection_store.record_rooms( - sync_config=sync_config, - from_token=from_token, - sent_room_ids=relevant_rooms_to_send_map.keys(), - unsent_room_ids=unsent_room_ids, - ) - elif from_token: - connection_position = from_token.connection_position - else: - # Initial sync without a `from_token` starts at `0` - connection_position = 0 - - sliding_sync_result = SlidingSyncResult( - next_pos=SlidingSyncStreamToken(to_token, connection_position), - lists=lists, - rooms=rooms, - extensions=extensions, - ) - - # Make it easy to find traces for syncs that aren't empty - set_tag(SynapseTags.RESULT_PREFIX + "result", bool(sliding_sync_result)) - set_tag(SynapseTags.FUNC_ARG_PREFIX + "sync_config.user", user_id) - - return sliding_sync_result - - @trace - async def get_room_membership_for_user_at_to_token( - self, - user: UserID, - to_token: StreamToken, - from_token: Optional[StreamToken], - ) -> Dict[str, _RoomMembershipForUser]: - """ - Fetch room IDs that the user has had membership in (the full room list including - long-lost left rooms that will be filtered, sorted, and sliced). - - We're looking for rooms where the user has had any sort of membership in the - token range (> `from_token` and <= `to_token`) - - In order for bans/kicks to not show up, you need to `/forget` those rooms. This - doesn't modify the event itself though and only adds the `forgotten` flag to the - `room_memberships` table in Synapse. There isn't a way to tell when a room was - forgotten at the moment so we can't factor it into the token range. - - Args: - user: User to fetch rooms for - to_token: The token to fetch rooms up to. - from_token: The point in the stream to sync from. - - Returns: - A dictionary of room IDs that the user has had membership in along with - membership information in that room at the time of `to_token`. - """ - user_id = user.to_string() - - # First grab a current snapshot rooms for the user - # (also handles forgotten rooms) - room_for_user_list = await self.store.get_rooms_for_local_user_where_membership_is( - user_id=user_id, - # We want to fetch any kind of membership (joined and left rooms) in order - # to get the `event_pos` of the latest room membership event for the - # user. - membership_list=Membership.LIST, - excluded_rooms=self.rooms_to_exclude_globally, - ) - - # If the user has never joined any rooms before, we can just return an empty list - if not room_for_user_list: - return {} - - # Our working list of rooms that can show up in the sync response - sync_room_id_set = { - # Note: The `room_for_user` we're assigning here will need to be fixed up - # (below) because they are potentially from the current snapshot time - # instead from the time of the `to_token`. - room_for_user.room_id: _RoomMembershipForUser( - room_id=room_for_user.room_id, - event_id=room_for_user.event_id, - event_pos=room_for_user.event_pos, - membership=room_for_user.membership, - sender=room_for_user.sender, - # We will update these fields below to be accurate - newly_joined=False, - newly_left=False, - is_dm=False, - ) - for room_for_user in room_for_user_list - } - - # Get the `RoomStreamToken` that represents the spot we queried up to when we got - # our membership snapshot from `get_rooms_for_local_user_where_membership_is()`. - # - # First, we need to get the max stream_ordering of each event persister instance - # that we queried events from. - instance_to_max_stream_ordering_map: Dict[str, int] = {} - for room_for_user in room_for_user_list: - instance_name = room_for_user.event_pos.instance_name - stream_ordering = room_for_user.event_pos.stream - - current_instance_max_stream_ordering = ( - instance_to_max_stream_ordering_map.get(instance_name) - ) - if ( - current_instance_max_stream_ordering is None - or stream_ordering > current_instance_max_stream_ordering - ): - instance_to_max_stream_ordering_map[instance_name] = stream_ordering - - # Then assemble the `RoomStreamToken` - min_stream_pos = min(instance_to_max_stream_ordering_map.values()) - membership_snapshot_token = RoomStreamToken( - # Minimum position in the `instance_map` - stream=min_stream_pos, - instance_map=immutabledict( - { - instance_name: stream_pos - for instance_name, stream_pos in instance_to_max_stream_ordering_map.items() - if stream_pos > min_stream_pos - } - ), - ) - - # Since we fetched the users room list at some point in time after the from/to - # tokens, we need to revert/rewind some membership changes to match the point in - # time of the `to_token`. In particular, we need to make these fixups: - # - # - 1a) Remove rooms that the user joined after the `to_token` - # - 1b) Add back rooms that the user left after the `to_token` - # - 1c) Update room membership events to the point in time of the `to_token` - # - 2) Figure out which rooms are `newly_left` rooms (> `from_token` and <= `to_token`) - # - 3) Figure out which rooms are `newly_joined` (> `from_token` and <= `to_token`) - # - 4) Figure out which rooms are DM's - - # 1) Fetch membership changes that fall in the range from `to_token` up to - # `membership_snapshot_token` - # - # If our `to_token` is already the same or ahead of the latest room membership - # for the user, we don't need to do any "2)" fix-ups and can just straight-up - # use the room list from the snapshot as a base (nothing has changed) - current_state_delta_membership_changes_after_to_token = [] - if not membership_snapshot_token.is_before_or_eq(to_token.room_key): - current_state_delta_membership_changes_after_to_token = ( - await self.store.get_current_state_delta_membership_changes_for_user( - user_id, - from_key=to_token.room_key, - to_key=membership_snapshot_token, - excluded_room_ids=self.rooms_to_exclude_globally, - ) - ) - - # 1) Assemble a list of the first membership event after the `to_token` so we can - # step backward to the previous membership that would apply to the from/to - # range. - first_membership_change_by_room_id_after_to_token: Dict[ - str, CurrentStateDeltaMembership - ] = {} - for membership_change in current_state_delta_membership_changes_after_to_token: - # Only set if we haven't already set it - first_membership_change_by_room_id_after_to_token.setdefault( - membership_change.room_id, membership_change - ) - - # 1) Fixup - # - # Since we fetched a snapshot of the users room list at some point in time after - # the from/to tokens, we need to revert/rewind some membership changes to match - # the point in time of the `to_token`. - for ( - room_id, - first_membership_change_after_to_token, - ) in first_membership_change_by_room_id_after_to_token.items(): - # 1a) Remove rooms that the user joined after the `to_token` - if first_membership_change_after_to_token.prev_event_id is None: - sync_room_id_set.pop(room_id, None) - # 1b) 1c) From the first membership event after the `to_token`, step backward to the - # previous membership that would apply to the from/to range. - else: - # We don't expect these fields to be `None` if we have a `prev_event_id` - # but we're being defensive since it's possible that the prev event was - # culled from the database. - if ( - first_membership_change_after_to_token.prev_event_pos is not None - and first_membership_change_after_to_token.prev_membership - is not None - ): - sync_room_id_set[room_id] = _RoomMembershipForUser( - room_id=room_id, - event_id=first_membership_change_after_to_token.prev_event_id, - event_pos=first_membership_change_after_to_token.prev_event_pos, - membership=first_membership_change_after_to_token.prev_membership, - sender=first_membership_change_after_to_token.prev_sender, - # We will update these fields below to be accurate - newly_joined=False, - newly_left=False, - is_dm=False, - ) - else: - # If we can't find the previous membership event, we shouldn't - # include the room in the sync response since we can't determine the - # exact membership state and shouldn't rely on the current snapshot. - sync_room_id_set.pop(room_id, None) - - # 2) Fetch membership changes that fall in the range from `from_token` up to `to_token` - current_state_delta_membership_changes_in_from_to_range = [] - if from_token: - current_state_delta_membership_changes_in_from_to_range = ( - await self.store.get_current_state_delta_membership_changes_for_user( - user_id, - from_key=from_token.room_key, - to_key=to_token.room_key, - excluded_room_ids=self.rooms_to_exclude_globally, - ) - ) - - # 2) Assemble a list of the last membership events in some given ranges. Someone - # could have left and joined multiple times during the given range but we only - # care about end-result so we grab the last one. - last_membership_change_by_room_id_in_from_to_range: Dict[ - str, CurrentStateDeltaMembership - ] = {} - # We also want to assemble a list of the first membership events during the token - # range so we can step backward to the previous membership that would apply to - # before the token range to see if we have `newly_joined` the room. - first_membership_change_by_room_id_in_from_to_range: Dict[ - str, CurrentStateDeltaMembership - ] = {} - # Keep track if the room has a non-join event in the token range so we can later - # tell if it was a `newly_joined` room. If the last membership event in the - # token range is a join and there is also some non-join in the range, we know - # they `newly_joined`. - has_non_join_event_by_room_id_in_from_to_range: Dict[str, bool] = {} - for ( - membership_change - ) in current_state_delta_membership_changes_in_from_to_range: - room_id = membership_change.room_id - - last_membership_change_by_room_id_in_from_to_range[room_id] = ( - membership_change - ) - # Only set if we haven't already set it - first_membership_change_by_room_id_in_from_to_range.setdefault( - room_id, membership_change - ) - - if membership_change.membership != Membership.JOIN: - has_non_join_event_by_room_id_in_from_to_range[room_id] = True - - # 2) Fixup - # - # 3) We also want to assemble a list of possibly newly joined rooms. Someone - # could have left and joined multiple times during the given range but we only - # care about whether they are joined at the end of the token range so we are - # working with the last membership even in the token range. - possibly_newly_joined_room_ids = set() - for ( - last_membership_change_in_from_to_range - ) in last_membership_change_by_room_id_in_from_to_range.values(): - room_id = last_membership_change_in_from_to_range.room_id - - # 3) - if last_membership_change_in_from_to_range.membership == Membership.JOIN: - possibly_newly_joined_room_ids.add(room_id) - - # 2) Figure out newly_left rooms (> `from_token` and <= `to_token`). - if last_membership_change_in_from_to_range.membership == Membership.LEAVE: - # 2) Mark this room as `newly_left` - - # If we're seeing a membership change here, we should expect to already - # have it in our snapshot but if a state reset happens, it wouldn't have - # shown up in our snapshot but appear as a change here. - existing_sync_entry = sync_room_id_set.get(room_id) - if existing_sync_entry is not None: - # Normal expected case - sync_room_id_set[room_id] = existing_sync_entry.copy_and_replace( - newly_left=True - ) - else: - # State reset! - logger.warn( - "State reset detected for room_id %s with %s who is no longer in the room", - room_id, - user_id, - ) - # Even though a state reset happened which removed the person from - # the room, we still add it the list so the user knows they left the - # room. Downstream code can check for a state reset by looking for - # `event_id=None and membership is not None`. - sync_room_id_set[room_id] = _RoomMembershipForUser( - room_id=room_id, - event_id=last_membership_change_in_from_to_range.event_id, - event_pos=last_membership_change_in_from_to_range.event_pos, - membership=last_membership_change_in_from_to_range.membership, - sender=last_membership_change_in_from_to_range.sender, - newly_joined=False, - newly_left=True, - is_dm=False, - ) - - # 3) Figure out `newly_joined` - for room_id in possibly_newly_joined_room_ids: - has_non_join_in_from_to_range = ( - has_non_join_event_by_room_id_in_from_to_range.get(room_id, False) - ) - # If the last membership event in the token range is a join and there is - # also some non-join in the range, we know they `newly_joined`. - if has_non_join_in_from_to_range: - # We found a `newly_joined` room (we left and joined within the token range) - sync_room_id_set[room_id] = sync_room_id_set[room_id].copy_and_replace( - newly_joined=True - ) - else: - prev_event_id = first_membership_change_by_room_id_in_from_to_range[ - room_id - ].prev_event_id - prev_membership = first_membership_change_by_room_id_in_from_to_range[ - room_id - ].prev_membership - - if prev_event_id is None: - # We found a `newly_joined` room (we are joining the room for the - # first time within the token range) - sync_room_id_set[room_id] = sync_room_id_set[ - room_id - ].copy_and_replace(newly_joined=True) - # Last resort, we need to step back to the previous membership event - # just before the token range to see if we're joined then or not. - elif prev_membership != Membership.JOIN: - # We found a `newly_joined` room (we left before the token range - # and joined within the token range) - sync_room_id_set[room_id] = sync_room_id_set[ - room_id - ].copy_and_replace(newly_joined=True) - - # 4) Figure out which rooms the user considers to be direct-message (DM) rooms - # - # We're using global account data (`m.direct`) instead of checking for - # `is_direct` on membership events because that property only appears for - # the invitee membership event (doesn't show up for the inviter). - # - # We're unable to take `to_token` into account for global account data since - # we only keep track of the latest account data for the user. - dm_map = await self.store.get_global_account_data_by_type_for_user( - user_id, AccountDataTypes.DIRECT - ) - - # Flatten out the map. Account data is set by the client so it needs to be - # scrutinized. - dm_room_id_set = set() - if isinstance(dm_map, dict): - for room_ids in dm_map.values(): - # Account data should be a list of room IDs. Ignore anything else - if isinstance(room_ids, list): - for room_id in room_ids: - if isinstance(room_id, str): - dm_room_id_set.add(room_id) - - # 4) Fixup - for room_id in sync_room_id_set: - sync_room_id_set[room_id] = sync_room_id_set[room_id].copy_and_replace( - is_dm=room_id in dm_room_id_set - ) - - return sync_room_id_set - - @trace - async def filter_rooms_relevant_for_sync( - self, - user: UserID, - room_membership_for_user_map: Dict[str, _RoomMembershipForUser], - ) -> Dict[str, _RoomMembershipForUser]: - """ - Filter room IDs that should/can be listed for this user in the sync response (the - full room list that will be further filtered, sorted, and sliced). - - We're looking for rooms where the user has the following state in the token - range (> `from_token` and <= `to_token`): - - - `invite`, `join`, `knock`, `ban` membership events - - Kicks (`leave` membership events where `sender` is different from the - `user_id`/`state_key`) - - `newly_left` (rooms that were left during the given token range) - - In order for bans/kicks to not show up in sync, you need to `/forget` those - rooms. This doesn't modify the event itself though and only adds the - `forgotten` flag to the `room_memberships` table in Synapse. There isn't a way - to tell when a room was forgotten at the moment so we can't factor it into the - from/to range. - - Args: - user: User that is syncing - room_membership_for_user_map: Room membership for the user - - Returns: - A dictionary of room IDs that should be listed in the sync response along - with membership information in that room at the time of `to_token`. - """ - user_id = user.to_string() - - # Filter rooms to only what we're interested to sync with - filtered_sync_room_map = { - room_id: room_membership_for_user - for room_id, room_membership_for_user in room_membership_for_user_map.items() - if filter_membership_for_sync( - user_id=user_id, - room_membership_for_user=room_membership_for_user, - ) - } - - return filtered_sync_room_map - - async def check_room_subscription_allowed_for_user( - self, - room_id: str, - room_membership_for_user_map: Dict[str, _RoomMembershipForUser], - to_token: StreamToken, - ) -> Optional[_RoomMembershipForUser]: - """ - Check whether the user is allowed to see the room based on whether they have - ever had membership in the room or if the room is `world_readable`. - - Similar to `check_user_in_room_or_world_readable(...)` - - Args: - room_id: Room to check - room_membership_for_user_map: Room membership for the user at the time of - the `to_token` (<= `to_token`). - to_token: The token to fetch rooms up to. - - Returns: - The room membership for the user if they are allowed to subscribe to the - room else `None`. - """ - - # We can first check if they are already allowed to see the room based - # on our previous work to assemble the `room_membership_for_user_map`. - # - # If they have had any membership in the room over time (up to the `to_token`), - # let them subscribe and see what they can. - existing_membership_for_user = room_membership_for_user_map.get(room_id) - if existing_membership_for_user is not None: - return existing_membership_for_user - - # TODO: Handle `world_readable` rooms - return None - - # If the room is `world_readable`, it doesn't matter whether they can join, - # everyone can see the room. - # not_in_room_membership_for_user = _RoomMembershipForUser( - # room_id=room_id, - # event_id=None, - # event_pos=None, - # membership=None, - # sender=None, - # newly_joined=False, - # newly_left=False, - # is_dm=False, - # ) - # room_state = await self.get_current_state_at( - # room_id=room_id, - # room_membership_for_user_at_to_token=not_in_room_membership_for_user, - # state_filter=StateFilter.from_types( - # [(EventTypes.RoomHistoryVisibility, "")] - # ), - # to_token=to_token, - # ) - - # visibility_event = room_state.get((EventTypes.RoomHistoryVisibility, "")) - # if ( - # visibility_event is not None - # and visibility_event.content.get("history_visibility") - # == HistoryVisibility.WORLD_READABLE - # ): - # return not_in_room_membership_for_user - - # return None - - @trace - async def _bulk_get_stripped_state_for_rooms_from_sync_room_map( - self, - room_ids: StrCollection, - sync_room_map: Dict[str, _RoomMembershipForUser], - ) -> Dict[str, Optional[StateMap[StrippedStateEvent]]]: - """ - Fetch stripped state for a list of room IDs. Stripped state is only - applicable to invite/knock rooms. Other rooms will have `None` as their - stripped state. - - For invite rooms, we pull from `unsigned.invite_room_state`. - For knock rooms, we pull from `unsigned.knock_room_state`. - - Args: - room_ids: Room IDs to fetch stripped state for - sync_room_map: Dictionary of room IDs to sort along with membership - information in the room at the time of `to_token`. - - Returns: - Mapping from room_id to mapping of (type, state_key) to stripped state - event. - """ - room_id_to_stripped_state_map: Dict[ - str, Optional[StateMap[StrippedStateEvent]] - ] = {} - - # Fetch what we haven't before - room_ids_to_fetch = [ - room_id - for room_id in room_ids - if room_id not in room_id_to_stripped_state_map - ] - - # Gather a list of event IDs we can grab stripped state from - invite_or_knock_event_ids: List[str] = [] - for room_id in room_ids_to_fetch: - if sync_room_map[room_id].membership in ( - Membership.INVITE, - Membership.KNOCK, - ): - event_id = sync_room_map[room_id].event_id - # If this is an invite/knock then there should be an event_id - assert event_id is not None - invite_or_knock_event_ids.append(event_id) - else: - room_id_to_stripped_state_map[room_id] = None - - invite_or_knock_events = await self.store.get_events(invite_or_knock_event_ids) - for invite_or_knock_event in invite_or_knock_events.values(): - room_id = invite_or_knock_event.room_id - membership = invite_or_knock_event.membership - - raw_stripped_state_events = None - if membership == Membership.INVITE: - invite_room_state = invite_or_knock_event.unsigned.get( - "invite_room_state" - ) - raw_stripped_state_events = invite_room_state - elif membership == Membership.KNOCK: - knock_room_state = invite_or_knock_event.unsigned.get( - "knock_room_state" - ) - raw_stripped_state_events = knock_room_state - else: - raise AssertionError( - f"Unexpected membership {membership} (this is a problem with Synapse itself)" - ) - - stripped_state_map: Optional[MutableStateMap[StrippedStateEvent]] = None - # Scrutinize unsigned things. `raw_stripped_state_events` should be a list - # of stripped events - if raw_stripped_state_events is not None: - stripped_state_map = {} - if isinstance(raw_stripped_state_events, list): - for raw_stripped_event in raw_stripped_state_events: - stripped_state_event = parse_stripped_state_event( - raw_stripped_event - ) - if stripped_state_event is not None: - stripped_state_map[ - ( - stripped_state_event.type, - stripped_state_event.state_key, - ) - ] = stripped_state_event - - room_id_to_stripped_state_map[room_id] = stripped_state_map - - return room_id_to_stripped_state_map - - @trace - async def _bulk_get_partial_current_state_content_for_rooms( - self, - content_type: Literal[ - # `content.type` from `EventTypes.Create`` - "room_type", - # `content.algorithm` from `EventTypes.RoomEncryption` - "room_encryption", - ], - room_ids: Set[str], - sync_room_map: Dict[str, _RoomMembershipForUser], - to_token: StreamToken, - room_id_to_stripped_state_map: Dict[ - str, Optional[StateMap[StrippedStateEvent]] - ], - ) -> Mapping[str, Union[Optional[str], StateSentinel]]: - """ - Get the given state event content for a list of rooms. First we check the - current state of the room, then fallback to stripped state if available, then - historical state. - - Args: - content_type: Which content to grab - room_ids: Room IDs to fetch the given content field for. - sync_room_map: Dictionary of room IDs to sort along with membership - information in the room at the time of `to_token`. - to_token: We filter based on the state of the room at this token - room_id_to_stripped_state_map: This does not need to be filled in before - calling this function. Mapping from room_id to mapping of (type, state_key) - to stripped state event. Modified in place when we fetch new rooms so we can - save work next time this function is called. - - Returns: - A mapping from room ID to the state event content if the room has - the given state event (event_type, ""), otherwise `None`. Rooms unknown to - this server will return `ROOM_UNKNOWN_SENTINEL`. - """ - room_id_to_content: Dict[str, Union[Optional[str], StateSentinel]] = {} - - # As a bulk shortcut, use the current state if the server is particpating in the - # room (meaning we have current state). Ideally, for leave/ban rooms, we would - # want the state at the time of the membership instead of current state to not - # leak anything but we consider the create/encryption stripped state events to - # not be a secret given they are often set at the start of the room and they are - # normally handed out on invite/knock. - # - # Be mindful to only use this for non-sensitive details. For example, even - # though the room name/avatar/topic are also stripped state, they seem a lot - # more senstive to leak the current state value of. - # - # Since this function is cached, we need to make a mutable copy via - # `dict(...)`. - event_type = "" - event_content_field = "" - if content_type == "room_type": - event_type = EventTypes.Create - event_content_field = EventContentFields.ROOM_TYPE - room_id_to_content = dict(await self.store.bulk_get_room_type(room_ids)) - elif content_type == "room_encryption": - event_type = EventTypes.RoomEncryption - event_content_field = EventContentFields.ENCRYPTION_ALGORITHM - room_id_to_content = dict( - await self.store.bulk_get_room_encryption(room_ids) - ) - else: - assert_never(content_type) - - room_ids_with_results = [ - room_id - for room_id, content_field in room_id_to_content.items() - if content_field is not ROOM_UNKNOWN_SENTINEL - ] - - # We might not have current room state for remote invite/knocks if we are - # the first person on our server to see the room. The best we can do is look - # in the optional stripped state from the invite/knock event. - room_ids_without_results = room_ids.difference( - chain( - room_ids_with_results, - [ - room_id - for room_id, stripped_state_map in room_id_to_stripped_state_map.items() - if stripped_state_map is not None - ], - ) - ) - room_id_to_stripped_state_map.update( - await self._bulk_get_stripped_state_for_rooms_from_sync_room_map( - room_ids_without_results, sync_room_map - ) - ) - - # Update our `room_id_to_content` map based on the stripped state - # (applies to invite/knock rooms) - rooms_ids_without_stripped_state: Set[str] = set() - for room_id in room_ids_without_results: - stripped_state_map = room_id_to_stripped_state_map.get( - room_id, Sentinel.UNSET_SENTINEL - ) - assert stripped_state_map is not Sentinel.UNSET_SENTINEL, ( - f"Stripped state left unset for room {room_id}. " - + "Make sure you're calling `_bulk_get_stripped_state_for_rooms_from_sync_room_map(...)` " - + "with that room_id. (this is a problem with Synapse itself)" - ) - - # If there is some stripped state, we assume the remote server passed *all* - # of the potential stripped state events for the room. - if stripped_state_map is not None: - create_stripped_event = stripped_state_map.get((EventTypes.Create, "")) - stripped_event = stripped_state_map.get((event_type, "")) - # Sanity check that we at-least have the create event - if create_stripped_event is not None: - if stripped_event is not None: - room_id_to_content[room_id] = stripped_event.content.get( - event_content_field - ) - else: - # Didn't see the state event we're looking for in the stripped - # state so we can assume relevant content field is `None`. - room_id_to_content[room_id] = None - else: - rooms_ids_without_stripped_state.add(room_id) - - # Last resort, we might not have current room state for rooms that the - # server has left (no one local is in the room) but we can look at the - # historical state. - # - # Update our `room_id_to_content` map based on the state at the time of - # the membership event. - for room_id in rooms_ids_without_stripped_state: - # TODO: It would be nice to look this up in a bulk way (N+1 queries) - # - # TODO: `get_state_at(...)` doesn't take into account the "current state". - room_state = await self.storage_controllers.state.get_state_at( - room_id=room_id, - stream_position=to_token.copy_and_replace( - StreamKeyType.ROOM, - sync_room_map[room_id].event_pos.to_room_stream_token(), - ), - state_filter=StateFilter.from_types( - [ - (EventTypes.Create, ""), - (event_type, ""), - ] - ), - # Partially-stated rooms should have all state events except for - # remote membership events so we don't need to wait at all because - # we only want the create event and some non-member event. - await_full_state=False, - ) - # We can use the create event as a canary to tell whether the server has - # seen the room before - create_event = room_state.get((EventTypes.Create, "")) - state_event = room_state.get((event_type, "")) - - if create_event is None: - # Skip for unknown rooms - continue - - if state_event is not None: - room_id_to_content[room_id] = state_event.content.get( - event_content_field - ) - else: - # Didn't see the state event we're looking for in the stripped - # state so we can assume relevant content field is `None`. - room_id_to_content[room_id] = None - - return room_id_to_content - - @trace - async def filter_rooms( - self, - user: UserID, - sync_room_map: Dict[str, _RoomMembershipForUser], - filters: SlidingSyncConfig.SlidingSyncList.Filters, - to_token: StreamToken, - ) -> Dict[str, _RoomMembershipForUser]: - """ - Filter rooms based on the sync request. - - Args: - user: User to filter rooms for - sync_room_map: Dictionary of room IDs to sort along with membership - information in the room at the time of `to_token`. - filters: Filters to apply - to_token: We filter based on the state of the room at this token - - Returns: - A filtered dictionary of room IDs along with membership information in the - room at the time of `to_token`. - """ - room_id_to_stripped_state_map: Dict[ - str, Optional[StateMap[StrippedStateEvent]] - ] = {} - - filtered_room_id_set = set(sync_room_map.keys()) - - # Filter for Direct-Message (DM) rooms - if filters.is_dm is not None: - with start_active_span("filters.is_dm"): - if filters.is_dm: - # Only DM rooms please - filtered_room_id_set = { - room_id - for room_id in filtered_room_id_set - if sync_room_map[room_id].is_dm - } - else: - # Only non-DM rooms please - filtered_room_id_set = { - room_id - for room_id in filtered_room_id_set - if not sync_room_map[room_id].is_dm - } - - if filters.spaces is not None: - with start_active_span("filters.spaces"): - raise NotImplementedError() - - # Filter for encrypted rooms - if filters.is_encrypted is not None: - with start_active_span("filters.is_encrypted"): - room_id_to_encryption = ( - await self._bulk_get_partial_current_state_content_for_rooms( - content_type="room_encryption", - room_ids=filtered_room_id_set, - to_token=to_token, - sync_room_map=sync_room_map, - room_id_to_stripped_state_map=room_id_to_stripped_state_map, - ) - ) - - # Make a copy so we don't run into an error: `Set changed size during - # iteration`, when we filter out and remove items - for room_id in filtered_room_id_set.copy(): - encryption = room_id_to_encryption.get( - room_id, ROOM_UNKNOWN_SENTINEL - ) - - # Just remove rooms if we can't determine their encryption status - if encryption is ROOM_UNKNOWN_SENTINEL: - filtered_room_id_set.remove(room_id) - continue - - # If we're looking for encrypted rooms, filter out rooms that are not - # encrypted and vice versa - is_encrypted = encryption is not None - if (filters.is_encrypted and not is_encrypted) or ( - not filters.is_encrypted and is_encrypted - ): - filtered_room_id_set.remove(room_id) - - # Filter for rooms that the user has been invited to - if filters.is_invite is not None: - with start_active_span("filters.is_invite"): - # Make a copy so we don't run into an error: `Set changed size during - # iteration`, when we filter out and remove items - for room_id in filtered_room_id_set.copy(): - room_for_user = sync_room_map[room_id] - # If we're looking for invite rooms, filter out rooms that the user is - # not invited to and vice versa - if ( - filters.is_invite - and room_for_user.membership != Membership.INVITE - ) or ( - not filters.is_invite - and room_for_user.membership == Membership.INVITE - ): - filtered_room_id_set.remove(room_id) - - # Filter by room type (space vs room, etc). A room must match one of the types - # provided in the list. `None` is a valid type for rooms which do not have a - # room type. - if filters.room_types is not None or filters.not_room_types is not None: - with start_active_span("filters.room_types"): - room_id_to_type = ( - await self._bulk_get_partial_current_state_content_for_rooms( - content_type="room_type", - room_ids=filtered_room_id_set, - to_token=to_token, - sync_room_map=sync_room_map, - room_id_to_stripped_state_map=room_id_to_stripped_state_map, - ) - ) - - # Make a copy so we don't run into an error: `Set changed size during - # iteration`, when we filter out and remove items - for room_id in filtered_room_id_set.copy(): - room_type = room_id_to_type.get(room_id, ROOM_UNKNOWN_SENTINEL) - - # Just remove rooms if we can't determine their type - if room_type is ROOM_UNKNOWN_SENTINEL: - filtered_room_id_set.remove(room_id) - continue - - if ( - filters.room_types is not None - and room_type not in filters.room_types - ): - filtered_room_id_set.remove(room_id) - - if ( - filters.not_room_types is not None - and room_type in filters.not_room_types - ): - filtered_room_id_set.remove(room_id) - - if filters.room_name_like is not None: - with start_active_span("filters.room_name_like"): - # TODO: The room name is a bit more sensitive to leak than the - # create/encryption event. Maybe we should consider a better way to fetch - # historical state before implementing this. - # - # room_id_to_create_content = await self._bulk_get_partial_current_state_content_for_rooms( - # content_type="room_name", - # room_ids=filtered_room_id_set, - # to_token=to_token, - # sync_room_map=sync_room_map, - # room_id_to_stripped_state_map=room_id_to_stripped_state_map, - # ) - raise NotImplementedError() - - if filters.tags is not None or filters.not_tags is not None: - with start_active_span("filters.tags"): - raise NotImplementedError() - - # Assemble a new sync room map but only with the `filtered_room_id_set` - return {room_id: sync_room_map[room_id] for room_id in filtered_room_id_set} - - @trace - async def sort_rooms( - self, - sync_room_map: Dict[str, _RoomMembershipForUser], - to_token: StreamToken, - ) -> List[_RoomMembershipForUser]: - """ - Sort by `stream_ordering` of the last event that the user should see in the - room. `stream_ordering` is unique so we get a stable sort. - - Args: - sync_room_map: Dictionary of room IDs to sort along with membership - information in the room at the time of `to_token`. - to_token: We sort based on the events in the room at this token (<= `to_token`) - - Returns: - A sorted list of room IDs by `stream_ordering` along with membership information. - """ - - # Assemble a map of room ID to the `stream_ordering` of the last activity that the - # user should see in the room (<= `to_token`) - last_activity_in_room_map: Dict[str, int] = {} - - for room_id, room_for_user in sync_room_map.items(): - if room_for_user.membership != Membership.JOIN: - # If the user has left/been invited/knocked/been banned from a - # room, they shouldn't see anything past that point. - # - # FIXME: It's possible that people should see beyond this point - # in invited/knocked cases if for example the room has - # `invite`/`world_readable` history visibility, see - # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932 - last_activity_in_room_map[room_id] = room_for_user.event_pos.stream - - # For fully-joined rooms, we find the latest activity at/before the - # `to_token`. - joined_room_positions = ( - await self.store.bulk_get_last_event_pos_in_room_before_stream_ordering( - [ - room_id - for room_id, room_for_user in sync_room_map.items() - if room_for_user.membership == Membership.JOIN - ], - to_token.room_key, - ) - ) - - last_activity_in_room_map.update(joined_room_positions) - - return sorted( - sync_room_map.values(), - # Sort by the last activity (stream_ordering) in the room - key=lambda room_info: last_activity_in_room_map[room_info.room_id], - # We want descending order - reverse=True, - ) - - @trace - async def get_current_state_ids_at( - self, - room_id: str, - room_membership_for_user_at_to_token: _RoomMembershipForUser, - state_filter: StateFilter, - to_token: StreamToken, - ) -> StateMap[str]: - """ - Get current state IDs for the user in the room according to their membership. This - will be the current state at the time of their LEAVE/BAN, otherwise will be the - current state <= to_token. - - Args: - room_id: The room ID to fetch data for - room_membership_for_user_at_token: Membership information for the user - in the room at the time of `to_token`. - to_token: The point in the stream to sync up to. - """ - state_ids: StateMap[str] - # People shouldn't see past their leave/ban event - if room_membership_for_user_at_to_token.membership in ( - Membership.LEAVE, - Membership.BAN, - ): - # TODO: `get_state_ids_at(...)` doesn't take into account the "current - # state". Maybe we need to use - # `get_forward_extremities_for_room_at_stream_ordering(...)` to "Fetch the - # current state at the time." - state_ids = await self.storage_controllers.state.get_state_ids_at( - room_id, - stream_position=to_token.copy_and_replace( - StreamKeyType.ROOM, - room_membership_for_user_at_to_token.event_pos.to_room_stream_token(), - ), - state_filter=state_filter, - # Partially-stated rooms should have all state events except for - # remote membership events. Since we've already excluded - # partially-stated rooms unless `required_state` only has - # `["m.room.member", "$LAZY"]` for membership, we should be able to - # retrieve everything requested. When we're lazy-loading, if there - # are some remote senders in the timeline, we should also have their - # membership event because we had to auth that timeline event. Plus - # we don't want to block the whole sync waiting for this one room. - await_full_state=False, - ) - # Otherwise, we can get the latest current state in the room - else: - state_ids = await self.storage_controllers.state.get_current_state_ids( - room_id, - state_filter, - # Partially-stated rooms should have all state events except for - # remote membership events. Since we've already excluded - # partially-stated rooms unless `required_state` only has - # `["m.room.member", "$LAZY"]` for membership, we should be able to - # retrieve everything requested. When we're lazy-loading, if there - # are some remote senders in the timeline, we should also have their - # membership event because we had to auth that timeline event. Plus - # we don't want to block the whole sync waiting for this one room. - await_full_state=False, - ) - # TODO: Query `current_state_delta_stream` and reverse/rewind back to the `to_token` - - return state_ids - - @trace - async def get_current_state_at( - self, - room_id: str, - room_membership_for_user_at_to_token: _RoomMembershipForUser, - state_filter: StateFilter, - to_token: StreamToken, - ) -> StateMap[EventBase]: - """ - Get current state for the user in the room according to their membership. This - will be the current state at the time of their LEAVE/BAN, otherwise will be the - current state <= to_token. - - Args: - room_id: The room ID to fetch data for - room_membership_for_user_at_token: Membership information for the user - in the room at the time of `to_token`. - to_token: The point in the stream to sync up to. - """ - state_ids = await self.get_current_state_ids_at( - room_id=room_id, - room_membership_for_user_at_to_token=room_membership_for_user_at_to_token, - state_filter=state_filter, - to_token=to_token, - ) - - event_map = await self.store.get_events(list(state_ids.values())) - - state_map = {} - for key, event_id in state_ids.items(): - event = event_map.get(event_id) - if event: - state_map[key] = event - - return state_map - - async def get_room_sync_data( - self, - sync_config: SlidingSyncConfig, - room_id: str, - room_sync_config: RoomSyncConfig, - room_membership_for_user_at_to_token: _RoomMembershipForUser, - from_token: Optional[SlidingSyncStreamToken], - to_token: StreamToken, - ) -> SlidingSyncResult.RoomResult: - """ - Fetch room data for the sync response. - - We fetch data according to the token range (> `from_token` and <= `to_token`). - - Args: - user: User to fetch data for - room_id: The room ID to fetch data for - room_sync_config: Config for what data we should fetch for a room in the - sync response. - room_membership_for_user_at_to_token: Membership information for the user - in the room at the time of `to_token`. - from_token: The point in the stream to sync from. - to_token: The point in the stream to sync up to. - """ - user = sync_config.user - - set_tag( - SynapseTags.FUNC_ARG_PREFIX + "membership", - room_membership_for_user_at_to_token.membership, - ) - set_tag( - SynapseTags.FUNC_ARG_PREFIX + "timeline_limit", - room_sync_config.timeline_limit, - ) - - # Determine whether we should limit the timeline to the token range. - # - # We should return historical messages (before token range) in the - # following cases because we want clients to be able to show a basic - # screen of information: - # - # - Initial sync (because no `from_token` to limit us anyway) - # - When users `newly_joined` - # - For an incremental sync where we haven't sent it down this - # connection before - # - # Relevant spec issue: https://github.com/matrix-org/matrix-spec/issues/1917 - from_bound = None - initial = True - if from_token and not room_membership_for_user_at_to_token.newly_joined: - room_status = await self.connection_store.have_sent_room( - sync_config=sync_config, - connection_token=from_token.connection_position, - room_id=room_id, - ) - if room_status.status == HaveSentRoomFlag.LIVE: - from_bound = from_token.stream_token.room_key - initial = False - elif room_status.status == HaveSentRoomFlag.PREVIOUSLY: - assert room_status.last_token is not None - from_bound = room_status.last_token - initial = False - elif room_status.status == HaveSentRoomFlag.NEVER: - from_bound = None - initial = True - else: - assert_never(room_status.status) - - log_kv({"sliding_sync.room_status": room_status}) - - log_kv({"sliding_sync.from_bound": from_bound, "sliding_sync.initial": initial}) - - # Assemble the list of timeline events - # - # FIXME: It would be nice to make the `rooms` response more uniform regardless of - # membership. Currently, we have to make all of these optional because - # `invite`/`knock` rooms only have `stripped_state`. See - # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932 - timeline_events: List[EventBase] = [] - bundled_aggregations: Optional[Dict[str, BundledAggregations]] = None - limited: Optional[bool] = None - prev_batch_token: Optional[StreamToken] = None - num_live: Optional[int] = None - if ( - room_sync_config.timeline_limit > 0 - # No timeline for invite/knock rooms (just `stripped_state`) - and room_membership_for_user_at_to_token.membership - not in (Membership.INVITE, Membership.KNOCK) - ): - limited = False - # We want to start off using the `to_token` (vs `from_token`) because we look - # backwards from the `to_token` up to the `timeline_limit` and we might not - # reach the `from_token` before we hit the limit. We will update the room stream - # position once we've fetched the events to point to the earliest event fetched. - prev_batch_token = to_token - - # We're going to paginate backwards from the `to_token` - to_bound = to_token.room_key - # People shouldn't see past their leave/ban event - if room_membership_for_user_at_to_token.membership in ( - Membership.LEAVE, - Membership.BAN, - ): - to_bound = ( - room_membership_for_user_at_to_token.event_pos.to_room_stream_token() - ) - - # For initial `/sync` (and other historical scenarios mentioned above), we - # want to view a historical section of the timeline; to fetch events by - # `topological_ordering` (best representation of the room DAG as others were - # seeing it at the time). This also aligns with the order that `/messages` - # returns events in. - # - # For incremental `/sync`, we want to get all updates for rooms since - # the last `/sync` (regardless if those updates arrived late or happened - # a while ago in the past); to fetch events by `stream_ordering` (in the - # order they were received by the server). - # - # Relevant spec issue: https://github.com/matrix-org/matrix-spec/issues/1917 - # - # FIXME: Using workaround for mypy, - # https://github.com/python/mypy/issues/10740#issuecomment-1997047277 and - # https://github.com/python/mypy/issues/17479 - paginate_room_events_by_topological_ordering: PaginateFunction = ( - self.store.paginate_room_events_by_topological_ordering - ) - paginate_room_events_by_stream_ordering: PaginateFunction = ( - self.store.paginate_room_events_by_stream_ordering - ) - pagination_method: PaginateFunction = ( - # Use `topographical_ordering` for historical events - paginate_room_events_by_topological_ordering - if from_bound is None - # Use `stream_ordering` for updates - else paginate_room_events_by_stream_ordering - ) - timeline_events, new_room_key = await pagination_method( - room_id=room_id, - # The bounds are reversed so we can paginate backwards - # (from newer to older events) starting at to_bound. - # This ensures we fill the `limit` with the newest events first, - from_key=to_bound, - to_key=from_bound, - direction=Direction.BACKWARDS, - # We add one so we can determine if there are enough events to saturate - # the limit or not (see `limited`) - limit=room_sync_config.timeline_limit + 1, - ) - - # We want to return the events in ascending order (the last event is the - # most recent). - timeline_events.reverse() - - # Determine our `limited` status based on the timeline. We do this before - # filtering the events so we can accurately determine if there is more to - # paginate even if we filter out some/all events. - if len(timeline_events) > room_sync_config.timeline_limit: - limited = True - # Get rid of that extra "+ 1" event because we only used it to determine - # if we hit the limit or not - timeline_events = timeline_events[-room_sync_config.timeline_limit :] - assert timeline_events[0].internal_metadata.stream_ordering - new_room_key = RoomStreamToken( - stream=timeline_events[0].internal_metadata.stream_ordering - 1 - ) - - # Make sure we don't expose any events that the client shouldn't see - timeline_events = await filter_events_for_client( - self.storage_controllers, - user.to_string(), - timeline_events, - is_peeking=room_membership_for_user_at_to_token.membership - != Membership.JOIN, - filter_send_to_client=True, - ) - # TODO: Filter out `EventTypes.CallInvite` in public rooms, - # see https://github.com/element-hq/synapse/issues/17359 - - # TODO: Handle timeline gaps (`get_timeline_gaps()`) - - # Determine how many "live" events we have (events within the given token range). - # - # This is mostly useful to determine whether a given @mention event should - # make a noise or not. Clients cannot rely solely on the absence of - # `initial: true` to determine live events because if a room not in the - # sliding window bumps into the window because of an @mention it will have - # `initial: true` yet contain a single live event (with potentially other - # old events in the timeline) - num_live = 0 - if from_token is not None: - for timeline_event in reversed(timeline_events): - # This fields should be present for all persisted events - assert timeline_event.internal_metadata.stream_ordering is not None - assert timeline_event.internal_metadata.instance_name is not None - - persisted_position = PersistedEventPosition( - instance_name=timeline_event.internal_metadata.instance_name, - stream=timeline_event.internal_metadata.stream_ordering, - ) - if persisted_position.persisted_after( - from_token.stream_token.room_key - ): - num_live += 1 - else: - # Since we're iterating over the timeline events in - # reverse-chronological order, we can break once we hit an event - # that's not live. In the future, we could potentially optimize - # this more with a binary search (bisect). - break - - # If the timeline is `limited=True`, the client does not have all events - # necessary to calculate aggregations themselves. - if limited: - bundled_aggregations = ( - await self.relations_handler.get_bundled_aggregations( - timeline_events, user.to_string() - ) - ) - - # Update the `prev_batch_token` to point to the position that allows us to - # keep paginating backwards from the oldest event we return in the timeline. - prev_batch_token = prev_batch_token.copy_and_replace( - StreamKeyType.ROOM, new_room_key - ) - - # Figure out any stripped state events for invite/knocks. This allows the - # potential joiner to identify the room. - stripped_state: List[JsonDict] = [] - if room_membership_for_user_at_to_token.membership in ( - Membership.INVITE, - Membership.KNOCK, - ): - # This should never happen. If someone is invited/knocked on room, then - # there should be an event for it. - assert room_membership_for_user_at_to_token.event_id is not None - - invite_or_knock_event = await self.store.get_event( - room_membership_for_user_at_to_token.event_id - ) - - stripped_state = [] - if invite_or_knock_event.membership == Membership.INVITE: - stripped_state.extend( - invite_or_knock_event.unsigned.get("invite_room_state", []) - ) - elif invite_or_knock_event.membership == Membership.KNOCK: - stripped_state.extend( - invite_or_knock_event.unsigned.get("knock_room_state", []) - ) - - stripped_state.append(strip_event(invite_or_knock_event)) - - # TODO: Handle state resets. For example, if we see - # `room_membership_for_user_at_to_token.event_id=None and - # room_membership_for_user_at_to_token.membership is not None`, we should - # indicate to the client that a state reset happened. Perhaps we should indicate - # this by setting `initial: True` and empty `required_state`. - - # Check whether the room has a name set - name_state_ids = await self.get_current_state_ids_at( - room_id=room_id, - room_membership_for_user_at_to_token=room_membership_for_user_at_to_token, - state_filter=StateFilter.from_types([(EventTypes.Name, "")]), - to_token=to_token, - ) - name_event_id = name_state_ids.get((EventTypes.Name, "")) - - room_membership_summary: Mapping[str, MemberSummary] - empty_membership_summary = MemberSummary([], 0) - if room_membership_for_user_at_to_token.membership in ( - Membership.LEAVE, - Membership.BAN, - ): - # TODO: Figure out how to get the membership summary for left/banned rooms - room_membership_summary = {} - else: - room_membership_summary = await self.store.get_room_summary(room_id) - # TODO: Reverse/rewind back to the `to_token` - - # `heroes` are required if the room name is not set. - # - # Note: When you're the first one on your server to be invited to a new room - # over federation, we only have access to some stripped state in - # `event.unsigned.invite_room_state` which currently doesn't include `heroes`, - # see https://github.com/matrix-org/matrix-spec/issues/380. This means that - # clients won't be able to calculate the room name when necessary and just a - # pitfall we have to deal with until that spec issue is resolved. - hero_user_ids: List[str] = [] - # TODO: Should we also check for `EventTypes.CanonicalAlias` - # (`m.room.canonical_alias`) as a fallback for the room name? see - # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1671260153 - if name_event_id is None: - hero_user_ids = extract_heroes_from_room_summary( - room_membership_summary, me=user.to_string() - ) - - # Fetch the `required_state` for the room - # - # No `required_state` for invite/knock rooms (just `stripped_state`) - # - # FIXME: It would be nice to make the `rooms` response more uniform regardless - # of membership. Currently, we have to make this optional because - # `invite`/`knock` rooms only have `stripped_state`. See - # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932 - # - # Calculate the `StateFilter` based on the `required_state` for the room - required_state_filter = StateFilter.none() - if room_membership_for_user_at_to_token.membership not in ( - Membership.INVITE, - Membership.KNOCK, - ): - # If we have a double wildcard ("*", "*") in the `required_state`, we need - # to fetch all state for the room - # - # Note: MSC3575 describes different behavior to how we're handling things - # here but since it's not wrong to return more state than requested - # (`required_state` is just the minimum requested), it doesn't matter if we - # include more than client wanted. This complexity is also under scrutiny, - # see - # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1185109050 - # - # > One unique exception is when you request all state events via ["*", "*"]. When used, - # > all state events are returned by default, and additional entries FILTER OUT the returned set - # > of state events. These additional entries cannot use '*' themselves. - # > For example, ["*", "*"], ["m.room.member", "@alice:example.com"] will _exclude_ every m.room.member - # > event _except_ for @alice:example.com, and include every other state event. - # > In addition, ["*", "*"], ["m.space.child", "*"] is an error, the m.space.child filter is not - # > required as it would have been returned anyway. - # > - # > -- MSC3575 (https://github.com/matrix-org/matrix-spec-proposals/pull/3575) - if StateValues.WILDCARD in room_sync_config.required_state_map.get( - StateValues.WILDCARD, set() - ): - set_tag( - SynapseTags.FUNC_ARG_PREFIX + "required_state_wildcard", - True, - ) - required_state_filter = StateFilter.all() - # TODO: `StateFilter` currently doesn't support wildcard event types. We're - # currently working around this by returning all state to the client but it - # would be nice to fetch less from the database and return just what the - # client wanted. - elif ( - room_sync_config.required_state_map.get(StateValues.WILDCARD) - is not None - ): - set_tag( - SynapseTags.FUNC_ARG_PREFIX + "required_state_wildcard_event_type", - True, - ) - required_state_filter = StateFilter.all() - else: - required_state_types: List[Tuple[str, Optional[str]]] = [] - for ( - state_type, - state_key_set, - ) in room_sync_config.required_state_map.items(): - num_wild_state_keys = 0 - lazy_load_room_members = False - num_others = 0 - for state_key in state_key_set: - if state_key == StateValues.WILDCARD: - num_wild_state_keys += 1 - # `None` is a wildcard in the `StateFilter` - required_state_types.append((state_type, None)) - # We need to fetch all relevant people when we're lazy-loading membership - elif ( - state_type == EventTypes.Member - and state_key == StateValues.LAZY - ): - lazy_load_room_members = True - # Everyone in the timeline is relevant - timeline_membership: Set[str] = set() - if timeline_events is not None: - for timeline_event in timeline_events: - timeline_membership.add(timeline_event.sender) - - for user_id in timeline_membership: - required_state_types.append( - (EventTypes.Member, user_id) - ) - - # FIXME: We probably also care about invite, ban, kick, targets, etc - # but the spec only mentions "senders". - elif state_key == StateValues.ME: - num_others += 1 - required_state_types.append((state_type, user.to_string())) - else: - num_others += 1 - required_state_types.append((state_type, state_key)) - - set_tag( - SynapseTags.FUNC_ARG_PREFIX - + "required_state_wildcard_state_key_count", - num_wild_state_keys, - ) - set_tag( - SynapseTags.FUNC_ARG_PREFIX + "required_state_lazy", - lazy_load_room_members, - ) - set_tag( - SynapseTags.FUNC_ARG_PREFIX + "required_state_other_count", - num_others, - ) - - required_state_filter = StateFilter.from_types(required_state_types) - - # We need this base set of info for the response so let's just fetch it along - # with the `required_state` for the room - meta_room_state = [(EventTypes.Name, ""), (EventTypes.RoomAvatar, "")] + [ - (EventTypes.Member, hero_user_id) for hero_user_id in hero_user_ids - ] - state_filter = StateFilter.all() - if required_state_filter != StateFilter.all(): - state_filter = StateFilter( - types=StateFilter.from_types( - chain(meta_room_state, required_state_filter.to_types()) - ).types, - include_others=required_state_filter.include_others, - ) - - # We can return all of the state that was requested if this was the first - # time we've sent the room down this connection. - room_state: StateMap[EventBase] = {} - if initial: - room_state = await self.get_current_state_at( - room_id=room_id, - room_membership_for_user_at_to_token=room_membership_for_user_at_to_token, - state_filter=state_filter, - to_token=to_token, - ) - else: - assert from_bound is not None - - # TODO: Limit the number of state events we're about to send down - # the room, if its too many we should change this to an - # `initial=True`? - deltas = await self.store.get_current_state_deltas_for_room( - room_id=room_id, - from_token=from_bound, - to_token=to_token.room_key, - ) - # TODO: Filter room state before fetching events - # TODO: Handle state resets where event_id is None - events = await self.store.get_events( - [d.event_id for d in deltas if d.event_id] - ) - room_state = {(s.type, s.state_key): s for s in events.values()} - - required_room_state: StateMap[EventBase] = {} - if required_state_filter != StateFilter.none(): - required_room_state = required_state_filter.filter_state(room_state) - - # Find the room name and avatar from the state - room_name: Optional[str] = None - # TODO: Should we also check for `EventTypes.CanonicalAlias` - # (`m.room.canonical_alias`) as a fallback for the room name? see - # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1671260153 - name_event = room_state.get((EventTypes.Name, "")) - if name_event is not None: - room_name = name_event.content.get("name") - - room_avatar: Optional[str] = None - avatar_event = room_state.get((EventTypes.RoomAvatar, "")) - if avatar_event is not None: - room_avatar = avatar_event.content.get("url") - - # Assemble heroes: extract the info from the state we just fetched - heroes: List[SlidingSyncResult.RoomResult.StrippedHero] = [] - for hero_user_id in hero_user_ids: - member_event = room_state.get((EventTypes.Member, hero_user_id)) - if member_event is not None: - heroes.append( - SlidingSyncResult.RoomResult.StrippedHero( - user_id=hero_user_id, - display_name=member_event.content.get("displayname"), - avatar_url=member_event.content.get("avatar_url"), - ) - ) - - # Figure out the last bump event in the room - last_bump_event_result = ( - await self.store.get_last_event_pos_in_room_before_stream_ordering( - room_id, to_token.room_key, event_types=DEFAULT_BUMP_EVENT_TYPES - ) - ) - - # By default, just choose the membership event position - bump_stamp = room_membership_for_user_at_to_token.event_pos.stream - # But if we found a bump event, use that instead - if last_bump_event_result is not None: - _, new_bump_event_pos = last_bump_event_result - - # If we've just joined a remote room, then the last bump event may - # have been backfilled (and so have a negative stream ordering). - # These negative stream orderings can't sensibly be compared, so - # instead we use the membership event position. - if new_bump_event_pos.stream > 0: - bump_stamp = new_bump_event_pos.stream - - set_tag(SynapseTags.RESULT_PREFIX + "initial", initial) - - return SlidingSyncResult.RoomResult( - name=room_name, - avatar=room_avatar, - heroes=heroes, - is_dm=room_membership_for_user_at_to_token.is_dm, - initial=initial, - required_state=list(required_room_state.values()), - timeline_events=timeline_events, - bundled_aggregations=bundled_aggregations, - stripped_state=stripped_state, - prev_batch=prev_batch_token, - limited=limited, - num_live=num_live, - bump_stamp=bump_stamp, - joined_count=room_membership_summary.get( - Membership.JOIN, empty_membership_summary - ).count, - invited_count=room_membership_summary.get( - Membership.INVITE, empty_membership_summary - ).count, - # TODO: These are just dummy values. We could potentially just remove these - # since notifications can only really be done correctly on the client anyway - # (encrypted rooms). - notification_count=0, - highlight_count=0, - ) - - @trace - async def get_extensions_response( - self, - sync_config: SlidingSyncConfig, - actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList], - actual_room_ids: Set[str], - actual_room_response_map: Dict[str, SlidingSyncResult.RoomResult], - to_token: StreamToken, - from_token: Optional[SlidingSyncStreamToken], - ) -> SlidingSyncResult.Extensions: - """Handle extension requests. - - Args: - sync_config: Sync configuration - actual_lists: Sliding window API. A map of list key to list results in the - Sliding Sync response. - actual_room_ids: The actual room IDs in the the Sliding Sync response. - actual_room_response_map: A map of room ID to room results in the the - Sliding Sync response. - to_token: The point in the stream to sync up to. - from_token: The point in the stream to sync from. - """ - - if sync_config.extensions is None: - return SlidingSyncResult.Extensions() - - to_device_response = None - if sync_config.extensions.to_device is not None: - to_device_response = await self.get_to_device_extension_response( - sync_config=sync_config, - to_device_request=sync_config.extensions.to_device, - to_token=to_token, - ) - - e2ee_response = None - if sync_config.extensions.e2ee is not None: - e2ee_response = await self.get_e2ee_extension_response( - sync_config=sync_config, - e2ee_request=sync_config.extensions.e2ee, - to_token=to_token, - from_token=from_token, - ) - - account_data_response = None - if sync_config.extensions.account_data is not None: - account_data_response = await self.get_account_data_extension_response( - sync_config=sync_config, - actual_lists=actual_lists, - actual_room_ids=actual_room_ids, - account_data_request=sync_config.extensions.account_data, - to_token=to_token, - from_token=from_token, - ) - - receipts_response = None - if sync_config.extensions.receipts is not None: - receipts_response = await self.get_receipts_extension_response( - sync_config=sync_config, - actual_lists=actual_lists, - actual_room_ids=actual_room_ids, - actual_room_response_map=actual_room_response_map, - receipts_request=sync_config.extensions.receipts, - to_token=to_token, - from_token=from_token, - ) - - typing_response = None - if sync_config.extensions.typing is not None: - typing_response = await self.get_typing_extension_response( - sync_config=sync_config, - actual_lists=actual_lists, - actual_room_ids=actual_room_ids, - actual_room_response_map=actual_room_response_map, - typing_request=sync_config.extensions.typing, - to_token=to_token, - from_token=from_token, - ) - - return SlidingSyncResult.Extensions( - to_device=to_device_response, - e2ee=e2ee_response, - account_data=account_data_response, - receipts=receipts_response, - typing=typing_response, - ) - - def find_relevant_room_ids_for_extension( - self, - requested_lists: Optional[List[str]], - requested_room_ids: Optional[List[str]], - actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList], - actual_room_ids: Set[str], - ) -> Set[str]: - """ - Handle the reserved `lists`/`rooms` keys for extensions. Extensions should only - return results for rooms in the Sliding Sync response. This matches up the - requested rooms/lists with the actual lists/rooms in the Sliding Sync response. - - {"lists": []} // Do not process any lists. - {"lists": ["rooms", "dms"]} // Process only a subset of lists. - {"lists": ["*"]} // Process all lists defined in the Sliding Window API. (This is the default.) - - {"rooms": []} // Do not process any specific rooms. - {"rooms": ["!a:b", "!c:d"]} // Process only a subset of room subscriptions. - {"rooms": ["*"]} // Process all room subscriptions defined in the Room Subscription API. (This is the default.) - - Args: - requested_lists: The `lists` from the extension request. - requested_room_ids: The `rooms` from the extension request. - actual_lists: The actual lists from the Sliding Sync response. - actual_room_ids: The actual room subscriptions from the Sliding Sync request. - """ - - # We only want to include account data for rooms that are already in the sliding - # sync response AND that were requested in the account data request. - relevant_room_ids: Set[str] = set() - - # See what rooms from the room subscriptions we should get account data for - if requested_room_ids is not None: - for room_id in requested_room_ids: - # A wildcard means we process all rooms from the room subscriptions - if room_id == "*": - relevant_room_ids.update(actual_room_ids) - break - - if room_id in actual_room_ids: - relevant_room_ids.add(room_id) - - # See what rooms from the sliding window lists we should get account data for - if requested_lists is not None: - for list_key in requested_lists: - # Just some typing because we share the variable name in multiple places - actual_list: Optional[SlidingSyncResult.SlidingWindowList] = None - - # A wildcard means we process rooms from all lists - if list_key == "*": - for actual_list in actual_lists.values(): - # We only expect a single SYNC operation for any list - assert len(actual_list.ops) == 1 - sync_op = actual_list.ops[0] - assert sync_op.op == OperationType.SYNC - - relevant_room_ids.update(sync_op.room_ids) - - break - - actual_list = actual_lists.get(list_key) - if actual_list is not None: - # We only expect a single SYNC operation for any list - assert len(actual_list.ops) == 1 - sync_op = actual_list.ops[0] - assert sync_op.op == OperationType.SYNC - - relevant_room_ids.update(sync_op.room_ids) - - return relevant_room_ids - - @trace - async def get_to_device_extension_response( - self, - sync_config: SlidingSyncConfig, - to_device_request: SlidingSyncConfig.Extensions.ToDeviceExtension, - to_token: StreamToken, - ) -> Optional[SlidingSyncResult.Extensions.ToDeviceExtension]: - """Handle to-device extension (MSC3885) - - Args: - sync_config: Sync configuration - to_device_request: The to-device extension from the request - to_token: The point in the stream to sync up to. - """ - user_id = sync_config.user.to_string() - device_id = sync_config.requester.device_id - - # Skip if the extension is not enabled - if not to_device_request.enabled: - return None - - # Check that this request has a valid device ID (not all requests have - # to belong to a device, and so device_id is None) - if device_id is None: - return SlidingSyncResult.Extensions.ToDeviceExtension( - next_batch=f"{to_token.to_device_key}", - events=[], - ) - - since_stream_id = 0 - if to_device_request.since is not None: - # We've already validated this is an int. - since_stream_id = int(to_device_request.since) - - if to_token.to_device_key < since_stream_id: - # The since token is ahead of our current token, so we return an - # empty response. - logger.warning( - "Got to-device.since from the future. since token: %r is ahead of our current to_device stream position: %r", - since_stream_id, - to_token.to_device_key, - ) - return SlidingSyncResult.Extensions.ToDeviceExtension( - next_batch=to_device_request.since, - events=[], - ) - - # Delete everything before the given since token, as we know the - # device must have received them. - deleted = await self.store.delete_messages_for_device( - user_id=user_id, - device_id=device_id, - up_to_stream_id=since_stream_id, - ) - - logger.debug( - "Deleted %d to-device messages up to %d for %s", - deleted, - since_stream_id, - user_id, - ) - - messages, stream_id = await self.store.get_messages_for_device( - user_id=user_id, - device_id=device_id, - from_stream_id=since_stream_id, - to_stream_id=to_token.to_device_key, - limit=min(to_device_request.limit, 100), # Limit to at most 100 events - ) - - return SlidingSyncResult.Extensions.ToDeviceExtension( - next_batch=f"{stream_id}", - events=messages, - ) - - @trace - async def get_e2ee_extension_response( - self, - sync_config: SlidingSyncConfig, - e2ee_request: SlidingSyncConfig.Extensions.E2eeExtension, - to_token: StreamToken, - from_token: Optional[SlidingSyncStreamToken], - ) -> Optional[SlidingSyncResult.Extensions.E2eeExtension]: - """Handle E2EE device extension (MSC3884) - - Args: - sync_config: Sync configuration - e2ee_request: The e2ee extension from the request - to_token: The point in the stream to sync up to. - from_token: The point in the stream to sync from. - """ - user_id = sync_config.user.to_string() - device_id = sync_config.requester.device_id - - # Skip if the extension is not enabled - if not e2ee_request.enabled: - return None - - device_list_updates: Optional[DeviceListUpdates] = None - if from_token is not None: - # TODO: This should take into account the `from_token` and `to_token` - device_list_updates = await self.device_handler.get_user_ids_changed( - user_id=user_id, - from_token=from_token.stream_token, - ) - - device_one_time_keys_count: Mapping[str, int] = {} - device_unused_fallback_key_types: Sequence[str] = [] - if device_id: - # TODO: We should have a way to let clients differentiate between the states of: - # * no change in OTK count since the provided since token - # * the server has zero OTKs left for this device - # Spec issue: https://github.com/matrix-org/matrix-doc/issues/3298 - device_one_time_keys_count = await self.store.count_e2e_one_time_keys( - user_id, device_id - ) - device_unused_fallback_key_types = ( - await self.store.get_e2e_unused_fallback_key_types(user_id, device_id) - ) - - return SlidingSyncResult.Extensions.E2eeExtension( - device_list_updates=device_list_updates, - device_one_time_keys_count=device_one_time_keys_count, - device_unused_fallback_key_types=device_unused_fallback_key_types, - ) - - @trace - async def get_account_data_extension_response( - self, - sync_config: SlidingSyncConfig, - actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList], - actual_room_ids: Set[str], - account_data_request: SlidingSyncConfig.Extensions.AccountDataExtension, - to_token: StreamToken, - from_token: Optional[SlidingSyncStreamToken], - ) -> Optional[SlidingSyncResult.Extensions.AccountDataExtension]: - """Handle Account Data extension (MSC3959) - - Args: - sync_config: Sync configuration - actual_lists: Sliding window API. A map of list key to list results in the - Sliding Sync response. - actual_room_ids: The actual room IDs in the the Sliding Sync response. - account_data_request: The account_data extension from the request - to_token: The point in the stream to sync up to. - from_token: The point in the stream to sync from. - """ - user_id = sync_config.user.to_string() - - # Skip if the extension is not enabled - if not account_data_request.enabled: - return None - - global_account_data_map: Mapping[str, JsonMapping] = {} - if from_token is not None: - # TODO: This should take into account the `from_token` and `to_token` - global_account_data_map = ( - await self.store.get_updated_global_account_data_for_user( - user_id, from_token.stream_token.account_data_key - ) - ) - - have_push_rules_changed = await self.store.have_push_rules_changed_for_user( - user_id, from_token.stream_token.push_rules_key - ) - if have_push_rules_changed: - global_account_data_map = dict(global_account_data_map) - # TODO: This should take into account the `from_token` and `to_token` - global_account_data_map[AccountDataTypes.PUSH_RULES] = ( - await self.push_rules_handler.push_rules_for_user(sync_config.user) - ) - else: - # TODO: This should take into account the `to_token` - all_global_account_data = await self.store.get_global_account_data_for_user( - user_id - ) - - global_account_data_map = dict(all_global_account_data) - # TODO: This should take into account the `to_token` - global_account_data_map[AccountDataTypes.PUSH_RULES] = ( - await self.push_rules_handler.push_rules_for_user(sync_config.user) - ) - - # Fetch room account data - account_data_by_room_map: Mapping[str, Mapping[str, JsonMapping]] = {} - relevant_room_ids = self.find_relevant_room_ids_for_extension( - requested_lists=account_data_request.lists, - requested_room_ids=account_data_request.rooms, - actual_lists=actual_lists, - actual_room_ids=actual_room_ids, - ) - if len(relevant_room_ids) > 0: - if from_token is not None: - # TODO: This should take into account the `from_token` and `to_token` - account_data_by_room_map = ( - await self.store.get_updated_room_account_data_for_user( - user_id, from_token.stream_token.account_data_key - ) - ) - else: - # TODO: This should take into account the `to_token` - account_data_by_room_map = ( - await self.store.get_room_account_data_for_user(user_id) - ) - - # Filter down to the relevant rooms - account_data_by_room_map = { - room_id: account_data_map - for room_id, account_data_map in account_data_by_room_map.items() - if room_id in relevant_room_ids - } - - return SlidingSyncResult.Extensions.AccountDataExtension( - global_account_data_map=global_account_data_map, - account_data_by_room_map=account_data_by_room_map, - ) - - async def get_receipts_extension_response( - self, - sync_config: SlidingSyncConfig, - actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList], - actual_room_ids: Set[str], - actual_room_response_map: Dict[str, SlidingSyncResult.RoomResult], - receipts_request: SlidingSyncConfig.Extensions.ReceiptsExtension, - to_token: StreamToken, - from_token: Optional[SlidingSyncStreamToken], - ) -> Optional[SlidingSyncResult.Extensions.ReceiptsExtension]: - """Handle Receipts extension (MSC3960) - - Args: - sync_config: Sync configuration - actual_lists: Sliding window API. A map of list key to list results in the - Sliding Sync response. - actual_room_ids: The actual room IDs in the the Sliding Sync response. - actual_room_response_map: A map of room ID to room results in the the - Sliding Sync response. - account_data_request: The account_data extension from the request - to_token: The point in the stream to sync up to. - from_token: The point in the stream to sync from. - """ - # Skip if the extension is not enabled - if not receipts_request.enabled: - return None - - relevant_room_ids = self.find_relevant_room_ids_for_extension( - requested_lists=receipts_request.lists, - requested_room_ids=receipts_request.rooms, - actual_lists=actual_lists, - actual_room_ids=actual_room_ids, - ) - - room_id_to_receipt_map: Dict[str, JsonMapping] = {} - if len(relevant_room_ids) > 0: - # TODO: Take connection tracking into account so that when a room comes back - # into range we can send the receipts that were missed. - receipt_source = self.event_sources.sources.receipt - receipts, _ = await receipt_source.get_new_events( - user=sync_config.user, - from_key=( - from_token.stream_token.receipt_key - if from_token - else MultiWriterStreamToken(stream=0) - ), - to_key=to_token.receipt_key, - # This is a dummy value and isn't used in the function - limit=0, - room_ids=relevant_room_ids, - is_guest=False, - ) - - for receipt in receipts: - # These fields should exist for every receipt - room_id = receipt["room_id"] - type = receipt["type"] - content = receipt["content"] - - # For `inital: True` rooms, we only want to include receipts for events - # in the timeline. - room_result = actual_room_response_map.get(room_id) - if room_result is not None: - if room_result.initial: - # TODO: In the future, it would be good to fetch less receipts - # out of the database in the first place but we would need to - # add a new `event_id` index to `receipts_linearized`. - relevant_event_ids = [ - event.event_id for event in room_result.timeline_events - ] - - assert isinstance(content, dict) - content = { - event_id: content_value - for event_id, content_value in content.items() - if event_id in relevant_event_ids - } - - room_id_to_receipt_map[room_id] = {"type": type, "content": content} - - return SlidingSyncResult.Extensions.ReceiptsExtension( - room_id_to_receipt_map=room_id_to_receipt_map, - ) - - async def get_typing_extension_response( - self, - sync_config: SlidingSyncConfig, - actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList], - actual_room_ids: Set[str], - actual_room_response_map: Dict[str, SlidingSyncResult.RoomResult], - typing_request: SlidingSyncConfig.Extensions.TypingExtension, - to_token: StreamToken, - from_token: Optional[SlidingSyncStreamToken], - ) -> Optional[SlidingSyncResult.Extensions.TypingExtension]: - """Handle Typing Notification extension (MSC3961) - - Args: - sync_config: Sync configuration - actual_lists: Sliding window API. A map of list key to list results in the - Sliding Sync response. - actual_room_ids: The actual room IDs in the the Sliding Sync response. - actual_room_response_map: A map of room ID to room results in the the - Sliding Sync response. - account_data_request: The account_data extension from the request - to_token: The point in the stream to sync up to. - from_token: The point in the stream to sync from. - """ - # Skip if the extension is not enabled - if not typing_request.enabled: - return None - - relevant_room_ids = self.find_relevant_room_ids_for_extension( - requested_lists=typing_request.lists, - requested_room_ids=typing_request.rooms, - actual_lists=actual_lists, - actual_room_ids=actual_room_ids, - ) - - room_id_to_typing_map: Dict[str, JsonMapping] = {} - if len(relevant_room_ids) > 0: - # Note: We don't need to take connection tracking into account for typing - # notifications because they'll get anything still relevant and hasn't timed - # out when the room comes into range. We consider the gap where the room - # fell out of range, as long enough for any typing notifications to have - # timed out (it's not worth the 30 seconds of data we may have missed). - typing_source = self.event_sources.sources.typing - typing_notifications, _ = await typing_source.get_new_events( - user=sync_config.user, - from_key=(from_token.stream_token.typing_key if from_token else 0), - to_key=to_token.typing_key, - # This is a dummy value and isn't used in the function - limit=0, - room_ids=relevant_room_ids, - is_guest=False, - ) - - for typing_notification in typing_notifications: - # These fields should exist for every typing notification - room_id = typing_notification["room_id"] - type = typing_notification["type"] - content = typing_notification["content"] - - room_id_to_typing_map[room_id] = {"type": type, "content": content} - - return SlidingSyncResult.Extensions.TypingExtension( - room_id_to_typing_map=room_id_to_typing_map, - ) - - -class HaveSentRoomFlag(Enum): - """Flag for whether we have sent the room down a sliding sync connection. - - The valid state changes here are: - NEVER -> LIVE - LIVE -> PREVIOUSLY - PREVIOUSLY -> LIVE - """ - - # The room has never been sent down (or we have forgotten we have sent it - # down). - NEVER = 1 - - # We have previously sent the room down, but there are updates that we - # haven't sent down. - PREVIOUSLY = 2 - - # We have sent the room down and the client has received all updates. - LIVE = 3 - - -@attr.s(auto_attribs=True, slots=True, frozen=True) -class HaveSentRoom: - """Whether we have sent the room down a sliding sync connection. - - Attributes: - status: Flag of if we have or haven't sent down the room - last_token: If the flag is `PREVIOUSLY` then this is non-null and - contains the last stream token of the last updates we sent down - the room, i.e. we still need to send everything since then to the - client. - """ - - status: HaveSentRoomFlag - last_token: Optional[RoomStreamToken] - - @staticmethod - def previously(last_token: RoomStreamToken) -> "HaveSentRoom": - """Constructor for `PREVIOUSLY` flag.""" - return HaveSentRoom(HaveSentRoomFlag.PREVIOUSLY, last_token) - - -HAVE_SENT_ROOM_NEVER = HaveSentRoom(HaveSentRoomFlag.NEVER, None) -HAVE_SENT_ROOM_LIVE = HaveSentRoom(HaveSentRoomFlag.LIVE, None) - - -@attr.s(auto_attribs=True) -class SlidingSyncConnectionStore: - """In-memory store of per-connection state, including what rooms we have - previously sent down a sliding sync connection. - - Note: This is NOT safe to run in a worker setup because connection positions will - point to different sets of rooms on different workers. e.g. for the same connection, - a connection position of 5 might have totally different states on worker A and - worker B. - - One complication that we need to deal with here is needing to handle requests being - resent, i.e. if we sent down a room in a response that the client received, we must - consider the room *not* sent when we get the request again. - - This is handled by using an integer "token", which is returned to the client - as part of the sync token. For each connection we store a mapping from - tokens to the room states, and create a new entry when we send down new - rooms. - - Note that for any given sliding sync connection we will only store a maximum - of two different tokens: the previous token from the request and a new token - sent in the response. When we receive a request with a given token, we then - clear out all other entries with a different token. - - Attributes: - _connections: Mapping from `(user_id, conn_id)` to mapping of `token` - to mapping of room ID to `HaveSentRoom`. - """ - - # `(user_id, conn_id)` -> `token` -> `room_id` -> `HaveSentRoom` - _connections: Dict[Tuple[str, str], Dict[int, Dict[str, HaveSentRoom]]] = ( - attr.Factory(dict) - ) - - async def is_valid_token( - self, sync_config: SlidingSyncConfig, connection_token: int - ) -> bool: - """Return whether the connection token is valid/recognized""" - if connection_token == 0: - return True - - conn_key = self._get_connection_key(sync_config) - return connection_token in self._connections.get(conn_key, {}) - - async def have_sent_room( - self, sync_config: SlidingSyncConfig, connection_token: int, room_id: str - ) -> HaveSentRoom: - """For the given user_id/conn_id/token, return whether we have - previously sent the room down - """ - - conn_key = self._get_connection_key(sync_config) - sync_statuses = self._connections.setdefault(conn_key, {}) - room_status = sync_statuses.get(connection_token, {}).get( - room_id, HAVE_SENT_ROOM_NEVER - ) - - return room_status - - @trace - async def record_rooms( - self, - sync_config: SlidingSyncConfig, - from_token: Optional[SlidingSyncStreamToken], - *, - sent_room_ids: StrCollection, - unsent_room_ids: StrCollection, - ) -> int: - """Record which rooms we have/haven't sent down in a new response - - Attributes: - sync_config - from_token: The since token from the request, if any - sent_room_ids: The set of room IDs that we have sent down as - part of this request (only needs to be ones we didn't - previously sent down). - unsent_room_ids: The set of room IDs that have had updates - since the `from_token`, but which were not included in - this request - """ - prev_connection_token = 0 - if from_token is not None: - prev_connection_token = from_token.connection_position - - # If there are no changes then this is a noop. - if not sent_room_ids and not unsent_room_ids: - return prev_connection_token - - conn_key = self._get_connection_key(sync_config) - sync_statuses = self._connections.setdefault(conn_key, {}) - - # Generate a new token, removing any existing entries in that token - # (which can happen if requests get resent). - new_store_token = prev_connection_token + 1 - sync_statuses.pop(new_store_token, None) - - # Copy over and update the room mappings. - new_room_statuses = dict(sync_statuses.get(prev_connection_token, {})) - - # Whether we have updated the `new_room_statuses`, if we don't by the - # end we can treat this as a noop. - have_updated = False - for room_id in sent_room_ids: - new_room_statuses[room_id] = HAVE_SENT_ROOM_LIVE - have_updated = True - - # Whether we add/update the entries for unsent rooms depends on the - # existing entry: - # - LIVE: We have previously sent down everything up to - # `last_room_token, so we update the entry to be `PREVIOUSLY` with - # `last_room_token`. - # - PREVIOUSLY: We have previously sent down everything up to *a* - # given token, so we don't need to update the entry. - # - NEVER: We have never previously sent down the room, and we haven't - # sent anything down this time either so we leave it as NEVER. - - # Work out the new state for unsent rooms that were `LIVE`. - if from_token: - new_unsent_state = HaveSentRoom.previously(from_token.stream_token.room_key) - else: - new_unsent_state = HAVE_SENT_ROOM_NEVER - - for room_id in unsent_room_ids: - prev_state = new_room_statuses.get(room_id) - if prev_state is not None and prev_state.status == HaveSentRoomFlag.LIVE: - new_room_statuses[room_id] = new_unsent_state - have_updated = True - - if not have_updated: - return prev_connection_token - - sync_statuses[new_store_token] = new_room_statuses - - return new_store_token - - @trace - async def mark_token_seen( - self, - sync_config: SlidingSyncConfig, - from_token: Optional[SlidingSyncStreamToken], - ) -> None: - """We have received a request with the given token, so we can clear out - any other tokens associated with the connection. - - If there is no from token then we have started afresh, and so we delete - all tokens associated with the device. - """ - # Clear out any tokens for the connection that doesn't match the one - # from the request. - - conn_key = self._get_connection_key(sync_config) - sync_statuses = self._connections.pop(conn_key, {}) - if from_token is None: - return - - sync_statuses = { - connection_token: room_statuses - for connection_token, room_statuses in sync_statuses.items() - if connection_token == from_token.connection_position - } - if sync_statuses: - self._connections[conn_key] = sync_statuses - - @staticmethod - def _get_connection_key(sync_config: SlidingSyncConfig) -> Tuple[str, str]: - """Return a unique identifier for this connection. - - The first part is simply the user ID. - - The second part is generally a combination of device ID and conn_id. - However, both these two are optional (e.g. puppet access tokens don't - have device IDs), so this handles those edge cases. - - We use this over the raw `conn_id` to avoid clashes between different - clients that use the same `conn_id`. Imagine a user uses a web client - that uses `conn_id: main_sync_loop` and an Android client that also has - a `conn_id: main_sync_loop`. - """ - - user_id = sync_config.user.to_string() - - # Only one sliding sync connection is allowed per given conn_id (empty - # or not). - conn_id = sync_config.conn_id or "" - - if sync_config.requester.device_id: - return (user_id, f"D/{sync_config.requester.device_id}/{conn_id}") - - if sync_config.requester.access_token_id: - # If we don't have a device, then the access token ID should be a - # stable ID. - return (user_id, f"A/{sync_config.requester.access_token_id}/{conn_id}") - - # If we have neither then its likely an AS or some weird token. Either - # way we can just fail here. - raise Exception("Cannot use sliding sync with access token type") diff --git a/synapse/handlers/sliding_sync/__init__.py b/synapse/handlers/sliding_sync/__init__.py new file mode 100644
index 0000000000..cb56eb53fc --- /dev/null +++ b/synapse/handlers/sliding_sync/__init__.py
@@ -0,0 +1,1691 @@ +# +# This file is licensed under the Affero General Public License (AGPL) version 3. +# +# Copyright (C) 2023 New Vector, Ltd +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# See the GNU Affero General Public License for more details: +# <https://www.gnu.org/licenses/agpl-3.0.html>. +# + +import itertools +import logging +from itertools import chain +from typing import TYPE_CHECKING, AbstractSet, Dict, List, Mapping, Optional, Set, Tuple + +from prometheus_client import Histogram +from typing_extensions import assert_never + +from synapse.api.constants import Direction, EventTypes, Membership +from synapse.events import EventBase +from synapse.events.utils import strip_event +from synapse.handlers.relations import BundledAggregations +from synapse.handlers.sliding_sync.extensions import SlidingSyncExtensionHandler +from synapse.handlers.sliding_sync.room_lists import ( + RoomsForUserType, + SlidingSyncRoomLists, +) +from synapse.handlers.sliding_sync.store import SlidingSyncConnectionStore +from synapse.logging.opentracing import ( + SynapseTags, + log_kv, + set_tag, + start_active_span, + tag_args, + trace, +) +from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary +from synapse.storage.databases.main.state_deltas import StateDelta +from synapse.storage.databases.main.stream import PaginateFunction +from synapse.storage.roommember import ( + MemberSummary, +) +from synapse.types import ( + JsonDict, + MutableStateMap, + PersistedEventPosition, + Requester, + RoomStreamToken, + SlidingSyncStreamToken, + StateMap, + StrCollection, + StreamKeyType, + StreamToken, +) +from synapse.types.handlers import SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES +from synapse.types.handlers.sliding_sync import ( + HaveSentRoomFlag, + MutablePerConnectionState, + PerConnectionState, + RoomSyncConfig, + SlidingSyncConfig, + SlidingSyncResult, + StateValues, +) +from synapse.types.state import StateFilter +from synapse.util.async_helpers import concurrently_execute +from synapse.visibility import filter_events_for_client + +if TYPE_CHECKING: + from synapse.server import HomeServer + +logger = logging.getLogger(__name__) + + +sync_processing_time = Histogram( + "synapse_sliding_sync_processing_time", + "Time taken to generate a sliding sync response, ignoring wait times.", + ["initial"], +) + +# Limit the number of state_keys we should remember sending down the connection for each +# (room_id, user_id). We don't want to store and pull out too much data in the database. +# +# 100 is an arbitrary but small-ish number. The idea is that we probably won't send down +# too many redundant member state events (that the client already knows about) for a +# given ongoing conversation if we keep 100 around. Most rooms don't have 100 members +# anyway and it takes a while to cycle through 100 members. +MAX_NUMBER_PREVIOUS_STATE_KEYS_TO_REMEMBER = 100 + + +class SlidingSyncHandler: + def __init__(self, hs: "HomeServer"): + self.clock = hs.get_clock() + self.store = hs.get_datastores().main + self.storage_controllers = hs.get_storage_controllers() + self.auth_blocking = hs.get_auth_blocking() + self.notifier = hs.get_notifier() + self.event_sources = hs.get_event_sources() + self.relations_handler = hs.get_relations_handler() + self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync + self.is_mine_id = hs.is_mine_id + + self.connection_store = SlidingSyncConnectionStore(self.store) + self.extensions = SlidingSyncExtensionHandler(hs) + self.room_lists = SlidingSyncRoomLists(hs) + + async def wait_for_sync_for_user( + self, + requester: Requester, + sync_config: SlidingSyncConfig, + from_token: Optional[SlidingSyncStreamToken] = None, + timeout_ms: int = 0, + ) -> SlidingSyncResult: + """ + Get the sync for a client if we have new data for it now. Otherwise + wait for new data to arrive on the server. If the timeout expires, then + return an empty sync result. + + Args: + requester: The user making the request + sync_config: Sync configuration + from_token: The point in the stream to sync from. Token of the end of the + previous batch. May be `None` if this is the initial sync request. + timeout_ms: The time in milliseconds to wait for new data to arrive. If 0, + we will immediately but there might not be any new data so we just return an + empty response. + """ + # If the user is not part of the mau group, then check that limits have + # not been exceeded (if not part of the group by this point, almost certain + # auth_blocking will occur) + await self.auth_blocking.check_auth_blocking(requester=requester) + + # If we're working with a user-provided token, we need to make sure to wait for + # this worker to catch up with the token so we don't skip past any incoming + # events or future events if the user is nefariously, manually modifying the + # token. + if from_token is not None: + # We need to make sure this worker has caught up with the token. If + # this returns false, it means we timed out waiting, and we should + # just return an empty response. + before_wait_ts = self.clock.time_msec() + if not await self.notifier.wait_for_stream_token(from_token.stream_token): + logger.warning( + "Timed out waiting for worker to catch up. Returning empty response" + ) + return SlidingSyncResult.empty(from_token) + + # If we've spent significant time waiting to catch up, take it off + # the timeout. + after_wait_ts = self.clock.time_msec() + if after_wait_ts - before_wait_ts > 1_000: + timeout_ms -= after_wait_ts - before_wait_ts + timeout_ms = max(timeout_ms, 0) + + # We're going to respond immediately if the timeout is 0 or if this is an + # initial sync (without a `from_token`) so we can avoid calling + # `notifier.wait_for_events()`. + if timeout_ms == 0 or from_token is None: + now_token = self.event_sources.get_current_token() + result = await self.current_sync_for_user( + sync_config, + from_token=from_token, + to_token=now_token, + ) + else: + # Otherwise, we wait for something to happen and report it to the user. + async def current_sync_callback( + before_token: StreamToken, after_token: StreamToken + ) -> SlidingSyncResult: + return await self.current_sync_for_user( + sync_config, + from_token=from_token, + to_token=after_token, + ) + + result = await self.notifier.wait_for_events( + sync_config.user.to_string(), + timeout_ms, + current_sync_callback, + from_token=from_token.stream_token, + ) + + return result + + @trace + async def current_sync_for_user( + self, + sync_config: SlidingSyncConfig, + to_token: StreamToken, + from_token: Optional[SlidingSyncStreamToken] = None, + ) -> SlidingSyncResult: + """ + Generates the response body of a Sliding Sync result, represented as a + `SlidingSyncResult`. + + We fetch data according to the token range (> `from_token` and <= `to_token`). + + Args: + sync_config: Sync configuration + to_token: The point in the stream to sync up to. + from_token: The point in the stream to sync from. Token of the end of the + previous batch. May be `None` if this is the initial sync request. + """ + start_time_s = self.clock.time() + + user_id = sync_config.user.to_string() + app_service = self.store.get_app_service_by_user_id(user_id) + if app_service: + # We no longer support AS users using /sync directly. + # See https://github.com/matrix-org/matrix-doc/issues/1144 + raise NotImplementedError() + + # Get the per-connection state (if any). + # + # Raises an exception if there is a `connection_position` that we don't + # recognize. If we don't do this and the client asks for the full range + # of rooms, we end up sending down all rooms and their state from + # scratch (which can be very slow). By expiring the connection we allow + # the client a chance to do an initial request with a smaller range of + # rooms to get them some results sooner but will end up taking the same + # amount of time (more with round-trips and re-processing) in the end to + # get everything again. + previous_connection_state = ( + await self.connection_store.get_and_clear_connection_positions( + sync_config, from_token + ) + ) + + # Get all of the room IDs that the user should be able to see in the sync + # response + has_lists = sync_config.lists is not None and len(sync_config.lists) > 0 + has_room_subscriptions = ( + sync_config.room_subscriptions is not None + and len(sync_config.room_subscriptions) > 0 + ) + + interested_rooms = await self.room_lists.compute_interested_rooms( + sync_config=sync_config, + previous_connection_state=previous_connection_state, + from_token=from_token.stream_token if from_token else None, + to_token=to_token, + ) + + lists = interested_rooms.lists + relevant_room_map = interested_rooms.relevant_room_map + all_rooms = interested_rooms.all_rooms + room_membership_for_user_map = interested_rooms.room_membership_for_user_map + relevant_rooms_to_send_map = interested_rooms.relevant_rooms_to_send_map + + # Fetch room data + rooms: Dict[str, SlidingSyncResult.RoomResult] = {} + + new_connection_state = previous_connection_state.get_mutable() + + @trace + @tag_args + async def handle_room(room_id: str) -> None: + room_sync_result = await self.get_room_sync_data( + sync_config=sync_config, + previous_connection_state=previous_connection_state, + new_connection_state=new_connection_state, + room_id=room_id, + room_sync_config=relevant_rooms_to_send_map[room_id], + room_membership_for_user_at_to_token=room_membership_for_user_map[ + room_id + ], + from_token=from_token, + to_token=to_token, + newly_joined=room_id in interested_rooms.newly_joined_rooms, + newly_left=room_id in interested_rooms.newly_left_rooms, + is_dm=room_id in interested_rooms.dm_room_ids, + ) + + # Filter out empty room results during incremental sync + if room_sync_result or not from_token: + rooms[room_id] = room_sync_result + + if relevant_rooms_to_send_map: + with start_active_span("sliding_sync.generate_room_entries"): + await concurrently_execute(handle_room, relevant_rooms_to_send_map, 20) + + extensions = await self.extensions.get_extensions_response( + sync_config=sync_config, + actual_lists=lists, + previous_connection_state=previous_connection_state, + new_connection_state=new_connection_state, + # We're purposely using `relevant_room_map` instead of + # `relevant_rooms_to_send_map` here. This needs to be all room_ids we could + # send regardless of whether they have an event update or not. The + # extensions care about more than just normal events in the rooms (like + # account data, read receipts, typing indicators, to-device messages, etc). + actual_room_ids=set(relevant_room_map.keys()), + actual_room_response_map=rooms, + from_token=from_token, + to_token=to_token, + ) + + if has_lists or has_room_subscriptions: + # We now calculate if any rooms outside the range have had updates, + # which we are not sending down. + # + # We *must* record rooms that have had updates, but it is also fine + # to record rooms as having updates even if there might not actually + # be anything new for the user (e.g. due to event filters, events + # having happened after the user left, etc). + if from_token: + # The set of rooms that the client (may) care about, but aren't + # in any list range (or subscribed to). + missing_rooms = all_rooms - relevant_room_map.keys() + + # We now just go and try fetching any events in the above rooms + # to see if anything has happened since the `from_token`. + # + # TODO: Replace this with something faster. When we land the + # sliding sync tables that record the most recent event + # positions we can use that. + unsent_room_ids: StrCollection + if await self.store.have_finished_sliding_sync_background_jobs(): + unsent_room_ids = await ( + self.store.get_rooms_that_have_updates_since_sliding_sync_table( + room_ids=missing_rooms, + from_key=from_token.stream_token.room_key, + ) + ) + else: + missing_event_map_by_room = ( + await self.store.get_room_events_stream_for_rooms( + room_ids=missing_rooms, + from_key=to_token.room_key, + to_key=from_token.stream_token.room_key, + limit=1, + ) + ) + unsent_room_ids = list(missing_event_map_by_room) + + new_connection_state.rooms.record_unsent_rooms( + unsent_room_ids, from_token.stream_token.room_key + ) + + new_connection_state.rooms.record_sent_rooms( + relevant_rooms_to_send_map.keys() + ) + + connection_position = await self.connection_store.record_new_state( + sync_config=sync_config, + from_token=from_token, + new_connection_state=new_connection_state, + ) + elif from_token: + connection_position = from_token.connection_position + else: + # Initial sync without a `from_token` starts at `0` + connection_position = 0 + + sliding_sync_result = SlidingSyncResult( + next_pos=SlidingSyncStreamToken(to_token, connection_position), + lists=lists, + rooms=rooms, + extensions=extensions, + ) + + # Make it easy to find traces for syncs that aren't empty + set_tag(SynapseTags.RESULT_PREFIX + "result", bool(sliding_sync_result)) + set_tag(SynapseTags.FUNC_ARG_PREFIX + "sync_config.user", user_id) + + end_time_s = self.clock.time() + sync_processing_time.labels(from_token is not None).observe( + end_time_s - start_time_s + ) + + return sliding_sync_result + + @trace + async def get_current_state_ids_at( + self, + room_id: str, + room_membership_for_user_at_to_token: RoomsForUserType, + state_filter: StateFilter, + to_token: StreamToken, + ) -> StateMap[str]: + """ + Get current state IDs for the user in the room according to their membership. This + will be the current state at the time of their LEAVE/BAN, otherwise will be the + current state <= to_token. + + Args: + room_id: The room ID to fetch data for + room_membership_for_user_at_token: Membership information for the user + in the room at the time of `to_token`. + to_token: The point in the stream to sync up to. + """ + state_ids: StateMap[str] + # People shouldn't see past their leave/ban event + if room_membership_for_user_at_to_token.membership in ( + Membership.LEAVE, + Membership.BAN, + ): + # TODO: `get_state_ids_at(...)` doesn't take into account the "current + # state". Maybe we need to use + # `get_forward_extremities_for_room_at_stream_ordering(...)` to "Fetch the + # current state at the time." + state_ids = await self.storage_controllers.state.get_state_ids_at( + room_id, + stream_position=to_token.copy_and_replace( + StreamKeyType.ROOM, + room_membership_for_user_at_to_token.event_pos.to_room_stream_token(), + ), + state_filter=state_filter, + # Partially-stated rooms should have all state events except for + # remote membership events. Since we've already excluded + # partially-stated rooms unless `required_state` only has + # `["m.room.member", "$LAZY"]` for membership, we should be able to + # retrieve everything requested. When we're lazy-loading, if there + # are some remote senders in the timeline, we should also have their + # membership event because we had to auth that timeline event. Plus + # we don't want to block the whole sync waiting for this one room. + await_full_state=False, + ) + # Otherwise, we can get the latest current state in the room + else: + state_ids = await self.storage_controllers.state.get_current_state_ids( + room_id, + state_filter, + # Partially-stated rooms should have all state events except for + # remote membership events. Since we've already excluded + # partially-stated rooms unless `required_state` only has + # `["m.room.member", "$LAZY"]` for membership, we should be able to + # retrieve everything requested. When we're lazy-loading, if there + # are some remote senders in the timeline, we should also have their + # membership event because we had to auth that timeline event. Plus + # we don't want to block the whole sync waiting for this one room. + await_full_state=False, + ) + # TODO: Query `current_state_delta_stream` and reverse/rewind back to the `to_token` + + return state_ids + + @trace + async def get_current_state_at( + self, + room_id: str, + room_membership_for_user_at_to_token: RoomsForUserType, + state_filter: StateFilter, + to_token: StreamToken, + ) -> StateMap[EventBase]: + """ + Get current state for the user in the room according to their membership. This + will be the current state at the time of their LEAVE/BAN, otherwise will be the + current state <= to_token. + + Args: + room_id: The room ID to fetch data for + room_membership_for_user_at_token: Membership information for the user + in the room at the time of `to_token`. + to_token: The point in the stream to sync up to. + """ + state_ids = await self.get_current_state_ids_at( + room_id=room_id, + room_membership_for_user_at_to_token=room_membership_for_user_at_to_token, + state_filter=state_filter, + to_token=to_token, + ) + + events = await self.store.get_events_as_list(list(state_ids.values())) + + state_map = {} + for event in events: + state_map[(event.type, event.state_key)] = event + + return state_map + + @trace + async def get_current_state_deltas_for_room( + self, + room_id: str, + room_membership_for_user_at_to_token: RoomsForUserType, + from_token: RoomStreamToken, + to_token: RoomStreamToken, + ) -> List[StateDelta]: + """ + Get the state deltas between two tokens taking into account the user's + membership. If the user is LEAVE/BAN, we will only get the state deltas up to + their LEAVE/BAN event (inclusive). + + (> `from_token` and <= `to_token`) + """ + membership = room_membership_for_user_at_to_token.membership + # We don't know how to handle `membership` values other than these. The + # code below would need to be updated. + assert membership in ( + Membership.JOIN, + Membership.INVITE, + Membership.KNOCK, + Membership.LEAVE, + Membership.BAN, + ) + + # People shouldn't see past their leave/ban event + if membership in ( + Membership.LEAVE, + Membership.BAN, + ): + to_bound = ( + room_membership_for_user_at_to_token.event_pos.to_room_stream_token() + ) + # If we are participating in the room, we can get the latest current state in + # the room + elif membership == Membership.JOIN: + to_bound = to_token + # We can only rely on the stripped state included in the invite/knock event + # itself so there will never be any state deltas to send down. + elif membership in (Membership.INVITE, Membership.KNOCK): + return [] + else: + # We don't know how to handle this type of membership yet + # + # FIXME: We should use `assert_never` here but for some reason + # the exhaustive matching doesn't recognize the `Never` here. + # assert_never(membership) + raise AssertionError( + f"Unexpected membership {membership} that we don't know how to handle yet" + ) + + return await self.store.get_current_state_deltas_for_room( + room_id=room_id, + from_token=from_token, + to_token=to_bound, + ) + + @trace + async def get_room_sync_data( + self, + sync_config: SlidingSyncConfig, + previous_connection_state: "PerConnectionState", + new_connection_state: "MutablePerConnectionState", + room_id: str, + room_sync_config: RoomSyncConfig, + room_membership_for_user_at_to_token: RoomsForUserType, + from_token: Optional[SlidingSyncStreamToken], + to_token: StreamToken, + newly_joined: bool, + newly_left: bool, + is_dm: bool, + ) -> SlidingSyncResult.RoomResult: + """ + Fetch room data for the sync response. + + We fetch data according to the token range (> `from_token` and <= `to_token`). + + Args: + user: User to fetch data for + room_id: The room ID to fetch data for + room_sync_config: Config for what data we should fetch for a room in the + sync response. + room_membership_for_user_at_to_token: Membership information for the user + in the room at the time of `to_token`. + from_token: The point in the stream to sync from. + to_token: The point in the stream to sync up to. + newly_joined: If the user has newly joined the room + newly_left: If the user has newly left the room + is_dm: Whether the room is a DM room + """ + user = sync_config.user + + set_tag( + SynapseTags.FUNC_ARG_PREFIX + "membership", + room_membership_for_user_at_to_token.membership, + ) + set_tag( + SynapseTags.FUNC_ARG_PREFIX + "timeline_limit", + room_sync_config.timeline_limit, + ) + + # Handle state resets. For example, if we see + # `room_membership_for_user_at_to_token.event_id=None and + # room_membership_for_user_at_to_token.membership is not None`, we should + # indicate to the client that a state reset happened. Perhaps we should indicate + # this by setting `initial: True` and empty `required_state: []`. + state_reset_out_of_room = False + if ( + room_membership_for_user_at_to_token.event_id is None + and room_membership_for_user_at_to_token.membership is not None + ): + # We only expect the `event_id` to be `None` if you've been state reset out + # of the room (meaning you're no longer in the room). We could put this as + # part of the if-statement above but we want to handle every case where + # `event_id` is `None`. + assert room_membership_for_user_at_to_token.membership is Membership.LEAVE + + state_reset_out_of_room = True + + prev_room_sync_config = previous_connection_state.room_configs.get(room_id) + + # Determine whether we should limit the timeline to the token range. + # + # We should return historical messages (before token range) in the + # following cases because we want clients to be able to show a basic + # screen of information: + # + # - Initial sync (because no `from_token` to limit us anyway) + # - When users `newly_joined` + # - For an incremental sync where we haven't sent it down this + # connection before + # + # Relevant spec issue: + # https://github.com/matrix-org/matrix-spec/issues/1917 + # + # XXX: Odd behavior - We also check if the `timeline_limit` has increased, if so + # we ignore the from bound for the timeline to send down a larger chunk of + # history and set `unstable_expanded_timeline` to true. This is only being added + # to match the behavior of the Sliding Sync proxy as we expect the ElementX + # client to feel a certain way and be able to trickle in a full page of timeline + # messages to fill up the screen. This is a bit different to the behavior of the + # Sliding Sync proxy (which sets initial=true, but then doesn't send down the + # full state again), but existing apps, e.g. ElementX, just need `limited` set. + # We don't explicitly set `limited` but this will be the case for any room that + # has more history than we're trying to pull out. Using + # `unstable_expanded_timeline` allows us to avoid contaminating what `initial` + # or `limited` mean for clients that interpret them correctly. In future this + # behavior is almost certainly going to change. + # + from_bound = None + initial = True + ignore_timeline_bound = False + if from_token and not newly_joined and not state_reset_out_of_room: + room_status = previous_connection_state.rooms.have_sent_room(room_id) + if room_status.status == HaveSentRoomFlag.LIVE: + from_bound = from_token.stream_token.room_key + initial = False + elif room_status.status == HaveSentRoomFlag.PREVIOUSLY: + assert room_status.last_token is not None + from_bound = room_status.last_token + initial = False + elif room_status.status == HaveSentRoomFlag.NEVER: + from_bound = None + initial = True + else: + assert_never(room_status.status) + + log_kv({"sliding_sync.room_status": room_status}) + + if prev_room_sync_config is not None: + # Check if the timeline limit has increased, if so ignore the + # timeline bound and record the change (see "XXX: Odd behavior" + # above). + if ( + prev_room_sync_config.timeline_limit + < room_sync_config.timeline_limit + ): + ignore_timeline_bound = True + + log_kv( + { + "sliding_sync.from_bound": from_bound, + "sliding_sync.initial": initial, + "sliding_sync.ignore_timeline_bound": ignore_timeline_bound, + } + ) + + # Assemble the list of timeline events + # + # FIXME: It would be nice to make the `rooms` response more uniform regardless of + # membership. Currently, we have to make all of these optional because + # `invite`/`knock` rooms only have `stripped_state`. See + # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932 + timeline_events: List[EventBase] = [] + bundled_aggregations: Optional[Dict[str, BundledAggregations]] = None + limited: Optional[bool] = None + prev_batch_token: Optional[StreamToken] = None + num_live: Optional[int] = None + if ( + room_sync_config.timeline_limit > 0 + # No timeline for invite/knock rooms (just `stripped_state`) + and room_membership_for_user_at_to_token.membership + not in (Membership.INVITE, Membership.KNOCK) + ): + limited = False + # We want to start off using the `to_token` (vs `from_token`) because we look + # backwards from the `to_token` up to the `timeline_limit` and we might not + # reach the `from_token` before we hit the limit. We will update the room stream + # position once we've fetched the events to point to the earliest event fetched. + prev_batch_token = to_token + + # We're going to paginate backwards from the `to_token` + to_bound = to_token.room_key + # People shouldn't see past their leave/ban event + if room_membership_for_user_at_to_token.membership in ( + Membership.LEAVE, + Membership.BAN, + ): + to_bound = room_membership_for_user_at_to_token.event_pos.to_room_stream_token() + + timeline_from_bound = from_bound + if ignore_timeline_bound: + timeline_from_bound = None + + # For initial `/sync` (and other historical scenarios mentioned above), we + # want to view a historical section of the timeline; to fetch events by + # `topological_ordering` (best representation of the room DAG as others were + # seeing it at the time). This also aligns with the order that `/messages` + # returns events in. + # + # For incremental `/sync`, we want to get all updates for rooms since + # the last `/sync` (regardless if those updates arrived late or happened + # a while ago in the past); to fetch events by `stream_ordering` (in the + # order they were received by the server). + # + # Relevant spec issue: https://github.com/matrix-org/matrix-spec/issues/1917 + # + # FIXME: Using workaround for mypy, + # https://github.com/python/mypy/issues/10740#issuecomment-1997047277 and + # https://github.com/python/mypy/issues/17479 + paginate_room_events_by_topological_ordering: PaginateFunction = ( + self.store.paginate_room_events_by_topological_ordering + ) + paginate_room_events_by_stream_ordering: PaginateFunction = ( + self.store.paginate_room_events_by_stream_ordering + ) + pagination_method: PaginateFunction = ( + # Use `topographical_ordering` for historical events + paginate_room_events_by_topological_ordering + if timeline_from_bound is None + # Use `stream_ordering` for updates + else paginate_room_events_by_stream_ordering + ) + timeline_events, new_room_key, limited = await pagination_method( + room_id=room_id, + # The bounds are reversed so we can paginate backwards + # (from newer to older events) starting at to_bound. + # This ensures we fill the `limit` with the newest events first, + from_key=to_bound, + to_key=timeline_from_bound, + direction=Direction.BACKWARDS, + limit=room_sync_config.timeline_limit, + ) + + # We want to return the events in ascending order (the last event is the + # most recent). + timeline_events.reverse() + + # Make sure we don't expose any events that the client shouldn't see + timeline_events = await filter_events_for_client( + self.storage_controllers, + user.to_string(), + timeline_events, + is_peeking=room_membership_for_user_at_to_token.membership + != Membership.JOIN, + filter_send_to_client=True, + ) + # TODO: Filter out `EventTypes.CallInvite` in public rooms, + # see https://github.com/element-hq/synapse/issues/17359 + + # TODO: Handle timeline gaps (`get_timeline_gaps()`) + + # Determine how many "live" events we have (events within the given token range). + # + # This is mostly useful to determine whether a given @mention event should + # make a noise or not. Clients cannot rely solely on the absence of + # `initial: true` to determine live events because if a room not in the + # sliding window bumps into the window because of an @mention it will have + # `initial: true` yet contain a single live event (with potentially other + # old events in the timeline) + num_live = 0 + if from_token is not None: + for timeline_event in reversed(timeline_events): + # This fields should be present for all persisted events + assert timeline_event.internal_metadata.stream_ordering is not None + assert timeline_event.internal_metadata.instance_name is not None + + persisted_position = PersistedEventPosition( + instance_name=timeline_event.internal_metadata.instance_name, + stream=timeline_event.internal_metadata.stream_ordering, + ) + if persisted_position.persisted_after( + from_token.stream_token.room_key + ): + num_live += 1 + else: + # Since we're iterating over the timeline events in + # reverse-chronological order, we can break once we hit an event + # that's not live. In the future, we could potentially optimize + # this more with a binary search (bisect). + break + + # If the timeline is `limited=True`, the client does not have all events + # necessary to calculate aggregations themselves. + if limited: + bundled_aggregations = ( + await self.relations_handler.get_bundled_aggregations( + timeline_events, user.to_string() + ) + ) + + # Update the `prev_batch_token` to point to the position that allows us to + # keep paginating backwards from the oldest event we return in the timeline. + prev_batch_token = prev_batch_token.copy_and_replace( + StreamKeyType.ROOM, new_room_key + ) + + # Figure out any stripped state events for invite/knocks. This allows the + # potential joiner to identify the room. + stripped_state: List[JsonDict] = [] + if room_membership_for_user_at_to_token.membership in ( + Membership.INVITE, + Membership.KNOCK, + ): + # This should never happen. If someone is invited/knocked on room, then + # there should be an event for it. + assert room_membership_for_user_at_to_token.event_id is not None + + invite_or_knock_event = await self.store.get_event( + room_membership_for_user_at_to_token.event_id + ) + + stripped_state = [] + if invite_or_knock_event.membership == Membership.INVITE: + invite_state = invite_or_knock_event.unsigned.get( + "invite_room_state", [] + ) + if not isinstance(invite_state, list): + invite_state = [] + + stripped_state.extend(invite_state) + elif invite_or_knock_event.membership == Membership.KNOCK: + knock_state = invite_or_knock_event.unsigned.get("knock_room_state", []) + if not isinstance(knock_state, list): + knock_state = [] + + stripped_state.extend(knock_state) + + stripped_state.append(strip_event(invite_or_knock_event)) + + # Get the changes to current state in the token range from the + # `current_state_delta_stream` table. + # + # For incremental syncs, we can do this first to determine if something relevant + # has changed and strategically avoid fetching other costly things. + room_state_delta_id_map: MutableStateMap[str] = {} + name_event_id: Optional[str] = None + membership_changed = False + name_changed = False + avatar_changed = False + if initial: + # Check whether the room has a name set + name_state_ids = await self.get_current_state_ids_at( + room_id=room_id, + room_membership_for_user_at_to_token=room_membership_for_user_at_to_token, + state_filter=StateFilter.from_types([(EventTypes.Name, "")]), + to_token=to_token, + ) + name_event_id = name_state_ids.get((EventTypes.Name, "")) + else: + assert from_bound is not None + + # TODO: Limit the number of state events we're about to send down + # the room, if its too many we should change this to an + # `initial=True`? + + # For the case of rejecting remote invites, the leave event won't be + # returned by `get_current_state_deltas_for_room`. This is due to the current + # state only being filled out for rooms the server is in, and so doesn't pick + # up out-of-band leaves (including locally rejected invites) as these events + # are outliers and not added to the `current_state_delta_stream`. + # + # We rely on being explicitly told that the room has been `newly_left` to + # ensure we extract the out-of-band leave. + if newly_left and room_membership_for_user_at_to_token.event_id is not None: + membership_changed = True + leave_event = await self.store.get_event( + room_membership_for_user_at_to_token.event_id + ) + state_key = leave_event.get_state_key() + if state_key is not None: + room_state_delta_id_map[(leave_event.type, state_key)] = ( + room_membership_for_user_at_to_token.event_id + ) + + deltas = await self.get_current_state_deltas_for_room( + room_id=room_id, + room_membership_for_user_at_to_token=room_membership_for_user_at_to_token, + from_token=from_bound, + to_token=to_token.room_key, + ) + for delta in deltas: + # TODO: Handle state resets where event_id is None + if delta.event_id is not None: + room_state_delta_id_map[(delta.event_type, delta.state_key)] = ( + delta.event_id + ) + + if delta.event_type == EventTypes.Member: + membership_changed = True + elif delta.event_type == EventTypes.Name and delta.state_key == "": + name_changed = True + elif ( + delta.event_type == EventTypes.RoomAvatar and delta.state_key == "" + ): + avatar_changed = True + + # We only need the room summary for calculating heroes, however if we do + # fetch it then we can use it to calculate `joined_count` and + # `invited_count`. + room_membership_summary: Optional[Mapping[str, MemberSummary]] = None + + # `heroes` are required if the room name is not set. + # + # Note: When you're the first one on your server to be invited to a new room + # over federation, we only have access to some stripped state in + # `event.unsigned.invite_room_state` which currently doesn't include `heroes`, + # see https://github.com/matrix-org/matrix-spec/issues/380. This means that + # clients won't be able to calculate the room name when necessary and just a + # pitfall we have to deal with until that spec issue is resolved. + hero_user_ids: List[str] = [] + # TODO: Should we also check for `EventTypes.CanonicalAlias` + # (`m.room.canonical_alias`) as a fallback for the room name? see + # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1671260153 + # + # We need to fetch the `heroes` if the room name is not set. But we only need to + # get them on initial syncs (or the first time we send down the room) or if the + # membership has changed which may change the heroes. + if name_event_id is None and (initial or (not initial and membership_changed)): + # We need the room summary to extract the heroes from + if room_membership_for_user_at_to_token.membership != Membership.JOIN: + # TODO: Figure out how to get the membership summary for left/banned rooms + # For invite/knock rooms we don't include the information. + room_membership_summary = {} + else: + room_membership_summary = await self.store.get_room_summary(room_id) + # TODO: Reverse/rewind back to the `to_token` + + hero_user_ids = extract_heroes_from_room_summary( + room_membership_summary, me=user.to_string() + ) + + # Fetch the membership counts for rooms we're joined to. + # + # Similarly to other metadata, we only need to calculate the member + # counts if this is an initial sync or the memberships have changed. + joined_count: Optional[int] = None + invited_count: Optional[int] = None + if ( + initial or membership_changed + ) and room_membership_for_user_at_to_token.membership == Membership.JOIN: + # If we have the room summary (because we calculated heroes above) + # then we can simply pull the counts from there. + if room_membership_summary is not None: + empty_membership_summary = MemberSummary([], 0) + + joined_count = room_membership_summary.get( + Membership.JOIN, empty_membership_summary + ).count + + invited_count = room_membership_summary.get( + Membership.INVITE, empty_membership_summary + ).count + else: + member_counts = await self.store.get_member_counts(room_id) + joined_count = member_counts.get(Membership.JOIN, 0) + invited_count = member_counts.get(Membership.INVITE, 0) + + # Fetch the `required_state` for the room + # + # No `required_state` for invite/knock rooms (just `stripped_state`) + # + # FIXME: It would be nice to make the `rooms` response more uniform regardless + # of membership. Currently, we have to make this optional because + # `invite`/`knock` rooms only have `stripped_state`. See + # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932 + # + # Calculate the `StateFilter` based on the `required_state` for the room + required_state_filter = StateFilter.none() + # The requested `required_state_map` with the lazy membership expanded and + # `$ME` replaced with the user's ID. This allows us to see what membership we've + # sent down to the client in the next request. + # + # Make a copy so we can modify it. Still need to be careful to make a copy of + # the state key sets if we want to add/remove from them. We could make a deep + # copy but this saves us some work. + expanded_required_state_map = dict(room_sync_config.required_state_map) + if room_membership_for_user_at_to_token.membership not in ( + Membership.INVITE, + Membership.KNOCK, + ): + # If we have a double wildcard ("*", "*") in the `required_state`, we need + # to fetch all state for the room + # + # Note: MSC3575 describes different behavior to how we're handling things + # here but since it's not wrong to return more state than requested + # (`required_state` is just the minimum requested), it doesn't matter if we + # include more than client wanted. This complexity is also under scrutiny, + # see + # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1185109050 + # + # > One unique exception is when you request all state events via ["*", "*"]. When used, + # > all state events are returned by default, and additional entries FILTER OUT the returned set + # > of state events. These additional entries cannot use '*' themselves. + # > For example, ["*", "*"], ["m.room.member", "@alice:example.com"] will _exclude_ every m.room.member + # > event _except_ for @alice:example.com, and include every other state event. + # > In addition, ["*", "*"], ["m.space.child", "*"] is an error, the m.space.child filter is not + # > required as it would have been returned anyway. + # > + # > -- MSC3575 (https://github.com/matrix-org/matrix-spec-proposals/pull/3575) + if StateValues.WILDCARD in room_sync_config.required_state_map.get( + StateValues.WILDCARD, set() + ): + set_tag( + SynapseTags.FUNC_ARG_PREFIX + "required_state_wildcard", + True, + ) + required_state_filter = StateFilter.all() + # TODO: `StateFilter` currently doesn't support wildcard event types. We're + # currently working around this by returning all state to the client but it + # would be nice to fetch less from the database and return just what the + # client wanted. + elif ( + room_sync_config.required_state_map.get(StateValues.WILDCARD) + is not None + ): + set_tag( + SynapseTags.FUNC_ARG_PREFIX + "required_state_wildcard_event_type", + True, + ) + required_state_filter = StateFilter.all() + else: + required_state_types: List[Tuple[str, Optional[str]]] = [] + num_wild_state_keys = 0 + lazy_load_room_members = False + num_others = 0 + for ( + state_type, + state_key_set, + ) in room_sync_config.required_state_map.items(): + for state_key in state_key_set: + if state_key == StateValues.WILDCARD: + num_wild_state_keys += 1 + # `None` is a wildcard in the `StateFilter` + required_state_types.append((state_type, None)) + # We need to fetch all relevant people when we're lazy-loading membership + elif ( + state_type == EventTypes.Member + and state_key == StateValues.LAZY + ): + lazy_load_room_members = True + + # Everyone in the timeline is relevant + timeline_membership: Set[str] = set() + if timeline_events is not None: + for timeline_event in timeline_events: + # Anyone who sent a message is relevant + timeline_membership.add(timeline_event.sender) + + # We also care about invite, ban, kick, targets, + # etc. + if timeline_event.type == EventTypes.Member: + timeline_membership.add( + timeline_event.state_key + ) + + # Update the required state filter so we pick up the new + # membership + for user_id in timeline_membership: + required_state_types.append( + (EventTypes.Member, user_id) + ) + + # Add an explicit entry for each user in the timeline + # + # Make a new set or copy of the state key set so we can + # modify it without affecting the original + # `required_state_map` + expanded_required_state_map[EventTypes.Member] = ( + expanded_required_state_map.get( + EventTypes.Member, set() + ) + | timeline_membership + ) + elif state_key == StateValues.ME: + num_others += 1 + required_state_types.append((state_type, user.to_string())) + # Replace `$ME` with the user's ID so we can deduplicate + # when someone requests the same state with `$ME` or with + # their user ID. + # + # Make a new set or copy of the state key set so we can + # modify it without affecting the original + # `required_state_map` + expanded_required_state_map[EventTypes.Member] = ( + expanded_required_state_map.get( + EventTypes.Member, set() + ) + | {user.to_string()} + ) + else: + num_others += 1 + required_state_types.append((state_type, state_key)) + + set_tag( + SynapseTags.FUNC_ARG_PREFIX + + "required_state_wildcard_state_key_count", + num_wild_state_keys, + ) + set_tag( + SynapseTags.FUNC_ARG_PREFIX + "required_state_lazy", + lazy_load_room_members, + ) + set_tag( + SynapseTags.FUNC_ARG_PREFIX + "required_state_other_count", + num_others, + ) + + required_state_filter = StateFilter.from_types(required_state_types) + + # We need this base set of info for the response so let's just fetch it along + # with the `required_state` for the room + hero_room_state = [ + (EventTypes.Member, hero_user_id) for hero_user_id in hero_user_ids + ] + meta_room_state = list(hero_room_state) + if initial or name_changed: + meta_room_state.append((EventTypes.Name, "")) + if initial or avatar_changed: + meta_room_state.append((EventTypes.RoomAvatar, "")) + + state_filter = StateFilter.all() + if required_state_filter != StateFilter.all(): + state_filter = StateFilter( + types=StateFilter.from_types( + chain(meta_room_state, required_state_filter.to_types()) + ).types, + include_others=required_state_filter.include_others, + ) + + # The required state map to store in the room sync config, if it has + # changed. + changed_required_state_map: Optional[Mapping[str, AbstractSet[str]]] = None + + # We can return all of the state that was requested if this was the first + # time we've sent the room down this connection. + room_state: StateMap[EventBase] = {} + if initial: + room_state = await self.get_current_state_at( + room_id=room_id, + room_membership_for_user_at_to_token=room_membership_for_user_at_to_token, + state_filter=state_filter, + to_token=to_token, + ) + else: + assert from_bound is not None + + if prev_room_sync_config is not None: + # Check if there are any changes to the required state config + # that we need to handle. + changed_required_state_map, added_state_filter = ( + _required_state_changes( + user.to_string(), + prev_required_state_map=prev_room_sync_config.required_state_map, + request_required_state_map=expanded_required_state_map, + state_deltas=room_state_delta_id_map, + ) + ) + + if added_state_filter: + # Some state entries got added, so we pull out the current + # state for them. If we don't do this we'd only send down new deltas. + state_ids = await self.get_current_state_ids_at( + room_id=room_id, + room_membership_for_user_at_to_token=room_membership_for_user_at_to_token, + state_filter=added_state_filter, + to_token=to_token, + ) + room_state_delta_id_map.update(state_ids) + + events = await self.store.get_events( + state_filter.filter_state(room_state_delta_id_map).values() + ) + room_state = {(s.type, s.state_key): s for s in events.values()} + + # If the membership changed and we have to get heroes, get the remaining + # heroes from the state + if hero_user_ids: + hero_membership_state = await self.get_current_state_at( + room_id=room_id, + room_membership_for_user_at_to_token=room_membership_for_user_at_to_token, + state_filter=StateFilter.from_types(hero_room_state), + to_token=to_token, + ) + room_state.update(hero_membership_state) + + required_room_state: StateMap[EventBase] = {} + if required_state_filter != StateFilter.none(): + required_room_state = required_state_filter.filter_state(room_state) + + # Find the room name and avatar from the state + room_name: Optional[str] = None + # TODO: Should we also check for `EventTypes.CanonicalAlias` + # (`m.room.canonical_alias`) as a fallback for the room name? see + # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1671260153 + name_event = room_state.get((EventTypes.Name, "")) + if name_event is not None: + room_name = name_event.content.get("name") + + room_avatar: Optional[str] = None + avatar_event = room_state.get((EventTypes.RoomAvatar, "")) + if avatar_event is not None: + room_avatar = avatar_event.content.get("url") + + # Assemble heroes: extract the info from the state we just fetched + heroes: List[SlidingSyncResult.RoomResult.StrippedHero] = [] + for hero_user_id in hero_user_ids: + member_event = room_state.get((EventTypes.Member, hero_user_id)) + if member_event is not None: + heroes.append( + SlidingSyncResult.RoomResult.StrippedHero( + user_id=hero_user_id, + display_name=member_event.content.get("displayname"), + avatar_url=member_event.content.get("avatar_url"), + ) + ) + + # Figure out the last bump event in the room. If the bump stamp hasn't + # changed we omit it from the response. + bump_stamp = None + + always_return_bump_stamp = ( + # We use the membership event position for any non-join + room_membership_for_user_at_to_token.membership != Membership.JOIN + # We didn't fetch any timeline events but we should still check for + # a bump_stamp that might be somewhere + or limited is None + # There might be a bump event somewhere before the timeline events + # that we fetched, that we didn't previously send down + or limited is True + # Always give the client some frame of reference if this is the + # first time they are seeing the room down the connection + or initial + ) + + # If we're joined to the room, we need to find the last bump event before the + # `to_token` + if room_membership_for_user_at_to_token.membership == Membership.JOIN: + # Try and get a bump stamp + new_bump_stamp = await self._get_bump_stamp( + room_id, + to_token, + timeline_events, + check_outside_timeline=always_return_bump_stamp, + ) + if new_bump_stamp is not None: + bump_stamp = new_bump_stamp + + if bump_stamp is None and always_return_bump_stamp: + # By default, just choose the membership event position for any non-join membership + bump_stamp = room_membership_for_user_at_to_token.event_pos.stream + + if bump_stamp is not None and bump_stamp < 0: + # We never want to send down negative stream orderings, as you can't + # sensibly compare positive and negative stream orderings (they have + # different meanings). + # + # A negative bump stamp here can only happen if the stream ordering + # of the membership event is negative (and there are no further bump + # stamps), which can happen if the server leaves and deletes a room, + # and then rejoins it. + # + # To deal with this, we just set the bump stamp to zero, which will + # shove this room to the bottom of the list. This is OK as the + # moment a new message happens in the room it will get put into a + # sensible order again. + bump_stamp = 0 + + room_sync_required_state_map_to_persist: Mapping[str, AbstractSet[str]] = ( + expanded_required_state_map + ) + if changed_required_state_map: + room_sync_required_state_map_to_persist = changed_required_state_map + + # Record the `room_sync_config` if we're `ignore_timeline_bound` (which means + # that the `timeline_limit` has increased) + unstable_expanded_timeline = False + if ignore_timeline_bound: + # FIXME: We signal the fact that we're sending down more events to + # the client by setting `unstable_expanded_timeline` to true (see + # "XXX: Odd behavior" above). + unstable_expanded_timeline = True + + new_connection_state.room_configs[room_id] = RoomSyncConfig( + timeline_limit=room_sync_config.timeline_limit, + required_state_map=room_sync_required_state_map_to_persist, + ) + elif prev_room_sync_config is not None: + # If the result is `limited` then we need to record that the + # `timeline_limit` has been reduced, as when/if the client later requests + # more timeline then we have more data to send. + # + # Otherwise (when not `limited`) we don't need to record that the + # `timeline_limit` has been reduced, as the *effective* `timeline_limit` + # (i.e. the amount of timeline we have previously sent to the client) is at + # least the previous `timeline_limit`. + # + # This is to handle the case where the `timeline_limit` e.g. goes from 10 to + # 5 to 10 again (without any timeline gaps), where there's no point sending + # down the initial historical chunk events when the `timeline_limit` is + # increased as the client already has the 10 previous events. However, if + # client has a gap in the timeline (i.e. `limited` is True), then we *do* + # need to record the reduced timeline. + # + # TODO: Handle timeline gaps (`get_timeline_gaps()`) - This is separate from + # the gaps we might see on the client because a response was `limited` we're + # talking about above. + if ( + limited + and prev_room_sync_config.timeline_limit + > room_sync_config.timeline_limit + ): + new_connection_state.room_configs[room_id] = RoomSyncConfig( + timeline_limit=room_sync_config.timeline_limit, + required_state_map=room_sync_required_state_map_to_persist, + ) + + elif changed_required_state_map is not None: + new_connection_state.room_configs[room_id] = RoomSyncConfig( + timeline_limit=room_sync_config.timeline_limit, + required_state_map=room_sync_required_state_map_to_persist, + ) + + else: + new_connection_state.room_configs[room_id] = RoomSyncConfig( + timeline_limit=room_sync_config.timeline_limit, + required_state_map=room_sync_required_state_map_to_persist, + ) + + set_tag(SynapseTags.RESULT_PREFIX + "initial", initial) + + return SlidingSyncResult.RoomResult( + name=room_name, + avatar=room_avatar, + heroes=heroes, + is_dm=is_dm, + initial=initial, + required_state=list(required_room_state.values()), + timeline_events=timeline_events, + bundled_aggregations=bundled_aggregations, + stripped_state=stripped_state, + prev_batch=prev_batch_token, + limited=limited, + unstable_expanded_timeline=unstable_expanded_timeline, + num_live=num_live, + bump_stamp=bump_stamp, + joined_count=joined_count, + invited_count=invited_count, + # TODO: These are just dummy values. We could potentially just remove these + # since notifications can only really be done correctly on the client anyway + # (encrypted rooms). + notification_count=0, + highlight_count=0, + ) + + @trace + async def _get_bump_stamp( + self, + room_id: str, + to_token: StreamToken, + timeline: List[EventBase], + check_outside_timeline: bool, + ) -> Optional[int]: + """Get a bump stamp for the room, if we have a bump event and it has + changed. + + Args: + room_id + to_token: The upper bound of token to return + timeline: The list of events we have fetched. + limited: If the timeline was limited. + check_outside_timeline: Whether we need to check for bump stamp for + events before the timeline if we didn't find a bump stamp in + the timeline events. + """ + + # First check the timeline events we're returning to see if one of + # those matches. We iterate backwards and take the stream ordering + # of the first event that matches the bump event types. + for timeline_event in reversed(timeline): + if timeline_event.type in SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES: + new_bump_stamp = timeline_event.internal_metadata.stream_ordering + + # All persisted events have a stream ordering + assert new_bump_stamp is not None + + # If we've just joined a remote room, then the last bump event may + # have been backfilled (and so have a negative stream ordering). + # These negative stream orderings can't sensibly be compared, so + # instead we use the membership event position. + if new_bump_stamp > 0: + return new_bump_stamp + + if not check_outside_timeline: + # If we are not a limited sync, then we know the bump stamp can't + # have changed. + return None + + # We can quickly query for the latest bump event in the room using the + # sliding sync tables. + latest_room_bump_stamp = await self.store.get_latest_bump_stamp_for_room( + room_id + ) + + min_to_token_position = to_token.room_key.stream + + # If we can rely on the new sliding sync tables and the `bump_stamp` is + # `None`, just fallback to the membership event position. This can happen + # when we've just joined a remote room and all the events are backfilled. + if ( + # FIXME: The background job check can be removed once we bump + # `SCHEMA_COMPAT_VERSION` and run the foreground update for + # `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` + # (tracked by https://github.com/element-hq/synapse/issues/17623) + latest_room_bump_stamp is None + and await self.store.have_finished_sliding_sync_background_jobs() + ): + return None + + # The `bump_stamp` stored in the database might be ahead of our token. Since + # `bump_stamp` is only a `stream_ordering` position, we can't be 100% sure + # that's before the `to_token` in all scenarios. The only scenario we can be + # sure of is if the `bump_stamp` is totally before the minimum position from + # the token. + # + # We don't need to check if the background update has finished, as if the + # returned bump stamp is not None then it must be up to date. + elif ( + latest_room_bump_stamp is not None + and latest_room_bump_stamp < min_to_token_position + ): + if latest_room_bump_stamp > 0: + return latest_room_bump_stamp + else: + return None + + # Otherwise, if it's within or after the `to_token`, we need to find the + # last bump event before the `to_token`. + else: + last_bump_event_result = ( + await self.store.get_last_event_pos_in_room_before_stream_ordering( + room_id, + to_token.room_key, + event_types=SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES, + ) + ) + if last_bump_event_result is not None: + _, new_bump_event_pos = last_bump_event_result + + # If we've just joined a remote room, then the last bump event may + # have been backfilled (and so have a negative stream ordering). + # These negative stream orderings can't sensibly be compared, so + # instead we use the membership event position. + if new_bump_event_pos.stream > 0: + return new_bump_event_pos.stream + + return None + + +def _required_state_changes( + user_id: str, + *, + prev_required_state_map: Mapping[str, AbstractSet[str]], + request_required_state_map: Mapping[str, AbstractSet[str]], + state_deltas: StateMap[str], +) -> Tuple[Optional[Mapping[str, AbstractSet[str]]], StateFilter]: + """Calculates the changes between the required state room config from the + previous requests compared with the current request. + + This does two things. First, it calculates if we need to update the room + config due to changes to required state. Secondly, it works out which state + entries we need to pull from current state and return due to the state entry + now appearing in the required state when it previously wasn't (on top of the + state deltas). + + This function tries to ensure to handle the case where a state entry is + added, removed and then added again to the required state. In that case we + only want to re-send that entry down sync if it has changed. + + Returns: + A 2-tuple of updated required state config (or None if there is no update) + and the state filter to use to fetch extra current state that we need to + return. + """ + if prev_required_state_map == request_required_state_map: + # There has been no change. Return immediately. + return None, StateFilter.none() + + prev_wildcard = prev_required_state_map.get(StateValues.WILDCARD, set()) + request_wildcard = request_required_state_map.get(StateValues.WILDCARD, set()) + + # If we were previously fetching everything ("*", "*"), always update the effective + # room required state config to match the request. And since we we're previously + # already fetching everything, we don't have to fetch anything now that they've + # narrowed. + if StateValues.WILDCARD in prev_wildcard: + return request_required_state_map, StateFilter.none() + + # If a event type wildcard has been added or removed we don't try and do + # anything fancy, and instead always update the effective room required + # state config to match the request. + if request_wildcard - prev_wildcard: + # Some keys were added, so we need to fetch everything + return request_required_state_map, StateFilter.all() + if prev_wildcard - request_wildcard: + # Keys were only removed, so we don't have to fetch everything. + return request_required_state_map, StateFilter.none() + + # Contains updates to the required state map compared with the previous room + # config. This has the same format as `RoomSyncConfig.required_state` + changes: Dict[str, AbstractSet[str]] = {} + + # The set of types/state keys that we need to fetch and return to the + # client. Passed to `StateFilter.from_types(...)` + added: List[Tuple[str, Optional[str]]] = [] + + # Convert the list of state deltas to map from type to state_keys that have + # changed. + changed_types_to_state_keys: Dict[str, Set[str]] = {} + for event_type, state_key in state_deltas: + changed_types_to_state_keys.setdefault(event_type, set()).add(state_key) + + # First we calculate what, if anything, has been *added*. + for event_type in ( + prev_required_state_map.keys() | request_required_state_map.keys() + ): + old_state_keys = prev_required_state_map.get(event_type, set()) + request_state_keys = request_required_state_map.get(event_type, set()) + changed_state_keys = changed_types_to_state_keys.get(event_type, set()) + + if old_state_keys == request_state_keys: + # No change to this type + continue + + if not request_state_keys - old_state_keys: + # Nothing *added*, so we skip. Removals happen below. + continue + + # We only remove state keys from the effective state if they've been + # removed from the request *and* the state has changed. This ensures + # that if a client removes and then re-adds a state key, we only send + # down the associated current state event if its changed (rather than + # sending down the same event twice). + invalidated_state_keys = ( + old_state_keys - request_state_keys + ) & changed_state_keys + + # Figure out which state keys we should remember sending down the connection + inheritable_previous_state_keys = ( + # Retain the previous state_keys that we've sent down before. + # Wildcard and lazy state keys are not sticky from previous requests. + (old_state_keys - {StateValues.WILDCARD, StateValues.LAZY}) + - invalidated_state_keys + ) + + # Always update changes to include the newly added keys (we've expanded the set + # of state keys), use the new requested set with whatever hasn't been + # invalidated from the previous set. + changes[event_type] = request_state_keys | inheritable_previous_state_keys + # Limit the number of state_keys we should remember sending down the connection + # for each (room_id, user_id). We don't want to store and pull out too much data + # in the database. This is a happy-medium between remembering nothing and + # everything. We can avoid sending redundant state down the connection most of + # the time given that most rooms don't have 100 members anyway and it takes a + # while to cycle through 100 members. + # + # Only remember up to (MAX_NUMBER_PREVIOUS_STATE_KEYS_TO_REMEMBER) + if len(changes[event_type]) > MAX_NUMBER_PREVIOUS_STATE_KEYS_TO_REMEMBER: + # Reset back to only the requested state keys + changes[event_type] = request_state_keys + + # Skip if there isn't any room to fill in the rest with previous state keys + if len(request_state_keys) < MAX_NUMBER_PREVIOUS_STATE_KEYS_TO_REMEMBER: + # Fill the rest with previous state_keys. Ideally, we could sort + # these by recency but it's just a set so just pick an arbitrary + # subset (good enough). + changes[event_type] = changes[event_type] | set( + itertools.islice( + inheritable_previous_state_keys, + # Just taking the difference isn't perfect as there could be + # overlap in the keys between the requested and previous but we + # will decide to just take the easy route for now and avoid + # additional set operations to figure it out. + MAX_NUMBER_PREVIOUS_STATE_KEYS_TO_REMEMBER + - len(request_state_keys), + ) + ) + + if StateValues.WILDCARD in old_state_keys: + # We were previously fetching everything for this type, so we don't need to + # fetch anything new. + continue + + # Record the new state keys to fetch for this type. + if StateValues.WILDCARD in request_state_keys: + # If we have added a wildcard then we always just fetch everything. + added.append((event_type, None)) + else: + for state_key in request_state_keys - old_state_keys: + if state_key == StateValues.ME: + added.append((event_type, user_id)) + elif state_key == StateValues.LAZY: + # We handle lazy loading separately (outside this function), + # so don't need to explicitly add anything here. + # + # LAZY values should also be ignore for event types that are + # not membership. + pass + else: + added.append((event_type, state_key)) + + added_state_filter = StateFilter.from_types(added) + + # Figure out what changes we need to apply to the effective required state + # config. + for event_type, changed_state_keys in changed_types_to_state_keys.items(): + old_state_keys = prev_required_state_map.get(event_type, set()) + request_state_keys = request_required_state_map.get(event_type, set()) + + if old_state_keys == request_state_keys: + # No change. + continue + + # If we see the `user_id` as a state_key, also add "$ME" to the list of state + # that has changed to account for people requesting `required_state` with `$ME` + # or their user ID. + if user_id in changed_state_keys: + changed_state_keys.add(StateValues.ME) + + # We only remove state keys from the effective state if they've been + # removed from the request *and* the state has changed. This ensures + # that if a client removes and then re-adds a state key, we only send + # down the associated current state event if its changed (rather than + # sending down the same event twice). + invalidated_state_keys = ( + old_state_keys - request_state_keys + ) & changed_state_keys + + # We've expanded the set of state keys, ... (already handled above) + if request_state_keys - old_state_keys: + continue + + old_state_key_wildcard = StateValues.WILDCARD in old_state_keys + request_state_key_wildcard = StateValues.WILDCARD in request_state_keys + + if old_state_key_wildcard != request_state_key_wildcard: + # If a state_key wildcard has been added or removed, we always update the + # effective room required state config to match the request. + changes[event_type] = request_state_keys + continue + + if event_type == EventTypes.Member: + old_state_key_lazy = StateValues.LAZY in old_state_keys + request_state_key_lazy = StateValues.LAZY in request_state_keys + + if old_state_key_lazy != request_state_key_lazy: + # If a "$LAZY" has been added or removed we always update the effective room + # required state config to match the request. + changes[event_type] = request_state_keys + continue + + # At this point there are no wildcards and no additions to the set of + # state keys requested, only deletions. + # + # We only remove state keys from the effective state if they've been + # removed from the request *and* the state has changed. This ensures + # that if a client removes and then re-adds a state key, we only send + # down the associated current state event if its changed (rather than + # sending down the same event twice). + if invalidated_state_keys: + changes[event_type] = old_state_keys - invalidated_state_keys + + if changes: + # Update the required state config based on the changes. + new_required_state_map = dict(prev_required_state_map) + for event_type, state_keys in changes.items(): + if state_keys: + new_required_state_map[event_type] = state_keys + else: + # Remove entries with empty state keys. + new_required_state_map.pop(event_type, None) + + return new_required_state_map, added_state_filter + else: + return None, added_state_filter diff --git a/synapse/handlers/sliding_sync/extensions.py b/synapse/handlers/sliding_sync/extensions.py new file mode 100644
index 0000000000..077887ec32 --- /dev/null +++ b/synapse/handlers/sliding_sync/extensions.py
@@ -0,0 +1,879 @@ +# +# This file is licensed under the Affero General Public License (AGPL) version 3. +# +# Copyright (C) 2023 New Vector, Ltd +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# See the GNU Affero General Public License for more details: +# <https://www.gnu.org/licenses/agpl-3.0.html>. +# + +import itertools +import logging +from typing import ( + TYPE_CHECKING, + AbstractSet, + ChainMap, + Dict, + Mapping, + MutableMapping, + Optional, + Sequence, + Set, + cast, +) + +from typing_extensions import assert_never + +from synapse.api.constants import AccountDataTypes, EduTypes +from synapse.handlers.receipts import ReceiptEventSource +from synapse.logging.opentracing import trace +from synapse.storage.databases.main.receipts import ReceiptInRoom +from synapse.types import ( + DeviceListUpdates, + JsonMapping, + MultiWriterStreamToken, + SlidingSyncStreamToken, + StrCollection, + StreamToken, +) +from synapse.types.handlers.sliding_sync import ( + HaveSentRoomFlag, + MutablePerConnectionState, + OperationType, + PerConnectionState, + SlidingSyncConfig, + SlidingSyncResult, +) +from synapse.util.async_helpers import ( + concurrently_execute, + gather_optional_coroutines, +) + +if TYPE_CHECKING: + from synapse.server import HomeServer + +logger = logging.getLogger(__name__) + + +class SlidingSyncExtensionHandler: + """Handles the extensions to sliding sync.""" + + def __init__(self, hs: "HomeServer"): + self.store = hs.get_datastores().main + self.event_sources = hs.get_event_sources() + self.device_handler = hs.get_device_handler() + self.push_rules_handler = hs.get_push_rules_handler() + + @trace + async def get_extensions_response( + self, + sync_config: SlidingSyncConfig, + previous_connection_state: "PerConnectionState", + new_connection_state: "MutablePerConnectionState", + actual_lists: Mapping[str, SlidingSyncResult.SlidingWindowList], + actual_room_ids: Set[str], + actual_room_response_map: Mapping[str, SlidingSyncResult.RoomResult], + to_token: StreamToken, + from_token: Optional[SlidingSyncStreamToken], + ) -> SlidingSyncResult.Extensions: + """Handle extension requests. + + Args: + sync_config: Sync configuration + new_connection_state: Snapshot of the current per-connection state + new_per_connection_state: A mutable copy of the per-connection + state, used to record updates to the state during this request. + actual_lists: Sliding window API. A map of list key to list results in the + Sliding Sync response. + actual_room_ids: The actual room IDs in the the Sliding Sync response. + actual_room_response_map: A map of room ID to room results in the the + Sliding Sync response. + to_token: The point in the stream to sync up to. + from_token: The point in the stream to sync from. + """ + + if sync_config.extensions is None: + return SlidingSyncResult.Extensions() + + to_device_coro = None + if sync_config.extensions.to_device is not None: + to_device_coro = self.get_to_device_extension_response( + sync_config=sync_config, + to_device_request=sync_config.extensions.to_device, + to_token=to_token, + ) + + e2ee_coro = None + if sync_config.extensions.e2ee is not None: + e2ee_coro = self.get_e2ee_extension_response( + sync_config=sync_config, + e2ee_request=sync_config.extensions.e2ee, + to_token=to_token, + from_token=from_token, + ) + + account_data_coro = None + if sync_config.extensions.account_data is not None: + account_data_coro = self.get_account_data_extension_response( + sync_config=sync_config, + previous_connection_state=previous_connection_state, + new_connection_state=new_connection_state, + actual_lists=actual_lists, + actual_room_ids=actual_room_ids, + account_data_request=sync_config.extensions.account_data, + to_token=to_token, + from_token=from_token, + ) + + receipts_coro = None + if sync_config.extensions.receipts is not None: + receipts_coro = self.get_receipts_extension_response( + sync_config=sync_config, + previous_connection_state=previous_connection_state, + new_connection_state=new_connection_state, + actual_lists=actual_lists, + actual_room_ids=actual_room_ids, + actual_room_response_map=actual_room_response_map, + receipts_request=sync_config.extensions.receipts, + to_token=to_token, + from_token=from_token, + ) + + typing_coro = None + if sync_config.extensions.typing is not None: + typing_coro = self.get_typing_extension_response( + sync_config=sync_config, + actual_lists=actual_lists, + actual_room_ids=actual_room_ids, + actual_room_response_map=actual_room_response_map, + typing_request=sync_config.extensions.typing, + to_token=to_token, + from_token=from_token, + ) + + ( + to_device_response, + e2ee_response, + account_data_response, + receipts_response, + typing_response, + ) = await gather_optional_coroutines( + to_device_coro, + e2ee_coro, + account_data_coro, + receipts_coro, + typing_coro, + ) + + return SlidingSyncResult.Extensions( + to_device=to_device_response, + e2ee=e2ee_response, + account_data=account_data_response, + receipts=receipts_response, + typing=typing_response, + ) + + def find_relevant_room_ids_for_extension( + self, + requested_lists: Optional[StrCollection], + requested_room_ids: Optional[StrCollection], + actual_lists: Mapping[str, SlidingSyncResult.SlidingWindowList], + actual_room_ids: AbstractSet[str], + ) -> Set[str]: + """ + Handle the reserved `lists`/`rooms` keys for extensions. Extensions should only + return results for rooms in the Sliding Sync response. This matches up the + requested rooms/lists with the actual lists/rooms in the Sliding Sync response. + + {"lists": []} // Do not process any lists. + {"lists": ["rooms", "dms"]} // Process only a subset of lists. + {"lists": ["*"]} // Process all lists defined in the Sliding Window API. (This is the default.) + + {"rooms": []} // Do not process any specific rooms. + {"rooms": ["!a:b", "!c:d"]} // Process only a subset of room subscriptions. + {"rooms": ["*"]} // Process all room subscriptions defined in the Room Subscription API. (This is the default.) + + Args: + requested_lists: The `lists` from the extension request. + requested_room_ids: The `rooms` from the extension request. + actual_lists: The actual lists from the Sliding Sync response. + actual_room_ids: The actual room subscriptions from the Sliding Sync request. + """ + + # We only want to include account data for rooms that are already in the sliding + # sync response AND that were requested in the account data request. + relevant_room_ids: Set[str] = set() + + # See what rooms from the room subscriptions we should get account data for + if requested_room_ids is not None: + for room_id in requested_room_ids: + # A wildcard means we process all rooms from the room subscriptions + if room_id == "*": + relevant_room_ids.update(actual_room_ids) + break + + if room_id in actual_room_ids: + relevant_room_ids.add(room_id) + + # See what rooms from the sliding window lists we should get account data for + if requested_lists is not None: + for list_key in requested_lists: + # Just some typing because we share the variable name in multiple places + actual_list: Optional[SlidingSyncResult.SlidingWindowList] = None + + # A wildcard means we process rooms from all lists + if list_key == "*": + for actual_list in actual_lists.values(): + # We only expect a single SYNC operation for any list + assert len(actual_list.ops) == 1 + sync_op = actual_list.ops[0] + assert sync_op.op == OperationType.SYNC + + relevant_room_ids.update(sync_op.room_ids) + + break + + actual_list = actual_lists.get(list_key) + if actual_list is not None: + # We only expect a single SYNC operation for any list + assert len(actual_list.ops) == 1 + sync_op = actual_list.ops[0] + assert sync_op.op == OperationType.SYNC + + relevant_room_ids.update(sync_op.room_ids) + + return relevant_room_ids + + @trace + async def get_to_device_extension_response( + self, + sync_config: SlidingSyncConfig, + to_device_request: SlidingSyncConfig.Extensions.ToDeviceExtension, + to_token: StreamToken, + ) -> Optional[SlidingSyncResult.Extensions.ToDeviceExtension]: + """Handle to-device extension (MSC3885) + + Args: + sync_config: Sync configuration + to_device_request: The to-device extension from the request + to_token: The point in the stream to sync up to. + """ + user_id = sync_config.user.to_string() + device_id = sync_config.requester.device_id + + # Skip if the extension is not enabled + if not to_device_request.enabled: + return None + + # Check that this request has a valid device ID (not all requests have + # to belong to a device, and so device_id is None) + if device_id is None: + return SlidingSyncResult.Extensions.ToDeviceExtension( + next_batch=f"{to_token.to_device_key}", + events=[], + ) + + since_stream_id = 0 + if to_device_request.since is not None: + # We've already validated this is an int. + since_stream_id = int(to_device_request.since) + + if to_token.to_device_key < since_stream_id: + # The since token is ahead of our current token, so we return an + # empty response. + logger.warning( + "Got to-device.since from the future. since token: %r is ahead of our current to_device stream position: %r", + since_stream_id, + to_token.to_device_key, + ) + return SlidingSyncResult.Extensions.ToDeviceExtension( + next_batch=to_device_request.since, + events=[], + ) + + # Delete everything before the given since token, as we know the + # device must have received them. + deleted = await self.store.delete_messages_for_device( + user_id=user_id, + device_id=device_id, + up_to_stream_id=since_stream_id, + ) + + logger.debug( + "Deleted %d to-device messages up to %d for %s", + deleted, + since_stream_id, + user_id, + ) + + messages, stream_id = await self.store.get_messages_for_device( + user_id=user_id, + device_id=device_id, + from_stream_id=since_stream_id, + to_stream_id=to_token.to_device_key, + limit=min(to_device_request.limit, 100), # Limit to at most 100 events + ) + + return SlidingSyncResult.Extensions.ToDeviceExtension( + next_batch=f"{stream_id}", + events=messages, + ) + + @trace + async def get_e2ee_extension_response( + self, + sync_config: SlidingSyncConfig, + e2ee_request: SlidingSyncConfig.Extensions.E2eeExtension, + to_token: StreamToken, + from_token: Optional[SlidingSyncStreamToken], + ) -> Optional[SlidingSyncResult.Extensions.E2eeExtension]: + """Handle E2EE device extension (MSC3884) + + Args: + sync_config: Sync configuration + e2ee_request: The e2ee extension from the request + to_token: The point in the stream to sync up to. + from_token: The point in the stream to sync from. + """ + user_id = sync_config.user.to_string() + device_id = sync_config.requester.device_id + + # Skip if the extension is not enabled + if not e2ee_request.enabled: + return None + + device_list_updates: Optional[DeviceListUpdates] = None + if from_token is not None: + # TODO: This should take into account the `from_token` and `to_token` + device_list_updates = await self.device_handler.get_user_ids_changed( + user_id=user_id, + from_token=from_token.stream_token, + ) + + device_one_time_keys_count: Mapping[str, int] = {} + device_unused_fallback_key_types: Sequence[str] = [] + if device_id: + # TODO: We should have a way to let clients differentiate between the states of: + # * no change in OTK count since the provided since token + # * the server has zero OTKs left for this device + # Spec issue: https://github.com/matrix-org/matrix-doc/issues/3298 + device_one_time_keys_count = await self.store.count_e2e_one_time_keys( + user_id, device_id + ) + device_unused_fallback_key_types = ( + await self.store.get_e2e_unused_fallback_key_types(user_id, device_id) + ) + + return SlidingSyncResult.Extensions.E2eeExtension( + device_list_updates=device_list_updates, + device_one_time_keys_count=device_one_time_keys_count, + device_unused_fallback_key_types=device_unused_fallback_key_types, + ) + + @trace + async def get_account_data_extension_response( + self, + sync_config: SlidingSyncConfig, + previous_connection_state: "PerConnectionState", + new_connection_state: "MutablePerConnectionState", + actual_lists: Mapping[str, SlidingSyncResult.SlidingWindowList], + actual_room_ids: Set[str], + account_data_request: SlidingSyncConfig.Extensions.AccountDataExtension, + to_token: StreamToken, + from_token: Optional[SlidingSyncStreamToken], + ) -> Optional[SlidingSyncResult.Extensions.AccountDataExtension]: + """Handle Account Data extension (MSC3959) + + Args: + sync_config: Sync configuration + actual_lists: Sliding window API. A map of list key to list results in the + Sliding Sync response. + actual_room_ids: The actual room IDs in the the Sliding Sync response. + account_data_request: The account_data extension from the request + to_token: The point in the stream to sync up to. + from_token: The point in the stream to sync from. + """ + user_id = sync_config.user.to_string() + + # Skip if the extension is not enabled + if not account_data_request.enabled: + return None + + global_account_data_map: Mapping[str, JsonMapping] = {} + if from_token is not None: + # TODO: This should take into account the `from_token` and `to_token` + global_account_data_map = ( + await self.store.get_updated_global_account_data_for_user( + user_id, from_token.stream_token.account_data_key + ) + ) + + # TODO: This should take into account the `from_token` and `to_token` + have_push_rules_changed = await self.store.have_push_rules_changed_for_user( + user_id, from_token.stream_token.push_rules_key + ) + if have_push_rules_changed: + # TODO: This should take into account the `from_token` and `to_token` + global_account_data_map[ + AccountDataTypes.PUSH_RULES + ] = await self.push_rules_handler.push_rules_for_user(sync_config.user) + else: + # TODO: This should take into account the `to_token` + immutable_global_account_data_map = ( + await self.store.get_global_account_data_for_user(user_id) + ) + + # Use a `ChainMap` to avoid copying the immutable data from the cache + global_account_data_map = ChainMap( + { + # TODO: This should take into account the `to_token` + AccountDataTypes.PUSH_RULES: await self.push_rules_handler.push_rules_for_user( + sync_config.user + ) + }, + # Cast is safe because `ChainMap` only mutates the top-most map, + # see https://github.com/python/typeshed/issues/8430 + cast( + MutableMapping[str, JsonMapping], immutable_global_account_data_map + ), + ) + + # Fetch room account data + # + account_data_by_room_map: MutableMapping[str, Mapping[str, JsonMapping]] = {} + relevant_room_ids = self.find_relevant_room_ids_for_extension( + requested_lists=account_data_request.lists, + requested_room_ids=account_data_request.rooms, + actual_lists=actual_lists, + actual_room_ids=actual_room_ids, + ) + if len(relevant_room_ids) > 0: + # We need to handle the different cases depending on if we have sent + # down account data previously or not, so we split the relevant + # rooms up into different collections based on status. + live_rooms = set() + previously_rooms: Dict[str, int] = {} + initial_rooms = set() + + for room_id in relevant_room_ids: + if not from_token: + initial_rooms.add(room_id) + continue + + room_status = previous_connection_state.account_data.have_sent_room( + room_id + ) + if room_status.status == HaveSentRoomFlag.LIVE: + live_rooms.add(room_id) + elif room_status.status == HaveSentRoomFlag.PREVIOUSLY: + assert room_status.last_token is not None + previously_rooms[room_id] = room_status.last_token + elif room_status.status == HaveSentRoomFlag.NEVER: + initial_rooms.add(room_id) + else: + assert_never(room_status.status) + + # We fetch all room account data since the from_token. This is so + # that we can record which rooms have updates that haven't been sent + # down. + # + # Mapping from room_id to mapping of `type` to `content` of room account + # data events. + all_updates_since_the_from_token: Mapping[ + str, Mapping[str, JsonMapping] + ] = {} + if from_token is not None: + # TODO: This should take into account the `from_token` and `to_token` + all_updates_since_the_from_token = ( + await self.store.get_updated_room_account_data_for_user( + user_id, from_token.stream_token.account_data_key + ) + ) + + # Add room tags + # + # TODO: This should take into account the `from_token` and `to_token` + tags_by_room = await self.store.get_updated_tags( + user_id, from_token.stream_token.account_data_key + ) + for room_id, tags in tags_by_room.items(): + all_updates_since_the_from_token.setdefault(room_id, {})[ + AccountDataTypes.TAG + ] = {"tags": tags} + + # For live rooms we just get the updates from `all_updates_since_the_from_token` + if live_rooms: + for room_id in all_updates_since_the_from_token.keys() & live_rooms: + account_data_by_room_map[room_id] = ( + all_updates_since_the_from_token[room_id] + ) + + # For previously and initial rooms we query each room individually. + if previously_rooms or initial_rooms: + + async def handle_previously(room_id: str) -> None: + # Either get updates or all account data in the room + # depending on if the room state is PREVIOUSLY or NEVER. + previous_token = previously_rooms.get(room_id) + if previous_token is not None: + room_account_data = await ( + self.store.get_updated_room_account_data_for_user_for_room( + user_id=user_id, + room_id=room_id, + from_stream_id=previous_token, + to_stream_id=to_token.account_data_key, + ) + ) + + # Add room tags + changed = await self.store.has_tags_changed_for_room( + user_id=user_id, + room_id=room_id, + from_stream_id=previous_token, + to_stream_id=to_token.account_data_key, + ) + if changed: + # XXX: Ideally, this should take into account the `to_token` + # and return the set of tags at that time but we don't track + # changes to tags so we just have to return all tags for the + # room. + immutable_tag_map = await self.store.get_tags_for_room( + user_id, room_id + ) + room_account_data[AccountDataTypes.TAG] = { + "tags": immutable_tag_map + } + + # Only add an entry if there were any updates. + if room_account_data: + account_data_by_room_map[room_id] = room_account_data + else: + # TODO: This should take into account the `to_token` + immutable_room_account_data = ( + await self.store.get_account_data_for_room(user_id, room_id) + ) + + # Add room tags + # + # XXX: Ideally, this should take into account the `to_token` + # and return the set of tags at that time but we don't track + # changes to tags so we just have to return all tags for the + # room. + immutable_tag_map = await self.store.get_tags_for_room( + user_id, room_id + ) + + account_data_by_room_map[room_id] = ChainMap( + {AccountDataTypes.TAG: {"tags": immutable_tag_map}} + if immutable_tag_map + else {}, + # Cast is safe because `ChainMap` only mutates the top-most map, + # see https://github.com/python/typeshed/issues/8430 + cast( + MutableMapping[str, JsonMapping], + immutable_room_account_data, + ), + ) + + # We handle these rooms concurrently to speed it up. + await concurrently_execute( + handle_previously, + previously_rooms.keys() | initial_rooms, + limit=20, + ) + + # Now record which rooms are now up to data, and which rooms have + # pending updates to send. + new_connection_state.account_data.record_sent_rooms(previously_rooms.keys()) + new_connection_state.account_data.record_sent_rooms(initial_rooms) + missing_updates = ( + all_updates_since_the_from_token.keys() - relevant_room_ids + ) + if missing_updates: + # If we have missing updates then we must have had a from_token. + assert from_token is not None + + new_connection_state.account_data.record_unsent_rooms( + missing_updates, from_token.stream_token.account_data_key + ) + + return SlidingSyncResult.Extensions.AccountDataExtension( + global_account_data_map=global_account_data_map, + account_data_by_room_map=account_data_by_room_map, + ) + + @trace + async def get_receipts_extension_response( + self, + sync_config: SlidingSyncConfig, + previous_connection_state: "PerConnectionState", + new_connection_state: "MutablePerConnectionState", + actual_lists: Mapping[str, SlidingSyncResult.SlidingWindowList], + actual_room_ids: Set[str], + actual_room_response_map: Mapping[str, SlidingSyncResult.RoomResult], + receipts_request: SlidingSyncConfig.Extensions.ReceiptsExtension, + to_token: StreamToken, + from_token: Optional[SlidingSyncStreamToken], + ) -> Optional[SlidingSyncResult.Extensions.ReceiptsExtension]: + """Handle Receipts extension (MSC3960) + + Args: + sync_config: Sync configuration + previous_connection_state: The current per-connection state + new_connection_state: A mutable copy of the per-connection + state, used to record updates to the state. + actual_lists: Sliding window API. A map of list key to list results in the + Sliding Sync response. + actual_room_ids: The actual room IDs in the the Sliding Sync response. + actual_room_response_map: A map of room ID to room results in the the + Sliding Sync response. + account_data_request: The account_data extension from the request + to_token: The point in the stream to sync up to. + from_token: The point in the stream to sync from. + """ + # Skip if the extension is not enabled + if not receipts_request.enabled: + return None + + relevant_room_ids = self.find_relevant_room_ids_for_extension( + requested_lists=receipts_request.lists, + requested_room_ids=receipts_request.rooms, + actual_lists=actual_lists, + actual_room_ids=actual_room_ids, + ) + + room_id_to_receipt_map: Dict[str, JsonMapping] = {} + if len(relevant_room_ids) > 0: + # We need to handle the different cases depending on if we have sent + # down receipts previously or not, so we split the relevant rooms + # up into different collections based on status. + live_rooms = set() + previously_rooms: Dict[str, MultiWriterStreamToken] = {} + initial_rooms = set() + + for room_id in relevant_room_ids: + if not from_token: + initial_rooms.add(room_id) + continue + + # If we're sending down the room from scratch again for some + # reason, we should always resend the receipts as well + # (regardless of if we've sent them down before). This is to + # mimic the behaviour of what happens on initial sync, where you + # get a chunk of timeline with all of the corresponding receipts + # for the events in the timeline. + # + # We also resend down receipts when we "expand" the timeline, + # (see the "XXX: Odd behavior" in + # `synapse.handlers.sliding_sync`). + room_result = actual_room_response_map.get(room_id) + if room_result is not None: + if room_result.initial or room_result.unstable_expanded_timeline: + initial_rooms.add(room_id) + continue + + room_status = previous_connection_state.receipts.have_sent_room(room_id) + if room_status.status == HaveSentRoomFlag.LIVE: + live_rooms.add(room_id) + elif room_status.status == HaveSentRoomFlag.PREVIOUSLY: + assert room_status.last_token is not None + previously_rooms[room_id] = room_status.last_token + elif room_status.status == HaveSentRoomFlag.NEVER: + initial_rooms.add(room_id) + else: + assert_never(room_status.status) + + # The set of receipts that we fetched. Private receipts need to be + # filtered out before returning. + fetched_receipts = [] + + # For live rooms we just fetch all receipts in those rooms since the + # `since` token. + if live_rooms: + assert from_token is not None + receipts = await self.store.get_linearized_receipts_for_rooms( + room_ids=live_rooms, + from_key=from_token.stream_token.receipt_key, + to_key=to_token.receipt_key, + ) + fetched_receipts.extend(receipts) + + # For rooms we've previously sent down, but aren't up to date, we + # need to use the from token from the room status. + if previously_rooms: + # Fetch any missing rooms concurrently. + + async def handle_previously_room(room_id: str) -> None: + receipt_token = previously_rooms[room_id] + # TODO: Limit the number of receipts we're about to send down + # for the room, if its too many we should TODO + previously_receipts = ( + await self.store.get_linearized_receipts_for_room( + room_id=room_id, + from_key=receipt_token, + to_key=to_token.receipt_key, + ) + ) + fetched_receipts.extend(previously_receipts) + + await concurrently_execute( + handle_previously_room, previously_rooms.keys(), 20 + ) + + if initial_rooms: + # We also always send down receipts for the current user. + user_receipts = ( + await self.store.get_linearized_receipts_for_user_in_rooms( + user_id=sync_config.user.to_string(), + room_ids=initial_rooms, + to_key=to_token.receipt_key, + ) + ) + + # For rooms we haven't previously sent down, we could send all receipts + # from that room but we only want to include receipts for events + # in the timeline to avoid bloating and blowing up the sync response + # as the number of users in the room increases. (this behavior is part of the spec) + initial_rooms_and_event_ids = [ + (room_id, event.event_id) + for room_id in initial_rooms + if room_id in actual_room_response_map + for event in actual_room_response_map[room_id].timeline_events + ] + initial_receipts = await self.store.get_linearized_receipts_for_events( + room_and_event_ids=initial_rooms_and_event_ids, + ) + + # Combine the receipts for a room and add them to + # `fetched_receipts` + for room_id in initial_receipts.keys() | user_receipts.keys(): + receipt_content = ReceiptInRoom.merge_to_content( + list( + itertools.chain( + initial_receipts.get(room_id, []), + user_receipts.get(room_id, []), + ) + ) + ) + + fetched_receipts.append( + { + "room_id": room_id, + "type": EduTypes.RECEIPT, + "content": receipt_content, + } + ) + + fetched_receipts = ReceiptEventSource.filter_out_private_receipts( + fetched_receipts, sync_config.user.to_string() + ) + + for receipt in fetched_receipts: + # These fields should exist for every receipt + room_id = receipt["room_id"] + type = receipt["type"] + content = receipt["content"] + + room_id_to_receipt_map[room_id] = {"type": type, "content": content} + + # Update the per-connection state to track which rooms we have sent + # all the receipts for. + new_connection_state.receipts.record_sent_rooms(previously_rooms.keys()) + new_connection_state.receipts.record_sent_rooms(initial_rooms) + + if from_token: + # Now find the set of rooms that may have receipts that we're not sending + # down. We only need to check rooms that we have previously returned + # receipts for (in `previous_connection_state`) because we only care about + # updating `LIVE` rooms to `PREVIOUSLY`. The `PREVIOUSLY` rooms will just + # stay pointing at their previous position so we don't need to waste time + # checking those and since we default to `NEVER`, rooms that were `NEVER` + # sent before don't need to be recorded as we'll handle them correctly when + # they come into range for the first time. + rooms_no_receipts = [ + room_id + for room_id, room_status in previous_connection_state.receipts._statuses.items() + if room_status.status == HaveSentRoomFlag.LIVE + and room_id not in relevant_room_ids + ] + changed_rooms = await self.store.get_rooms_with_receipts_between( + rooms_no_receipts, + from_key=from_token.stream_token.receipt_key, + to_key=to_token.receipt_key, + ) + new_connection_state.receipts.record_unsent_rooms( + changed_rooms, from_token.stream_token.receipt_key + ) + + return SlidingSyncResult.Extensions.ReceiptsExtension( + room_id_to_receipt_map=room_id_to_receipt_map, + ) + + async def get_typing_extension_response( + self, + sync_config: SlidingSyncConfig, + actual_lists: Mapping[str, SlidingSyncResult.SlidingWindowList], + actual_room_ids: Set[str], + actual_room_response_map: Mapping[str, SlidingSyncResult.RoomResult], + typing_request: SlidingSyncConfig.Extensions.TypingExtension, + to_token: StreamToken, + from_token: Optional[SlidingSyncStreamToken], + ) -> Optional[SlidingSyncResult.Extensions.TypingExtension]: + """Handle Typing Notification extension (MSC3961) + + Args: + sync_config: Sync configuration + actual_lists: Sliding window API. A map of list key to list results in the + Sliding Sync response. + actual_room_ids: The actual room IDs in the the Sliding Sync response. + actual_room_response_map: A map of room ID to room results in the the + Sliding Sync response. + account_data_request: The account_data extension from the request + to_token: The point in the stream to sync up to. + from_token: The point in the stream to sync from. + """ + # Skip if the extension is not enabled + if not typing_request.enabled: + return None + + relevant_room_ids = self.find_relevant_room_ids_for_extension( + requested_lists=typing_request.lists, + requested_room_ids=typing_request.rooms, + actual_lists=actual_lists, + actual_room_ids=actual_room_ids, + ) + + room_id_to_typing_map: Dict[str, JsonMapping] = {} + if len(relevant_room_ids) > 0: + # Note: We don't need to take connection tracking into account for typing + # notifications because they'll get anything still relevant and hasn't timed + # out when the room comes into range. We consider the gap where the room + # fell out of range, as long enough for any typing notifications to have + # timed out (it's not worth the 30 seconds of data we may have missed). + typing_source = self.event_sources.sources.typing + typing_notifications, _ = await typing_source.get_new_events( + user=sync_config.user, + from_key=(from_token.stream_token.typing_key if from_token else 0), + to_key=to_token.typing_key, + # This is a dummy value and isn't used in the function + limit=0, + room_ids=relevant_room_ids, + is_guest=False, + ) + + for typing_notification in typing_notifications: + # These fields should exist for every typing notification + room_id = typing_notification["room_id"] + type = typing_notification["type"] + content = typing_notification["content"] + + room_id_to_typing_map[room_id] = {"type": type, "content": content} + + return SlidingSyncResult.Extensions.TypingExtension( + room_id_to_typing_map=room_id_to_typing_map, + ) diff --git a/synapse/handlers/sliding_sync/room_lists.py b/synapse/handlers/sliding_sync/room_lists.py new file mode 100644
index 0000000000..13e69f18a0 --- /dev/null +++ b/synapse/handlers/sliding_sync/room_lists.py
@@ -0,0 +1,2304 @@ +# +# This file is licensed under the Affero General Public License (AGPL) version 3. +# +# Copyright (C) 2023 New Vector, Ltd +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# See the GNU Affero General Public License for more details: +# <https://www.gnu.org/licenses/agpl-3.0.html>. +# + + +import enum +import logging +from itertools import chain +from typing import ( + TYPE_CHECKING, + AbstractSet, + Dict, + List, + Literal, + Mapping, + Optional, + Set, + Tuple, + Union, + cast, +) + +import attr +from immutabledict import immutabledict +from typing_extensions import assert_never + +from synapse.api.constants import ( + AccountDataTypes, + EventContentFields, + EventTypes, + Membership, +) +from synapse.api.room_versions import KNOWN_ROOM_VERSIONS +from synapse.events import StrippedStateEvent +from synapse.events.utils import parse_stripped_state_event +from synapse.logging.opentracing import start_active_span, trace +from synapse.storage.databases.main.state import ( + ROOM_UNKNOWN_SENTINEL, + Sentinel as StateSentinel, +) +from synapse.storage.databases.main.stream import CurrentStateDeltaMembership +from synapse.storage.invite_rule import InviteRule +from synapse.storage.roommember import ( + RoomsForUser, + RoomsForUserSlidingSync, + RoomsForUserStateReset, +) +from synapse.types import ( + MutableStateMap, + RoomStreamToken, + StateMap, + StrCollection, + StreamKeyType, + StreamToken, + UserID, +) +from synapse.types.handlers.sliding_sync import ( + HaveSentRoomFlag, + OperationType, + PerConnectionState, + RoomSyncConfig, + SlidingSyncConfig, + SlidingSyncResult, +) +from synapse.types.state import StateFilter + +if TYPE_CHECKING: + from synapse.server import HomeServer + + +logger = logging.getLogger(__name__) + + +class Sentinel(enum.Enum): + # defining a sentinel in this way allows mypy to correctly handle the + # type of a dictionary lookup and subsequent type narrowing. + UNSET_SENTINEL = object() + + +# Helper definition for the types that we might return. We do this to avoid +# copying data between types (which can be expensive for many rooms). +RoomsForUserType = Union[RoomsForUserStateReset, RoomsForUser, RoomsForUserSlidingSync] + + +@attr.s(auto_attribs=True, slots=True, frozen=True) +class SlidingSyncInterestedRooms: + """The set of rooms and metadata a client is interested in based on their + sliding sync request. + + Returned by `compute_interested_rooms`. + + Attributes: + lists: A mapping from list name to the list result for the response + relevant_room_map: A map from rooms that match the sync request to + their room sync config. + relevant_rooms_to_send_map: Subset of `relevant_room_map` that + includes the rooms that *may* have relevant updates. Rooms not + in this map will definitely not have room updates (though + extensions may have updates in these rooms). + newly_joined_rooms: The set of rooms that were joined in the token range + and the user is still joined to at the end of this range. + newly_left_rooms: The set of rooms that we left in the token range + and are still "leave" at the end of this range. + dm_room_ids: The set of rooms the user consider as direct-message (DM) rooms + """ + + lists: Mapping[str, SlidingSyncResult.SlidingWindowList] + relevant_room_map: Mapping[str, RoomSyncConfig] + relevant_rooms_to_send_map: Mapping[str, RoomSyncConfig] + all_rooms: Set[str] + room_membership_for_user_map: Mapping[str, RoomsForUserType] + + newly_joined_rooms: AbstractSet[str] + newly_left_rooms: AbstractSet[str] + dm_room_ids: AbstractSet[str] + + @staticmethod + def empty() -> "SlidingSyncInterestedRooms": + return SlidingSyncInterestedRooms( + lists={}, + relevant_room_map={}, + relevant_rooms_to_send_map={}, + all_rooms=set(), + room_membership_for_user_map={}, + newly_joined_rooms=set(), + newly_left_rooms=set(), + dm_room_ids=set(), + ) + + +def filter_membership_for_sync( + *, + user_id: str, + room_membership_for_user: RoomsForUserType, + newly_left: bool, +) -> bool: + """ + Returns True if the membership event should be included in the sync response, + otherwise False. + + Attributes: + user_id: The user ID that the membership applies to + room_membership_for_user: Membership information for the user in the room + """ + + membership = room_membership_for_user.membership + sender = room_membership_for_user.sender + + # We want to allow everything except rooms the user has left unless `newly_left` + # because we want everything that's *still* relevant to the user. We include + # `newly_left` rooms because the last event that the user should see is their own + # leave event. + # + # A leave != kick. This logic includes kicks (leave events where the sender is not + # the same user). + # + # When `sender=None`, it means that a state reset happened that removed the user + # from the room without a corresponding leave event. We can just remove the rooms + # since they are no longer relevant to the user but will still appear if they are + # `newly_left`. + return ( + # Anything except leave events + membership != Membership.LEAVE + # Unless... + or newly_left + # Allow kicks + or (membership == Membership.LEAVE and sender not in (user_id, None)) + ) + + +class SlidingSyncRoomLists: + """Handles calculating the room lists from sliding sync requests""" + + def __init__(self, hs: "HomeServer"): + self.store = hs.get_datastores().main + self.storage_controllers = hs.get_storage_controllers() + self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync + self.is_mine_id = hs.is_mine_id + + async def compute_interested_rooms( + self, + sync_config: SlidingSyncConfig, + previous_connection_state: "PerConnectionState", + to_token: StreamToken, + from_token: Optional[StreamToken], + ) -> SlidingSyncInterestedRooms: + """Fetch the set of rooms that match the request""" + has_lists = sync_config.lists is not None and len(sync_config.lists) > 0 + has_room_subscriptions = ( + sync_config.room_subscriptions is not None + and len(sync_config.room_subscriptions) > 0 + ) + + if not has_lists and not has_room_subscriptions: + return SlidingSyncInterestedRooms.empty() + + if await self.store.have_finished_sliding_sync_background_jobs(): + return await self._compute_interested_rooms_new_tables( + sync_config=sync_config, + previous_connection_state=previous_connection_state, + to_token=to_token, + from_token=from_token, + ) + else: + # FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the + # foreground update for + # `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by + # https://github.com/element-hq/synapse/issues/17623) + return await self._compute_interested_rooms_fallback( + sync_config=sync_config, + previous_connection_state=previous_connection_state, + to_token=to_token, + from_token=from_token, + ) + + @trace + async def _compute_interested_rooms_new_tables( + self, + sync_config: SlidingSyncConfig, + previous_connection_state: "PerConnectionState", + to_token: StreamToken, + from_token: Optional[StreamToken], + ) -> SlidingSyncInterestedRooms: + """Implementation of `compute_interested_rooms` using new sliding sync db tables.""" + user_id = sync_config.user.to_string() + + # Assemble sliding window lists + lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {} + # Keep track of the rooms that we can display and need to fetch more info about + relevant_room_map: Dict[str, RoomSyncConfig] = {} + # The set of room IDs of all rooms that could appear in any list. These + # include rooms that are outside the list ranges. + all_rooms: Set[str] = set() + + # Note: this won't include rooms the user has left themselves. We add back + # `newly_left` rooms below. This is more efficient than fetching all rooms and + # then filtering out the old left rooms. + room_membership_for_user_map = ( + await self.store.get_sliding_sync_rooms_for_user_from_membership_snapshots( + user_id + ) + ) + # To play nice with the rewind logic below, we need to go fetch the rooms the + # user has left themselves but only if it changed after the `to_token`. + # + # If a leave happens *after* the token range, we may have still been joined (or + # any non-self-leave which is relevant to sync) to the room before so we need to + # include it in the list of potentially relevant rooms and apply our rewind + # logic (outside of this function) to see if it's actually relevant. + # + # We do this separately from + # `get_sliding_sync_rooms_for_user_from_membership_snapshots` as those results + # are cached and the `to_token` isn't very cache friendly (people are constantly + # requesting with new tokens) so we separate it out here. + self_leave_room_membership_for_user_map = ( + await self.store.get_sliding_sync_self_leave_rooms_after_to_token( + user_id, to_token + ) + ) + if self_leave_room_membership_for_user_map: + # FIXME: It would be nice to avoid this copy but since + # `get_sliding_sync_rooms_for_user_from_membership_snapshots` is cached, it + # can't return a mutable value like a `dict`. We make the copy to get a + # mutable dict that we can change. We try to only make a copy when necessary + # (if we actually need to change something) as in most cases, the logic + # doesn't need to run. + room_membership_for_user_map = dict(room_membership_for_user_map) + room_membership_for_user_map.update(self_leave_room_membership_for_user_map) + + # Remove invites from ignored users + ignored_users = await self.store.ignored_users(user_id) + invite_config = await self.store.get_invite_config_for_user(user_id) + if ignored_users: + # FIXME: It would be nice to avoid this copy but since + # `get_sliding_sync_rooms_for_user_from_membership_snapshots` is cached, it + # can't return a mutable value like a `dict`. We make the copy to get a + # mutable dict that we can change. We try to only make a copy when necessary + # (if we actually need to change something) as in most cases, the logic + # doesn't need to run. + room_membership_for_user_map = dict(room_membership_for_user_map) + # Make a copy so we don't run into an error: `dictionary changed size during + # iteration`, when we remove items + for room_id in list(room_membership_for_user_map.keys()): + room_for_user_sliding_sync = room_membership_for_user_map[room_id] + if ( + room_for_user_sliding_sync.membership == Membership.INVITE + and room_for_user_sliding_sync.sender + and ( + room_for_user_sliding_sync.sender in ignored_users + or invite_config.get_invite_rule( + room_for_user_sliding_sync.sender + ) + == InviteRule.IGNORE + ) + ): + room_membership_for_user_map.pop(room_id, None) + + ( + newly_joined_room_ids, + newly_left_room_map, + ) = await self._get_newly_joined_and_left_rooms( + user_id, from_token=from_token, to_token=to_token + ) + + changes = await self._get_rewind_changes_to_current_membership_to_token( + sync_config.user, room_membership_for_user_map, to_token=to_token + ) + if changes: + # FIXME: It would be nice to avoid this copy but since + # `get_sliding_sync_rooms_for_user_from_membership_snapshots` is cached, it + # can't return a mutable value like a `dict`. We make the copy to get a + # mutable dict that we can change. We try to only make a copy when necessary + # (if we actually need to change something) as in most cases, the logic + # doesn't need to run. + room_membership_for_user_map = dict(room_membership_for_user_map) + for room_id, change in changes.items(): + if change is None: + # Remove rooms that the user joined after the `to_token` + room_membership_for_user_map.pop(room_id, None) + continue + + existing_room = room_membership_for_user_map.get(room_id) + if existing_room is not None: + # Update room membership events to the point in time of the `to_token` + room_for_user = RoomsForUserSlidingSync( + room_id=room_id, + sender=change.sender, + membership=change.membership, + event_id=change.event_id, + event_pos=change.event_pos, + room_version_id=change.room_version_id, + # We keep the state of the room though + has_known_state=existing_room.has_known_state, + room_type=existing_room.room_type, + is_encrypted=existing_room.is_encrypted, + ) + if filter_membership_for_sync( + user_id=user_id, + room_membership_for_user=room_for_user, + newly_left=room_id in newly_left_room_map, + ): + room_membership_for_user_map[room_id] = room_for_user + else: + room_membership_for_user_map.pop(room_id, None) + + # Add back `newly_left` rooms (rooms left in the from -> to token range). + # + # We do this because `get_sliding_sync_rooms_for_user_from_membership_snapshots(...)` doesn't include + # rooms that the user left themselves as it's more efficient to add them back + # here than to fetch all rooms and then filter out the old left rooms. The user + # only leaves a room once in a blue moon so this barely needs to run. + # + missing_newly_left_rooms = ( + newly_left_room_map.keys() - room_membership_for_user_map.keys() + ) + if missing_newly_left_rooms: + # FIXME: It would be nice to avoid this copy but since + # `get_sliding_sync_rooms_for_user_from_membership_snapshots` is cached, it + # can't return a mutable value like a `dict`. We make the copy to get a + # mutable dict that we can change. We try to only make a copy when necessary + # (if we actually need to change something) as in most cases, the logic + # doesn't need to run. + room_membership_for_user_map = dict(room_membership_for_user_map) + for room_id in missing_newly_left_rooms: + newly_left_room_for_user = newly_left_room_map[room_id] + # This should be a given + assert newly_left_room_for_user.membership == Membership.LEAVE + + # Add back `newly_left` rooms + # + # Check for membership and state in the Sliding Sync tables as it's just + # another membership + newly_left_room_for_user_sliding_sync = ( + await self.store.get_sliding_sync_room_for_user(user_id, room_id) + ) + # If the membership exists, it's just a normal user left the room on + # their own + if newly_left_room_for_user_sliding_sync is not None: + if filter_membership_for_sync( + user_id=user_id, + room_membership_for_user=newly_left_room_for_user_sliding_sync, + newly_left=room_id in newly_left_room_map, + ): + room_membership_for_user_map[room_id] = ( + newly_left_room_for_user_sliding_sync + ) + else: + room_membership_for_user_map.pop(room_id, None) + + change = changes.get(room_id) + if change is not None: + # Update room membership events to the point in time of the `to_token` + room_for_user = RoomsForUserSlidingSync( + room_id=room_id, + sender=change.sender, + membership=change.membership, + event_id=change.event_id, + event_pos=change.event_pos, + room_version_id=change.room_version_id, + # We keep the state of the room though + has_known_state=newly_left_room_for_user_sliding_sync.has_known_state, + room_type=newly_left_room_for_user_sliding_sync.room_type, + is_encrypted=newly_left_room_for_user_sliding_sync.is_encrypted, + ) + if filter_membership_for_sync( + user_id=user_id, + room_membership_for_user=room_for_user, + newly_left=room_id in newly_left_room_map, + ): + room_membership_for_user_map[room_id] = room_for_user + else: + room_membership_for_user_map.pop(room_id, None) + + # If we are `newly_left` from the room but can't find any membership, + # then we have been "state reset" out of the room + else: + # Get the state at the time. We can't read from the Sliding Sync + # tables because the user has no membership in the room according to + # the state (thanks to the state reset). + # + # Note: `room_type` never changes, so we can just get current room + # type + room_type = await self.store.get_room_type(room_id) + has_known_state = room_type is not ROOM_UNKNOWN_SENTINEL + if isinstance(room_type, StateSentinel): + room_type = None + + # Get the encryption status at the time of the token + is_encrypted = await self.get_is_encrypted_for_room_at_token( + room_id, + newly_left_room_for_user.event_pos.to_room_stream_token(), + ) + + room_for_user = RoomsForUserSlidingSync( + room_id=room_id, + sender=newly_left_room_for_user.sender, + membership=newly_left_room_for_user.membership, + event_id=newly_left_room_for_user.event_id, + event_pos=newly_left_room_for_user.event_pos, + room_version_id=newly_left_room_for_user.room_version_id, + has_known_state=has_known_state, + room_type=room_type, + is_encrypted=is_encrypted, + ) + if filter_membership_for_sync( + user_id=user_id, + room_membership_for_user=room_for_user, + newly_left=room_id in newly_left_room_map, + ): + room_membership_for_user_map[room_id] = room_for_user + else: + room_membership_for_user_map.pop(room_id, None) + + dm_room_ids = await self._get_dm_rooms_for_user(user_id) + + if sync_config.lists: + sync_room_map = room_membership_for_user_map + with start_active_span("assemble_sliding_window_lists"): + for list_key, list_config in sync_config.lists.items(): + # Apply filters + filtered_sync_room_map = sync_room_map + if list_config.filters is not None: + filtered_sync_room_map = await self.filter_rooms_using_tables( + user_id, + sync_room_map, + previous_connection_state, + list_config.filters, + to_token, + dm_room_ids, + ) + + # Find which rooms are partially stated and may need to be filtered out + # depending on the `required_state` requested (see below). + partial_state_rooms = await self.store.get_partial_rooms() + + # Since creating the `RoomSyncConfig` takes some work, let's just do it + # once. + room_sync_config = RoomSyncConfig.from_room_config(list_config) + + # Exclude partially-stated rooms if we must wait for the room to be + # fully-stated + if room_sync_config.must_await_full_state(self.is_mine_id): + filtered_sync_room_map = { + room_id: room + for room_id, room in filtered_sync_room_map.items() + if room_id not in partial_state_rooms + } + + all_rooms.update(filtered_sync_room_map) + + ops: List[SlidingSyncResult.SlidingWindowList.Operation] = [] + + if list_config.ranges: + # Optimization: If we are asking for the full range, we don't + # need to sort the list. + if ( + # We're looking for a single range that covers the entire list + len(list_config.ranges) == 1 + # Range starts at 0 + and list_config.ranges[0][0] == 0 + # And the range extends to the end of the list or more. Each + # side is inclusive. + and list_config.ranges[0][1] + >= len(filtered_sync_room_map) - 1 + ): + sorted_room_info: List[RoomsForUserType] = list( + filtered_sync_room_map.values() + ) + else: + # Sort the list + sorted_room_info = await self.sort_rooms( + # Cast is safe because RoomsForUserSlidingSync is part + # of the `RoomsForUserType` union. Why can't it detect this? + cast( + Dict[str, RoomsForUserType], filtered_sync_room_map + ), + to_token, + # We only need to sort the rooms up to the end + # of the largest range. Both sides of range are + # inclusive so we `+ 1`. + limit=max(range[1] + 1 for range in list_config.ranges), + ) + + for range in list_config.ranges: + room_ids_in_list: List[str] = [] + + # We're going to loop through the sorted list of rooms starting + # at the range start index and keep adding rooms until we fill + # up the range or run out of rooms. + # + # Both sides of range are inclusive so we `+ 1` + max_num_rooms = range[1] - range[0] + 1 + for room_membership in sorted_room_info[range[0] :]: + room_id = room_membership.room_id + + if len(room_ids_in_list) >= max_num_rooms: + break + + # Take the superset of the `RoomSyncConfig` for each room. + # + # Update our `relevant_room_map` with the room we're going + # to display and need to fetch more info about. + existing_room_sync_config = relevant_room_map.get( + room_id + ) + if existing_room_sync_config is not None: + room_sync_config = existing_room_sync_config.combine_room_sync_config( + room_sync_config + ) + + relevant_room_map[room_id] = room_sync_config + + room_ids_in_list.append(room_id) + + ops.append( + SlidingSyncResult.SlidingWindowList.Operation( + op=OperationType.SYNC, + range=range, + room_ids=room_ids_in_list, + ) + ) + + lists[list_key] = SlidingSyncResult.SlidingWindowList( + count=len(filtered_sync_room_map), + ops=ops, + ) + + if sync_config.room_subscriptions: + with start_active_span("assemble_room_subscriptions"): + # FIXME: It would be nice to avoid this copy but since + # `get_sliding_sync_rooms_for_user_from_membership_snapshots` is cached, it + # can't return a mutable value like a `dict`. We make the copy to get a + # mutable dict that we can change. We try to only make a copy when necessary + # (if we actually need to change something) as in most cases, the logic + # doesn't need to run. + room_membership_for_user_map = dict(room_membership_for_user_map) + + # Find which rooms are partially stated and may need to be filtered out + # depending on the `required_state` requested (see below). + partial_state_rooms = await self.store.get_partial_rooms() + + # Fetch any rooms that we have not already fetched from the database. + subscription_sliding_sync_rooms = ( + await self.store.get_sliding_sync_room_for_user_batch( + user_id, + sync_config.room_subscriptions.keys() + - room_membership_for_user_map.keys(), + ) + ) + room_membership_for_user_map.update(subscription_sliding_sync_rooms) + + for ( + room_id, + room_subscription, + ) in sync_config.room_subscriptions.items(): + # Check if we have a membership for the room, but didn't pull it out + # above. This could be e.g. a leave that we don't pull out by + # default. + current_room_entry = room_membership_for_user_map.get(room_id) + if not current_room_entry: + # TODO: Handle rooms the user isn't in. + continue + + all_rooms.add(room_id) + + # Take the superset of the `RoomSyncConfig` for each room. + room_sync_config = RoomSyncConfig.from_room_config( + room_subscription + ) + + # Exclude partially-stated rooms if we must wait for the room to be + # fully-stated + if room_sync_config.must_await_full_state(self.is_mine_id): + if room_id in partial_state_rooms: + continue + + # Update our `relevant_room_map` with the room we're going to display + # and need to fetch more info about. + existing_room_sync_config = relevant_room_map.get(room_id) + if existing_room_sync_config is not None: + room_sync_config = ( + existing_room_sync_config.combine_room_sync_config( + room_sync_config + ) + ) + + relevant_room_map[room_id] = room_sync_config + + # Filtered subset of `relevant_room_map` for rooms that may have updates + # (in the event stream) + relevant_rooms_to_send_map = await self._filter_relevant_rooms_to_send( + previous_connection_state, from_token, relevant_room_map + ) + + return SlidingSyncInterestedRooms( + lists=lists, + relevant_room_map=relevant_room_map, + relevant_rooms_to_send_map=relevant_rooms_to_send_map, + all_rooms=all_rooms, + room_membership_for_user_map=room_membership_for_user_map, + newly_joined_rooms=newly_joined_room_ids, + newly_left_rooms=set(newly_left_room_map), + dm_room_ids=dm_room_ids, + ) + + async def _compute_interested_rooms_fallback( + self, + sync_config: SlidingSyncConfig, + previous_connection_state: "PerConnectionState", + to_token: StreamToken, + from_token: Optional[StreamToken], + ) -> SlidingSyncInterestedRooms: + """Fallback code when the database background updates haven't completed yet.""" + + ( + room_membership_for_user_map, + newly_joined_room_ids, + newly_left_room_ids, + ) = await self.get_room_membership_for_user_at_to_token( + sync_config.user, to_token, from_token + ) + + dm_room_ids = await self._get_dm_rooms_for_user(sync_config.user.to_string()) + + # Assemble sliding window lists + lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {} + # Keep track of the rooms that we can display and need to fetch more info about + relevant_room_map: Dict[str, RoomSyncConfig] = {} + # The set of room IDs of all rooms that could appear in any list. These + # include rooms that are outside the list ranges. + all_rooms: Set[str] = set() + + if sync_config.lists: + with start_active_span("assemble_sliding_window_lists"): + sync_room_map = await self.filter_rooms_relevant_for_sync( + user=sync_config.user, + room_membership_for_user_map=room_membership_for_user_map, + newly_left_room_ids=newly_left_room_ids, + ) + + for list_key, list_config in sync_config.lists.items(): + # Apply filters + filtered_sync_room_map = sync_room_map + if list_config.filters is not None: + filtered_sync_room_map = await self.filter_rooms( + sync_config.user, + sync_room_map, + previous_connection_state, + list_config.filters, + to_token, + dm_room_ids, + ) + + # Find which rooms are partially stated and may need to be filtered out + # depending on the `required_state` requested (see below). + partial_state_rooms = await self.store.get_partial_rooms() + + # Since creating the `RoomSyncConfig` takes some work, let's just do it + # once. + room_sync_config = RoomSyncConfig.from_room_config(list_config) + + # Exclude partially-stated rooms if we must wait for the room to be + # fully-stated + if room_sync_config.must_await_full_state(self.is_mine_id): + filtered_sync_room_map = { + room_id: room + for room_id, room in filtered_sync_room_map.items() + if room_id not in partial_state_rooms + } + + all_rooms.update(filtered_sync_room_map) + + # Sort the list + sorted_room_info = await self.sort_rooms( + filtered_sync_room_map, to_token + ) + + ops: List[SlidingSyncResult.SlidingWindowList.Operation] = [] + if list_config.ranges: + for range in list_config.ranges: + room_ids_in_list: List[str] = [] + + # We're going to loop through the sorted list of rooms starting + # at the range start index and keep adding rooms until we fill + # up the range or run out of rooms. + # + # Both sides of range are inclusive so we `+ 1` + max_num_rooms = range[1] - range[0] + 1 + for room_membership in sorted_room_info[range[0] :]: + room_id = room_membership.room_id + + if len(room_ids_in_list) >= max_num_rooms: + break + + # Take the superset of the `RoomSyncConfig` for each room. + # + # Update our `relevant_room_map` with the room we're going + # to display and need to fetch more info about. + existing_room_sync_config = relevant_room_map.get( + room_id + ) + if existing_room_sync_config is not None: + room_sync_config = existing_room_sync_config.combine_room_sync_config( + room_sync_config + ) + + relevant_room_map[room_id] = room_sync_config + + room_ids_in_list.append(room_id) + + ops.append( + SlidingSyncResult.SlidingWindowList.Operation( + op=OperationType.SYNC, + range=range, + room_ids=room_ids_in_list, + ) + ) + + lists[list_key] = SlidingSyncResult.SlidingWindowList( + count=len(sorted_room_info), + ops=ops, + ) + + if sync_config.room_subscriptions: + with start_active_span("assemble_room_subscriptions"): + # Find which rooms are partially stated and may need to be filtered out + # depending on the `required_state` requested (see below). + partial_state_rooms = await self.store.get_partial_rooms() + + for ( + room_id, + room_subscription, + ) in sync_config.room_subscriptions.items(): + room_membership_for_user_at_to_token = ( + await self.check_room_subscription_allowed_for_user( + room_id=room_id, + room_membership_for_user_map=room_membership_for_user_map, + to_token=to_token, + ) + ) + + # Skip this room if the user isn't allowed to see it + if not room_membership_for_user_at_to_token: + continue + + all_rooms.add(room_id) + + room_membership_for_user_map[room_id] = ( + room_membership_for_user_at_to_token + ) + + # Take the superset of the `RoomSyncConfig` for each room. + room_sync_config = RoomSyncConfig.from_room_config( + room_subscription + ) + + # Exclude partially-stated rooms if we must wait for the room to be + # fully-stated + if room_sync_config.must_await_full_state(self.is_mine_id): + if room_id in partial_state_rooms: + continue + + all_rooms.add(room_id) + + # Update our `relevant_room_map` with the room we're going to display + # and need to fetch more info about. + existing_room_sync_config = relevant_room_map.get(room_id) + if existing_room_sync_config is not None: + room_sync_config = ( + existing_room_sync_config.combine_room_sync_config( + room_sync_config + ) + ) + + relevant_room_map[room_id] = room_sync_config + + # Filtered subset of `relevant_room_map` for rooms that may have updates + # (in the event stream) + relevant_rooms_to_send_map = await self._filter_relevant_rooms_to_send( + previous_connection_state, from_token, relevant_room_map + ) + + return SlidingSyncInterestedRooms( + lists=lists, + relevant_room_map=relevant_room_map, + relevant_rooms_to_send_map=relevant_rooms_to_send_map, + all_rooms=all_rooms, + room_membership_for_user_map=room_membership_for_user_map, + newly_joined_rooms=newly_joined_room_ids, + newly_left_rooms=newly_left_room_ids, + dm_room_ids=dm_room_ids, + ) + + async def _filter_relevant_rooms_to_send( + self, + previous_connection_state: PerConnectionState, + from_token: Optional[StreamToken], + relevant_room_map: Dict[str, RoomSyncConfig], + ) -> Dict[str, RoomSyncConfig]: + """Filters the `relevant_room_map` down to those rooms that may have + updates we need to fetch and return.""" + + # Filtered subset of `relevant_room_map` for rooms that may have updates + # (in the event stream) + relevant_rooms_to_send_map: Dict[str, RoomSyncConfig] = relevant_room_map + if relevant_room_map: + with start_active_span("filter_relevant_rooms_to_send"): + if from_token: + rooms_should_send = set() + + # First we check if there are rooms that match a list/room + # subscription and have updates we need to send (i.e. either because + # we haven't sent the room down, or we have but there are missing + # updates). + for room_id, room_config in relevant_room_map.items(): + prev_room_sync_config = ( + previous_connection_state.room_configs.get(room_id) + ) + if prev_room_sync_config is not None: + # Always include rooms whose timeline limit has increased. + # (see the "XXX: Odd behavior" described below) + if ( + prev_room_sync_config.timeline_limit + < room_config.timeline_limit + ): + rooms_should_send.add(room_id) + continue + + status = previous_connection_state.rooms.have_sent_room(room_id) + if ( + # The room was never sent down before so the client needs to know + # about it regardless of any updates. + status.status == HaveSentRoomFlag.NEVER + # `PREVIOUSLY` literally means the "room was sent down before *AND* + # there are updates we haven't sent down" so we already know this + # room has updates. + or status.status == HaveSentRoomFlag.PREVIOUSLY + ): + rooms_should_send.add(room_id) + elif status.status == HaveSentRoomFlag.LIVE: + # We know that we've sent all updates up until `from_token`, + # so we just need to check if there have been updates since + # then. + pass + else: + assert_never(status.status) + + # We only need to check for new events since any state changes + # will also come down as new events. + rooms_that_have_updates = ( + self.store.get_rooms_that_might_have_updates( + relevant_room_map.keys(), from_token.room_key + ) + ) + rooms_should_send.update(rooms_that_have_updates) + relevant_rooms_to_send_map = { + room_id: room_sync_config + for room_id, room_sync_config in relevant_room_map.items() + if room_id in rooms_should_send + } + + return relevant_rooms_to_send_map + + @trace + async def _get_rewind_changes_to_current_membership_to_token( + self, + user: UserID, + rooms_for_user: Mapping[str, RoomsForUserType], + to_token: StreamToken, + ) -> Mapping[str, Optional[RoomsForUser]]: + """ + Takes the current set of rooms for a user (retrieved after the given + token), and returns the changes needed to "rewind" it to match the set of + memberships *at that token* (<= `to_token`). + + Args: + user: User to fetch rooms for + rooms_for_user: The set of rooms for the user after the `to_token`. + to_token: The token to rewind to + + Returns: + The changes to apply to rewind the the current memberships. + """ + # If the user has never joined any rooms before, we can just return an empty list + if not rooms_for_user: + return {} + + user_id = user.to_string() + + # Get the `RoomStreamToken` that represents the spot we queried up to when we got + # our membership snapshot from `get_rooms_for_local_user_where_membership_is()`. + # + # First, we need to get the max stream_ordering of each event persister instance + # that we queried events from. + instance_to_max_stream_ordering_map: Dict[str, int] = {} + for room_for_user in rooms_for_user.values(): + instance_name = room_for_user.event_pos.instance_name + stream_ordering = room_for_user.event_pos.stream + + current_instance_max_stream_ordering = ( + instance_to_max_stream_ordering_map.get(instance_name) + ) + if ( + current_instance_max_stream_ordering is None + or stream_ordering > current_instance_max_stream_ordering + ): + instance_to_max_stream_ordering_map[instance_name] = stream_ordering + + # Then assemble the `RoomStreamToken` + min_stream_pos = min(instance_to_max_stream_ordering_map.values()) + membership_snapshot_token = RoomStreamToken( + # Minimum position in the `instance_map` + stream=min_stream_pos, + instance_map=immutabledict( + { + instance_name: stream_pos + for instance_name, stream_pos in instance_to_max_stream_ordering_map.items() + if stream_pos > min_stream_pos + } + ), + ) + + # Since we fetched the users room list at some point in time after the + # tokens, we need to revert/rewind some membership changes to match the point in + # time of the `to_token`. In particular, we need to make these fixups: + # + # - a) Remove rooms that the user joined after the `to_token` + # - b) Update room membership events to the point in time of the `to_token` + + # Fetch membership changes that fall in the range from `to_token` up to + # `membership_snapshot_token` + # + # If our `to_token` is already the same or ahead of the latest room membership + # for the user, we don't need to do any "2)" fix-ups and can just straight-up + # use the room list from the snapshot as a base (nothing has changed) + current_state_delta_membership_changes_after_to_token = [] + if not membership_snapshot_token.is_before_or_eq(to_token.room_key): + current_state_delta_membership_changes_after_to_token = ( + await self.store.get_current_state_delta_membership_changes_for_user( + user_id, + from_key=to_token.room_key, + to_key=membership_snapshot_token, + excluded_room_ids=self.rooms_to_exclude_globally, + ) + ) + + if not current_state_delta_membership_changes_after_to_token: + # There have been no membership changes, so we can early return. + return {} + + # Otherwise we're about to make changes to `rooms_for_user`, so we turn + # it into a mutable dict. + changes: Dict[str, Optional[RoomsForUser]] = {} + + # Assemble a list of the first membership event after the `to_token` so we can + # step backward to the previous membership that would apply to the from/to + # range. + first_membership_change_by_room_id_after_to_token: Dict[ + str, CurrentStateDeltaMembership + ] = {} + for membership_change in current_state_delta_membership_changes_after_to_token: + # Only set if we haven't already set it + first_membership_change_by_room_id_after_to_token.setdefault( + membership_change.room_id, membership_change + ) + + # Since we fetched a snapshot of the users room list at some point in time after + # the from/to tokens, we need to revert/rewind some membership changes to match + # the point in time of the `to_token`. + for ( + room_id, + first_membership_change_after_to_token, + ) in first_membership_change_by_room_id_after_to_token.items(): + # 1a) Remove rooms that the user joined after the `to_token` + if first_membership_change_after_to_token.prev_event_id is None: + changes[room_id] = None + # 1b) 1c) From the first membership event after the `to_token`, step backward to the + # previous membership that would apply to the from/to range. + else: + # We don't expect these fields to be `None` if we have a `prev_event_id` + # but we're being defensive since it's possible that the prev event was + # culled from the database. + if ( + first_membership_change_after_to_token.prev_event_pos is not None + and first_membership_change_after_to_token.prev_membership + is not None + and first_membership_change_after_to_token.prev_sender is not None + ): + # We need to know the room version ID, which we normally we + # can get from the current membership, but if we don't have + # that then we need to query the DB. + current_membership = rooms_for_user.get(room_id) + if current_membership is not None: + room_version_id = current_membership.room_version_id + else: + room_version_id = await self.store.get_room_version_id(room_id) + + changes[room_id] = RoomsForUser( + room_id=room_id, + event_id=first_membership_change_after_to_token.prev_event_id, + event_pos=first_membership_change_after_to_token.prev_event_pos, + membership=first_membership_change_after_to_token.prev_membership, + sender=first_membership_change_after_to_token.prev_sender, + room_version_id=room_version_id, + ) + else: + # If we can't find the previous membership event, we shouldn't + # include the room in the sync response since we can't determine the + # exact membership state and shouldn't rely on the current snapshot. + changes[room_id] = None + + return changes + + @trace + async def get_room_membership_for_user_at_to_token( + self, + user: UserID, + to_token: StreamToken, + from_token: Optional[StreamToken], + ) -> Tuple[Dict[str, RoomsForUserType], AbstractSet[str], AbstractSet[str]]: + """ + Fetch room IDs that the user has had membership in (the full room list including + long-lost left rooms that will be filtered, sorted, and sliced). + + We're looking for rooms where the user has had any sort of membership in the + token range (> `from_token` and <= `to_token`) + + In order for bans/kicks to not show up, you need to `/forget` those rooms. This + doesn't modify the event itself though and only adds the `forgotten` flag to the + `room_memberships` table in Synapse. There isn't a way to tell when a room was + forgotten at the moment so we can't factor it into the token range. + + Args: + user: User to fetch rooms for + to_token: The token to fetch rooms up to. + from_token: The point in the stream to sync from. + + Returns: + A 3-tuple of: + - A dictionary of room IDs that the user has had membership in along with + membership information in that room at the time of `to_token`. + - Set of newly joined rooms + - Set of newly left rooms + """ + user_id = user.to_string() + + # First grab a current snapshot rooms for the user + # (also handles forgotten rooms) + room_for_user_list = await self.store.get_rooms_for_local_user_where_membership_is( + user_id=user_id, + # We want to fetch any kind of membership (joined and left rooms) in order + # to get the `event_pos` of the latest room membership event for the + # user. + membership_list=Membership.LIST, + excluded_rooms=self.rooms_to_exclude_globally, + ) + + # We filter out unknown room versions before we try and load any + # metadata about the room. They shouldn't go down sync anyway, and their + # metadata may be in a broken state. + room_for_user_list = [ + room_for_user + for room_for_user in room_for_user_list + if room_for_user.room_version_id in KNOWN_ROOM_VERSIONS + ] + + # Remove invites from ignored users + ignored_users = await self.store.ignored_users(user_id) + if ignored_users: + room_for_user_list = [ + room_for_user + for room_for_user in room_for_user_list + if not ( + room_for_user.membership == Membership.INVITE + and room_for_user.sender in ignored_users + ) + ] + + ( + newly_joined_room_ids, + newly_left_room_map, + ) = await self._get_newly_joined_and_left_rooms_fallback( + user_id, to_token=to_token, from_token=from_token + ) + + # If the user has never joined any rooms before, we can just return an empty + # list. We also have to check the `newly_left_room_map` in case someone was + # state reset out of all of the rooms they were in. + if not room_for_user_list and not newly_left_room_map: + return {}, set(), set() + + # Since we fetched the users room list at some point in time after the + # tokens, we need to revert/rewind some membership changes to match the point in + # time of the `to_token`. + rooms_for_user: Dict[str, RoomsForUserType] = { + room.room_id: room for room in room_for_user_list + } + changes = await self._get_rewind_changes_to_current_membership_to_token( + user, rooms_for_user, to_token + ) + for room_id, change_room_for_user in changes.items(): + if change_room_for_user is None: + rooms_for_user.pop(room_id, None) + else: + rooms_for_user[room_id] = change_room_for_user + + # Ensure we have entries for rooms that the user has been "state reset" + # out of. These are rooms appear in the `newly_left_rooms` map but + # aren't in the `rooms_for_user` map. + for room_id, newly_left_room_for_user in newly_left_room_map.items(): + # If we already know about the room, it's not a state reset + if room_id in rooms_for_user: + continue + + # This should be true if it's a state reset + assert newly_left_room_for_user.membership is Membership.LEAVE + assert newly_left_room_for_user.event_id is None + assert newly_left_room_for_user.sender is None + + rooms_for_user[room_id] = newly_left_room_for_user + + return rooms_for_user, newly_joined_room_ids, set(newly_left_room_map) + + @trace + async def _get_newly_joined_and_left_rooms( + self, + user_id: str, + to_token: StreamToken, + from_token: Optional[StreamToken], + ) -> Tuple[AbstractSet[str], Mapping[str, RoomsForUserStateReset]]: + """Fetch the sets of rooms that the user newly joined or left in the + given token range. + + Note: there may be rooms in the newly left rooms where the user was + "state reset" out of the room, and so that room would not be part of the + "current memberships" of the user. + + Returns: + A 2-tuple of newly joined room IDs and a map of newly_left room + IDs to the `RoomsForUserStateReset` entry. + + We're using `RoomsForUserStateReset` but that doesn't necessarily mean the + user was state reset of the rooms. It's just that the `event_id`/`sender` + are optional and we can't tell the difference between the server leaving the + room when the user was the last person participating in the room and left or + was state reset out of the room. To actually check for a state reset, you + need to check if a membership still exists in the room. + """ + + newly_joined_room_ids: Set[str] = set() + newly_left_room_map: Dict[str, RoomsForUserStateReset] = {} + + if not from_token: + return newly_joined_room_ids, newly_left_room_map + + changes = await self.store.get_sliding_sync_membership_changes( + user_id, + from_key=from_token.room_key, + to_key=to_token.room_key, + excluded_room_ids=set(self.rooms_to_exclude_globally), + ) + + for room_id, entry in changes.items(): + if entry.membership == Membership.JOIN: + newly_joined_room_ids.add(room_id) + elif entry.membership == Membership.LEAVE: + newly_left_room_map[room_id] = entry + + return newly_joined_room_ids, newly_left_room_map + + @trace + async def _get_newly_joined_and_left_rooms_fallback( + self, + user_id: str, + to_token: StreamToken, + from_token: Optional[StreamToken], + ) -> Tuple[AbstractSet[str], Mapping[str, RoomsForUserStateReset]]: + """Fetch the sets of rooms that the user newly joined or left in the + given token range. + + Note: there may be rooms in the newly left rooms where the user was + "state reset" out of the room, and so that room would not be part of the + "current memberships" of the user. + + Returns: + A 2-tuple of newly joined room IDs and a map of newly_left room + IDs to the `RoomsForUserStateReset` entry. + + We're using `RoomsForUserStateReset` but that doesn't necessarily mean the + user was state reset of the rooms. It's just that the `event_id`/`sender` + are optional and we can't tell the difference between the server leaving the + room when the user was the last person participating in the room and left or + was state reset out of the room. To actually check for a state reset, you + need to check if a membership still exists in the room. + """ + newly_joined_room_ids: Set[str] = set() + newly_left_room_map: Dict[str, RoomsForUserStateReset] = {} + + # We need to figure out the + # + # - 1) Figure out which rooms are `newly_left` rooms (> `from_token` and <= `to_token`) + # - 2) Figure out which rooms are `newly_joined` (> `from_token` and <= `to_token`) + + # 1) Fetch membership changes that fall in the range from `from_token` up to `to_token` + current_state_delta_membership_changes_in_from_to_range = [] + if from_token: + current_state_delta_membership_changes_in_from_to_range = ( + await self.store.get_current_state_delta_membership_changes_for_user( + user_id, + from_key=from_token.room_key, + to_key=to_token.room_key, + excluded_room_ids=self.rooms_to_exclude_globally, + ) + ) + + # 1) Assemble a list of the last membership events in some given ranges. Someone + # could have left and joined multiple times during the given range but we only + # care about end-result so we grab the last one. + last_membership_change_by_room_id_in_from_to_range: Dict[ + str, CurrentStateDeltaMembership + ] = {} + # We also want to assemble a list of the first membership events during the token + # range so we can step backward to the previous membership that would apply to + # before the token range to see if we have `newly_joined` the room. + first_membership_change_by_room_id_in_from_to_range: Dict[ + str, CurrentStateDeltaMembership + ] = {} + # Keep track if the room has a non-join event in the token range so we can later + # tell if it was a `newly_joined` room. If the last membership event in the + # token range is a join and there is also some non-join in the range, we know + # they `newly_joined`. + has_non_join_event_by_room_id_in_from_to_range: Dict[str, bool] = {} + for ( + membership_change + ) in current_state_delta_membership_changes_in_from_to_range: + room_id = membership_change.room_id + + last_membership_change_by_room_id_in_from_to_range[room_id] = ( + membership_change + ) + # Only set if we haven't already set it + first_membership_change_by_room_id_in_from_to_range.setdefault( + room_id, membership_change + ) + + if membership_change.membership != Membership.JOIN: + has_non_join_event_by_room_id_in_from_to_range[room_id] = True + + # 1) Fixup + # + # 2) We also want to assemble a list of possibly newly joined rooms. Someone + # could have left and joined multiple times during the given range but we only + # care about whether they are joined at the end of the token range so we are + # working with the last membership even in the token range. + possibly_newly_joined_room_ids = set() + for ( + last_membership_change_in_from_to_range + ) in last_membership_change_by_room_id_in_from_to_range.values(): + room_id = last_membership_change_in_from_to_range.room_id + + # 2) + if last_membership_change_in_from_to_range.membership == Membership.JOIN: + possibly_newly_joined_room_ids.add(room_id) + + # 1) Figure out newly_left rooms (> `from_token` and <= `to_token`). + if last_membership_change_in_from_to_range.membership == Membership.LEAVE: + # 1) Mark this room as `newly_left` + newly_left_room_map[room_id] = RoomsForUserStateReset( + room_id=room_id, + sender=last_membership_change_in_from_to_range.sender, + membership=Membership.LEAVE, + event_id=last_membership_change_in_from_to_range.event_id, + event_pos=last_membership_change_in_from_to_range.event_pos, + room_version_id=await self.store.get_room_version_id(room_id), + ) + + # 2) Figure out `newly_joined` + for room_id in possibly_newly_joined_room_ids: + has_non_join_in_from_to_range = ( + has_non_join_event_by_room_id_in_from_to_range.get(room_id, False) + ) + # If the last membership event in the token range is a join and there is + # also some non-join in the range, we know they `newly_joined`. + if has_non_join_in_from_to_range: + # We found a `newly_joined` room (we left and joined within the token range) + newly_joined_room_ids.add(room_id) + else: + prev_event_id = first_membership_change_by_room_id_in_from_to_range[ + room_id + ].prev_event_id + prev_membership = first_membership_change_by_room_id_in_from_to_range[ + room_id + ].prev_membership + + if prev_event_id is None: + # We found a `newly_joined` room (we are joining the room for the + # first time within the token range) + newly_joined_room_ids.add(room_id) + # Last resort, we need to step back to the previous membership event + # just before the token range to see if we're joined then or not. + elif prev_membership != Membership.JOIN: + # We found a `newly_joined` room (we left before the token range + # and joined within the token range) + newly_joined_room_ids.add(room_id) + + return newly_joined_room_ids, newly_left_room_map + + @trace + async def _get_dm_rooms_for_user( + self, + user_id: str, + ) -> AbstractSet[str]: + """Get the set of DM rooms for the user.""" + + # We're using global account data (`m.direct`) instead of checking for + # `is_direct` on membership events because that property only appears for + # the invitee membership event (doesn't show up for the inviter). + # + # We're unable to take `to_token` into account for global account data since + # we only keep track of the latest account data for the user. + dm_map = await self.store.get_global_account_data_by_type_for_user( + user_id, AccountDataTypes.DIRECT + ) + + # Flatten out the map. Account data is set by the client so it needs to be + # scrutinized. + dm_room_id_set = set() + if isinstance(dm_map, dict): + for room_ids in dm_map.values(): + # Account data should be a list of room IDs. Ignore anything else + if isinstance(room_ids, list): + for room_id in room_ids: + if isinstance(room_id, str): + dm_room_id_set.add(room_id) + + return dm_room_id_set + + @trace + async def filter_rooms_relevant_for_sync( + self, + user: UserID, + room_membership_for_user_map: Dict[str, RoomsForUserType], + newly_left_room_ids: AbstractSet[str], + ) -> Dict[str, RoomsForUserType]: + """ + Filter room IDs that should/can be listed for this user in the sync response (the + full room list that will be further filtered, sorted, and sliced). + + We're looking for rooms where the user has the following state in the token + range (> `from_token` and <= `to_token`): + + - `invite`, `join`, `knock`, `ban` membership events + - Kicks (`leave` membership events where `sender` is different from the + `user_id`/`state_key`) + - `newly_left` (rooms that were left during the given token range) + - In order for bans/kicks to not show up in sync, you need to `/forget` those + rooms. This doesn't modify the event itself though and only adds the + `forgotten` flag to the `room_memberships` table in Synapse. There isn't a way + to tell when a room was forgotten at the moment so we can't factor it into the + from/to range. + + Args: + user: User that is syncing + room_membership_for_user_map: Room membership for the user + newly_left_room_ids: The set of room IDs we have newly left + + Returns: + A dictionary of room IDs that should be listed in the sync response along + with membership information in that room at the time of `to_token`. + """ + user_id = user.to_string() + + # Filter rooms to only what we're interested to sync with + filtered_sync_room_map = { + room_id: room_membership_for_user + for room_id, room_membership_for_user in room_membership_for_user_map.items() + if filter_membership_for_sync( + user_id=user_id, + room_membership_for_user=room_membership_for_user, + newly_left=room_id in newly_left_room_ids, + ) + } + + return filtered_sync_room_map + + async def check_room_subscription_allowed_for_user( + self, + room_id: str, + room_membership_for_user_map: Dict[str, RoomsForUserType], + to_token: StreamToken, + ) -> Optional[RoomsForUserType]: + """ + Check whether the user is allowed to see the room based on whether they have + ever had membership in the room or if the room is `world_readable`. + + Similar to `check_user_in_room_or_world_readable(...)` + + Args: + room_id: Room to check + room_membership_for_user_map: Room membership for the user at the time of + the `to_token` (<= `to_token`). + to_token: The token to fetch rooms up to. + + Returns: + The room membership for the user if they are allowed to subscribe to the + room else `None`. + """ + + # We can first check if they are already allowed to see the room based + # on our previous work to assemble the `room_membership_for_user_map`. + # + # If they have had any membership in the room over time (up to the `to_token`), + # let them subscribe and see what they can. + existing_membership_for_user = room_membership_for_user_map.get(room_id) + if existing_membership_for_user is not None: + return existing_membership_for_user + + # TODO: Handle `world_readable` rooms + return None + + # If the room is `world_readable`, it doesn't matter whether they can join, + # everyone can see the room. + # not_in_room_membership_for_user = _RoomMembershipForUser( + # room_id=room_id, + # event_id=None, + # event_pos=None, + # membership=None, + # sender=None, + # newly_joined=False, + # newly_left=False, + # is_dm=False, + # ) + # room_state = await self.get_current_state_at( + # room_id=room_id, + # room_membership_for_user_at_to_token=not_in_room_membership_for_user, + # state_filter=StateFilter.from_types( + # [(EventTypes.RoomHistoryVisibility, "")] + # ), + # to_token=to_token, + # ) + + # visibility_event = room_state.get((EventTypes.RoomHistoryVisibility, "")) + # if ( + # visibility_event is not None + # and visibility_event.content.get("history_visibility") + # == HistoryVisibility.WORLD_READABLE + # ): + # return not_in_room_membership_for_user + + # return None + + @trace + async def _bulk_get_stripped_state_for_rooms_from_sync_room_map( + self, + room_ids: StrCollection, + sync_room_map: Dict[str, RoomsForUserType], + ) -> Dict[str, Optional[StateMap[StrippedStateEvent]]]: + """ + Fetch stripped state for a list of room IDs. Stripped state is only + applicable to invite/knock rooms. Other rooms will have `None` as their + stripped state. + + For invite rooms, we pull from `unsigned.invite_room_state`. + For knock rooms, we pull from `unsigned.knock_room_state`. + + Args: + room_ids: Room IDs to fetch stripped state for + sync_room_map: Dictionary of room IDs to sort along with membership + information in the room at the time of `to_token`. + + Returns: + Mapping from room_id to mapping of (type, state_key) to stripped state + event. + """ + room_id_to_stripped_state_map: Dict[ + str, Optional[StateMap[StrippedStateEvent]] + ] = {} + + # Fetch what we haven't before + room_ids_to_fetch = [ + room_id + for room_id in room_ids + if room_id not in room_id_to_stripped_state_map + ] + + # Gather a list of event IDs we can grab stripped state from + invite_or_knock_event_ids: List[str] = [] + for room_id in room_ids_to_fetch: + if sync_room_map[room_id].membership in ( + Membership.INVITE, + Membership.KNOCK, + ): + event_id = sync_room_map[room_id].event_id + # If this is an invite/knock then there should be an event_id + assert event_id is not None + invite_or_knock_event_ids.append(event_id) + else: + room_id_to_stripped_state_map[room_id] = None + + invite_or_knock_events = await self.store.get_events(invite_or_knock_event_ids) + for invite_or_knock_event in invite_or_knock_events.values(): + room_id = invite_or_knock_event.room_id + membership = invite_or_knock_event.membership + + raw_stripped_state_events = None + if membership == Membership.INVITE: + invite_room_state = invite_or_knock_event.unsigned.get( + "invite_room_state" + ) + raw_stripped_state_events = invite_room_state + elif membership == Membership.KNOCK: + knock_room_state = invite_or_knock_event.unsigned.get( + "knock_room_state" + ) + raw_stripped_state_events = knock_room_state + else: + raise AssertionError( + f"Unexpected membership {membership} (this is a problem with Synapse itself)" + ) + + stripped_state_map: Optional[MutableStateMap[StrippedStateEvent]] = None + # Scrutinize unsigned things. `raw_stripped_state_events` should be a list + # of stripped events + if raw_stripped_state_events is not None: + stripped_state_map = {} + if isinstance(raw_stripped_state_events, list): + for raw_stripped_event in raw_stripped_state_events: + stripped_state_event = parse_stripped_state_event( + raw_stripped_event + ) + if stripped_state_event is not None: + stripped_state_map[ + ( + stripped_state_event.type, + stripped_state_event.state_key, + ) + ] = stripped_state_event + + room_id_to_stripped_state_map[room_id] = stripped_state_map + + return room_id_to_stripped_state_map + + @trace + async def _bulk_get_partial_current_state_content_for_rooms( + self, + content_type: Literal[ + # `content.type` from `EventTypes.Create`` + "room_type", + # `content.algorithm` from `EventTypes.RoomEncryption` + "room_encryption", + ], + room_ids: Set[str], + sync_room_map: Dict[str, RoomsForUserType], + to_token: StreamToken, + room_id_to_stripped_state_map: Dict[ + str, Optional[StateMap[StrippedStateEvent]] + ], + ) -> Mapping[str, Union[Optional[str], StateSentinel]]: + """ + Get the given state event content for a list of rooms. First we check the + current state of the room, then fallback to stripped state if available, then + historical state. + + Args: + content_type: Which content to grab + room_ids: Room IDs to fetch the given content field for. + sync_room_map: Dictionary of room IDs to sort along with membership + information in the room at the time of `to_token`. + to_token: We filter based on the state of the room at this token + room_id_to_stripped_state_map: This does not need to be filled in before + calling this function. Mapping from room_id to mapping of (type, state_key) + to stripped state event. Modified in place when we fetch new rooms so we can + save work next time this function is called. + + Returns: + A mapping from room ID to the state event content if the room has + the given state event (event_type, ""), otherwise `None`. Rooms unknown to + this server will return `ROOM_UNKNOWN_SENTINEL`. + """ + room_id_to_content: Dict[str, Union[Optional[str], StateSentinel]] = {} + + # As a bulk shortcut, use the current state if the server is particpating in the + # room (meaning we have current state). Ideally, for leave/ban rooms, we would + # want the state at the time of the membership instead of current state to not + # leak anything but we consider the create/encryption stripped state events to + # not be a secret given they are often set at the start of the room and they are + # normally handed out on invite/knock. + # + # Be mindful to only use this for non-sensitive details. For example, even + # though the room name/avatar/topic are also stripped state, they seem a lot + # more senstive to leak the current state value of. + # + # Since this function is cached, we need to make a mutable copy via + # `dict(...)`. + event_type = "" + event_content_field = "" + if content_type == "room_type": + event_type = EventTypes.Create + event_content_field = EventContentFields.ROOM_TYPE + room_id_to_content = dict(await self.store.bulk_get_room_type(room_ids)) + elif content_type == "room_encryption": + event_type = EventTypes.RoomEncryption + event_content_field = EventContentFields.ENCRYPTION_ALGORITHM + room_id_to_content = dict( + await self.store.bulk_get_room_encryption(room_ids) + ) + else: + assert_never(content_type) + + room_ids_with_results = [ + room_id + for room_id, content_field in room_id_to_content.items() + if content_field is not ROOM_UNKNOWN_SENTINEL + ] + + # We might not have current room state for remote invite/knocks if we are + # the first person on our server to see the room. The best we can do is look + # in the optional stripped state from the invite/knock event. + room_ids_without_results = room_ids.difference( + chain( + room_ids_with_results, + [ + room_id + for room_id, stripped_state_map in room_id_to_stripped_state_map.items() + if stripped_state_map is not None + ], + ) + ) + room_id_to_stripped_state_map.update( + await self._bulk_get_stripped_state_for_rooms_from_sync_room_map( + room_ids_without_results, sync_room_map + ) + ) + + # Update our `room_id_to_content` map based on the stripped state + # (applies to invite/knock rooms) + rooms_ids_without_stripped_state: Set[str] = set() + for room_id in room_ids_without_results: + stripped_state_map = room_id_to_stripped_state_map.get( + room_id, Sentinel.UNSET_SENTINEL + ) + assert stripped_state_map is not Sentinel.UNSET_SENTINEL, ( + f"Stripped state left unset for room {room_id}. " + + "Make sure you're calling `_bulk_get_stripped_state_for_rooms_from_sync_room_map(...)` " + + "with that room_id. (this is a problem with Synapse itself)" + ) + + # If there is some stripped state, we assume the remote server passed *all* + # of the potential stripped state events for the room. + if stripped_state_map is not None: + create_stripped_event = stripped_state_map.get((EventTypes.Create, "")) + stripped_event = stripped_state_map.get((event_type, "")) + # Sanity check that we at-least have the create event + if create_stripped_event is not None: + if stripped_event is not None: + room_id_to_content[room_id] = stripped_event.content.get( + event_content_field + ) + else: + # Didn't see the state event we're looking for in the stripped + # state so we can assume relevant content field is `None`. + room_id_to_content[room_id] = None + else: + rooms_ids_without_stripped_state.add(room_id) + + # Last resort, we might not have current room state for rooms that the + # server has left (no one local is in the room) but we can look at the + # historical state. + # + # Update our `room_id_to_content` map based on the state at the time of + # the membership event. + for room_id in rooms_ids_without_stripped_state: + # TODO: It would be nice to look this up in a bulk way (N+1 queries) + # + # TODO: `get_state_at(...)` doesn't take into account the "current state". + room_state = await self.storage_controllers.state.get_state_at( + room_id=room_id, + stream_position=to_token.copy_and_replace( + StreamKeyType.ROOM, + sync_room_map[room_id].event_pos.to_room_stream_token(), + ), + state_filter=StateFilter.from_types( + [ + (EventTypes.Create, ""), + (event_type, ""), + ] + ), + # Partially-stated rooms should have all state events except for + # remote membership events so we don't need to wait at all because + # we only want the create event and some non-member event. + await_full_state=False, + ) + # We can use the create event as a canary to tell whether the server has + # seen the room before + create_event = room_state.get((EventTypes.Create, "")) + state_event = room_state.get((event_type, "")) + + if create_event is None: + # Skip for unknown rooms + continue + + if state_event is not None: + room_id_to_content[room_id] = state_event.content.get( + event_content_field + ) + else: + # Didn't see the state event we're looking for in the stripped + # state so we can assume relevant content field is `None`. + room_id_to_content[room_id] = None + + return room_id_to_content + + @trace + async def filter_rooms( + self, + user: UserID, + sync_room_map: Dict[str, RoomsForUserType], + previous_connection_state: PerConnectionState, + filters: SlidingSyncConfig.SlidingSyncList.Filters, + to_token: StreamToken, + dm_room_ids: AbstractSet[str], + ) -> Dict[str, RoomsForUserType]: + """ + Filter rooms based on the sync request. + + Args: + user: User to filter rooms for + sync_room_map: Dictionary of room IDs to sort along with membership + information in the room at the time of `to_token`. + filters: Filters to apply + to_token: We filter based on the state of the room at this token + dm_room_ids: Set of room IDs that are DMs for the user + + Returns: + A filtered dictionary of room IDs along with membership information in the + room at the time of `to_token`. + """ + user_id = user.to_string() + + room_id_to_stripped_state_map: Dict[ + str, Optional[StateMap[StrippedStateEvent]] + ] = {} + + filtered_room_id_set = set(sync_room_map.keys()) + + # Filter for Direct-Message (DM) rooms + if filters.is_dm is not None: + with start_active_span("filters.is_dm"): + if filters.is_dm: + # Only DM rooms please + filtered_room_id_set = { + room_id + for room_id in filtered_room_id_set + if room_id in dm_room_ids + } + else: + # Only non-DM rooms please + filtered_room_id_set = { + room_id + for room_id in filtered_room_id_set + if room_id not in dm_room_ids + } + + if filters.spaces is not None: + with start_active_span("filters.spaces"): + raise NotImplementedError() + + # Filter for encrypted rooms + if filters.is_encrypted is not None: + with start_active_span("filters.is_encrypted"): + room_id_to_encryption = ( + await self._bulk_get_partial_current_state_content_for_rooms( + content_type="room_encryption", + room_ids=filtered_room_id_set, + to_token=to_token, + sync_room_map=sync_room_map, + room_id_to_stripped_state_map=room_id_to_stripped_state_map, + ) + ) + + # Make a copy so we don't run into an error: `Set changed size during + # iteration`, when we filter out and remove items + for room_id in filtered_room_id_set.copy(): + encryption = room_id_to_encryption.get( + room_id, ROOM_UNKNOWN_SENTINEL + ) + + # Just remove rooms if we can't determine their encryption status + if encryption is ROOM_UNKNOWN_SENTINEL: + filtered_room_id_set.remove(room_id) + continue + + # If we're looking for encrypted rooms, filter out rooms that are not + # encrypted and vice versa + is_encrypted = encryption is not None + if (filters.is_encrypted and not is_encrypted) or ( + not filters.is_encrypted and is_encrypted + ): + filtered_room_id_set.remove(room_id) + + # Filter for rooms that the user has been invited to + if filters.is_invite is not None: + with start_active_span("filters.is_invite"): + # Make a copy so we don't run into an error: `Set changed size during + # iteration`, when we filter out and remove items + for room_id in filtered_room_id_set.copy(): + room_for_user = sync_room_map[room_id] + # If we're looking for invite rooms, filter out rooms that the user is + # not invited to and vice versa + if ( + filters.is_invite + and room_for_user.membership != Membership.INVITE + ) or ( + not filters.is_invite + and room_for_user.membership == Membership.INVITE + ): + filtered_room_id_set.remove(room_id) + + # Filter by room type (space vs room, etc). A room must match one of the types + # provided in the list. `None` is a valid type for rooms which do not have a + # room type. + if filters.room_types is not None or filters.not_room_types is not None: + with start_active_span("filters.room_types"): + room_id_to_type = ( + await self._bulk_get_partial_current_state_content_for_rooms( + content_type="room_type", + room_ids=filtered_room_id_set, + to_token=to_token, + sync_room_map=sync_room_map, + room_id_to_stripped_state_map=room_id_to_stripped_state_map, + ) + ) + + # Make a copy so we don't run into an error: `Set changed size during + # iteration`, when we filter out and remove items + for room_id in filtered_room_id_set.copy(): + room_type = room_id_to_type.get(room_id, ROOM_UNKNOWN_SENTINEL) + + # Just remove rooms if we can't determine their type + if room_type is ROOM_UNKNOWN_SENTINEL: + filtered_room_id_set.remove(room_id) + continue + + if ( + filters.room_types is not None + and room_type not in filters.room_types + ): + filtered_room_id_set.remove(room_id) + continue + + if ( + filters.not_room_types is not None + and room_type in filters.not_room_types + ): + filtered_room_id_set.remove(room_id) + continue + + if filters.room_name_like is not None: + with start_active_span("filters.room_name_like"): + # TODO: The room name is a bit more sensitive to leak than the + # create/encryption event. Maybe we should consider a better way to fetch + # historical state before implementing this. + # + # room_id_to_create_content = await self._bulk_get_partial_current_state_content_for_rooms( + # content_type="room_name", + # room_ids=filtered_room_id_set, + # to_token=to_token, + # sync_room_map=sync_room_map, + # room_id_to_stripped_state_map=room_id_to_stripped_state_map, + # ) + raise NotImplementedError() + + # Filter by room tags according to the users account data + if filters.tags is not None or filters.not_tags is not None: + with start_active_span("filters.tags"): + # Fetch the user tags for their rooms + room_tags = await self.store.get_tags_for_user(user_id) + room_id_to_tag_name_set: Dict[str, Set[str]] = { + room_id: set(tags.keys()) for room_id, tags in room_tags.items() + } + + if filters.tags is not None: + tags_set = set(filters.tags) + filtered_room_id_set = { + room_id + for room_id in filtered_room_id_set + # Remove rooms that don't have one of the tags in the filter + if room_id_to_tag_name_set.get(room_id, set()).intersection( + tags_set + ) + } + + if filters.not_tags is not None: + not_tags_set = set(filters.not_tags) + filtered_room_id_set = { + room_id + for room_id in filtered_room_id_set + # Remove rooms if they have any of the tags in the filter + if not room_id_to_tag_name_set.get(room_id, set()).intersection( + not_tags_set + ) + } + + # Keep rooms if the user has been state reset out of it but we previously sent + # down the connection before. We want to make sure that we send these down to + # the client regardless of filters so they find out about the state reset. + # + # We don't always have access to the state in a room after being state reset if + # no one else locally on the server is participating in the room so we patch + # these back in manually. + state_reset_out_of_room_id_set = { + room_id + for room_id in sync_room_map.keys() + if sync_room_map[room_id].event_id is None + and previous_connection_state.rooms.have_sent_room(room_id).status + != HaveSentRoomFlag.NEVER + } + + # Assemble a new sync room map but only with the `filtered_room_id_set` + return { + room_id: sync_room_map[room_id] + for room_id in filtered_room_id_set | state_reset_out_of_room_id_set + } + + @trace + async def filter_rooms_using_tables( + self, + user_id: str, + sync_room_map: Mapping[str, RoomsForUserSlidingSync], + previous_connection_state: PerConnectionState, + filters: SlidingSyncConfig.SlidingSyncList.Filters, + to_token: StreamToken, + dm_room_ids: AbstractSet[str], + ) -> Dict[str, RoomsForUserSlidingSync]: + """ + Filter rooms based on the sync request. + + Args: + user: User to filter rooms for + sync_room_map: Dictionary of room IDs to sort along with membership + information in the room at the time of `to_token`. + filters: Filters to apply + to_token: We filter based on the state of the room at this token + dm_room_ids: Set of room IDs which are DMs + room_tags: Mapping of room ID to tags + + Returns: + A filtered dictionary of room IDs along with membership information in the + room at the time of `to_token`. + """ + + filtered_room_id_set = set(sync_room_map.keys()) + + # Filter for Direct-Message (DM) rooms + if filters.is_dm is not None: + with start_active_span("filters.is_dm"): + if filters.is_dm: + # Intersect with the DM room set + filtered_room_id_set &= dm_room_ids + else: + # Remove DMs + filtered_room_id_set -= dm_room_ids + + if filters.spaces is not None: + with start_active_span("filters.spaces"): + raise NotImplementedError() + + # Filter for encrypted rooms + if filters.is_encrypted is not None: + filtered_room_id_set = { + room_id + for room_id in filtered_room_id_set + # Remove rooms if we can't figure out what the encryption status is + if sync_room_map[room_id].has_known_state + # Or remove if it doesn't match the filter + and sync_room_map[room_id].is_encrypted == filters.is_encrypted + } + + # Filter for rooms that the user has been invited to + if filters.is_invite is not None: + with start_active_span("filters.is_invite"): + # Make a copy so we don't run into an error: `Set changed size during + # iteration`, when we filter out and remove items + for room_id in filtered_room_id_set.copy(): + room_for_user = sync_room_map[room_id] + # If we're looking for invite rooms, filter out rooms that the user is + # not invited to and vice versa + if ( + filters.is_invite + and room_for_user.membership != Membership.INVITE + ) or ( + not filters.is_invite + and room_for_user.membership == Membership.INVITE + ): + filtered_room_id_set.remove(room_id) + + # Filter by room type (space vs room, etc). A room must match one of the types + # provided in the list. `None` is a valid type for rooms which do not have a + # room type. + if filters.room_types is not None or filters.not_room_types is not None: + with start_active_span("filters.room_types"): + # Make a copy so we don't run into an error: `Set changed size during + # iteration`, when we filter out and remove items + for room_id in filtered_room_id_set.copy(): + # Remove rooms if we can't figure out what room type it is + if not sync_room_map[room_id].has_known_state: + filtered_room_id_set.remove(room_id) + continue + + room_type = sync_room_map[room_id].room_type + + if ( + filters.room_types is not None + and room_type not in filters.room_types + ): + filtered_room_id_set.remove(room_id) + continue + + if ( + filters.not_room_types is not None + and room_type in filters.not_room_types + ): + filtered_room_id_set.remove(room_id) + continue + + if filters.room_name_like is not None: + with start_active_span("filters.room_name_like"): + # TODO: The room name is a bit more sensitive to leak than the + # create/encryption event. Maybe we should consider a better way to fetch + # historical state before implementing this. + # + # room_id_to_create_content = await self._bulk_get_partial_current_state_content_for_rooms( + # content_type="room_name", + # room_ids=filtered_room_id_set, + # to_token=to_token, + # sync_room_map=sync_room_map, + # room_id_to_stripped_state_map=room_id_to_stripped_state_map, + # ) + raise NotImplementedError() + + # Filter by room tags according to the users account data + if filters.tags is not None or filters.not_tags is not None: + with start_active_span("filters.tags"): + # Fetch the user tags for their rooms + room_tags = await self.store.get_tags_for_user(user_id) + room_id_to_tag_name_set: Dict[str, Set[str]] = { + room_id: set(tags.keys()) for room_id, tags in room_tags.items() + } + + if filters.tags is not None: + tags_set = set(filters.tags) + filtered_room_id_set = { + room_id + for room_id in filtered_room_id_set + # Remove rooms that don't have one of the tags in the filter + if room_id_to_tag_name_set.get(room_id, set()).intersection( + tags_set + ) + } + + if filters.not_tags is not None: + not_tags_set = set(filters.not_tags) + filtered_room_id_set = { + room_id + for room_id in filtered_room_id_set + # Remove rooms if they have any of the tags in the filter + if not room_id_to_tag_name_set.get(room_id, set()).intersection( + not_tags_set + ) + } + + # Keep rooms if the user has been state reset out of it but we previously sent + # down the connection before. We want to make sure that we send these down to + # the client regardless of filters so they find out about the state reset. + # + # We don't always have access to the state in a room after being state reset if + # no one else locally on the server is participating in the room so we patch + # these back in manually. + state_reset_out_of_room_id_set = { + room_id + for room_id in sync_room_map.keys() + if sync_room_map[room_id].event_id is None + and previous_connection_state.rooms.have_sent_room(room_id).status + != HaveSentRoomFlag.NEVER + } + + # Assemble a new sync room map but only with the `filtered_room_id_set` + return { + room_id: sync_room_map[room_id] + for room_id in filtered_room_id_set | state_reset_out_of_room_id_set + } + + @trace + async def sort_rooms( + self, + sync_room_map: Dict[str, RoomsForUserType], + to_token: StreamToken, + limit: Optional[int] = None, + ) -> List[RoomsForUserType]: + """ + Sort by `stream_ordering` of the last event that the user should see in the + room. `stream_ordering` is unique so we get a stable sort. + + If `limit` is specified then sort may return fewer entries, but will + always return at least the top N rooms. This is useful as we don't always + need to sort the full list, but are just interested in the top N. + + Args: + sync_room_map: Dictionary of room IDs to sort along with membership + information in the room at the time of `to_token`. + to_token: We sort based on the events in the room at this token (<= `to_token`) + limit: The number of rooms that we need to return from the top of the list. + + Returns: + A sorted list of room IDs by `stream_ordering` along with membership information. + """ + + # Assemble a map of room ID to the `stream_ordering` of the last activity that the + # user should see in the room (<= `to_token`) + last_activity_in_room_map: Dict[str, int] = {} + + # Same as above, except for positions that we know are in the event + # stream cache. + cached_positions: Dict[str, int] = {} + + earliest_cache_position = ( + self.store._events_stream_cache.get_earliest_known_position() + ) + + for room_id, room_for_user in sync_room_map.items(): + if room_for_user.membership == Membership.JOIN: + # For joined rooms check the stream change cache. + cached_position = ( + self.store._events_stream_cache.get_max_pos_of_last_change(room_id) + ) + if cached_position is not None: + cached_positions[room_id] = cached_position + else: + # If the user has left/been invited/knocked/been banned from a + # room, they shouldn't see anything past that point. + # + # FIXME: It's possible that people should see beyond this point + # in invited/knocked cases if for example the room has + # `invite`/`world_readable` history visibility, see + # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932 + last_activity_in_room_map[room_id] = room_for_user.event_pos.stream + + # If the stream position is in range of the stream change cache + # we can include it. + if room_for_user.event_pos.stream > earliest_cache_position: + cached_positions[room_id] = room_for_user.event_pos.stream + + # If we are only asked for the top N rooms, and we have enough from + # looking in the stream change cache, then we can return early. This + # is because the cache must include all entries above + # `.get_earliest_known_position()`. + if limit is not None and len(cached_positions) >= limit: + # ... but first we need to handle the case where the cached max + # position is greater than the to_token, in which case we do + # actually query the DB. This should happen rarely, so can do it in + # a loop. + for room_id, position in list(cached_positions.items()): + if position > to_token.room_key.stream: + result = await self.store.get_last_event_pos_in_room_before_stream_ordering( + room_id, to_token.room_key + ) + if ( + result is not None + and result[1].stream > earliest_cache_position + ): + # We have a stream position in the cached range. + cached_positions[room_id] = result[1].stream + else: + # No position in the range, so we remove the entry. + cached_positions.pop(room_id) + + if limit is not None and len(cached_positions) >= limit: + return sorted( + ( + room + for room in sync_room_map.values() + if room.room_id in cached_positions + ), + # Sort by the last activity (stream_ordering) in the room + key=lambda room_info: cached_positions[room_info.room_id], + # We want descending order + reverse=True, + ) + + # For fully-joined rooms, we find the latest activity at/before the + # `to_token`. + joined_room_positions = ( + await self.store.bulk_get_last_event_pos_in_room_before_stream_ordering( + [ + room_id + for room_id, room_for_user in sync_room_map.items() + if room_for_user.membership == Membership.JOIN + ], + to_token.room_key, + ) + ) + + last_activity_in_room_map.update(joined_room_positions) + + return sorted( + sync_room_map.values(), + # Sort by the last activity (stream_ordering) in the room + key=lambda room_info: last_activity_in_room_map[room_info.room_id], + # We want descending order + reverse=True, + ) + + async def get_is_encrypted_for_room_at_token( + self, room_id: str, to_token: RoomStreamToken + ) -> bool: + """Get if the room is encrypted at the time.""" + + # Fetch the current encryption state + state_ids = await self.store.get_partial_filtered_current_state_ids( + room_id, StateFilter.from_types([(EventTypes.RoomEncryption, "")]) + ) + encryption_event_id = state_ids.get((EventTypes.RoomEncryption, "")) + + # Now roll back the state by looking at the state deltas between + # to_token and now. + deltas = await self.store.get_current_state_deltas_for_room( + room_id, + from_token=to_token, + to_token=self.store.get_room_max_token(), + ) + + for delta in deltas: + if delta.event_type != EventTypes.RoomEncryption: + continue + + # Found the first change, we look at the previous event ID to get + # the state at the to token. + + if delta.prev_event_id is None: + # There is no prev event, so no encryption state event, so room is not encrypted + return False + + encryption_event_id = delta.prev_event_id + break + + # We didn't find an encryption state, room isn't encrypted + if encryption_event_id is None: + return False + + # We found encryption state, check if content has a non-null algorithm + encrypted_event = await self.store.get_event(encryption_event_id) + algorithm = encrypted_event.content.get(EventContentFields.ENCRYPTION_ALGORITHM) + + return algorithm is not None diff --git a/synapse/handlers/sliding_sync/store.py b/synapse/handlers/sliding_sync/store.py new file mode 100644
index 0000000000..d24fccf76f --- /dev/null +++ b/synapse/handlers/sliding_sync/store.py
@@ -0,0 +1,128 @@ +# +# This file is licensed under the Affero General Public License (AGPL) version 3. +# +# Copyright (C) 2023 New Vector, Ltd +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# See the GNU Affero General Public License for more details: +# <https://www.gnu.org/licenses/agpl-3.0.html>. +# + +import logging +from typing import TYPE_CHECKING, Optional + +import attr + +from synapse.logging.opentracing import trace +from synapse.storage.databases.main import DataStore +from synapse.types import SlidingSyncStreamToken +from synapse.types.handlers.sliding_sync import ( + MutablePerConnectionState, + PerConnectionState, + SlidingSyncConfig, +) + +if TYPE_CHECKING: + pass + +logger = logging.getLogger(__name__) + + +@attr.s(auto_attribs=True) +class SlidingSyncConnectionStore: + """In-memory store of per-connection state, including what rooms we have + previously sent down a sliding sync connection. + + Note: This is NOT safe to run in a worker setup because connection positions will + point to different sets of rooms on different workers. e.g. for the same connection, + a connection position of 5 might have totally different states on worker A and + worker B. + + One complication that we need to deal with here is needing to handle requests being + resent, i.e. if we sent down a room in a response that the client received, we must + consider the room *not* sent when we get the request again. + + This is handled by using an integer "token", which is returned to the client + as part of the sync token. For each connection we store a mapping from + tokens to the room states, and create a new entry when we send down new + rooms. + + Note that for any given sliding sync connection we will only store a maximum + of two different tokens: the previous token from the request and a new token + sent in the response. When we receive a request with a given token, we then + clear out all other entries with a different token. + + Attributes: + _connections: Mapping from `(user_id, conn_id)` to mapping of `token` + to mapping of room ID to `HaveSentRoom`. + """ + + store: "DataStore" + + async def get_and_clear_connection_positions( + self, + sync_config: SlidingSyncConfig, + from_token: Optional[SlidingSyncStreamToken], + ) -> PerConnectionState: + """Fetch the per-connection state for the token. + + Raises: + SlidingSyncUnknownPosition if the connection_token is unknown + """ + # If this is our first request, there is no previous connection state to fetch out of the database + if from_token is None or from_token.connection_position == 0: + return PerConnectionState() + + conn_id = sync_config.conn_id or "" + + device_id = sync_config.requester.device_id + assert device_id is not None + + return await self.store.get_and_clear_connection_positions( + sync_config.user.to_string(), + device_id, + conn_id, + from_token.connection_position, + ) + + @trace + async def record_new_state( + self, + sync_config: SlidingSyncConfig, + from_token: Optional[SlidingSyncStreamToken], + new_connection_state: MutablePerConnectionState, + ) -> int: + """Record updated per-connection state, returning the connection + position associated with the new state. + If there are no changes to the state this may return the same token as + the existing per-connection state. + """ + if not new_connection_state.has_updates(): + if from_token is not None: + return from_token.connection_position + else: + return 0 + + # A from token with a zero connection position means there was no + # previously stored connection state, so we treat a zero the same as + # there being no previous position. + previous_connection_position = None + if from_token is not None and from_token.connection_position != 0: + previous_connection_position = from_token.connection_position + + conn_id = sync_config.conn_id or "" + + device_id = sync_config.requester.device_id + assert device_id is not None + + return await self.store.persist_per_connection_state( + sync_config.user.to_string(), + device_id, + conn_id, + previous_connection_position, + new_connection_state, + )