diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 904787ced3..be98b379eb 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -62,32 +62,79 @@ DEFAULT_BUMP_EVENT_TYPES = {
}
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class _RoomMembershipForUser:
+ """
+ Attributes:
+ room_id: The room ID of the membership event
+ event_id: The event ID of the membership event
+ event_pos: The stream position of the membership event
+ membership: The membership state of the user in the room
+ sender: The person who sent the membership event
+ newly_joined: Whether the user newly joined the room during the given token
+ range and is still joined to the room at the end of this range.
+ newly_left: Whether the user newly left (or kicked) the room during the given
+ token range and is still "leave" at the end of this range.
+ is_dm: Whether this user considers this room as a direct-message (DM) room
+ """
+
+ room_id: str
+ # Optional because state resets can affect room membership without a corresponding event.
+ event_id: Optional[str]
+ # Even during a state reset which removes the user from the room, we expect this to
+ # be set because `current_state_delta_stream` will note the position that the reset
+ # happened.
+ event_pos: PersistedEventPosition
+ # Even during a state reset which removes the user from the room, we expect this to
+ # be set to `LEAVE` because we can make that assumption based on the situaton (see
+ # `get_current_state_delta_membership_changes_for_user(...)`)
+ membership: str
+ # Optional because state resets can affect room membership without a corresponding event.
+ sender: Optional[str]
+ newly_joined: bool
+ newly_left: bool
+ is_dm: bool
+
+ def copy_and_replace(self, **kwds: Any) -> "_RoomMembershipForUser":
+ return attr.evolve(self, **kwds)
+
+
def filter_membership_for_sync(
- *, membership: str, user_id: str, sender: Optional[str]
+ *, user_id: str, room_membership_for_user: _RoomMembershipForUser
) -> bool:
"""
Returns True if the membership event should be included in the sync response,
otherwise False.
Attributes:
- membership: The membership state of the user in the room.
user_id: The user ID that the membership applies to
- sender: The person who sent the membership event
+ room_membership_for_user: Membership information for the user in the room
"""
- # Everything except `Membership.LEAVE` because we want everything that's *still*
- # relevant to the user. There are few more things to include in the sync response
- # (newly_left) but those are handled separately.
+ membership = room_membership_for_user.membership
+ sender = room_membership_for_user.sender
+ newly_left = room_membership_for_user.newly_left
+
+ # We want to allow everything except rooms the user has left unless `newly_left`
+ # because we want everything that's *still* relevant to the user. We include
+ # `newly_left` rooms because the last event that the user should see is their own
+ # leave event.
#
- # This logic includes kicks (leave events where the sender is not the same user) and
- # can be read as "anything that isn't a leave or a leave with a different sender".
+ # A leave != kick. This logic includes kicks (leave events where the sender is not
+ # the same user).
#
- # When `sender=None` and `membership=Membership.LEAVE`, it means that a state reset
- # happened that removed the user from the room, or the user was the last person
- # locally to leave the room which caused the server to leave the room. In both
- # cases, we can just remove the rooms since they are no longer relevant to the user.
- # They could still be added back later if they are `newly_left`.
- return membership != Membership.LEAVE or sender not in (user_id, None)
+ # When `sender=None`, it means that a state reset happened that removed the user
+ # from the room without a corresponding leave event. We can just remove the rooms
+ # since they are no longer relevant to the user but will still appear if they are
+ # `newly_left`.
+ return (
+ # Anything except leave events
+ membership != Membership.LEAVE
+ # Unless...
+ or newly_left
+ # Allow kicks
+ or (membership == Membership.LEAVE and sender not in (user_id, None))
+ )
# We can't freeze this class because we want to update it in place with the
@@ -281,31 +328,6 @@ class StateValues:
LAZY: Final = "$LAZY"
-@attr.s(slots=True, frozen=True, auto_attribs=True)
-class _RoomMembershipForUser:
- """
- Attributes:
- event_id: The event ID of the membership event
- event_pos: The stream position of the membership event
- membership: The membership state of the user in the room
- sender: The person who sent the membership event
- newly_joined: Whether the user newly joined the room during the given token
- range
- is_dm: Whether this user considers this room as a direct-message (DM) room
- """
-
- room_id: str
- event_id: Optional[str]
- event_pos: PersistedEventPosition
- membership: str
- sender: Optional[str]
- newly_joined: bool
- is_dm: bool
-
- def copy_and_replace(self, **kwds: Any) -> "_RoomMembershipForUser":
- return attr.evolve(self, **kwds)
-
-
class SlidingSyncHandler:
def __init__(self, hs: "HomeServer"):
self.clock = hs.get_clock()
@@ -424,18 +446,31 @@ class SlidingSyncHandler:
# See https://github.com/matrix-org/matrix-doc/issues/1144
raise NotImplementedError()
+ # Get all of the room IDs that the user should be able to see in the sync
+ # response
+ has_lists = sync_config.lists is not None and len(sync_config.lists) > 0
+ has_room_subscriptions = (
+ sync_config.room_subscriptions is not None
+ and len(sync_config.room_subscriptions) > 0
+ )
+ if has_lists or has_room_subscriptions:
+ room_membership_for_user_map = (
+ await self.get_room_membership_for_user_at_to_token(
+ user=sync_config.user,
+ to_token=to_token,
+ from_token=from_token,
+ )
+ )
+
# Assemble sliding window lists
lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {}
# Keep track of the rooms that we're going to display and need to fetch more
# info about
relevant_room_map: Dict[str, RoomSyncConfig] = {}
- if sync_config.lists:
- # Get all of the room IDs that the user should be able to see in the sync
- # response
- sync_room_map = await self.get_sync_room_ids_for_user(
- sync_config.user,
- from_token=from_token,
- to_token=to_token,
+ if has_lists and sync_config.lists is not None:
+ sync_room_map = await self.filter_rooms_relevant_for_sync(
+ user=sync_config.user,
+ room_membership_for_user_map=room_membership_for_user_map,
)
for list_key, list_config in sync_config.lists.items():
@@ -524,7 +559,35 @@ class SlidingSyncHandler:
ops=ops,
)
- # TODO: if (sync_config.room_subscriptions):
+ # Handle room subscriptions
+ if has_room_subscriptions and sync_config.room_subscriptions is not None:
+ for room_id, room_subscription in sync_config.room_subscriptions.items():
+ room_membership_for_user_at_to_token = (
+ await self.check_room_subscription_allowed_for_user(
+ room_id=room_id,
+ room_membership_for_user_map=room_membership_for_user_map,
+ to_token=to_token,
+ )
+ )
+
+ # Skip this room if the user isn't allowed to see it
+ if not room_membership_for_user_at_to_token:
+ continue
+
+ room_membership_for_user_map[room_id] = (
+ room_membership_for_user_at_to_token
+ )
+
+ # Take the superset of the `RoomSyncConfig` for each room.
+ #
+ # Update our `relevant_room_map` with the room we're going to display
+ # and need to fetch more info about.
+ room_sync_config = RoomSyncConfig.from_room_config(room_subscription)
+ existing_room_sync_config = relevant_room_map.get(room_id)
+ if existing_room_sync_config is not None:
+ existing_room_sync_config.combine_room_sync_config(room_sync_config)
+ else:
+ relevant_room_map[room_id] = room_sync_config
# Fetch room data
rooms: Dict[str, SlidingSyncResult.RoomResult] = {}
@@ -533,7 +596,9 @@ class SlidingSyncHandler:
user=sync_config.user,
room_id=room_id,
room_sync_config=room_sync_config,
- room_membership_for_user_at_to_token=sync_room_map[room_id],
+ room_membership_for_user_at_to_token=room_membership_for_user_map[
+ room_id
+ ],
from_token=from_token,
to_token=to_token,
)
@@ -551,28 +616,23 @@ class SlidingSyncHandler:
extensions=extensions,
)
- async def get_sync_room_ids_for_user(
+ async def get_room_membership_for_user_at_to_token(
self,
user: UserID,
to_token: StreamToken,
- from_token: Optional[StreamToken] = None,
+ from_token: Optional[StreamToken],
) -> Dict[str, _RoomMembershipForUser]:
"""
- Fetch room IDs that should be listed for this user in the sync response (the
- full room list that will be filtered, sorted, and sliced).
+ Fetch room IDs that the user has had membership in (the full room list including
+ long-lost left rooms that will be filtered, sorted, and sliced).
- We're looking for rooms where the user has the following state in the token
- range (> `from_token` and <= `to_token`):
+ We're looking for rooms where the user has had any sort of membership in the
+ token range (> `from_token` and <= `to_token`)
- - `invite`, `join`, `knock`, `ban` membership events
- - Kicks (`leave` membership events where `sender` is different from the
- `user_id`/`state_key`)
- - `newly_left` (rooms that were left during the given token range)
- - In order for bans/kicks to not show up in sync, you need to `/forget` those
- rooms. This doesn't modify the event itself though and only adds the
- `forgotten` flag to the `room_memberships` table in Synapse. There isn't a way
- to tell when a room was forgotten at the moment so we can't factor it into the
- from/to range.
+ In order for bans/kicks to not show up, you need to `/forget` those rooms. This
+ doesn't modify the event itself though and only adds the `forgotten` flag to the
+ `room_memberships` table in Synapse. There isn't a way to tell when a room was
+ forgotten at the moment so we can't factor it into the token range.
Args:
user: User to fetch rooms for
@@ -580,8 +640,8 @@ class SlidingSyncHandler:
from_token: The point in the stream to sync from.
Returns:
- A dictionary of room IDs that should be listed in the sync response along
- with membership information in that room at the time of `to_token`.
+ A dictionary of room IDs that the user has had membership in along with
+ membership information in that room at the time of `to_token`.
"""
user_id = user.to_string()
@@ -592,9 +652,6 @@ class SlidingSyncHandler:
# We want to fetch any kind of membership (joined and left rooms) in order
# to get the `event_pos` of the latest room membership event for the
# user.
- #
- # We will filter out the rooms that don't belong below (see
- # `filter_membership_for_sync`)
membership_list=Membership.LIST,
excluded_rooms=self.rooms_to_exclude_globally,
)
@@ -614,7 +671,9 @@ class SlidingSyncHandler:
event_pos=room_for_user.event_pos,
membership=room_for_user.membership,
sender=room_for_user.sender,
+ # We will update these fields below to be accurate
newly_joined=False,
+ newly_left=False,
is_dm=False,
)
for room_for_user in room_for_user_list
@@ -653,12 +712,10 @@ class SlidingSyncHandler:
# - 1a) Remove rooms that the user joined after the `to_token`
# - 1b) Add back rooms that the user left after the `to_token`
# - 1c) Update room membership events to the point in time of the `to_token`
- # - 2) Add back newly_left rooms (> `from_token` and <= `to_token`)
- # - 3) Figure out which rooms are `newly_joined`
+ # - 2) Figure out which rooms are `newly_left` rooms (> `from_token` and <= `to_token`)
+ # - 3) Figure out which rooms are `newly_joined` (> `from_token` and <= `to_token`)
# - 4) Figure out which rooms are DM's
- # 1) -----------------------------------------------------
-
# 1) Fetch membership changes that fall in the range from `to_token` up to
# `membership_snapshot_token`
#
@@ -717,7 +774,9 @@ class SlidingSyncHandler:
event_pos=first_membership_change_after_to_token.prev_event_pos,
membership=first_membership_change_after_to_token.prev_membership,
sender=first_membership_change_after_to_token.prev_sender,
+ # We will update these fields below to be accurate
newly_joined=False,
+ newly_left=False,
is_dm=False,
)
else:
@@ -726,22 +785,6 @@ class SlidingSyncHandler:
# exact membership state and shouldn't rely on the current snapshot.
sync_room_id_set.pop(room_id, None)
- # Filter the rooms that that we have updated room membership events to the point
- # in time of the `to_token` (from the "1)" fixups)
- filtered_sync_room_id_set = {
- room_id: room_membership_for_user
- for room_id, room_membership_for_user in sync_room_id_set.items()
- if filter_membership_for_sync(
- membership=room_membership_for_user.membership,
- user_id=user_id,
- sender=room_membership_for_user.sender,
- )
- }
-
- # 2) -----------------------------------------------------
- # We fix-up newly_left rooms after the first fixup because it may have removed
- # some left rooms that we can figure out are newly_left in the following code
-
# 2) Fetch membership changes that fall in the range from `from_token` up to `to_token`
current_state_delta_membership_changes_in_from_to_range = []
if from_token:
@@ -803,19 +846,40 @@ class SlidingSyncHandler:
if last_membership_change_in_from_to_range.membership == Membership.JOIN:
possibly_newly_joined_room_ids.add(room_id)
- # 2) Add back newly_left rooms (> `from_token` and <= `to_token`). We
- # include newly_left rooms because the last event that the user should see
- # is their own leave event
+ # 2) Figure out newly_left rooms (> `from_token` and <= `to_token`).
if last_membership_change_in_from_to_range.membership == Membership.LEAVE:
- filtered_sync_room_id_set[room_id] = _RoomMembershipForUser(
- room_id=room_id,
- event_id=last_membership_change_in_from_to_range.event_id,
- event_pos=last_membership_change_in_from_to_range.event_pos,
- membership=last_membership_change_in_from_to_range.membership,
- sender=last_membership_change_in_from_to_range.sender,
- newly_joined=False,
- is_dm=False,
- )
+ # 2) Mark this room as `newly_left`
+
+ # If we're seeing a membership change here, we should expect to already
+ # have it in our snapshot but if a state reset happens, it wouldn't have
+ # shown up in our snapshot but appear as a change here.
+ existing_sync_entry = sync_room_id_set.get(room_id)
+ if existing_sync_entry is not None:
+ # Normal expected case
+ sync_room_id_set[room_id] = existing_sync_entry.copy_and_replace(
+ newly_left=True
+ )
+ else:
+ # State reset!
+ logger.warn(
+ "State reset detected for room_id %s with %s who is no longer in the room",
+ room_id,
+ user_id,
+ )
+ # Even though a state reset happened which removed the person from
+ # the room, we still add it the list so the user knows they left the
+ # room. Downstream code can check for a state reset by looking for
+ # `event_id=None and membership is not None`.
+ sync_room_id_set[room_id] = _RoomMembershipForUser(
+ room_id=room_id,
+ event_id=last_membership_change_in_from_to_range.event_id,
+ event_pos=last_membership_change_in_from_to_range.event_pos,
+ membership=last_membership_change_in_from_to_range.membership,
+ sender=last_membership_change_in_from_to_range.sender,
+ newly_joined=False,
+ newly_left=True,
+ is_dm=False,
+ )
# 3) Figure out `newly_joined`
for room_id in possibly_newly_joined_room_ids:
@@ -826,9 +890,9 @@ class SlidingSyncHandler:
# also some non-join in the range, we know they `newly_joined`.
if has_non_join_in_from_to_range:
# We found a `newly_joined` room (we left and joined within the token range)
- filtered_sync_room_id_set[room_id] = filtered_sync_room_id_set[
- room_id
- ].copy_and_replace(newly_joined=True)
+ sync_room_id_set[room_id] = sync_room_id_set[room_id].copy_and_replace(
+ newly_joined=True
+ )
else:
prev_event_id = first_membership_change_by_room_id_in_from_to_range[
room_id
@@ -840,7 +904,7 @@ class SlidingSyncHandler:
if prev_event_id is None:
# We found a `newly_joined` room (we are joining the room for the
# first time within the token range)
- filtered_sync_room_id_set[room_id] = filtered_sync_room_id_set[
+ sync_room_id_set[room_id] = sync_room_id_set[
room_id
].copy_and_replace(newly_joined=True)
# Last resort, we need to step back to the previous membership event
@@ -848,7 +912,7 @@ class SlidingSyncHandler:
elif prev_membership != Membership.JOIN:
# We found a `newly_joined` room (we left before the token range
# and joined within the token range)
- filtered_sync_room_id_set[room_id] = filtered_sync_room_id_set[
+ sync_room_id_set[room_id] = sync_room_id_set[
room_id
].copy_and_replace(newly_joined=True)
@@ -876,12 +940,122 @@ class SlidingSyncHandler:
dm_room_id_set.add(room_id)
# 4) Fixup
- for room_id in filtered_sync_room_id_set:
- filtered_sync_room_id_set[room_id] = filtered_sync_room_id_set[
- room_id
- ].copy_and_replace(is_dm=room_id in dm_room_id_set)
+ for room_id in sync_room_id_set:
+ sync_room_id_set[room_id] = sync_room_id_set[room_id].copy_and_replace(
+ is_dm=room_id in dm_room_id_set
+ )
+
+ return sync_room_id_set
+
+ async def filter_rooms_relevant_for_sync(
+ self,
+ user: UserID,
+ room_membership_for_user_map: Dict[str, _RoomMembershipForUser],
+ ) -> Dict[str, _RoomMembershipForUser]:
+ """
+ Filter room IDs that should/can be listed for this user in the sync response (the
+ full room list that will be further filtered, sorted, and sliced).
+
+ We're looking for rooms where the user has the following state in the token
+ range (> `from_token` and <= `to_token`):
+
+ - `invite`, `join`, `knock`, `ban` membership events
+ - Kicks (`leave` membership events where `sender` is different from the
+ `user_id`/`state_key`)
+ - `newly_left` (rooms that were left during the given token range)
+ - In order for bans/kicks to not show up in sync, you need to `/forget` those
+ rooms. This doesn't modify the event itself though and only adds the
+ `forgotten` flag to the `room_memberships` table in Synapse. There isn't a way
+ to tell when a room was forgotten at the moment so we can't factor it into the
+ from/to range.
+
+ Args:
+ user: User that is syncing
+ room_membership_for_user_map: Room membership for the user
+
+ Returns:
+ A dictionary of room IDs that should be listed in the sync response along
+ with membership information in that room at the time of `to_token`.
+ """
+ user_id = user.to_string()
+
+ # Filter rooms to only what we're interested to sync with
+ filtered_sync_room_map = {
+ room_id: room_membership_for_user
+ for room_id, room_membership_for_user in room_membership_for_user_map.items()
+ if filter_membership_for_sync(
+ user_id=user_id,
+ room_membership_for_user=room_membership_for_user,
+ )
+ }
+
+ return filtered_sync_room_map
+
+ async def check_room_subscription_allowed_for_user(
+ self,
+ room_id: str,
+ room_membership_for_user_map: Dict[str, _RoomMembershipForUser],
+ to_token: StreamToken,
+ ) -> Optional[_RoomMembershipForUser]:
+ """
+ Check whether the user is allowed to see the room based on whether they have
+ ever had membership in the room or if the room is `world_readable`.
- return filtered_sync_room_id_set
+ Similar to `check_user_in_room_or_world_readable(...)`
+
+ Args:
+ room_id: Room to check
+ room_membership_for_user_map: Room membership for the user at the time of
+ the `to_token` (<= `to_token`).
+ to_token: The token to fetch rooms up to.
+
+ Returns:
+ The room membership for the user if they are allowed to subscribe to the
+ room else `None`.
+ """
+
+ # We can first check if they are already allowed to see the room based
+ # on our previous work to assemble the `room_membership_for_user_map`.
+ #
+ # If they have had any membership in the room over time (up to the `to_token`),
+ # let them subscribe and see what they can.
+ existing_membership_for_user = room_membership_for_user_map.get(room_id)
+ if existing_membership_for_user is not None:
+ return existing_membership_for_user
+
+ # TODO: Handle `world_readable` rooms
+ return None
+
+ # If the room is `world_readable`, it doesn't matter whether they can join,
+ # everyone can see the room.
+ # not_in_room_membership_for_user = _RoomMembershipForUser(
+ # room_id=room_id,
+ # event_id=None,
+ # event_pos=None,
+ # membership=None,
+ # sender=None,
+ # newly_joined=False,
+ # newly_left=False,
+ # is_dm=False,
+ # )
+ # room_state = await self.get_current_state_at(
+ # room_id=room_id,
+ # room_membership_for_user_at_to_token=not_in_room_membership_for_user,
+ # state_filter=StateFilter.from_types(
+ # [(EventTypes.RoomHistoryVisibility, "")]
+ # ),
+ # to_token=to_token,
+ # )
+
+ # visibility_event = room_state.get((EventTypes.RoomHistoryVisibility, ""))
+ # if (
+ # visibility_event is not None
+ # and visibility_event.content.get("history_visibility")
+ # == HistoryVisibility.WORLD_READABLE
+ # ):
+ # return not_in_room_membership_for_user
+
+ # return None
async def filter_rooms(
self,
@@ -1081,7 +1255,6 @@ class SlidingSyncHandler:
in the room at the time of `to_token`.
to_token: The point in the stream to sync up to.
"""
-
room_state_ids: StateMap[str]
# People shouldn't see past their leave/ban event
if room_membership_for_user_at_to_token.membership in (
@@ -1349,10 +1522,10 @@ class SlidingSyncHandler:
stripped_state.append(strip_event(invite_or_knock_event))
# TODO: Handle state resets. For example, if we see
- # `room_membership_for_user_at_to_token.membership = Membership.LEAVE` but
- # `required_state` doesn't include it, we should indicate to the client that a
- # state reset happened. Perhaps we should indicate this by setting `initial:
- # True` and empty `required_state`.
+ # `room_membership_for_user_at_to_token.event_id=None and
+ # room_membership_for_user_at_to_token.membership is not None`, we should
+ # indicate to the client that a state reset happened. Perhaps we should indicate
+ # this by setting `initial: True` and empty `required_state`.
# TODO: Since we can't determine whether we've already sent a room down this
# Sliding Sync connection before (we plan to add this optimization in the
|