summary refs log tree commit diff
path: root/synapse/handlers/sliding_sync.py
diff options
context:
space:
mode:
authorEric Eastwood <eric.eastwood@beta.gouv.fr>2024-07-15 04:37:10 -0500
committerGitHub <noreply@github.com>2024-07-15 10:37:10 +0100
commitab62aa09da4a3c4444d80a9d3a899c685f1bb798 (patch)
tree22e3c7044d52d9ba2efdf00fb87d470c00c0d4b8 /synapse/handlers/sliding_sync.py
parentAdd `is_dm` room field to Sliding Sync `/sync` (#17429) (diff)
downloadsynapse-ab62aa09da4a3c4444d80a9d3a899c685f1bb798.tar.xz
Add room subscriptions to Sliding Sync `/sync` (#17432)
Add room subscriptions to Sliding Sync `/sync`

Based on
[MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575):
Sliding Sync

Currently, you can only subscribe to rooms you have had *any* membership
in before.

In the future, we will allow `world_readable` rooms to be subscribed to
without joining.
Diffstat (limited to 'synapse/handlers/sliding_sync.py')
-rw-r--r--synapse/handlers/sliding_sync.py403
1 files changed, 288 insertions, 115 deletions
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 904787ced3..be98b379eb 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -62,32 +62,79 @@ DEFAULT_BUMP_EVENT_TYPES = {
 }
 
 
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class _RoomMembershipForUser:
+    """
+    Attributes:
+        room_id: The room ID of the membership event
+        event_id: The event ID of the membership event
+        event_pos: The stream position of the membership event
+        membership: The membership state of the user in the room
+        sender: The person who sent the membership event
+        newly_joined: Whether the user newly joined the room during the given token
+            range and is still joined to the room at the end of this range.
+        newly_left: Whether the user newly left (or kicked) the room during the given
+            token range and is still "leave" at the end of this range.
+        is_dm: Whether this user considers this room as a direct-message (DM) room
+    """
+
+    room_id: str
+    # Optional because state resets can affect room membership without a corresponding event.
+    event_id: Optional[str]
+    # Even during a state reset which removes the user from the room, we expect this to
+    # be set because `current_state_delta_stream` will note the position that the reset
+    # happened.
+    event_pos: PersistedEventPosition
+    # Even during a state reset which removes the user from the room, we expect this to
+    # be set to `LEAVE` because we can make that assumption based on the situaton (see
+    # `get_current_state_delta_membership_changes_for_user(...)`)
+    membership: str
+    # Optional because state resets can affect room membership without a corresponding event.
+    sender: Optional[str]
+    newly_joined: bool
+    newly_left: bool
+    is_dm: bool
+
+    def copy_and_replace(self, **kwds: Any) -> "_RoomMembershipForUser":
+        return attr.evolve(self, **kwds)
+
+
 def filter_membership_for_sync(
-    *, membership: str, user_id: str, sender: Optional[str]
+    *, user_id: str, room_membership_for_user: _RoomMembershipForUser
 ) -> bool:
     """
     Returns True if the membership event should be included in the sync response,
     otherwise False.
 
     Attributes:
-        membership: The membership state of the user in the room.
         user_id: The user ID that the membership applies to
-        sender: The person who sent the membership event
+        room_membership_for_user: Membership information for the user in the room
     """
 
-    # Everything except `Membership.LEAVE` because we want everything that's *still*
-    # relevant to the user. There are few more things to include in the sync response
-    # (newly_left) but those are handled separately.
+    membership = room_membership_for_user.membership
+    sender = room_membership_for_user.sender
+    newly_left = room_membership_for_user.newly_left
+
+    # We want to allow everything except rooms the user has left unless `newly_left`
+    # because we want everything that's *still* relevant to the user. We include
+    # `newly_left` rooms because the last event that the user should see is their own
+    # leave event.
     #
-    # This logic includes kicks (leave events where the sender is not the same user) and
-    # can be read as "anything that isn't a leave or a leave with a different sender".
+    # A leave != kick. This logic includes kicks (leave events where the sender is not
+    # the same user).
     #
-    # When `sender=None` and `membership=Membership.LEAVE`, it means that a state reset
-    # happened that removed the user from the room, or the user was the last person
-    # locally to leave the room which caused the server to leave the room. In both
-    # cases, we can just remove the rooms since they are no longer relevant to the user.
-    # They could still be added back later if they are `newly_left`.
-    return membership != Membership.LEAVE or sender not in (user_id, None)
+    # When `sender=None`, it means that a state reset happened that removed the user
+    # from the room without a corresponding leave event. We can just remove the rooms
+    # since they are no longer relevant to the user but will still appear if they are
+    # `newly_left`.
+    return (
+        # Anything except leave events
+        membership != Membership.LEAVE
+        # Unless...
+        or newly_left
+        # Allow kicks
+        or (membership == Membership.LEAVE and sender not in (user_id, None))
+    )
 
 
 # We can't freeze this class because we want to update it in place with the
@@ -281,31 +328,6 @@ class StateValues:
     LAZY: Final = "$LAZY"
 
 
-@attr.s(slots=True, frozen=True, auto_attribs=True)
-class _RoomMembershipForUser:
-    """
-    Attributes:
-        event_id: The event ID of the membership event
-        event_pos: The stream position of the membership event
-        membership: The membership state of the user in the room
-        sender: The person who sent the membership event
-        newly_joined: Whether the user newly joined the room during the given token
-            range
-        is_dm: Whether this user considers this room as a direct-message (DM) room
-    """
-
-    room_id: str
-    event_id: Optional[str]
-    event_pos: PersistedEventPosition
-    membership: str
-    sender: Optional[str]
-    newly_joined: bool
-    is_dm: bool
-
-    def copy_and_replace(self, **kwds: Any) -> "_RoomMembershipForUser":
-        return attr.evolve(self, **kwds)
-
-
 class SlidingSyncHandler:
     def __init__(self, hs: "HomeServer"):
         self.clock = hs.get_clock()
@@ -424,18 +446,31 @@ class SlidingSyncHandler:
             # See https://github.com/matrix-org/matrix-doc/issues/1144
             raise NotImplementedError()
 
+        # Get all of the room IDs that the user should be able to see in the sync
+        # response
+        has_lists = sync_config.lists is not None and len(sync_config.lists) > 0
+        has_room_subscriptions = (
+            sync_config.room_subscriptions is not None
+            and len(sync_config.room_subscriptions) > 0
+        )
+        if has_lists or has_room_subscriptions:
+            room_membership_for_user_map = (
+                await self.get_room_membership_for_user_at_to_token(
+                    user=sync_config.user,
+                    to_token=to_token,
+                    from_token=from_token,
+                )
+            )
+
         # Assemble sliding window lists
         lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {}
         # Keep track of the rooms that we're going to display and need to fetch more
         # info about
         relevant_room_map: Dict[str, RoomSyncConfig] = {}
-        if sync_config.lists:
-            # Get all of the room IDs that the user should be able to see in the sync
-            # response
-            sync_room_map = await self.get_sync_room_ids_for_user(
-                sync_config.user,
-                from_token=from_token,
-                to_token=to_token,
+        if has_lists and sync_config.lists is not None:
+            sync_room_map = await self.filter_rooms_relevant_for_sync(
+                user=sync_config.user,
+                room_membership_for_user_map=room_membership_for_user_map,
             )
 
             for list_key, list_config in sync_config.lists.items():
@@ -524,7 +559,35 @@ class SlidingSyncHandler:
                     ops=ops,
                 )
 
-        # TODO: if (sync_config.room_subscriptions):
+        # Handle room subscriptions
+        if has_room_subscriptions and sync_config.room_subscriptions is not None:
+            for room_id, room_subscription in sync_config.room_subscriptions.items():
+                room_membership_for_user_at_to_token = (
+                    await self.check_room_subscription_allowed_for_user(
+                        room_id=room_id,
+                        room_membership_for_user_map=room_membership_for_user_map,
+                        to_token=to_token,
+                    )
+                )
+
+                # Skip this room if the user isn't allowed to see it
+                if not room_membership_for_user_at_to_token:
+                    continue
+
+                room_membership_for_user_map[room_id] = (
+                    room_membership_for_user_at_to_token
+                )
+
+                # Take the superset of the `RoomSyncConfig` for each room.
+                #
+                # Update our `relevant_room_map` with the room we're going to display
+                # and need to fetch more info about.
+                room_sync_config = RoomSyncConfig.from_room_config(room_subscription)
+                existing_room_sync_config = relevant_room_map.get(room_id)
+                if existing_room_sync_config is not None:
+                    existing_room_sync_config.combine_room_sync_config(room_sync_config)
+                else:
+                    relevant_room_map[room_id] = room_sync_config
 
         # Fetch room data
         rooms: Dict[str, SlidingSyncResult.RoomResult] = {}
@@ -533,7 +596,9 @@ class SlidingSyncHandler:
                 user=sync_config.user,
                 room_id=room_id,
                 room_sync_config=room_sync_config,
-                room_membership_for_user_at_to_token=sync_room_map[room_id],
+                room_membership_for_user_at_to_token=room_membership_for_user_map[
+                    room_id
+                ],
                 from_token=from_token,
                 to_token=to_token,
             )
@@ -551,28 +616,23 @@ class SlidingSyncHandler:
             extensions=extensions,
         )
 
