diff options
author | Eric Eastwood <eric.eastwood@beta.gouv.fr> | 2024-07-04 12:25:36 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-07-04 12:25:36 -0500 |
commit | 22aeb78b775ab1d24401b44d642eacc8b99a64fd (patch) | |
tree | d0bf200e3ae91dcc953fba514319b4e2138a9b51 /synapse/handlers/sliding_sync.py | |
parent | Changelog entries only get merged if they have the same content and extension... (diff) | |
download | synapse-22aeb78b775ab1d24401b44d642eacc8b99a64fd.tar.xz |
Add `rooms.required_state` to Sliding Sync `/sync` (#17342)
Also handles excluding rooms with partial state when people are asking for room membership events unless it's `$LAZY` room membership.
Diffstat (limited to 'synapse/handlers/sliding_sync.py')
-rw-r--r-- | synapse/handlers/sliding_sync.py | 432 |
1 files changed, 379 insertions, 53 deletions
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 0cebeea592..a1ddac903e 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -18,7 +18,7 @@ # # import logging -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple +from typing import TYPE_CHECKING, Any, Dict, Final, List, Optional, Set, Tuple import attr from immutabledict import immutabledict @@ -39,6 +39,7 @@ from synapse.types import ( PersistedEventPosition, Requester, RoomStreamToken, + StateMap, StreamKeyType, StreamToken, UserID, @@ -90,14 +91,186 @@ class RoomSyncConfig: Attributes: timeline_limit: The maximum number of events to return in the timeline. - required_state: The set of state events requested for the room. The - values are close to `StateKey` but actually use a syntax where you can - provide `*` wildcard and `$LAZY` for lazy room members as the `state_key` part - of the tuple (type, state_key). + + required_state_map: Map from state event type to state_keys requested for the + room. The values are close to `StateKey` but actually use a syntax where you + can provide `*` wildcard and `$LAZY` for lazy-loading room members. """ timeline_limit: int - required_state: Set[Tuple[str, str]] + required_state_map: Dict[str, Set[str]] + + @classmethod + def from_room_config( + cls, + room_params: SlidingSyncConfig.CommonRoomParameters, + ) -> "RoomSyncConfig": + """ + Create a `RoomSyncConfig` from a `SlidingSyncList`/`RoomSubscription` config. + + Args: + room_params: `SlidingSyncConfig.SlidingSyncList` or `SlidingSyncConfig.RoomSubscription` + """ + required_state_map: Dict[str, Set[str]] = {} + for ( + state_type, + state_key, + ) in room_params.required_state: + # If we already have a wildcard for this specific `state_key`, we don't need + # to add it since the wildcard already covers it. + if state_key in required_state_map.get(StateValues.WILDCARD, set()): + continue + + # If we already have a wildcard `state_key` for this `state_type`, we don't need + # to add anything else + if StateValues.WILDCARD in required_state_map.get(state_type, set()): + continue + + # If we're getting wildcards for the `state_type` and `state_key`, that's + # all that matters so get rid of any other entries + if state_type == StateValues.WILDCARD and state_key == StateValues.WILDCARD: + required_state_map = {StateValues.WILDCARD: {StateValues.WILDCARD}} + # We can break, since we don't need to add anything else + break + + # If we're getting a wildcard for the `state_type`, get rid of any other + # entries with the same `state_key`, since the wildcard will cover it already. + elif state_type == StateValues.WILDCARD: + # Get rid of any entries that match the `state_key` + # + # Make a copy so we don't run into an error: `dictionary changed size + # during iteration`, when we remove items + for ( + existing_state_type, + existing_state_key_set, + ) in list(required_state_map.items()): + # Make a copy so we don't run into an error: `Set changed size during + # iteration`, when we filter out and remove items + for existing_state_key in existing_state_key_set.copy(): + if existing_state_key == state_key: + existing_state_key_set.remove(state_key) + + # If we've the left the `set()` empty, remove it from the map + if existing_state_key_set == set(): + required_state_map.pop(existing_state_type, None) + + # If we're getting a wildcard `state_key`, get rid of any other state_keys + # for this `state_type` since the wildcard will cover it already. + if state_key == StateValues.WILDCARD: + required_state_map[state_type] = {state_key} + # Otherwise, just add it to the set + else: + if required_state_map.get(state_type) is None: + required_state_map[state_type] = {state_key} + else: + required_state_map[state_type].add(state_key) + + return cls( + timeline_limit=room_params.timeline_limit, + required_state_map=required_state_map, + ) + + def deep_copy(self) -> "RoomSyncConfig": + required_state_map: Dict[str, Set[str]] = { + state_type: state_key_set.copy() + for state_type, state_key_set in self.required_state_map.items() + } + + return RoomSyncConfig( + timeline_limit=self.timeline_limit, + required_state_map=required_state_map, + ) + + def combine_room_sync_config( + self, other_room_sync_config: "RoomSyncConfig" + ) -> None: + """ + Combine this `RoomSyncConfig` with another `RoomSyncConfig` and take the + superset union of the two. + """ + # Take the highest timeline limit + if self.timeline_limit < other_room_sync_config.timeline_limit: + self.timeline_limit = other_room_sync_config.timeline_limit + + # Union the required state + for ( + state_type, + state_key_set, + ) in other_room_sync_config.required_state_map.items(): + # If we already have a wildcard for everything, we don't need to add + # anything else + if StateValues.WILDCARD in self.required_state_map.get( + StateValues.WILDCARD, set() + ): + break + + # If we already have a wildcard `state_key` for this `state_type`, we don't need + # to add anything else + if StateValues.WILDCARD in self.required_state_map.get(state_type, set()): + continue + + # If we're getting wildcards for the `state_type` and `state_key`, that's + # all that matters so get rid of any other entries + if ( + state_type == StateValues.WILDCARD + and StateValues.WILDCARD in state_key_set + ): + self.required_state_map = {state_type: {StateValues.WILDCARD}} + # We can break, since we don't need to add anything else + break + + for state_key in state_key_set: + # If we already have a wildcard for this specific `state_key`, we don't need + # to add it since the wildcard already covers it. + if state_key in self.required_state_map.get( + StateValues.WILDCARD, set() + ): + continue + + # If we're getting a wildcard for the `state_type`, get rid of any other + # entries with the same `state_key`, since the wildcard will cover it already. + if state_type == StateValues.WILDCARD: + # Get rid of any entries that match the `state_key` + # + # Make a copy so we don't run into an error: `dictionary changed size + # during iteration`, when we remove items + for existing_state_type, existing_state_key_set in list( + self.required_state_map.items() + ): + # Make a copy so we don't run into an error: `Set changed size during + # iteration`, when we filter out and remove items + for existing_state_key in existing_state_key_set.copy(): + if existing_state_key == state_key: + existing_state_key_set.remove(state_key) + + # If we've the left the `set()` empty, remove it from the map + if existing_state_key_set == set(): + self.required_state_map.pop(existing_state_type, None) + + # If we're getting a wildcard `state_key`, get rid of any other state_keys + # for this `state_type` since the wildcard will cover it already. + if state_key == StateValues.WILDCARD: + self.required_state_map[state_type] = {state_key} + break + # Otherwise, just add it to the set + else: + if self.required_state_map.get(state_type) is None: + self.required_state_map[state_type] = {state_key} + else: + self.required_state_map[state_type].add(state_key) + + +class StateValues: + """ + Understood values of the (type, state_key) tuple in `required_state`. + """ + + # Include all state events of the given type + WILDCARD: Final = "*" + # Lazy-load room membership events (include room membership events for any event + # `sender` in the timeline). We only give special meaning to this value when it's a + # `state_key`. + LAZY: Final = "$LAZY" @attr.s(slots=True, frozen=True, auto_attribs=True) @@ -242,6 +415,8 @@ class SlidingSyncHandler: # Assemble sliding window lists lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {} + # Keep track of the rooms that we're going to display and need to fetch more + # info about relevant_room_map: Dict[str, RoomSyncConfig] = {} if sync_config.lists: # Get all of the room IDs that the user should be able to see in the sync @@ -260,49 +435,76 @@ class SlidingSyncHandler: sync_config.user, sync_room_map, list_config.filters, to_token ) + # Sort the list sorted_room_info = await self.sort_rooms( filtered_sync_room_map, to_token ) + # Find which rooms are partially stated and may need to be filtered out + # depending on the `required_state` requested (see below). + partial_state_room_map = await self.store.is_partial_state_room_batched( + filtered_sync_room_map.keys() + ) + + # Since creating the `RoomSyncConfig` takes some work, let's just do it + # once and make a copy whenever we need it. + room_sync_config = RoomSyncConfig.from_room_config(list_config) + membership_state_keys = room_sync_config.required_state_map.get( + EventTypes.Member + ) + lazy_loading = ( + membership_state_keys is not None + and len(membership_state_keys) == 1 + and StateValues.LAZY in membership_state_keys + ) + ops: List[SlidingSyncResult.SlidingWindowList.Operation] = [] if list_config.ranges: for range in list_config.ranges: - sliced_room_ids = [ - room_id - # Both sides of range are inclusive - for room_id, _ in sorted_room_info[range[0] : range[1] + 1] - ] + room_ids_in_list: List[str] = [] + + # We're going to loop through the sorted list of rooms starting + # at the range start index and keep adding rooms until we fill + # up the range or run out of rooms. + # + # Both sides of range are inclusive so we `+ 1` + max_num_rooms = range[1] - range[0] + 1 + for room_id, _ in sorted_room_info[range[0] :]: + if len(room_ids_in_list) >= max_num_rooms: + break + + # Exclude partially-stated rooms unless the `required_state` + # only has `["m.room.member", "$LAZY"]` for membership + # (lazy-loading room members). + if partial_state_room_map.get(room_id) and not lazy_loading: + continue + + # Take the superset of the `RoomSyncConfig` for each room. + # + # Update our `relevant_room_map` with the room we're going + # to display and need to fetch more info about. + existing_room_sync_config = relevant_room_map.get(room_id) + if existing_room_sync_config is not None: + existing_room_sync_config.combine_room_sync_config( + room_sync_config + ) + else: + # Make a copy so if we modify it later, it doesn't + # affect all references. + relevant_room_map[room_id] = ( + room_sync_config.deep_copy() + ) + + room_ids_in_list.append(room_id) ops.append( SlidingSyncResult.SlidingWindowList.Operation( op=OperationType.SYNC, range=range, - room_ids=sliced_room_ids, + room_ids=room_ids_in_list, ) ) - # Take the superset of the `RoomSyncConfig` for each room - for room_id in sliced_room_ids: - if relevant_room_map.get(room_id) is not None: - # Take the highest timeline limit - if ( - relevant_room_map[room_id].timeline_limit - < list_config.timeline_limit - ): - relevant_room_map[room_id].timeline_limit = ( - list_config.timeline_limit - ) - - # Union the required state - relevant_room_map[room_id].required_state.update( - list_config.required_state - ) - else: - relevant_room_map[room_id] = RoomSyncConfig( - timeline_limit=list_config.timeline_limit, - required_state=set(list_config.required_state), - ) - lists[list_key] = SlidingSyncResult.SlidingWindowList( count=len(sorted_room_info), ops=ops, @@ -651,9 +853,6 @@ class SlidingSyncHandler: user_id = user.to_string() # TODO: Apply filters - # - # TODO: Exclude partially stated rooms unless the `required_state` has - # `["m.room.member", "$LAZY"]` filtered_room_id_set = set(sync_room_map.keys()) @@ -694,16 +893,18 @@ class SlidingSyncHandler: if filters.is_encrypted is not None: # Make a copy so we don't run into an error: `Set changed size during # iteration`, when we filter out and remove items - for room_id in list(filtered_room_id_set): + for room_id in filtered_room_id_set.copy(): state_at_to_token = await self.storage_controllers.state.get_state_at( room_id, to_token, state_filter=StateFilter.from_types( [(EventTypes.RoomEncryption, "")] ), - # Partially stated rooms should have all state events except for the - # membership events so we don't need to wait. Plus we don't want to - # block the whole sync waiting for this one room. + # Partially-stated rooms should have all state events except for the + # membership events so we don't need to wait because we only care + # about retrieving the `EventTypes.RoomEncryption` state event here. + # Plus we don't want to block the whole sync waiting for this one + # room. await_full_state=False, ) is_encrypted = state_at_to_token.get((EventTypes.RoomEncryption, "")) @@ -719,7 +920,7 @@ class SlidingSyncHandler: if filters.is_invite is not None: # Make a copy so we don't run into an error: `Set changed size during # iteration`, when we filter out and remove items - for room_id in list(filtered_room_id_set): + for room_id in filtered_room_id_set.copy(): room_for_user = sync_room_map[room_id] # If we're looking for invite rooms, filter out rooms that the user is # not invited to and vice versa @@ -737,7 +938,7 @@ class SlidingSyncHandler: if filters.room_types is not None or filters.not_room_types is not None: # Make a copy so we don't run into an error: `Set changed size during # iteration`, when we filter out and remove items - for room_id in list(filtered_room_id_set): + for room_id in filtered_room_id_set.copy(): create_event = await self.store.get_create_event_for_room(room_id) room_type = create_event.content.get(EventContentFields.ROOM_TYPE) if ( @@ -843,7 +1044,7 @@ class SlidingSyncHandler: # Assemble the list of timeline events # - # It would be nice to make the `rooms` response more uniform regardless of + # FIXME: It would be nice to make the `rooms` response more uniform regardless of # membership. Currently, we have to make all of these optional because # `invite`/`knock` rooms only have `stripped_state`. See # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932 @@ -1010,6 +1211,136 @@ class SlidingSyncHandler: # state reset happened. Perhaps we should indicate this by setting `initial: # True` and empty `required_state`. + # TODO: Since we can't determine whether we've already sent a room down this + # Sliding Sync connection before (we plan to add this optimization in the + # future), we're always returning the requested room state instead of + # updates. + initial = True + + # Fetch the required state for the room + # + # No `required_state` for invite/knock rooms (just `stripped_state`) + # + # FIXME: It would be nice to make the `rooms` response more uniform regardless + # of membership. Currently, we have to make this optional because + # `invite`/`knock` rooms only have `stripped_state`. See + # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932 + room_state: Optional[StateMap[EventBase]] = None + if rooms_membership_for_user_at_to_token.membership not in ( + Membership.INVITE, + Membership.KNOCK, + ): + # Calculate the `StateFilter` based on the `required_state` for the room + state_filter: Optional[StateFilter] = StateFilter.none() + # If we have a double wildcard ("*", "*") in the `required_state`, we need + # to fetch all state for the room + # + # Note: MSC3575 describes different behavior to how we're handling things + # here but since it's not wrong to return more state than requested + # (`required_state` is just the minimum requested), it doesn't matter if we + # include more than client wanted. This complexity is also under scrutiny, + # see + # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1185109050 + # + # > One unique exception is when you request all state events via ["*", "*"]. When used, + # > all state events are returned by default, and additional entries FILTER OUT the returned set + # > of state events. These additional entries cannot use '*' themselves. + # > For example, ["*", "*"], ["m.room.member", "@alice:example.com"] will _exclude_ every m.room.member + # > event _except_ for @alice:example.com, and include every other state event. + # > In addition, ["*", "*"], ["m.space.child", "*"] is an error, the m.space.child filter is not + # > required as it would have been returned anyway. + # > + # > -- MSC3575 (https://github.com/matrix-org/matrix-spec-proposals/pull/3575) + if StateValues.WILDCARD in room_sync_config.required_state_map.get( + StateValues.WILDCARD, set() + ): + state_filter = StateFilter.all() + # TODO: `StateFilter` currently doesn't support wildcard event types. We're + # currently working around this by returning all state to the client but it + # would be nice to fetch less from the database and return just what the + # client wanted. + elif ( + room_sync_config.required_state_map.get(StateValues.WILDCARD) + is not None + ): + state_filter = StateFilter.all() + else: + required_state_types: List[Tuple[str, Optional[str]]] = [] + for ( + state_type, + state_key_set, + ) in room_sync_config.required_state_map.items(): + for state_key in state_key_set: + if state_key == StateValues.WILDCARD: + # `None` is a wildcard in the `StateFilter` + required_state_types.append((state_type, None)) + # We need to fetch all relevant people when we're lazy-loading membership + elif ( + state_type == EventTypes.Member + and state_key == StateValues.LAZY + ): + # Everyone in the timeline is relevant + timeline_membership: Set[str] = set() + if timeline_events is not None: + for timeline_event in timeline_events: + timeline_membership.add(timeline_event.sender) + + for user_id in timeline_membership: + required_state_types.append( + (EventTypes.Member, user_id) + ) + + # FIXME: We probably also care about invite, ban, kick, targets, etc + # but the spec only mentions "senders". + else: + required_state_types.append((state_type, state_key)) + + state_filter = StateFilter.from_types(required_state_types) + + # We can skip fetching state if we don't need any + if state_filter != StateFilter.none(): + # We can return all of the state that was requested if we're doing an + # initial sync + if initial: + # People shouldn't see past their leave/ban event + if rooms_membership_for_user_at_to_token.membership in ( + Membership.LEAVE, + Membership.BAN, + ): + room_state = await self.storage_controllers.state.get_state_at( + room_id, + stream_position=to_token.copy_and_replace( + StreamKeyType.ROOM, + rooms_membership_for_user_at_to_token.event_pos.to_room_stream_token(), + ), + state_filter=state_filter, + # Partially-stated rooms should have all state events except for + # the membership events and since we've already excluded + # partially-stated rooms unless `required_state` only has + # `["m.room.member", "$LAZY"]` for membership, we should be able + # to retrieve everything requested. Plus we don't want to block + # the whole sync waiting for this one room. + await_full_state=False, + ) + # Otherwise, we can get the latest current state in the room + else: + room_state = await self.storage_controllers.state.get_current_state( + room_id, + state_filter, + # Partially-stated rooms should have all state events except for + # the membership events and since we've already excluded + # partially-stated rooms unless `required_state` only has + # `["m.room.member", "$LAZY"]` for membership, we should be able + # to retrieve everything requested. Plus we don't want to block + # the whole sync waiting for this one room. + await_full_state=False, + ) + # TODO: Query `current_state_delta_stream` and reverse/rewind back to the `to_token` + else: + # TODO: Once we can figure out if we've sent a room down this connection before, + # we can return updates instead of the full required state. + raise NotImplementedError() + return SlidingSyncResult.RoomResult( # TODO: Dummy value name=None, @@ -1017,20 +1348,16 @@ class SlidingSyncHandler: avatar=None, # TODO: Dummy value heroes=None, - # TODO: Since we can't determine whether we've already sent a room down this - # Sliding Sync connection before (we plan to add this optimization in the - # future), we're always returning the requested room state instead of - # updates. - initial=True, # TODO: Dummy value - required_state=[], + is_dm=False, + initial=initial, + required_state=list(room_state.values()) if room_state else None, timeline_events=timeline_events, bundled_aggregations=bundled_aggregations, - # TODO: Dummy value - is_dm=False, stripped_state=stripped_state, prev_batch=prev_batch_token, limited=limited, + num_live=num_live, # TODO: Dummy values joined_count=0, invited_count=0, @@ -1039,5 +1366,4 @@ class SlidingSyncHandler: # (encrypted rooms). notification_count=0, highlight_count=0, - num_live=num_live, ) |