diff --git a/changelog.d/17629.misc b/changelog.d/17629.misc
new file mode 100644
index 0000000000..1eb46b2c68
--- /dev/null
+++ b/changelog.d/17629.misc
@@ -0,0 +1 @@
+Sliding Sync: Split up `get_room_membership_for_user_at_to_token`.
diff --git a/synapse/handlers/sliding_sync/room_lists.py b/synapse/handlers/sliding_sync/room_lists.py
index 0e6cb28524..f41180feda 100644
--- a/synapse/handlers/sliding_sync/room_lists.py
+++ b/synapse/handlers/sliding_sync/room_lists.py
@@ -25,6 +25,7 @@ from typing import (
Mapping,
Optional,
Set,
+ Tuple,
Union,
)
@@ -46,6 +47,7 @@ from synapse.storage.databases.main.state import (
Sentinel as StateSentinel,
)
from synapse.storage.databases.main.stream import CurrentStateDeltaMembership
+from synapse.storage.roommember import RoomsForUser
from synapse.types import (
MutableStateMap,
PersistedEventPosition,
@@ -425,68 +427,30 @@ class SlidingSyncRoomLists:
)
@trace
- async def get_room_membership_for_user_at_to_token(
+ async def _get_rewind_changes_to_current_membership_to_token(
self,
user: UserID,
+ rooms_for_user: Mapping[str, RoomsForUser],
to_token: StreamToken,
- from_token: Optional[StreamToken],
- ) -> Dict[str, _RoomMembershipForUser]:
+ ) -> Mapping[str, Optional[RoomsForUser]]:
"""
- Fetch room IDs that the user has had membership in (the full room list including
- long-lost left rooms that will be filtered, sorted, and sliced).
-
- We're looking for rooms where the user has had any sort of membership in the
- token range (> `from_token` and <= `to_token`)
-
- In order for bans/kicks to not show up, you need to `/forget` those rooms. This
- doesn't modify the event itself though and only adds the `forgotten` flag to the
- `room_memberships` table in Synapse. There isn't a way to tell when a room was
- forgotten at the moment so we can't factor it into the token range.
+ Takes the current set of rooms for a user (retrieved after the given
+ token), and returns the changes needed to "rewind" it to match the set of
+ memberships *at that token* (<= `to_token`).
Args:
user: User to fetch rooms for
- to_token: The token to fetch rooms up to.
- from_token: The point in the stream to sync from.
+ rooms_for_user: The set of rooms for the user after the `to_token`.
+ to_token: The token to rewind to
Returns:
- A dictionary of room IDs that the user has had membership in along with
- membership information in that room at the time of `to_token`.
+ The changes to apply to rewind the the current memberships.
"""
- user_id = user.to_string()
-
- # First grab a current snapshot rooms for the user
- # (also handles forgotten rooms)
- room_for_user_list = await self.store.get_rooms_for_local_user_where_membership_is(
- user_id=user_id,
- # We want to fetch any kind of membership (joined and left rooms) in order
- # to get the `event_pos` of the latest room membership event for the
- # user.
- membership_list=Membership.LIST,
- excluded_rooms=self.rooms_to_exclude_globally,
- )
-
# If the user has never joined any rooms before, we can just return an empty list
- if not room_for_user_list:
+ if not rooms_for_user:
return {}
- # Our working list of rooms that can show up in the sync response
- sync_room_id_set = {
- # Note: The `room_for_user` we're assigning here will need to be fixed up
- # (below) because they are potentially from the current snapshot time
- # instead from the time of the `to_token`.
- room_for_user.room_id: _RoomMembershipForUser(
- room_id=room_for_user.room_id,
- event_id=room_for_user.event_id,
- event_pos=room_for_user.event_pos,
- membership=room_for_user.membership,
- sender=room_for_user.sender,
- # We will update these fields below to be accurate
- newly_joined=False,
- newly_left=False,
- is_dm=False,
- )
- for room_for_user in room_for_user_list
- }
+ user_id = user.to_string()
# Get the `RoomStreamToken` that represents the spot we queried up to when we got
# our membership snapshot from `get_rooms_for_local_user_where_membership_is()`.
@@ -494,7 +458,7 @@ class SlidingSyncRoomLists:
# First, we need to get the max stream_ordering of each event persister instance
# that we queried events from.
instance_to_max_stream_ordering_map: Dict[str, int] = {}
- for room_for_user in room_for_user_list:
+ for room_for_user in rooms_for_user.values():
instance_name = room_for_user.event_pos.instance_name
stream_ordering = room_for_user.event_pos.stream
@@ -521,18 +485,14 @@ class SlidingSyncRoomLists:
),
)
- # Since we fetched the users room list at some point in time after the from/to
+ # Since we fetched the users room list at some point in time after the
# tokens, we need to revert/rewind some membership changes to match the point in
# time of the `to_token`. In particular, we need to make these fixups:
#
- # - 1a) Remove rooms that the user joined after the `to_token`
- # - 1b) Add back rooms that the user left after the `to_token`
- # - 1c) Update room membership events to the point in time of the `to_token`
- # - 2) Figure out which rooms are `newly_left` rooms (> `from_token` and <= `to_token`)
- # - 3) Figure out which rooms are `newly_joined` (> `from_token` and <= `to_token`)
- # - 4) Figure out which rooms are DM's
-
- # 1) Fetch membership changes that fall in the range from `to_token` up to
+ # - a) Remove rooms that the user joined after the `to_token`
+ # - b) Update room membership events to the point in time of the `to_token`
+
+ # Fetch membership changes that fall in the range from `to_token` up to
# `membership_snapshot_token`
#
# If our `to_token` is already the same or ahead of the latest room membership
@@ -549,7 +509,15 @@ class SlidingSyncRoomLists:
)
)
- # 1) Assemble a list of the first membership event after the `to_token` so we can
+ if not current_state_delta_membership_changes_after_to_token:
+ # There have been no membership changes, so we can early return.
+ return {}
+
+ # Otherwise we're about to make changes to `rooms_for_user`, so we turn
+ # it into a mutable dict.
+ changes: Dict[str, Optional[RoomsForUser]] = {}
+
+ # Assemble a list of the first membership event after the `to_token` so we can
# step backward to the previous membership that would apply to the from/to
# range.
first_membership_change_by_room_id_after_to_token: Dict[
@@ -561,8 +529,6 @@ class SlidingSyncRoomLists:
membership_change.room_id, membership_change
)
- # 1) Fixup
- #
# Since we fetched a snapshot of the users room list at some point in time after
# the from/to tokens, we need to revert/rewind some membership changes to match
# the point in time of the `to_token`.
@@ -572,7 +538,7 @@ class SlidingSyncRoomLists:
) in first_membership_change_by_room_id_after_to_token.items():
# 1a) Remove rooms that the user joined after the `to_token`
if first_membership_change_after_to_token.prev_event_id is None:
- sync_room_id_set.pop(room_id, None)
+ changes[room_id] = None
# 1b) 1c) From the first membership event after the `to_token`, step backward to the
# previous membership that would apply to the from/to range.
else:
@@ -583,25 +549,161 @@ class SlidingSyncRoomLists:
first_membership_change_after_to_token.prev_event_pos is not None
and first_membership_change_after_to_token.prev_membership
is not None
+ and first_membership_change_after_to_token.prev_sender is not None
):
- sync_room_id_set[room_id] = _RoomMembershipForUser(
+ # We need to know the room version ID, which we normally we
+ # can get from the current membership, but if we don't have
+ # that then we need to query the DB.
+ current_membership = rooms_for_user.get(room_id)
+ if current_membership is not None:
+ room_version_id = current_membership.room_version_id
+ else:
+ room_version_id = await self.store.get_room_version_id(room_id)
+
+ changes[room_id] = RoomsForUser(
room_id=room_id,
event_id=first_membership_change_after_to_token.prev_event_id,
event_pos=first_membership_change_after_to_token.prev_event_pos,
membership=first_membership_change_after_to_token.prev_membership,
sender=first_membership_change_after_to_token.prev_sender,
- # We will update these fields below to be accurate
- newly_joined=False,
- newly_left=False,
- is_dm=False,
+ room_version_id=room_version_id,
)
else:
# If we can't find the previous membership event, we shouldn't
# include the room in the sync response since we can't determine the
# exact membership state and shouldn't rely on the current snapshot.
- sync_room_id_set.pop(room_id, None)
+ changes[room_id] = None
+
+ return changes
+
+ @trace
+ async def get_room_membership_for_user_at_to_token(
+ self,
+ user: UserID,
+ to_token: StreamToken,
+ from_token: Optional[StreamToken],
+ ) -> Dict[str, _RoomMembershipForUser]:
+ """
+ Fetch room IDs that the user has had membership in (the full room list including
+ long-lost left rooms that will be filtered, sorted, and sliced).
- # 2) Fetch membership changes that fall in the range from `from_token` up to `to_token`
+ We're looking for rooms where the user has had any sort of membership in the
+ token range (> `from_token` and <= `to_token`)
+
+ In order for bans/kicks to not show up, you need to `/forget` those rooms. This
+ doesn't modify the event itself though and only adds the `forgotten` flag to the
+ `room_memberships` table in Synapse. There isn't a way to tell when a room was
+ forgotten at the moment so we can't factor it into the token range.
+
+ Args:
+ user: User to fetch rooms for
+ to_token: The token to fetch rooms up to.
+ from_token: The point in the stream to sync from.
+
+ Returns:
+ A dictionary of room IDs that the user has had membership in along with
+ membership information in that room at the time of `to_token`.
+ """
+ user_id = user.to_string()
+
+ # First grab a current snapshot rooms for the user
+ # (also handles forgotten rooms)
+ room_for_user_list = await self.store.get_rooms_for_local_user_where_membership_is(
+ user_id=user_id,
+ # We want to fetch any kind of membership (joined and left rooms) in order
+ # to get the `event_pos` of the latest room membership event for the
+ # user.
+ membership_list=Membership.LIST,
+ excluded_rooms=self.rooms_to_exclude_globally,
+ )
+
+ # If the user has never joined any rooms before, we can just return an empty list
+ if not room_for_user_list:
+ return {}
+
+ # Since we fetched the users room list at some point in time after the
+ # tokens, we need to revert/rewind some membership changes to match the point in
+ # time of the `to_token`.
+ rooms_for_user = {room.room_id: room for room in room_for_user_list}
+ changes = await self._get_rewind_changes_to_current_membership_to_token(
+ user, rooms_for_user, to_token
+ )
+ for room_id, change_room_for_user in changes.items():
+ if change_room_for_user is None:
+ rooms_for_user.pop(room_id, None)
+ else:
+ rooms_for_user[room_id] = change_room_for_user
+
+ newly_joined_room_ids, newly_left_room_ids = (
+ await self._get_newly_joined_and_left_rooms(
+ user_id, to_token=to_token, from_token=from_token
+ )
+ )
+
+ dm_room_ids = await self._get_dm_rooms_for_user(user_id)
+
+ # Our working list of rooms that can show up in the sync response
+ sync_room_id_set = {
+ room_for_user.room_id: _RoomMembershipForUser(
+ room_id=room_for_user.room_id,
+ event_id=room_for_user.event_id,
+ event_pos=room_for_user.event_pos,
+ membership=room_for_user.membership,
+ sender=room_for_user.sender,
+ newly_joined=room_id in newly_joined_room_ids,
+ newly_left=room_id in newly_left_room_ids,
+ is_dm=room_id in dm_room_ids,
+ )
+ for room_id, room_for_user in rooms_for_user.items()
+ }
+
+ # Ensure we have entries for rooms that the user has been "state reset"
+ # out of. These are rooms appear in the `newly_left_rooms` map but
+ # aren't in the `rooms_for_user` map.
+ for room_id, left_event_pos in newly_left_room_ids.items():
+ if room_id in sync_room_id_set:
+ continue
+
+ sync_room_id_set[room_id] = _RoomMembershipForUser(
+ room_id=room_id,
+ event_id=None,
+ event_pos=left_event_pos,
+ membership=Membership.LEAVE,
+ sender=None,
+ newly_joined=False,
+ newly_left=True,
+ is_dm=room_id in dm_room_ids,
+ )
+
+ return sync_room_id_set
+
+ @trace
+ async def _get_newly_joined_and_left_rooms(
+ self,
+ user_id: str,
+ to_token: StreamToken,
+ from_token: Optional[StreamToken],
+ ) -> Tuple[StrCollection, Mapping[str, PersistedEventPosition]]:
+ """Fetch the sets of rooms that the user newly joined or left in the
+ given token range.
+
+ Note: there may be rooms in the newly left rooms where the user was
+ "state reset" out of the room, and so that room would not be part of the
+ "current memberships" of the user.
+
+ Returns:
+ A 2-tuple of newly joined room IDs and a map of newly left room
+ IDs to the event position the leave happened at.
+ """
+ newly_joined_room_ids: Set[str] = set()
+ newly_left_room_map: Dict[str, PersistedEventPosition] = {}
+
+ # We need to figure out the
+ #
+ # - 1) Figure out which rooms are `newly_left` rooms (> `from_token` and <= `to_token`)
+ # - 2) Figure out which rooms are `newly_joined` (> `from_token` and <= `to_token`)
+
+ # 1) Fetch membership changes that fall in the range from `from_token` up to `to_token`
current_state_delta_membership_changes_in_from_to_range = []
if from_token:
current_state_delta_membership_changes_in_from_to_range = (
@@ -613,7 +715,7 @@ class SlidingSyncRoomLists:
)
)
- # 2) Assemble a list of the last membership events in some given ranges. Someone
+ # 1) Assemble a list of the last membership events in some given ranges. Someone
# could have left and joined multiple times during the given range but we only
# care about end-result so we grab the last one.
last_membership_change_by_room_id_in_from_to_range: Dict[
@@ -646,9 +748,9 @@ class SlidingSyncRoomLists:
if membership_change.membership != Membership.JOIN:
has_non_join_event_by_room_id_in_from_to_range[room_id] = True
- # 2) Fixup
+ # 1) Fixup
#
- # 3) We also want to assemble a list of possibly newly joined rooms. Someone
+ # 2) We also want to assemble a list of possibly newly joined rooms. Someone
# could have left and joined multiple times during the given range but we only
# care about whether they are joined at the end of the token range so we are
# working with the last membership even in the token range.
@@ -658,46 +760,18 @@ class SlidingSyncRoomLists:
) in last_membership_change_by_room_id_in_from_to_range.values():
room_id = last_membership_change_in_from_to_range.room_id
- # 3)
+ # 2)
if last_membership_change_in_from_to_range.membership == Membership.JOIN:
possibly_newly_joined_room_ids.add(room_id)
- # 2) Figure out newly_left rooms (> `from_token` and <= `to_token`).
+ # 1) Figure out newly_left rooms (> `from_token` and <= `to_token`).
if last_membership_change_in_from_to_range.membership == Membership.LEAVE:
- # 2) Mark this room as `newly_left`
-
- # If we're seeing a membership change here, we should expect to already
- # have it in our snapshot but if a state reset happens, it wouldn't have
- # shown up in our snapshot but appear as a change here.
- existing_sync_entry = sync_room_id_set.get(room_id)
- if existing_sync_entry is not None:
- # Normal expected case
- sync_room_id_set[room_id] = existing_sync_entry.copy_and_replace(
- newly_left=True
- )
- else:
- # State reset!
- logger.warn(
- "State reset detected for room_id %s with %s who is no longer in the room",
- room_id,
- user_id,
- )
- # Even though a state reset happened which removed the person from
- # the room, we still add it the list so the user knows they left the
- # room. Downstream code can check for a state reset by looking for
- # `event_id=None and membership is not None`.
- sync_room_id_set[room_id] = _RoomMembershipForUser(
- room_id=room_id,
- event_id=last_membership_change_in_from_to_range.event_id,
- event_pos=last_membership_change_in_from_to_range.event_pos,
- membership=last_membership_change_in_from_to_range.membership,
- sender=last_membership_change_in_from_to_range.sender,
- newly_joined=False,
- newly_left=True,
- is_dm=False,
- )
+ # 1) Mark this room as `newly_left`
+ newly_left_room_map[room_id] = (
+ last_membership_change_in_from_to_range.event_pos
+ )
- # 3) Figure out `newly_joined`
+ # 2) Figure out `newly_joined`
for room_id in possibly_newly_joined_room_ids:
has_non_join_in_from_to_range = (
has_non_join_event_by_room_id_in_from_to_range.get(room_id, False)
@@ -706,9 +780,7 @@ class SlidingSyncRoomLists:
# also some non-join in the range, we know they `newly_joined`.
if has_non_join_in_from_to_range:
# We found a `newly_joined` room (we left and joined within the token range)
- sync_room_id_set[room_id] = sync_room_id_set[room_id].copy_and_replace(
- newly_joined=True
- )
+ newly_joined_room_ids.add(room_id)
else:
prev_event_id = first_membership_change_by_room_id_in_from_to_range[
room_id
@@ -720,20 +792,23 @@ class SlidingSyncRoomLists:
if prev_event_id is None:
# We found a `newly_joined` room (we are joining the room for the
# first time within the token range)
- sync_room_id_set[room_id] = sync_room_id_set[
- room_id
- ].copy_and_replace(newly_joined=True)
+ newly_joined_room_ids.add(room_id)
# Last resort, we need to step back to the previous membership event
# just before the token range to see if we're joined then or not.
elif prev_membership != Membership.JOIN:
# We found a `newly_joined` room (we left before the token range
# and joined within the token range)
- sync_room_id_set[room_id] = sync_room_id_set[
- room_id
- ].copy_and_replace(newly_joined=True)
+ newly_joined_room_ids.add(room_id)
+
+ return newly_joined_room_ids, newly_left_room_map
+
+ @trace
+ async def _get_dm_rooms_for_user(
+ self,
+ user_id: str,
+ ) -> StrCollection:
+ """Get the set of DM rooms for the user."""
- # 4) Figure out which rooms the user considers to be direct-message (DM) rooms
- #
# We're using global account data (`m.direct`) instead of checking for
# `is_direct` on membership events because that property only appears for
# the invitee membership event (doesn't show up for the inviter).
@@ -755,13 +830,7 @@ class SlidingSyncRoomLists:
if isinstance(room_id, str):
dm_room_id_set.add(room_id)
- # 4) Fixup
- for room_id in sync_room_id_set:
- sync_room_id_set[room_id] = sync_room_id_set[room_id].copy_and_replace(
- is_dm=room_id in dm_room_id_set
- )
-
- return sync_room_id_set
+ return dm_room_id_set
@trace
async def filter_rooms_relevant_for_sync(
|