diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 0cebeea592..a1ddac903e 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -18,7 +18,7 @@
#
#
import logging
-from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple
+from typing import TYPE_CHECKING, Any, Dict, Final, List, Optional, Set, Tuple
import attr
from immutabledict import immutabledict
@@ -39,6 +39,7 @@ from synapse.types import (
PersistedEventPosition,
Requester,
RoomStreamToken,
+ StateMap,
StreamKeyType,
StreamToken,
UserID,
@@ -90,14 +91,186 @@ class RoomSyncConfig:
Attributes:
timeline_limit: The maximum number of events to return in the timeline.
- required_state: The set of state events requested for the room. The
- values are close to `StateKey` but actually use a syntax where you can
- provide `*` wildcard and `$LAZY` for lazy room members as the `state_key` part
- of the tuple (type, state_key).
+
+ required_state_map: Map from state event type to state_keys requested for the
+ room. The values are close to `StateKey` but actually use a syntax where you
+ can provide `*` wildcard and `$LAZY` for lazy-loading room members.
"""
timeline_limit: int
- required_state: Set[Tuple[str, str]]
+ required_state_map: Dict[str, Set[str]]
+
+ @classmethod
+ def from_room_config(
+ cls,
+ room_params: SlidingSyncConfig.CommonRoomParameters,
+ ) -> "RoomSyncConfig":
+ """
+ Create a `RoomSyncConfig` from a `SlidingSyncList`/`RoomSubscription` config.
+
+ Args:
+ room_params: `SlidingSyncConfig.SlidingSyncList` or `SlidingSyncConfig.RoomSubscription`
+ """
+ required_state_map: Dict[str, Set[str]] = {}
+ for (
+ state_type,
+ state_key,
+ ) in room_params.required_state:
+ # If we already have a wildcard for this specific `state_key`, we don't need
+ # to add it since the wildcard already covers it.
+ if state_key in required_state_map.get(StateValues.WILDCARD, set()):
+ continue
+
+ # If we already have a wildcard `state_key` for this `state_type`, we don't need
+ # to add anything else
+ if StateValues.WILDCARD in required_state_map.get(state_type, set()):
+ continue
+
+ # If we're getting wildcards for the `state_type` and `state_key`, that's
+ # all that matters so get rid of any other entries
+ if state_type == StateValues.WILDCARD and state_key == StateValues.WILDCARD:
+ required_state_map = {StateValues.WILDCARD: {StateValues.WILDCARD}}
+ # We can break, since we don't need to add anything else
+ break
+
+ # If we're getting a wildcard for the `state_type`, get rid of any other
+ # entries with the same `state_key`, since the wildcard will cover it already.
+ elif state_type == StateValues.WILDCARD:
+ # Get rid of any entries that match the `state_key`
+ #
+ # Make a copy so we don't run into an error: `dictionary changed size
+ # during iteration`, when we remove items
+ for (
+ existing_state_type,
+ existing_state_key_set,
+ ) in list(required_state_map.items()):
+ # Make a copy so we don't run into an error: `Set changed size during
+ # iteration`, when we filter out and remove items
+ for existing_state_key in existing_state_key_set.copy():
+ if existing_state_key == state_key:
+ existing_state_key_set.remove(state_key)
+
+ # If we've the left the `set()` empty, remove it from the map
+ if existing_state_key_set == set():
+ required_state_map.pop(existing_state_type, None)
+
+ # If we're getting a wildcard `state_key`, get rid of any other state_keys
+ # for this `state_type` since the wildcard will cover it already.
+ if state_key == StateValues.WILDCARD:
+ required_state_map[state_type] = {state_key}
+ # Otherwise, just add it to the set
+ else:
+ if required_state_map.get(state_type) is None:
+ required_state_map[state_type] = {state_key}
+ else:
+ required_state_map[state_type].add(state_key)
+
+ return cls(
+ timeline_limit=room_params.timeline_limit,
+ required_state_map=required_state_map,
+ )
+
+ def deep_copy(self) -> "RoomSyncConfig":
+ required_state_map: Dict[str, Set[str]] = {
+ state_type: state_key_set.copy()
+ for state_type, state_key_set in self.required_state_map.items()
+ }
+
+ return RoomSyncConfig(
+ timeline_limit=self.timeline_limit,
+ required_state_map=required_state_map,
+ )
+
+ def combine_room_sync_config(
+ self, other_room_sync_config: "RoomSyncConfig"
+ ) -> None:
+ """
+ Combine this `RoomSyncConfig` with another `RoomSyncConfig` and take the
+ superset union of the two.
+ """
+ # Take the highest timeline limit
+ if self.timeline_limit < other_room_sync_config.timeline_limit:
+ self.timeline_limit = other_room_sync_config.timeline_limit
+
+ # Union the required state
+ for (
+ state_type,
+ state_key_set,
+ ) in other_room_sync_config.required_state_map.items():
+ # If we already have a wildcard for everything, we don't need to add
+ # anything else
+ if StateValues.WILDCARD in self.required_state_map.get(
+ StateValues.WILDCARD, set()
+ ):
+ break
+
+ # If we already have a wildcard `state_key` for this `state_type`, we don't need
+ # to add anything else
+ if StateValues.WILDCARD in self.required_state_map.get(state_type, set()):
+ continue
+
+ # If we're getting wildcards for the `state_type` and `state_key`, that's
+ # all that matters so get rid of any other entries
+ if (
+ state_type == StateValues.WILDCARD
+ and StateValues.WILDCARD in state_key_set
+ ):
+ self.required_state_map = {state_type: {StateValues.WILDCARD}}
+ # We can break, since we don't need to add anything else
+ break
+
+ for state_key in state_key_set:
+ # If we already have a wildcard for this specific `state_key`, we don't need
+ # to add it since the wildcard already covers it.
+ if state_key in self.required_state_map.get(
+ StateValues.WILDCARD, set()
+ ):
+ continue
+
+ # If we're getting a wildcard for the `state_type`, get rid of any other
+ # entries with the same `state_key`, since the wildcard will cover it already.
+ if state_type == StateValues.WILDCARD:
+ # Get rid of any entries that match the `state_key`
+ #
+ # Make a copy so we don't run into an error: `dictionary changed size
+ # during iteration`, when we remove items
+ for existing_state_type, existing_state_key_set in list(
+ self.required_state_map.items()
+ ):
+ # Make a copy so we don't run into an error: `Set changed size during
+ # iteration`, when we filter out and remove items
+ for existing_state_key in existing_state_key_set.copy():
+ if existing_state_key == state_key:
+ existing_state_key_set.remove(state_key)
+
+ # If we've the left the `set()` empty, remove it from the map
+ if existing_state_key_set == set():
+ self.required_state_map.pop(existing_state_type, None)
+
+ # If we're getting a wildcard `state_key`, get rid of any other state_keys
+ # for this `state_type` since the wildcard will cover it already.
+ if state_key == StateValues.WILDCARD:
+ self.required_state_map[state_type] = {state_key}
+ break
+ # Otherwise, just add it to the set
+ else:
+ if self.required_state_map.get(state_type) is None:
+ self.required_state_map[state_type] = {state_key}
+ else:
+ self.required_state_map[state_type].add(state_key)
+
+
+class StateValues:
+ """
+ Understood values of the (type, state_key) tuple in `required_state`.
+ """
+
+ # Include all state events of the given type
+ WILDCARD: Final = "*"
+ # Lazy-load room membership events (include room membership events for any event
+ # `sender` in the timeline). We only give special meaning to this value when it's a
+ # `state_key`.
+ LAZY: Final = "$LAZY"
@attr.s(slots=True, frozen=True, auto_attribs=True)
@@ -242,6 +415,8 @@ class SlidingSyncHandler:
# Assemble sliding window lists
lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {}
+ # Keep track of the rooms that we're going to display and need to fetch more
+ # info about
relevant_room_map: Dict[str, RoomSyncConfig] = {}
if sync_config.lists:
# Get all of the room IDs that the user should be able to see in the sync
@@ -260,49 +435,76 @@ class SlidingSyncHandler:
sync_config.user, sync_room_map, list_config.filters, to_token
)
+ # Sort the list
sorted_room_info = await self.sort_rooms(
filtered_sync_room_map, to_token
)
+ # Find which rooms are partially stated and may need to be filtered out
+ # depending on the `required_state` requested (see below).
+ partial_state_room_map = await self.store.is_partial_state_room_batched(
+ filtered_sync_room_map.keys()
+ )
+
+ # Since creating the `RoomSyncConfig` takes some work, let's just do it
+ # once and make a copy whenever we need it.
+ room_sync_config = RoomSyncConfig.from_room_config(list_config)
+ membership_state_keys = room_sync_config.required_state_map.get(
+ EventTypes.Member
+ )
+ lazy_loading = (
+ membership_state_keys is not None
+ and len(membership_state_keys) == 1
+ and StateValues.LAZY in membership_state_keys
+ )
+
ops: List[SlidingSyncResult.SlidingWindowList.Operation] = []
if list_config.ranges:
for range in list_config.ranges:
- sliced_room_ids = [
- room_id
- # Both sides of range are inclusive
- for room_id, _ in sorted_room_info[range[0] : range[1] + 1]
- ]
+ room_ids_in_list: List[str] = []
+
+ # We're going to loop through the sorted list of rooms starting
+ # at the range start index and keep adding rooms until we fill
+ # up the range or run out of rooms.
+ #
+ # Both sides of range are inclusive so we `+ 1`
+ max_num_rooms = range[1] - range[0] + 1
+ for room_id, _ in sorted_room_info[range[0] :]:
+ if len(room_ids_in_list) >= max_num_rooms:
+ break
+
+ # Exclude partially-stated rooms unless the `required_state`
+ # only has `["m.room.member", "$LAZY"]` for membership
+ # (lazy-loading room members).
+ if partial_state_room_map.get(room_id) and not lazy_loading:
+ continue
+
+ # Take the superset of the `RoomSyncConfig` for each room.
+ #
+ # Update our `relevant_room_map` with the room we're going
+ # to display and need to fetch more info about.
+ existing_room_sync_config = relevant_room_map.get(room_id)
+ if existing_room_sync_config is not None:
+ existing_room_sync_config.combine_room_sync_config(
+ room_sync_config
+ )
+ else:
+ # Make a copy so if we modify it later, it doesn't
+ # affect all references.
+ relevant_room_map[room_id] = (
+ room_sync_config.deep_copy()
+ )
+
+ room_ids_in_list.append(room_id)
ops.append(
SlidingSyncResult.SlidingWindowList.Operation(
op=OperationType.SYNC,
range=range,
- room_ids=sliced_room_ids,
+ room_ids=room_ids_in_list,
)
)
- # Take the superset of the `RoomSyncConfig` for each room
- for room_id in sliced_room_ids:
- if relevant_room_map.get(room_id) is not None:
- # Take the highest timeline limit
- if (
- relevant_room_map[room_id].timeline_limit
- < list_config.timeline_limit
- ):
- relevant_room_map[room_id].timeline_limit = (
- list_config.timeline_limit
- )
-
- # Union the required state
- relevant_room_map[room_id].required_state.update(
- list_config.required_state
- )
- else:
- relevant_room_map[room_id] = RoomSyncConfig(
- timeline_limit=list_config.timeline_limit,
- required_state=set(list_config.required_state),
- )
-
lists[list_key] = SlidingSyncResult.SlidingWindowList(
count=len(sorted_room_info),
ops=ops,
@@ -651,9 +853,6 @@ class SlidingSyncHandler:
user_id = user.to_string()
# TODO: Apply filters
- #
- # TODO: Exclude partially stated rooms unless the `required_state` has
- # `["m.room.member", "$LAZY"]`
filtered_room_id_set = set(sync_room_map.keys())
@@ -694,16 +893,18 @@ class SlidingSyncHandler:
if filters.is_encrypted is not None:
# Make a copy so we don't run into an error: `Set changed size during
# iteration`, when we filter out and remove items
- for room_id in list(filtered_room_id_set):
+ for room_id in filtered_room_id_set.copy():
state_at_to_token = await self.storage_controllers.state.get_state_at(
room_id,
to_token,
state_filter=StateFilter.from_types(
[(EventTypes.RoomEncryption, "")]
),
- # Partially stated rooms should have all state events except for the
- # membership events so we don't need to wait. Plus we don't want to
- # block the whole sync waiting for this one room.
+ # Partially-stated rooms should have all state events except for the
+ # membership events so we don't need to wait because we only care
+ # about retrieving the `EventTypes.RoomEncryption` state event here.
+ # Plus we don't want to block the whole sync waiting for this one
+ # room.
await_full_state=False,
)
is_encrypted = state_at_to_token.get((EventTypes.RoomEncryption, ""))
@@ -719,7 +920,7 @@ class SlidingSyncHandler:
if filters.is_invite is not None:
# Make a copy so we don't run into an error: `Set changed size during
# iteration`, when we filter out and remove items
- for room_id in list(filtered_room_id_set):
+ for room_id in filtered_room_id_set.copy():
room_for_user = sync_room_map[room_id]
# If we're looking for invite rooms, filter out rooms that the user is
# not invited to and vice versa
@@ -737,7 +938,7 @@ class SlidingSyncHandler:
if filters.room_types is not None or filters.not_room_types is not None:
# Make a copy so we don't run into an error: `Set changed size during
# iteration`, when we filter out and remove items
- for room_id in list(filtered_room_id_set):
+ for room_id in filtered_room_id_set.copy():
create_event = await self.store.get_create_event_for_room(room_id)
room_type = create_event.content.get(EventContentFields.ROOM_TYPE)
if (
@@ -843,7 +1044,7 @@ class SlidingSyncHandler:
# Assemble the list of timeline events
#
- # It would be nice to make the `rooms` response more uniform regardless of
+ # FIXME: It would be nice to make the `rooms` response more uniform regardless of
# membership. Currently, we have to make all of these optional because
# `invite`/`knock` rooms only have `stripped_state`. See
# https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932
@@ -1010,6 +1211,136 @@ class SlidingSyncHandler:
# state reset happened. Perhaps we should indicate this by setting `initial:
# True` and empty `required_state`.
+ # TODO: Since we can't determine whether we've already sent a room down this
+ # Sliding Sync connection before (we plan to add this optimization in the
+ # future), we're always returning the requested room state instead of
+ # updates.
+ initial = True
+
+ # Fetch the required state for the room
+ #
+ # No `required_state` for invite/knock rooms (just `stripped_state`)
+ #
+ # FIXME: It would be nice to make the `rooms` response more uniform regardless
+ # of membership. Currently, we have to make this optional because
+ # `invite`/`knock` rooms only have `stripped_state`. See
+ # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932
+ room_state: Optional[StateMap[EventBase]] = None
+ if rooms_membership_for_user_at_to_token.membership not in (
+ Membership.INVITE,
+ Membership.KNOCK,
+ ):
+ # Calculate the `StateFilter` based on the `required_state` for the room
+ state_filter: Optional[StateFilter] = StateFilter.none()
+ # If we have a double wildcard ("*", "*") in the `required_state`, we need
+ # to fetch all state for the room
+ #
+ # Note: MSC3575 describes different behavior to how we're handling things
+ # here but since it's not wrong to return more state than requested
+ # (`required_state` is just the minimum requested), it doesn't matter if we
+ # include more than client wanted. This complexity is also under scrutiny,
+ # see
+ # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1185109050
+ #
+ # > One unique exception is when you request all state events via ["*", "*"]. When used,
+ # > all state events are returned by default, and additional entries FILTER OUT the returned set
+ # > of state events. These additional entries cannot use '*' themselves.
+ # > For example, ["*", "*"], ["m.room.member", "@alice:example.com"] will _exclude_ every m.room.member
+ # > event _except_ for @alice:example.com, and include every other state event.
+ # > In addition, ["*", "*"], ["m.space.child", "*"] is an error, the m.space.child filter is not
+ # > required as it would have been returned anyway.
+ # >
+ # > -- MSC3575 (https://github.com/matrix-org/matrix-spec-proposals/pull/3575)
+ if StateValues.WILDCARD in room_sync_config.required_state_map.get(
+ StateValues.WILDCARD, set()
+ ):
+ state_filter = StateFilter.all()
+ # TODO: `StateFilter` currently doesn't support wildcard event types. We're
+ # currently working around this by returning all state to the client but it
+ # would be nice to fetch less from the database and return just what the
+ # client wanted.
+ elif (
+ room_sync_config.required_state_map.get(StateValues.WILDCARD)
+ is not None
+ ):
+ state_filter = StateFilter.all()
+ else:
+ required_state_types: List[Tuple[str, Optional[str]]] = []
+ for (
+ state_type,
+ state_key_set,
+ ) in room_sync_config.required_state_map.items():
+ for state_key in state_key_set:
+ if state_key == StateValues.WILDCARD:
+ # `None` is a wildcard in the `StateFilter`
+ required_state_types.append((state_type, None))
+ # We need to fetch all relevant people when we're lazy-loading membership
+ elif (
+ state_type == EventTypes.Member
+ and state_key == StateValues.LAZY
+ ):
+ # Everyone in the timeline is relevant
+ timeline_membership: Set[str] = set()
+ if timeline_events is not None:
+ for timeline_event in timeline_events:
+ timeline_membership.add(timeline_event.sender)
+
+ for user_id in timeline_membership:
+ required_state_types.append(
+ (EventTypes.Member, user_id)
+ )
+
+ # FIXME: We probably also care about invite, ban, kick, targets, etc
+ # but the spec only mentions "senders".
+ else:
+ required_state_types.append((state_type, state_key))
+
+ state_filter = StateFilter.from_types(required_state_types)
+
+ # We can skip fetching state if we don't need any
+ if state_filter != StateFilter.none():
+ # We can return all of the state that was requested if we're doing an
+ # initial sync
+ if initial:
+ # People shouldn't see past their leave/ban event
+ if rooms_membership_for_user_at_to_token.membership in (
+ Membership.LEAVE,
+ Membership.BAN,
+ ):
+ room_state = await self.storage_controllers.state.get_state_at(
+ room_id,
+ stream_position=to_token.copy_and_replace(
+ StreamKeyType.ROOM,
+ rooms_membership_for_user_at_to_token.event_pos.to_room_stream_token(),
+ ),
+ state_filter=state_filter,
+ # Partially-stated rooms should have all state events except for
+ # the membership events and since we've already excluded
+ # partially-stated rooms unless `required_state` only has
+ # `["m.room.member", "$LAZY"]` for membership, we should be able
+ # to retrieve everything requested. Plus we don't want to block
+ # the whole sync waiting for this one room.
+ await_full_state=False,
+ )
+ # Otherwise, we can get the latest current state in the room
+ else:
+ room_state = await self.storage_controllers.state.get_current_state(
+ room_id,
+ state_filter,
+ # Partially-stated rooms should have all state events except for
+ # the membership events and since we've already excluded
+ # partially-stated rooms unless `required_state` only has
+ # `["m.room.member", "$LAZY"]` for membership, we should be able
+ # to retrieve everything requested. Plus we don't want to block
+ # the whole sync waiting for this one room.
+ await_full_state=False,
+ )
+ # TODO: Query `current_state_delta_stream` and reverse/rewind back to the `to_token`
+ else:
+ # TODO: Once we can figure out if we've sent a room down this connection before,
+ # we can return updates instead of the full required state.
+ raise NotImplementedError()
+
return SlidingSyncResult.RoomResult(
# TODO: Dummy value
name=None,
@@ -1017,20 +1348,16 @@ class SlidingSyncHandler:
avatar=None,
# TODO: Dummy value
heroes=None,
- # TODO: Since we can't determine whether we've already sent a room down this
- # Sliding Sync connection before (we plan to add this optimization in the
- # future), we're always returning the requested room state instead of
- # updates.
- initial=True,
# TODO: Dummy value
- required_state=[],
+ is_dm=False,
+ initial=initial,
+ required_state=list(room_state.values()) if room_state else None,
timeline_events=timeline_events,
bundled_aggregations=bundled_aggregations,
- # TODO: Dummy value
- is_dm=False,
stripped_state=stripped_state,
prev_batch=prev_batch_token,
limited=limited,
+ num_live=num_live,
# TODO: Dummy values
joined_count=0,
invited_count=0,
@@ -1039,5 +1366,4 @@ class SlidingSyncHandler:
# (encrypted rooms).
notification_count=0,
highlight_count=0,
- num_live=num_live,
)
|