-    async def get_sync_room_ids_for_user(
+    async def get_room_membership_for_user_at_to_token(
         self,
         user: UserID,
         to_token: StreamToken,
-        from_token: Optional[StreamToken] = None,
+        from_token: Optional[StreamToken],
     ) -> Dict[str, _RoomMembershipForUser]:
         """
-        Fetch room IDs that should be listed for this user in the sync response (the
-        full room list that will be filtered, sorted, and sliced).
+        Fetch room IDs that the user has had membership in (the full room list including
+        long-lost left rooms that will be filtered, sorted, and sliced).
 
-        We're looking for rooms where the user has the following state in the token
-        range (> `from_token` and <= `to_token`):
+        We're looking for rooms where the user has had any sort of membership in the
+        token range (> `from_token` and <= `to_token`)
 
-        - `invite`, `join`, `knock`, `ban` membership events
-        - Kicks (`leave` membership events where `sender` is different from the
-          `user_id`/`state_key`)
-        - `newly_left` (rooms that were left during the given token range)
-        - In order for bans/kicks to not show up in sync, you need to `/forget` those
-          rooms. This doesn't modify the event itself though and only adds the
-          `forgotten` flag to the `room_memberships` table in Synapse. There isn't a way
-          to tell when a room was forgotten at the moment so we can't factor it into the
-          from/to range.
+        In order for bans/kicks to not show up, you need to `/forget` those rooms. This
+        doesn't modify the event itself though and only adds the `forgotten` flag to the
+        `room_memberships` table in Synapse. There isn't a way to tell when a room was
+        forgotten at the moment so we can't factor it into the token range.
 
         Args:
             user: User to fetch rooms for
@@ -580,8 +640,8 @@ class SlidingSyncHandler:
             from_token: The point in the stream to sync from.
 
         Returns:
-            A dictionary of room IDs that should be listed in the sync response along
-            with membership information in that room at the time of `to_token`.
+            A dictionary of room IDs that the user has had membership in along with
+            membership information in that room at the time of `to_token`.
         """
         user_id = user.to_string()
 
@@ -592,9 +652,6 @@ class SlidingSyncHandler:
             # We want to fetch any kind of membership (joined and left rooms) in order
             # to get the `event_pos` of the latest room membership event for the
             # user.
-            #
-            # We will filter out the rooms that don't belong below (see
-            # `filter_membership_for_sync`)
             membership_list=Membership.LIST,
             excluded_rooms=self.rooms_to_exclude_globally,
         )
