diff options
author | Eric Eastwood <eric.eastwood@beta.gouv.fr> | 2024-07-15 04:37:10 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-07-15 10:37:10 +0100 |
commit | ab62aa09da4a3c4444d80a9d3a899c685f1bb798 (patch) | |
tree | 22e3c7044d52d9ba2efdf00fb87d470c00c0d4b8 /synapse | |
parent | Add `is_dm` room field to Sliding Sync `/sync` (#17429) (diff) | |
download | synapse-ab62aa09da4a3c4444d80a9d3a899c685f1bb798.tar.xz |
Add room subscriptions to Sliding Sync `/sync` (#17432)
Add room subscriptions to Sliding Sync `/sync` Based on [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575): Sliding Sync Currently, you can only subscribe to rooms you have had *any* membership in before. In the future, we will allow `world_readable` rooms to be subscribed to without joining.
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/handlers/sliding_sync.py | 403 |
1 files changed, 288 insertions, 115 deletions
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 904787ced3..be98b379eb 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -62,32 +62,79 @@ DEFAULT_BUMP_EVENT_TYPES = { } +@attr.s(slots=True, frozen=True, auto_attribs=True) +class _RoomMembershipForUser: + """ + Attributes: + room_id: The room ID of the membership event + event_id: The event ID of the membership event + event_pos: The stream position of the membership event + membership: The membership state of the user in the room + sender: The person who sent the membership event + newly_joined: Whether the user newly joined the room during the given token + range and is still joined to the room at the end of this range. + newly_left: Whether the user newly left (or kicked) the room during the given + token range and is still "leave" at the end of this range. + is_dm: Whether this user considers this room as a direct-message (DM) room + """ + + room_id: str + # Optional because state resets can affect room membership without a corresponding event. + event_id: Optional[str] + # Even during a state reset which removes the user from the room, we expect this to + # be set because `current_state_delta_stream` will note the position that the reset + # happened. + event_pos: PersistedEventPosition + # Even during a state reset which removes the user from the room, we expect this to + # be set to `LEAVE` because we can make that assumption based on the situaton (see + # `get_current_state_delta_membership_changes_for_user(...)`) + membership: str + # Optional because state resets can affect room membership without a corresponding event. + sender: Optional[str] + newly_joined: bool + newly_left: bool + is_dm: bool + + def copy_and_replace(self, **kwds: Any) -> "_RoomMembershipForUser": + return attr.evolve(self, **kwds) + + def filter_membership_for_sync( - *, membership: str, user_id: str, sender: Optional[str] + *, user_id: str, room_membership_for_user: _RoomMembershipForUser ) -> bool: """ Returns True if the membership event should be included in the sync response, otherwise False. Attributes: - membership: The membership state of the user in the room. user_id: The user ID that the membership applies to - sender: The person who sent the membership event + room_membership_for_user: Membership information for the user in the room """ - # Everything except `Membership.LEAVE` because we want everything that's *still* - # relevant to the user. There are few more things to include in the sync response - # (newly_left) but those are handled separately. + membership = room_membership_for_user.membership + sender = room_membership_for_user.sender + newly_left = room_membership_for_user.newly_left + + # We want to allow everything except rooms the user has left unless `newly_left` + # because we want everything that's *still* relevant to the user. We include + # `newly_left` rooms because the last event that the user should see is their own + # leave event. # - # This logic includes kicks (leave events where the sender is not the same user) and - # can be read as "anything that isn't a leave or a leave with a different sender". + # A leave != kick. This logic includes kicks (leave events where the sender is not + # the same user). # - # When `sender=None` and `membership=Membership.LEAVE`, it means that a state reset - # happened that removed the user from the room, or the user was the last person - # locally to leave the room which caused the server to leave the room. In both - # cases, we can just remove the rooms since they are no longer relevant to the user. - # They could still be added back later if they are `newly_left`. - return membership != Membership.LEAVE or sender not in (user_id, None) + # When `sender=None`, it means that a state reset happened that removed the user + # from the room without a corresponding leave event. We can just remove the rooms + # since they are no longer relevant to the user but will still appear if they are + # `newly_left`. + return ( + # Anything except leave events + membership != Membership.LEAVE + # Unless... + or newly_left + # Allow kicks + or (membership == Membership.LEAVE and sender not in (user_id, None)) + ) # We can't freeze this class because we want to update it in place with the @@ -281,31 +328,6 @@ class StateValues: LAZY: Final = "$LAZY" -@attr.s(slots=True, frozen=True, auto_attribs=True) -class _RoomMembershipForUser: - """ - Attributes: - event_id: The event ID of the membership event - event_pos: The stream position of the membership event - membership: The membership state of the user in the room - sender: The person who sent the membership event - newly_joined: Whether the user newly joined the room during the given token - range - is_dm: Whether this user considers this room as a direct-message (DM) room - """ - - room_id: str - event_id: Optional[str] - event_pos: PersistedEventPosition - membership: str - sender: Optional[str] - newly_joined: bool - is_dm: bool - - def copy_and_replace(self, **kwds: Any) -> "_RoomMembershipForUser": - return attr.evolve(self, **kwds) - - class SlidingSyncHandler: def __init__(self, hs: "HomeServer"): self.clock = hs.get_clock() @@ -424,18 +446,31 @@ class SlidingSyncHandler: # See https://github.com/matrix-org/matrix-doc/issues/1144 raise NotImplementedError() + # Get all of the room IDs that the user should be able to see in the sync + # response + has_lists = sync_config.lists is not None and len(sync_config.lists) > 0 + has_room_subscriptions = ( + sync_config.room_subscriptions is not None + and len(sync_config.room_subscriptions) > 0 + ) + if has_lists or has_room_subscriptions: + room_membership_for_user_map = ( + await self.get_room_membership_for_user_at_to_token( + user=sync_config.user, + to_token=to_token, + from_token=from_token, + ) + ) + # Assemble sliding window lists lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {} # Keep track of the rooms that we're going to display and need to fetch more # info about relevant_room_map: Dict[str, RoomSyncConfig] = {} - if sync_config.lists: - # Get all of the room IDs that the user should be able to see in the sync - # response - sync_room_map = await self.get_sync_room_ids_for_user( - sync_config.user, - from_token=from_token, - to_token=to_token, + if has_lists and sync_config.lists is not None: + sync_room_map = await self.filter_rooms_relevant_for_sync( + user=sync_config.user, + room_membership_for_user_map=room_membership_for_user_map, ) for list_key, list_config in sync_config.lists.items(): @@ -524,7 +559,35 @@ class SlidingSyncHandler: ops=ops, ) - # TODO: if (sync_config.room_subscriptions): + # Handle room subscriptions + if has_room_subscriptions and sync_config.room_subscriptions is not None: + for room_id, room_subscription in sync_config.room_subscriptions.items(): + room_membership_for_user_at_to_token = ( + await self.check_room_subscription_allowed_for_user( + room_id=room_id, + room_membership_for_user_map=room_membership_for_user_map, + to_token=to_token, + ) + ) + + # Skip this room if the user isn't allowed to see it + if not room_membership_for_user_at_to_token: + continue + + room_membership_for_user_map[room_id] = ( + room_membership_for_user_at_to_token + ) + + # Take the superset of the `RoomSyncConfig` for each room. + # + # Update our `relevant_room_map` with the room we're going to display + # and need to fetch more info about. + room_sync_config = RoomSyncConfig.from_room_config(room_subscription) + existing_room_sync_config = relevant_room_map.get(room_id) + if existing_room_sync_config is not None: + existing_room_sync_config.combine_room_sync_config(room_sync_config) + else: + relevant_room_map[room_id] = room_sync_config # Fetch room data rooms: Dict[str, SlidingSyncResult.RoomResult] = {} @@ -533,7 +596,9 @@ class SlidingSyncHandler: user=sync_config.user, room_id=room_id, room_sync_config=room_sync_config, - room_membership_for_user_at_to_token=sync_room_map[room_id], + room_membership_for_user_at_to_token=room_membership_for_user_map[ + room_id + ], from_token=from_token, to_token=to_token, ) @@ -551,28 +616,23 @@ class SlidingSyncHandler: extensions=extensions, ) - async def get_sync_room_ids_for_user( + async def get_room_membership_for_user_at_to_token( self, user: UserID, to_token: StreamToken, - from_token: Optional[StreamToken] = None, + from_token: Optional[StreamToken], ) -> Dict[str, _RoomMembershipForUser]: """ - Fetch room IDs that should be listed for this user in the sync response (the - full room list that will be filtered, sorted, and sliced). + Fetch room IDs that the user has had membership in (the full room list including + long-lost left rooms that will be filtered, sorted, and sliced). - We're looking for rooms where the user has the following state in the token - range (> `from_token` and <= `to_token`): + We're looking for rooms where the user has had any sort of membership in the + token range (> `from_token` and <= `to_token`) - - `invite`, `join`, `knock`, `ban` membership events - - Kicks (`leave` membership events where `sender` is different from the - `user_id`/`state_key`) - - `newly_left` (rooms that were left during the given token range) - - In order for bans/kicks to not show up in sync, you need to `/forget` those - rooms. This doesn't modify the event itself though and only adds the - `forgotten` flag to the `room_memberships` table in Synapse. There isn't a way - to tell when a room was forgotten at the moment so we can't factor it into the - from/to range. + In order for bans/kicks to not show up, you need to `/forget` those rooms. This + doesn't modify the event itself though and only adds the `forgotten` flag to the + `room_memberships` table in Synapse. There isn't a way to tell when a room was + forgotten at the moment so we can't factor it into the token range. Args: user: User to fetch rooms for @@ -580,8 +640,8 @@ class SlidingSyncHandler: from_token: The point in the stream to sync from. Returns: - A dictionary of room IDs that should be listed in the sync response along - with membership information in that room at the time of `to_token`. + A dictionary of room IDs that the user has had membership in along with + membership information in that room at the time of `to_token`. """ user_id = user.to_string() @@ -592,9 +652,6 @@ class SlidingSyncHandler: # We want to fetch any kind of membership (joined and left rooms) in order # to get the `event_pos` of the latest room membership event for the # user. - # - # We will filter out the rooms that don't belong below (see - # `filter_membership_for_sync`) membership_list=Membership.LIST, excluded_rooms=self.rooms_to_exclude_globally, ) @@ -614,7 +671,9 @@ class SlidingSyncHandler: event_pos=room_for_user.event_pos, membership=room_for_user.membership, sender=room_for_user.sender, + # We will update these fields below to be accurate newly_joined=False, + newly_left=False, is_dm=False, ) for room_for_user in room_for_user_list @@ -653,12 +712,10 @@ class SlidingSyncHandler: # - 1a) Remove rooms that the user joined after the `to_token` # - 1b) Add back rooms that the user left after the `to_token` # - 1c) Update room membership events to the point in time of the `to_token` - # - 2) Add back newly_left rooms (> `from_token` and <= `to_token`) - # - 3) Figure out which rooms are `newly_joined` + # - 2) Figure out which rooms are `newly_left` rooms (> `from_token` and <= `to_token`) + # - 3) Figure out which rooms are `newly_joined` (> `from_token` and <= `to_token`) # - 4) Figure out which rooms are DM's - # 1) ----------------------------------------------------- - # 1) Fetch membership changes that fall in the range from `to_token` up to # `membership_snapshot_token` # @@ -717,7 +774,9 @@ class SlidingSyncHandler: event_pos=first_membership_change_after_to_token.prev_event_pos, membership=first_membership_change_after_to_token.prev_membership, sender=first_membership_change_after_to_token.prev_sender, + # We will update these fields below to be accurate newly_joined=False, + newly_left=False, is_dm=False, ) else: @@ -726,22 +785,6 @@ class SlidingSyncHandler: # exact membership state and shouldn't rely on the current snapshot. sync_room_id_set.pop(room_id, None) - # Filter the rooms that that we have updated room membership events to the point - # in time of the `to_token` (from the "1)" fixups) - filtered_sync_room_id_set = { - room_id: room_membership_for_user - for room_id, room_membership_for_user in sync_room_id_set.items() - if filter_membership_for_sync( - membership=room_membership_for_user.membership, - user_id=user_id, - sender=room_membership_for_user.sender, - ) - } - - # 2) ----------------------------------------------------- - # We fix-up newly_left rooms after the first fixup because it may have removed - # some left rooms that we can figure out are newly_left in the following code - # 2) Fetch membership changes that fall in the range from `from_token` up to `to_token` current_state_delta_membership_changes_in_from_to_range = [] if from_token: @@ -803,19 +846,40 @@ class SlidingSyncHandler: if last_membership_change_in_from_to_range.membership == Membership.JOIN: possibly_newly_joined_room_ids.add(room_id) - # 2) Add back newly_left rooms (> `from_token` and <= `to_token`). We - # include newly_left rooms because the last event that the user should see - # is their own leave event + # 2) Figure out newly_left rooms (> `from_token` and <= `to_token`). if last_membership_change_in_from_to_range.membership == Membership.LEAVE: - filtered_sync_room_id_set[room_id] = _RoomMembershipForUser( - room_id=room_id, - event_id=last_membership_change_in_from_to_range.event_id, - event_pos=last_membership_change_in_from_to_range.event_pos, - membership=last_membership_change_in_from_to_range.membership, - sender=last_membership_change_in_from_to_range.sender, - newly_joined=False, - is_dm=False, - ) + # 2) Mark this room as `newly_left` + + # If we're seeing a membership change here, we should expect to already + # have it in our snapshot but if a state reset happens, it wouldn't have + # shown up in our snapshot but appear as a change here. + existing_sync_entry = sync_room_id_set.get(room_id) + if existing_sync_entry is not None: + # Normal expected case + sync_room_id_set[room_id] = existing_sync_entry.copy_and_replace( + newly_left=True + ) + else: + # State reset! + logger.warn( + "State reset detected for room_id %s with %s who is no longer in the room", + room_id, + user_id, + ) + # Even though a state reset happened which removed the person from + # the room, we still add it the list so the user knows they left the + # room. Downstream code can check for a state reset by looking for + # `event_id=None and membership is not None`. + sync_room_id_set[room_id] = _RoomMembershipForUser( + room_id=room_id, + event_id=last_membership_change_in_from_to_range.event_id, + event_pos=last_membership_change_in_from_to_range.event_pos, + membership=last_membership_change_in_from_to_range.membership, + sender=last_membership_change_in_from_to_range.sender, + newly_joined=False, + newly_left=True, + is_dm=False, + ) # 3) Figure out `newly_joined` for room_id in possibly_newly_joined_room_ids: @@ -826,9 +890,9 @@ class SlidingSyncHandler: # also some non-join in the range, we know they `newly_joined`. if has_non_join_in_from_to_range: # We found a `newly_joined` room (we left and joined within the token range) - filtered_sync_room_id_set[room_id] = filtered_sync_room_id_set[ - room_id - ].copy_and_replace(newly_joined=True) + sync_room_id_set[room_id] = sync_room_id_set[room_id].copy_and_replace( + newly_joined=True + ) else: prev_event_id = first_membership_change_by_room_id_in_from_to_range[ room_id @@ -840,7 +904,7 @@ class SlidingSyncHandler: if prev_event_id is None: # We found a `newly_joined` room (we are joining the room for the # first time within the token range) - filtered_sync_room_id_set[room_id] = filtered_sync_room_id_set[ + sync_room_id_set[room_id] = sync_room_id_set[ room_id ].copy_and_replace(newly_joined=True) # Last resort, we need to step back to the previous membership event @@ -848,7 +912,7 @@ class SlidingSyncHandler: elif prev_membership != Membership.JOIN: # We found a `newly_joined` room (we left before the token range # and joined within the token range) - filtered_sync_room_id_set[room_id] = filtered_sync_room_id_set[ + sync_room_id_set[room_id] = sync_room_id_set[ room_id ].copy_and_replace(newly_joined=True) @@ -876,12 +940,122 @@ class SlidingSyncHandler: dm_room_id_set.add(room_id) # 4) Fixup - for room_id in filtered_sync_room_id_set: - filtered_sync_room_id_set[room_id] = filtered_sync_room_id_set[ - room_id - ].copy_and_replace(is_dm=room_id in dm_room_id_set) + for room_id in sync_room_id_set: + sync_room_id_set[room_id] = sync_room_id_set[room_id].copy_and_replace( + is_dm=room_id in dm_room_id_set + ) + + return sync_room_id_set + + async def filter_rooms_relevant_for_sync( + self, + user: UserID, + room_membership_for_user_map: Dict[str, _RoomMembershipForUser], + ) -> Dict[str, _RoomMembershipForUser]: + """ + Filter room IDs that should/can be listed for this user in the sync response (the + full room list that will be further filtered, sorted, and sliced). + + We're looking for rooms where the user has the following state in the token + range (> `from_token` and <= `to_token`): + + - `invite`, `join`, `knock`, `ban` membership events + - Kicks (`leave` membership events where `sender` is different from the + `user_id`/`state_key`) + - `newly_left` (rooms that were left during the given token range) + - In order for bans/kicks to not show up in sync, you need to `/forget` those + rooms. This doesn't modify the event itself though and only adds the + `forgotten` flag to the `room_memberships` table in Synapse. There isn't a way + to tell when a room was forgotten at the moment so we can't factor it into the + from/to range. + + Args: + user: User that is syncing + room_membership_for_user_map: Room membership for the user + + Returns: + A dictionary of room IDs that should be listed in the sync response along + with membership information in that room at the time of `to_token`. + """ + user_id = user.to_string() + + # Filter rooms to only what we're interested to sync with + filtered_sync_room_map = { + room_id: room_membership_for_user + for room_id, room_membership_for_user in room_membership_for_user_map.items() + if filter_membership_for_sync( + user_id=user_id, + room_membership_for_user=room_membership_for_user, + ) + } + + return filtered_sync_room_map + + async def check_room_subscription_allowed_for_user( + self, + room_id: str, + room_membership_for_user_map: Dict[str, _RoomMembershipForUser], + to_token: StreamToken, + ) -> Optional[_RoomMembershipForUser]: + """ + Check whether the user is allowed to see the room based on whether they have + ever had membership in the room or if the room is `world_readable`. - return filtered_sync_room_id_set + Similar to `check_user_in_room_or_world_readable(...)` + + Args: + room_id: Room to check + room_membership_for_user_map: Room membership for the user at the time of + the `to_token` (<= `to_token`). + to_token: The token to fetch rooms up to. + + Returns: + The room membership for the user if they are allowed to subscribe to the + room else `None`. + """ + + # We can first check if they are already allowed to see the room based + # on our previous work to assemble the `room_membership_for_user_map`. + # + # If they have had any membership in the room over time (up to the `to_token`), + # let them subscribe and see what they can. + existing_membership_for_user = room_membership_for_user_map.get(room_id) + if existing_membership_for_user is not None: + return existing_membership_for_user + + # TODO: Handle `world_readable` rooms + return None + + # If the room is `world_readable`, it doesn't matter whether they can join, + # everyone can see the room. + # not_in_room_membership_for_user = _RoomMembershipForUser( + # room_id=room_id, + # event_id=None, + # event_pos=None, + # membership=None, + # sender=None, + # newly_joined=False, + # newly_left=False, + # is_dm=False, + # ) + # room_state = await self.get_current_state_at( + # room_id=room_id, + # room_membership_for_user_at_to_token=not_in_room_membership_for_user, + # state_filter=StateFilter.from_types( + # [(EventTypes.RoomHistoryVisibility, "")] + # ), + # to_token=to_token, + # ) + + # visibility_event = room_state.get((EventTypes.RoomHistoryVisibility, "")) + # if ( + # visibility_event is not None + # and visibility_event.content.get("history_visibility") + # == HistoryVisibility.WORLD_READABLE + # ): + # return not_in_room_membership_for_user + + # return None async def filter_rooms( self, @@ -1081,7 +1255,6 @@ class SlidingSyncHandler: in the room at the time of `to_token`. to_token: The point in the stream to sync up to. """ - room_state_ids: StateMap[str] # People shouldn't see past their leave/ban event if room_membership_for_user_at_to_token.membership in ( @@ -1349,10 +1522,10 @@ class SlidingSyncHandler: stripped_state.append(strip_event(invite_or_knock_event)) # TODO: Handle state resets. For example, if we see - # `room_membership_for_user_at_to_token.membership = Membership.LEAVE` but - # `required_state` doesn't include it, we should indicate to the client that a - # state reset happened. Perhaps we should indicate this by setting `initial: - # True` and empty `required_state`. + # `room_membership_for_user_at_to_token.event_id=None and + # room_membership_for_user_at_to_token.membership is not None`, we should + # indicate to the client that a state reset happened. Perhaps we should indicate + # this by setting `initial: True` and empty `required_state`. # TODO: Since we can't determine whether we've already sent a room down this # Sliding Sync connection before (we plan to add this optimization in the |