summary refs log tree commit diff
path: root/synapse/handlers/sliding_sync.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/sliding_sync.py')
-rw-r--r--synapse/handlers/sliding_sync.py432
1 files changed, 379 insertions, 53 deletions
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 0cebeea592..a1ddac903e 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -18,7 +18,7 @@
 #
 #
 import logging
-from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple
+from typing import TYPE_CHECKING, Any, Dict, Final, List, Optional, Set, Tuple
 
 import attr
 from immutabledict import immutabledict
@@ -39,6 +39,7 @@ from synapse.types import (
     PersistedEventPosition,
     Requester,
     RoomStreamToken,
+    StateMap,
     StreamKeyType,
     StreamToken,
     UserID,
@@ -90,14 +91,186 @@ class RoomSyncConfig:
 
     Attributes:
         timeline_limit: The maximum number of events to return in the timeline.
-        required_state: The set of state events requested for the room. The
-            values are close to `StateKey` but actually use a syntax where you can
-            provide `*` wildcard and `$LAZY` for lazy room members as the `state_key` part
-            of the tuple (type, state_key).
+
+        required_state_map: Map from state event type to state_keys requested for the
+            room. The values are close to `StateKey` but actually use a syntax where you
+            can provide `*` wildcard and `$LAZY` for lazy-loading room members.
     """
 
     timeline_limit: int
-    required_state: Set[Tuple[str, str]]
+    required_state_map: Dict[str, Set[str]]
+
+    @classmethod
+    def from_room_config(
+        cls,
+        room_params: SlidingSyncConfig.CommonRoomParameters,
+    ) -> "RoomSyncConfig":
+        """
+        Create a `RoomSyncConfig` from a `SlidingSyncList`/`RoomSubscription` config.
+
+        Args:
+            room_params: `SlidingSyncConfig.SlidingSyncList` or `SlidingSyncConfig.RoomSubscription`
+        """
+        required_state_map: Dict[str, Set[str]] = {}
+        for (
+            state_type,
+            state_key,
+        ) in room_params.required_state:
+            # If we already have a wildcard for this specific `state_key`, we don't need
+            # to add it since the wildcard already covers it.
+            if state_key in required_state_map.get(StateValues.WILDCARD, set()):
+                continue
+
+            # If we already have a wildcard `state_key` for this `state_type`, we don't need
+            # to add anything else
+            if StateValues.WILDCARD in required_state_map.get(state_type, set()):
+                continue
+
+            # If we're getting wildcards for the `state_type` and `state_key`, that's
+            # all that matters so get rid of any other entries
+            if state_type == StateValues.WILDCARD and state_key == StateValues.WILDCARD:
+                required_state_map = {StateValues.WILDCARD: {StateValues.WILDCARD}}
+                # We can break, since we don't need to add anything else
+                break
+
+            # If we're getting a wildcard for the `state_type`, get rid of any other
+            # entries with the same `state_key`, since the wildcard will cover it already.
+            elif state_type == StateValues.WILDCARD:
+                # Get rid of any entries that match the `state_key`
+                #
+                # Make a copy so we don't run into an error: `dictionary changed size
+                # during iteration`, when we remove items
+                for (
+                    existing_state_type,
+                    existing_state_key_set,
+                ) in list(required_state_map.items()):
+                    # Make a copy so we don't run into an error: `Set changed size during
+                    # iteration`, when we filter out and remove items
+                    for existing_state_key in existing_state_key_set.copy():
+                        if existing_state_key == state_key:
+                            existing_state_key_set.remove(state_key)
+
+                    # If we've the left the `set()` empty, remove it from the map
+                    if existing_state_key_set == set():
+                        required_state_map.pop(existing_state_type, None)
+
+            # If we're getting a wildcard `state_key`, get rid of any other state_keys
+            # for this `state_type` since the wildcard will cover it already.
+            if state_key == StateValues.WILDCARD:
+                required_state_map[state_type] = {state_key}
+            # Otherwise, just add it to the set
+            else:
+                if required_state_map.get(state_type) is None:
+                    required_state_map[state_type] = {state_key}
+                else:
+                    required_state_map[state_type].add(state_key)
+
+        return cls(
+            timeline_limit=room_params.timeline_limit,
+            required_state_map=required_state_map,
+        )
+
+    def deep_copy(self) -> "RoomSyncConfig":
+        required_state_map: Dict[str, Set[str]] = {
+            state_type: state_key_set.copy()
+            for state_type, state_key_set in self.required_state_map.items()
+        }
+
+        return RoomSyncConfig(
+            timeline_limit=self.timeline_limit,
+            required_state_map=required_state_map,
+        )
+
+    def combine_room_sync_config(
+        self, other_room_sync_config: "RoomSyncConfig"
+    ) -> None:
+        """
+        Combine this `RoomSyncConfig` with another `RoomSyncConfig` and take the
+        superset union of the two.
+        """
+        # Take the highest timeline limit
+        if self.timeline_limit < other_room_sync_config.timeline_limit:
+            self.timeline_limit = other_room_sync_config.timeline_limit
+
+        # Union the required state
+        for (
+            state_type,
+            state_key_set,
+        ) in other_room_sync_config.required_state_map.items():
+            # If we already have a wildcard for everything, we don't need to add
+            # anything else
+            if StateValues.WILDCARD in self.required_state_map.get(
+                StateValues.WILDCARD, set()
+            ):
+                break
+
+            # If we already have a wildcard `state_key` for this `state_type`, we don't need
+            # to add anything else
+            if StateValues.WILDCARD in self.required_state_map.get(state_type, set()):
+                continue
+
+            # If we're getting wildcards for the `state_type` and `state_key`, that's
+            # all that matters so get rid of any other entries
+            if (
+                state_type == StateValues.WILDCARD
+                and StateValues.WILDCARD in state_key_set
+            ):
+                self.required_state_map = {state_type: {StateValues.WILDCARD}}
+                # We can break, since we don't need to add anything else
+                break
+
+            for state_key in state_key_set:
+                # If we already have a wildcard for this specific `state_key`, we don't need
+                # to add it since the wildcard already covers it.
+                if state_key in self.required_state_map.get(
+                    StateValues.WILDCARD, set()
+                ):
+                    continue
+
+                # If we're getting a wildcard for the `state_type`, get rid of any other
+                # entries with the same `state_key`, since the wildcard will cover it already.
+                if state_type == StateValues.WILDCARD:
+                    # Get rid of any entries that match the `state_key`
+                    #
+                    # Make a copy so we don't run into an error: `dictionary changed size
+                    # during iteration`, when we remove items
+                    for existing_state_type, existing_state_key_set in list(
+                        self.required_state_map.items()
+                    ):
+                        # Make a copy so we don't run into an error: `Set changed size during
+                        # iteration`, when we filter out and remove items
+                        for existing_state_key in existing_state_key_set.copy():
+                            if existing_state_key == state_key:
+                                existing_state_key_set.remove(state_key)
+
+                        # If we've the left the `set()` empty, remove it from the map
+                        if existing_state_key_set == set():
+                            self.required_state_map.pop(existing_state_type, None)
+
+                # If we're getting a wildcard `state_key`, get rid of any other state_keys
+                # for this `state_type` since the wildcard will cover it already.
+                if state_key == StateValues.WILDCARD:
+                    self.required_state_map[state_type] = {state_key}
+                    break
+                # Otherwise, just add it to the set
+                else:
+                    if self.required_state_map.get(state_type) is None:
+                        self.required_state_map[state_type] = {state_key}
+                    else:
+                        self.required_state_map[state_type].add(state_key)
+
+
+class StateValues:
+    """
+    Understood values of the (type, state_key) tuple in `required_state`.
+    """
+
+    # Include all state events of the given type
+    WILDCARD: Final = "*"
+    # Lazy-load room membership events (include room membership events for any event
+    # `sender` in the timeline). We only give special meaning to this value when it's a
+    # `state_key`.
+    LAZY: Final = "$LAZY"
 
 
 @attr.s(slots=True, frozen=True, auto_attribs=True)
@@ -242,6 +415,8 @@ class SlidingSyncHandler:
 
         # Assemble sliding window lists
         lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {}
+        # Keep track of the rooms that we're going to display and need to fetch more
+        # info about
         relevant_room_map: Dict[str, RoomSyncConfig] = {}
         if sync_config.lists:
             # Get all of the room IDs that the user should be able to see in the sync
@@ -260,49 +435,76 @@ class SlidingSyncHandler:
                         sync_config.user, sync_room_map, list_config.filters, to_token
                     )
 
+                # Sort the list
                 sorted_room_info = await self.sort_rooms(
                     filtered_sync_room_map, to_token
                 )
 
+                # Find which rooms are partially stated and may need to be filtered out
+                # depending on the `required_state` requested (see below).
+                partial_state_room_map = await self.store.is_partial_state_room_batched(
+                    filtered_sync_room_map.keys()
+                )
+
+                # Since creating the `RoomSyncConfig` takes some work, let's just do it
+                # once and make a copy whenever we need it.
+                room_sync_config = RoomSyncConfig.from_room_config(list_config)
+                membership_state_keys = room_sync_config.required_state_map.get(
+                    EventTypes.Member
+                )
+                lazy_loading = (
+                    membership_state_keys is not None
+                    and len(membership_state_keys) == 1
+                    and StateValues.LAZY in membership_state_keys
+                )
+
                 ops: List[SlidingSyncResult.SlidingWindowList.Operation] = []
                 if list_config.ranges:
                     for range in list_config.ranges:
-                        sliced_room_ids = [
-                            room_id
-                            # Both sides of range are inclusive
-                            for room_id, _ in sorted_room_info[range[0] : range[1] + 1]
-                        ]
+                        room_ids_in_list: List[str] = []
+
+                        # We're going to loop through the sorted list of rooms starting
+                        # at the range start index and keep adding rooms until we fill
+                        # up the range or run out of rooms.
+                        #
+                        # Both sides of range are inclusive so we `+ 1`
+                        max_num_rooms = range[1] - range[0] + 1
+                        for room_id, _ in sorted_room_info[range[0] :]:
+                            if len(room_ids_in_list) >= max_num_rooms:
+                                break
+
+                            # Exclude partially-stated rooms unless the `required_state`
+                            # only has `["m.room.member", "$LAZY"]` for membership
+                            # (lazy-loading room members).
+                            if partial_state_room_map.get(room_id) and not lazy_loading:
+                                continue
+
+                            # Take the superset of the `RoomSyncConfig` for each room.
+                            #
+                            # Update our `relevant_room_map` with the room we're going
+                            # to display and need to fetch more info about.
+                            existing_room_sync_config = relevant_room_map.get(room_id)
+                            if existing_room_sync_config is not None:
+                                existing_room_sync_config.combine_room_sync_config(
+                                    room_sync_config
+                                )
+                            else:
+                                # Make a copy so if we modify it later, it doesn't
+                                # affect all references.
+                                relevant_room_map[room_id] = (
+                                    room_sync_config.deep_copy()
+                                )
+
+                            room_ids_in_list.append(room_id)
 
                         ops.append(
                             SlidingSyncResult.SlidingWindowList.Operation(
                                 op=OperationType.SYNC,
                                 range=range,
-                                room_ids=sliced_room_ids,
+                                room_ids=room_ids_in_list,
                             )
                         )
 
-                        # Take the superset of the `RoomSyncConfig` for each room
-                        for room_id in sliced_room_ids:
-                            if relevant_room_map.get(room_id) is not None:
-                                # Take the highest timeline limit
-                                if (
-                                    relevant_room_map[room_id].timeline_limit
-                                    < list_config.timeline_limit
-                                ):
-                                    relevant_room_map[room_id].timeline_limit = (
-                                        list_config.timeline_limit
-                                    )
-
-                                # Union the required state
-                                relevant_room_map[room_id].required_state.update(
-                                    list_config.required_state
-                                )
-                            else:
-                                relevant_room_map[room_id] = RoomSyncConfig(
-                                    timeline_limit=list_config.timeline_limit,
-                                    required_state=set(list_config.required_state),
-                                )
-
                 lists[list_key] = SlidingSyncResult.SlidingWindowList(
                     count=len(sorted_room_info),
                     ops=ops,
@@ -651,9 +853,6 @@ class SlidingSyncHandler:
         user_id = user.to_string()
 
         # TODO: Apply filters
-        #
-        # TODO: Exclude partially stated rooms unless the `required_state` has
-        # `["m.room.member", "$LAZY"]`
 
         filtered_room_id_set = set(sync_room_map.keys())
 
@@ -694,16 +893,18 @@ class SlidingSyncHandler:
         if filters.is_encrypted is not None:
             # Make a copy so we don't run into an error: `Set changed size during
             # iteration`, when we filter out and remove items
-            for room_id in list(filtered_room_id_set):
+            for room_id in filtered_room_id_set.copy():
                 state_at_to_token = await self.storage_controllers.state.get_state_at(
                     room_id,
                     to_token,
                     state_filter=StateFilter.from_types(
                         [(EventTypes.RoomEncryption, "")]
                     ),
-                    # Partially stated rooms should have all state events except for the
-                    # membership events so we don't need to wait. Plus we don't want to
-                    # block the whole sync waiting for this one room.
+                    # Partially-stated rooms should have all state events except for the
+                    # membership events so we don't need to wait because we only care
+                    # about retrieving the `EventTypes.RoomEncryption` state event here.
+                    # Plus we don't want to block the whole sync waiting for this one
+                    # room.
                     await_full_state=False,
                 )
                 is_encrypted = state_at_to_token.get((EventTypes.RoomEncryption, ""))
@@ -719,7 +920,7 @@ class SlidingSyncHandler:
         if filters.is_invite is not None:
             # Make a copy so we don't run into an error: `Set changed size during
             # iteration`, when we filter out and remove items
-            for room_id in list(filtered_room_id_set):
+            for room_id in filtered_room_id_set.copy():
                 room_for_user = sync_room_map[room_id]
                 # If we're looking for invite rooms, filter out rooms that the user is
                 # not invited to and vice versa
@@ -737,7 +938,7 @@ class SlidingSyncHandler:
         if filters.room_types is not None or filters.not_room_types is not None:
             # Make a copy so we don't run into an error: `Set changed size during
             # iteration`, when we filter out and remove items
-            for room_id in list(filtered_room_id_set):
+            for room_id in filtered_room_id_set.copy():
                 create_event = await self.store.get_create_event_for_room(room_id)
                 room_type = create_event.content.get(EventContentFields.ROOM_TYPE)
                 if (
@@ -843,7 +1044,7 @@ class SlidingSyncHandler:
 
         # Assemble the list of timeline events
         #
-        # It would be nice to make the `rooms` response more uniform regardless of
+        # FIXME: It would be nice to make the `rooms` response more uniform regardless of
         # membership. Currently, we have to make all of these optional because
         # `invite`/`knock` rooms only have `stripped_state`. See
         # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932
@@ -1010,6 +1211,136 @@ class SlidingSyncHandler:
         # state reset happened. Perhaps we should indicate this by setting `initial:
         # True` and empty `required_state`.
 
+        # TODO: Since we can't determine whether we've already sent a room down this
+        # Sliding Sync connection before (we plan to add this optimization in the
+        # future), we're always returning the requested room state instead of
+        # updates.
+        initial = True
+
+        # Fetch the required state for the room
+        #
+        # No `required_state` for invite/knock rooms (just `stripped_state`)
+        #
+        # FIXME: It would be nice to make the `rooms` response more uniform regardless
+        # of membership. Currently, we have to make this optional because
+        # `invite`/`knock` rooms only have `stripped_state`. See
+        # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932
+        room_state: Optional[StateMap[EventBase]] = None
+        if rooms_membership_for_user_at_to_token.membership not in (
+            Membership.INVITE,
+            Membership.KNOCK,
+        ):
+            # Calculate the `StateFilter` based on the `required_state` for the room
+            state_filter: Optional[StateFilter] = StateFilter.none()
+            # If we have a double wildcard ("*", "*") in the `required_state`, we need
+            # to fetch all state for the room
+            #
+            # Note: MSC3575 describes different behavior to how we're handling things
+            # here but since it's not wrong to return more state than requested
+            # (`required_state` is just the minimum requested), it doesn't matter if we
+            # include more than client wanted. This complexity is also under scrutiny,
+            # see
+            # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1185109050
+            #
+            # > One unique exception is when you request all state events via ["*", "*"]. When used,
+            # > all state events are returned by default, and additional entries FILTER OUT the returned set
+            # > of state events. These additional entries cannot use '*' themselves.
+            # > For example, ["*", "*"], ["m.room.member", "@alice:example.com"] will _exclude_ every m.room.member
+            # > event _except_ for @alice:example.com, and include every other state event.
+            # > In addition, ["*", "*"], ["m.space.child", "*"] is an error, the m.space.child filter is not
+            # > required as it would have been returned anyway.
+            # >
+            # > -- MSC3575 (https://github.com/matrix-org/matrix-spec-proposals/pull/3575)
+            if StateValues.WILDCARD in room_sync_config.required_state_map.get(
+                StateValues.WILDCARD, set()
+            ):
+                state_filter = StateFilter.all()
+            # TODO: `StateFilter` currently doesn't support wildcard event types. We're
+            # currently working around this by returning all state to the client but it
+            # would be nice to fetch less from the database and return just what the
+            # client wanted.
+            elif (
+                room_sync_config.required_state_map.get(StateValues.WILDCARD)
+                is not None
+            ):
+                state_filter = StateFilter.all()
+            else:
+                required_state_types: List[Tuple[str, Optional[str]]] = []
+                for (
+                    state_type,
+                    state_key_set,
+                ) in room_sync_config.required_state_map.items():
+                    for state_key in state_key_set:
+                        if state_key == StateValues.WILDCARD:
+                            # `None` is a wildcard in the `StateFilter`
+                            required_state_types.append((state_type, None))
+                        # We need to fetch all relevant people when we're lazy-loading membership
+                        elif (
+                            state_type == EventTypes.Member
+                            and state_key == StateValues.LAZY
+                        ):
+                            # Everyone in the timeline is relevant
+                            timeline_membership: Set[str] = set()
+                            if timeline_events is not None:
+                                for timeline_event in timeline_events:
+                                    timeline_membership.add(timeline_event.sender)
+
+                            for user_id in timeline_membership:
+                                required_state_types.append(
+                                    (EventTypes.Member, user_id)
+                                )
+
+                            # FIXME: We probably also care about invite, ban, kick, targets, etc
+                            # but the spec only mentions "senders".
+                        else:
+                            required_state_types.append((state_type, state_key))
+
+                state_filter = StateFilter.from_types(required_state_types)
+
+            # We can skip fetching state if we don't need any
+            if state_filter != StateFilter.none():
+                # We can return all of the state that was requested if we're doing an
+                # initial sync
+                if initial:
+                    # People shouldn't see past their leave/ban event
+                    if rooms_membership_for_user_at_to_token.membership in (
+                        Membership.LEAVE,
+                        Membership.BAN,
+                    ):
+                        room_state = await self.storage_controllers.state.get_state_at(
+                            room_id,
+                            stream_position=to_token.copy_and_replace(
+                                StreamKeyType.ROOM,
+                                rooms_membership_for_user_at_to_token.event_pos.to_room_stream_token(),
+                            ),
+                            state_filter=state_filter,
+                            # Partially-stated rooms should have all state events except for
+                            # the membership events and since we've already excluded
+                            # partially-stated rooms unless `required_state` only has
+                            # `["m.room.member", "$LAZY"]` for membership, we should be able
+                            # to retrieve everything requested. Plus we don't want to block
+                            # the whole sync waiting for this one room.
+                            await_full_state=False,
+                        )
+                    # Otherwise, we can get the latest current state in the room
+                    else:
+                        room_state = await self.storage_controllers.state.get_current_state(
+                            room_id,
+                            state_filter,
+                            # Partially-stated rooms should have all state events except for
+                            # the membership events and since we've already excluded
+                            # partially-stated rooms unless `required_state` only has
+                            # `["m.room.member", "$LAZY"]` for membership, we should be able
+                            # to retrieve everything requested. Plus we don't want to block
+                            # the whole sync waiting for this one room.
+                            await_full_state=False,
+                        )
+                        # TODO: Query `current_state_delta_stream` and reverse/rewind back to the `to_token`
+                else:
+                    # TODO: Once we can figure out if we've sent a room down this connection before,
+                    # we can return updates instead of the full required state.
+                    raise NotImplementedError()
+
         return SlidingSyncResult.RoomResult(
             # TODO: Dummy value
             name=None,
@@ -1017,20 +1348,16 @@ class SlidingSyncHandler:
             avatar=None,
             # TODO: Dummy value
             heroes=None,
-            # TODO: Since we can't determine whether we've already sent a room down this
-            # Sliding Sync connection before (we plan to add this optimization in the
-            # future), we're always returning the requested room state instead of
-            # updates.
-            initial=True,
             # TODO: Dummy value
-            required_state=[],
+            is_dm=False,
+            initial=initial,
+            required_state=list(room_state.values()) if room_state else None,
             timeline_events=timeline_events,
             bundled_aggregations=bundled_aggregations,
-            # TODO: Dummy value
-            is_dm=False,
             stripped_state=stripped_state,
             prev_batch=prev_batch_token,
             limited=limited,
+            num_live=num_live,
             # TODO: Dummy values
             joined_count=0,
             invited_count=0,
@@ -1039,5 +1366,4 @@ class SlidingSyncHandler:
             # (encrypted rooms).
             notification_count=0,
             highlight_count=0,
-            num_live=num_live,
         )