@@ -614,7 +671,9 @@ class SlidingSyncHandler:
                 event_pos=room_for_user.event_pos,
                 membership=room_for_user.membership,
                 sender=room_for_user.sender,
+                # We will update these fields below to be accurate
                 newly_joined=False,
+                newly_left=False,
                 is_dm=False,
             )
             for room_for_user in room_for_user_list
@@ -653,12 +712,10 @@ class SlidingSyncHandler:
         # - 1a) Remove rooms that the user joined after the `to_token`
         # - 1b) Add back rooms that the user left after the `to_token`
         # - 1c) Update room membership events to the point in time of the `to_token`
-        # - 2) Add back newly_left rooms (> `from_token` and <= `to_token`)
-        # - 3) Figure out which rooms are `newly_joined`
+        # - 2) Figure out which rooms are `newly_left` rooms (> `from_token` and <= `to_token`)
+        # - 3) Figure out which rooms are `newly_joined` (> `from_token` and <= `to_token`)
         # - 4) Figure out which rooms are DM's
 
-        # 1) -----------------------------------------------------
-
         # 1) Fetch membership changes that fall in the range from `to_token` up to
         # `membership_snapshot_token`
         #
@@ -717,7 +774,9 @@ class SlidingSyncHandler:
                         event_pos=first_membership_change_after_to_token.prev_event_pos,
                         membership=first_membership_change_after_to_token.prev_membership,
                         sender=first_membership_change_after_to_token.prev_sender,
+                        # We will update these fields below to be accurate
                         newly_joined=False,
+                        newly_left=False,
                         is_dm=False,
                     )
                 else:
@@ -726,22 +785,6 @@ class SlidingSyncHandler:
                     # exact membership state and shouldn't rely on the current snapshot.
                     sync_room_id_set.pop(room_id, None)
 
-        # Filter the rooms that that we have updated room membership events to the point
-        # in time of the `to_token` (from the "1)" fixups)
-        filtered_sync_room_id_set = {
-            room_id: room_membership_for_user
-            for room_id, room_membership_for_user in sync_room_id_set.items()
-            if filter_membership_for_sync(
-                membership=room_membership_for_user.membership,
-                user_id=user_id,
-                sender=room_membership_for_user.sender,
-            )
-        }
-
-        # 2) -----------------------------------------------------
-        # We fix-up newly_left rooms after the first fixup because it may have removed
-        # some left rooms that we can figure out are newly_left in the following code
-
         # 2) Fetch membership changes that fall in the range from `from_token` up to `to_token`
         current_state_delta_membership_changes_in_from_to_range = []
         if from_token:
@@ -803,19 +846,40 @@ class SlidingSyncHandler:
             if last_membership_change_in_from_to_range.membership == Membership.JOIN:
                 possibly_newly_joined_room_ids.add(room_id)
 
-            # 2) Add back newly_left rooms (> `from_token` and <= `to_token`). We
-            # include newly_left rooms because the last event that the user should see
-            # is their own leave event
+            # 2) Figure out newly_left rooms (> `from_token` and <= `to_token`).
             if last_membership_change_in_from_to_range.membership == Membership.LEAVE:
-                filtered_sync_room_id_set[room_id] = _RoomMembershipForUser(
-                    room_id=room_id,
-                    event_id=last_membership_change_in_from_to_range.event_id,
-                    event_pos=last_membership_change_in_from_to_range.event_pos,
-                    membership=last_membership_change_in_from_to_range.membership,
-                    sender=last_membership_change_in_from_to_range.sender,
-                    newly_joined=False,
-                    is_dm=False,
-                )
+                # 2) Mark this room as `newly_left`
+
+                # If we're seeing a membership change here, we should expect to already
+                # have it in our snapshot but if a state reset happens, it wouldn't have
+                # shown up in our snapshot but appear as a change here.
+                existing_sync_entry = sync_room_id_set.get(room_id)
+                if existing_sync_entry is not None:
+                    # Normal expected case
+                    sync_room_id_set[room_id] = existing_sync_entry.copy_and_replace(
+                        newly_left=True
+                    )
+                else:
+                    # State reset!
+                    logger.warn(
+                        "State reset detected for room_id %s with %s who is no longer in the room",
+                        room_id,
+                        user_id,
+                    )
+                    # Even though a state reset happened which removed the person from
+                    # the room, we still add it the list so the user knows they left the
+                    # room. Downstream code can check for a state reset by looking for
+                    # `event_id=None and membership is not None`.
+                    sync_room_id_set[room_id] = _RoomMembershipForUser(
+                        room_id=room_id,
+                        event_id=last_membership_change_in_from_to_range.event_id,
+                        event_pos=last_membership_change_in_from_to_range.event_pos,
+                        membership=last_membership_change_in_from_to_range.membership,
+                        sender=last_membership_change_in_from_to_range.sender,
+                        newly_joined=False,
+                        newly_left=True,
+                        is_dm=False,
+                    )
 
         # 3) Figure out `newly_joined`
         for room_id in possibly_newly_joined_room_ids:
@@ -826,9 +890,9 @@ class SlidingSyncHandler:
             # also some non-join in the range, we know they `newly_joined`.
             if has_non_join_in_from_to_range:
                 # We found a `newly_joined` room (we left and joined within the token range)
-                filtered_sync_room_id_set[room_id] = filtered_sync_room_id_set[
-                    room_id
-                ].copy_and_replace(newly_joined=True)
+                sync_room_id_set[room_id] = sync_room_id_set[room_id].copy_and_replace(
+                    newly_joined=True
+                )
             else:
                 prev_event_id = first_membership_change_by_room_id_in_from_to_range[
                     room_id
@@ -840,7 +904,7 @@ class SlidingSyncHandler:
                 if prev_event_id is None:
                     # We found a `newly_joined` room (we are joining the room for the
                     # first time within the token range)
-                    filtered_sync_room_id_set[room_id] = filtered_sync_room_id_set[
+                    sync_room_id_set[room_id] = sync_room_id_set[
                         room_id
                     ].copy_and_replace(newly_joined=True)
                 # Last resort, we need to step back to the previous membership event
@@ -848,7 +912,7 @@ class SlidingSyncHandler:
                 elif prev_membership != Membership.JOIN:
                     # We found a `newly_joined` room (we left before the token range
                     # and joined within the token range)
-                    filtered_sync_room_id_set[room_id] = filtered_sync_room_id_set[
+                    sync_room_id_set[room_id] = sync_room_id_set[
                         room_id
                     ].copy_and_replace(newly_joined=True)
 
@@ -876,12 +940,122 @@ class SlidingSyncHandler:
                             dm_room_id_set.add(room_id)
 
         # 4) Fixup
-        for room_id in filtered_sync_room_id_set:
-            filtered_sync_room_id_set[room_id] = filtered_sync_room_id_set[
-                room_id
-            ].copy_and_replace(is_dm=room_id in dm_room_id_set)
+        for room_id in sync_room_id_set:
+            sync_room_id_set[room_id] = sync_room_id_set[room_id].copy_and_replace(
+                is_dm=room_id in dm_room_id_set
+            )
+
+        return sync_room_id_set
+
+    async def filter_rooms_relevant_for_sync(
+        self,
+        user: UserID,
+        room_membership_for_user_map: Dict[str, _RoomMembershipForUser],
+    ) -> Dict[str, _RoomMembershipForUser]:
+        """
+        Filter room IDs that should/can be listed for this user in the sync response (the
+        full room list that will be further filtered, sorted, and sliced).
+
+        We're looking for rooms where the user has the following state in the token
+        range (> `from_token` and <= `to_token`):
+
+        - `invite`, `join`, `knock`, `ban` membership events
+        - Kicks (`leave` membership events where `sender` is different from the
+          `user_id`/`state_key`)
+        - `newly_left` (rooms that were left during the given token range)
+        - In order for bans/kicks to not show up in sync, you need to `/forget` those
+          rooms. This doesn't modify the event itself though and only adds the
+          `forgotten` flag to the `room_memberships` table in Synapse. There isn't a way
+          to tell when a room was forgotten at the moment so we can't factor it into the
+          from/to range.
+
+        Args:
+            user: User that is syncing
+            room_membership_for_user_map: Room membership for the user
+
+        Returns:
+            A dictionary of room IDs that should be listed in the sync response along
+            with membership information in that room at the time of `to_token`.
+        """
+        user_id = user.to_string()
+
+        # Filter rooms to only what we're interested to sync with
+        filtered_sync_room_map = {
+            room_id: room_membership_for_user
+            for room_id, room_membership_for_user in room_membership_for_user_map.items()
+            if filter_membership_for_sync(
+                user_id=user_id,
+                room_membership_for_user=room_membership_for_user,
+            )
+        }
+
+        return filtered_sync_room_map
+
+    async def check_room_subscription_allowed_for_user(
+        self,
+        room_id: str,
+        room_membership_for_user_map: Dict[str, _RoomMembershipForUser],
+        to_token: StreamToken,
+    ) -> Optional[_RoomMembershipForUser]:
+        """
+        Check whether the user is allowed to see the room based on whether they have
+        ever had membership in the room or if the room is `world_readable`.
 
-        return filtered_sync_room_id_set
+        Similar to `check_user_in_room_or_world_readable(...)`
+
+        Args:
+            room_id: Room to check
+            room_membership_for_user_map: Room membership for the user at the time of
+                the `to_token` (<= `to_token`).
+            to_token: The token to fetch rooms up to.
+
+        Returns:
+            The room membership for the user if they are allowed to subscribe to the
+            room else `None`.
+        """
+
+        # We can first check if they are already allowed to see the room based
+        # on our previous work to assemble the `room_membership_for_user_map`.
+        #
+        # If they have had any membership in the room over time (up to the `to_token`),
+        # let them subscribe and see what they can.
+        existing_membership_for_user = room_membership_for_user_map.get(room_id)
+        if existing_membership_for_user is not None:
+            return existing_membership_for_user
+
+        # TODO: Handle `world_readable` rooms
+        return None
+
+        # If the room is `world_readable`, it doesn't matter whether they can join,
+        # everyone can see the room.
+        # not_in_room_membership_for_user = _RoomMembershipForUser(
+        #     room_id=room_id,
+        #     event_id=None,
+        #     event_pos=None,
+        #     membership=None,
+        #     sender=None,
+        #     newly_joined=False,
+        #     newly_left=False,
+        #     is_dm=False,
+        # )
+        # room_state = await self.get_current_state_at(
+        #     room_id=room_id,
+        #     room_membership_for_user_at_to_token=not_in_room_membership_for_user,
+        #     state_filter=StateFilter.from_types(
+        #         [(EventTypes.RoomHistoryVisibility, "")]
+        #     ),
+        #     to_token=to_token,
+        # )
+
+        # visibility_event = room_state.get((EventTypes.RoomHistoryVisibility, ""))
+        # if (
+        #     visibility_event is not None
+        #     and visibility_event.content.get("history_visibility")
+        #     == HistoryVisibility.WORLD_READABLE
+        # ):
+        #     return not_in_room_membership_for_user
+
+        # return None
 
     async def filter_rooms(
         self,
@@ -1081,7 +1255,6 @@ class SlidingSyncHandler:
                 in the room at the time of `to_token`.
             to_token: The point in the stream to sync up to.
         """
-
         room_state_ids: StateMap[str]
         # People shouldn't see past their leave/ban event
         if room_membership_for_user_at_to_token.membership in (
@@ -1349,10 +1522,10 @@ class SlidingSyncHandler:
             stripped_state.append(strip_event(invite_or_knock_event))
 
         # TODO: Handle state resets. For example, if we see
-        # `room_membership_for_user_at_to_token.membership = Membership.LEAVE` but
-        # `required_state` doesn't include it, we should indicate to the client that a
-        # state reset happened. Perhaps we should indicate this by setting `initial:
-        # True` and empty `required_state`.
+        # `room_membership_for_user_at_to_token.event_id=None and
+        # room_membership_for_user_at_to_token.membership is not None`, we should
+        # indicate to the client that a state reset happened. Perhaps we should indicate
+        # this by setting `initial: True` and empty `required_state`.
 
         # TODO: Since we can't determine whether we've already sent a room down this
         # Sliding Sync connection before (we plan to add this optimization in the