diff --git a/synapse/handlers/sliding_sync/room_lists.py b/synapse/handlers/sliding_sync/room_lists.py
index 353c491f72..bf19eb735b 100644
--- a/synapse/handlers/sliding_sync/room_lists.py
+++ b/synapse/handlers/sliding_sync/room_lists.py
@@ -56,7 +56,6 @@ from synapse.storage.roommember import (
)
from synapse.types import (
MutableStateMap,
- PersistedEventPosition,
RoomStreamToken,
StateMap,
StrCollection,
@@ -81,6 +80,12 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
+class Sentinel(enum.Enum):
+ # defining a sentinel in this way allows mypy to correctly handle the
+ # type of a dictionary lookup and subsequent type narrowing.
+ UNSET_SENTINEL = object()
+
+
# Helper definition for the types that we might return. We do this to avoid
# copying data between types (which can be expensive for many rooms).
RoomsForUserType = Union[RoomsForUserStateReset, RoomsForUser, RoomsForUserSlidingSync]
@@ -119,12 +124,6 @@ class SlidingSyncInterestedRooms:
dm_room_ids: AbstractSet[str]
-class Sentinel(enum.Enum):
- # defining a sentinel in this way allows mypy to correctly handle the
- # type of a dictionary lookup and subsequent type narrowing.
- UNSET_SENTINEL = object()
-
-
def filter_membership_for_sync(
*,
user_id: str,
@@ -221,6 +220,9 @@ class SlidingSyncRoomLists:
# include rooms that are outside the list ranges.
all_rooms: Set[str] = set()
+ # Note: this won't include rooms the user has left themselves. We add back
+ # `newly_left` rooms below. This is more efficient than fetching all rooms and
+ # then filtering out the old left rooms.
room_membership_for_user_map = await self.store.get_sliding_sync_rooms_for_user(
user_id
)
@@ -262,36 +264,11 @@ class SlidingSyncRoomLists:
event_id=change.event_id,
event_pos=change.event_pos,
room_version_id=change.room_version_id,
- # We keep the current state of the room though
+ # We keep the state of the room though
has_known_state=existing_room.has_known_state,
room_type=existing_room.room_type,
is_encrypted=existing_room.is_encrypted,
)
- else:
- # This can happen if we get "state reset" out of the room
- # after the `to_token`. In other words, there is no membership
- # for the room after the `to_token` but we see membership in
- # the token range.
-
- # Get the state at the time. Note that room type never changes,
- # so we can just get current room type
- room_type = await self.store.get_room_type(room_id)
- is_encrypted = await self.get_is_encrypted_for_room_at_token(
- room_id, to_token.room_key
- )
-
- # Add back rooms that the user was state-reset out of after `to_token`
- room_membership_for_user_map[room_id] = RoomsForUserSlidingSync(
- room_id=room_id,
- sender=change.sender,
- membership=change.membership,
- event_id=change.event_id,
- event_pos=change.event_pos,
- room_version_id=change.room_version_id,
- has_known_state=True,
- room_type=room_type,
- is_encrypted=is_encrypted,
- )
(
newly_joined_room_ids,
@@ -301,44 +278,88 @@ class SlidingSyncRoomLists:
)
dm_room_ids = await self._get_dm_rooms_for_user(user_id)
- # Handle state resets in the from -> to token range.
- state_reset_rooms = (
+ # Add back `newly_left` rooms (rooms left in the from -> to token range).
+ #
+ # We do this because `get_sliding_sync_rooms_for_user(...)` doesn't include
+ # rooms that the user left themselves as it's more efficient to add them back
+ # here than to fetch all rooms and then filter out the old left rooms. The user
+ # only leaves a room once in a blue moon so this barely needs to run.
+ #
+ missing_newly_left_rooms = (
newly_left_room_map.keys() - room_membership_for_user_map.keys()
)
- if state_reset_rooms:
+ if missing_newly_left_rooms:
+ # TODO: It would be nice to avoid these copies
room_membership_for_user_map = dict(room_membership_for_user_map)
- for room_id in (
- newly_left_room_map.keys() - room_membership_for_user_map.keys()
- ):
- # Get the state at the time. Note that room type never changes,
- # so we can just get current room type
- room_type = await self.store.get_room_type(room_id)
- is_encrypted = await self.get_is_encrypted_for_room_at_token(
- room_id, newly_left_room_map[room_id].to_room_stream_token()
- )
+ for room_id in missing_newly_left_rooms:
+ newly_left_room_for_user = newly_left_room_map[room_id]
+ # This should be a given
+ assert newly_left_room_for_user.membership == Membership.LEAVE
- room_membership_for_user_map[room_id] = RoomsForUserSlidingSync(
- room_id=room_id,
- sender=None,
- membership=Membership.LEAVE,
- event_id=None,
- event_pos=newly_left_room_map[room_id],
- room_version_id=await self.store.get_room_version_id(room_id),
- has_known_state=True,
- room_type=room_type,
- is_encrypted=is_encrypted,
+ # Add back `newly_left` rooms
+ #
+ # Check for membership and state in the Sliding Sync tables as it's just
+ # another membership
+ newly_left_room_for_user_sliding_sync = (
+ await self.store.get_sliding_sync_room_for_user(user_id, room_id)
)
+ # If the membership exists, it's just a normal user left the room on
+ # their own
+ if newly_left_room_for_user_sliding_sync is not None:
+ room_membership_for_user_map[room_id] = (
+ newly_left_room_for_user_sliding_sync
+ )
+
+ change = changes.get(room_id)
+ if change is not None:
+ # Update room membership events to the point in time of the `to_token`
+ room_membership_for_user_map[room_id] = RoomsForUserSlidingSync(
+ room_id=room_id,
+ sender=change.sender,
+ membership=change.membership,
+ event_id=change.event_id,
+ event_pos=change.event_pos,
+ room_version_id=change.room_version_id,
+ # We keep the state of the room though
+ has_known_state=newly_left_room_for_user_sliding_sync.has_known_state,
+ room_type=newly_left_room_for_user_sliding_sync.room_type,
+ is_encrypted=newly_left_room_for_user_sliding_sync.is_encrypted,
+ )
+
+ # If we are `newly_left` from the room but can't find any membership,
+ # then we have been "state reset" out of the room
+ else:
+ # Get the state at the time. We can't read from the Sliding Sync
+ # tables because the user has no membership in the room according to
+ # the state (thanks to the state reset).
+ #
+ # Note: `room_type` never changes, so we can just get current room
+ # type
+ room_type = await self.store.get_room_type(room_id)
+ has_known_state = room_type is not ROOM_UNKNOWN_SENTINEL
+ if isinstance(room_type, StateSentinel):
+ room_type = None
+
+ # Get the encryption status at the time of the token
+ is_encrypted = await self.get_is_encrypted_for_room_at_token(
+ room_id,
+ newly_left_room_for_user.event_pos.to_room_stream_token(),
+ )
+
+ room_membership_for_user_map[room_id] = RoomsForUserSlidingSync(
+ room_id=room_id,
+ sender=newly_left_room_for_user.sender,
+ membership=newly_left_room_for_user.membership,
+ event_id=newly_left_room_for_user.event_id,
+ event_pos=newly_left_room_for_user.event_pos,
+ room_version_id=newly_left_room_for_user.room_version_id,
+ has_known_state=has_known_state,
+ room_type=room_type,
+ is_encrypted=is_encrypted,
+ )
if sync_config.lists:
- sync_room_map = {
- room_id: room_membership_for_user
- for room_id, room_membership_for_user in room_membership_for_user_map.items()
- if filter_membership_for_sync(
- user_id=user_id,
- room_membership_for_user=room_membership_for_user,
- newly_left=room_id in newly_left_room_map,
- )
- }
+ sync_room_map = room_membership_for_user_map
with start_active_span("assemble_sliding_window_lists"):
for list_key, list_config in sync_config.lists.items():
# Apply filters
@@ -347,6 +368,7 @@ class SlidingSyncRoomLists:
filtered_sync_room_map = await self.filter_rooms_using_tables(
user_id,
sync_room_map,
+ previous_connection_state,
list_config.filters,
to_token,
dm_room_ids,
@@ -446,6 +468,9 @@ class SlidingSyncRoomLists:
if sync_config.room_subscriptions:
with start_active_span("assemble_room_subscriptions"):
+ # TODO: It would be nice to avoid these copies
+ room_membership_for_user_map = dict(room_membership_for_user_map)
+
# Find which rooms are partially stated and may need to be filtered out
# depending on the `required_state` requested (see below).
partial_state_rooms = await self.store.get_partial_rooms()
@@ -454,10 +479,20 @@ class SlidingSyncRoomLists:
room_id,
room_subscription,
) in sync_config.room_subscriptions.items():
- if room_id not in room_membership_for_user_map:
+ # Check if we have a membership for the room, but didn't pull it out
+ # above. This could be e.g. a leave that we don't pull out by
+ # default.
+ current_room_entry = (
+ await self.store.get_sliding_sync_room_for_user(
+ user_id, room_id
+ )
+ )
+ if not current_room_entry:
# TODO: Handle rooms the user isn't in.
continue
+ room_membership_for_user_map[room_id] = current_room_entry
+
all_rooms.add(room_id)
# Take the superset of the `RoomSyncConfig` for each room.
@@ -471,8 +506,6 @@ class SlidingSyncRoomLists:
if room_id in partial_state_rooms:
continue
- all_rooms.add(room_id)
-
# Update our `relevant_room_map` with the room we're going to display
# and need to fetch more info about.
existing_room_sync_config = relevant_room_map.get(room_id)
@@ -487,7 +520,7 @@ class SlidingSyncRoomLists:
# Filtered subset of `relevant_room_map` for rooms that may have updates
# (in the event stream)
- relevant_rooms_to_send_map = await self._filter_relevant_room_to_send(
+ relevant_rooms_to_send_map = await self._filter_relevant_rooms_to_send(
previous_connection_state, from_token, relevant_room_map
)
@@ -544,6 +577,7 @@ class SlidingSyncRoomLists:
filtered_sync_room_map = await self.filter_rooms(
sync_config.user,
sync_room_map,
+ previous_connection_state,
list_config.filters,
to_token,
dm_room_ids,
@@ -674,7 +708,7 @@ class SlidingSyncRoomLists:
# Filtered subset of `relevant_room_map` for rooms that may have updates
# (in the event stream)
- relevant_rooms_to_send_map = await self._filter_relevant_room_to_send(
+ relevant_rooms_to_send_map = await self._filter_relevant_rooms_to_send(
previous_connection_state, from_token, relevant_room_map
)
@@ -689,7 +723,7 @@ class SlidingSyncRoomLists:
dm_room_ids=dm_room_ids,
)
- async def _filter_relevant_room_to_send(
+ async def _filter_relevant_rooms_to_send(
self,
previous_connection_state: PerConnectionState,
from_token: Optional[StreamToken],
@@ -974,8 +1008,17 @@ class SlidingSyncRoomLists:
)
]
- # If the user has never joined any rooms before, we can just return an empty list
- if not room_for_user_list:
+ (
+ newly_joined_room_ids,
+ newly_left_room_map,
+ ) = await self._get_newly_joined_and_left_rooms(
+ user_id, to_token=to_token, from_token=from_token
+ )
+
+ # If the user has never joined any rooms before, we can just return an empty
+ # list. We also have to check the `newly_left_room_map` in case someone was
+ # state reset out of all of the rooms they were in.
+ if not room_for_user_list and not newly_left_room_map:
return {}, set(), set()
# Since we fetched the users room list at some point in time after the
@@ -993,30 +1036,22 @@ class SlidingSyncRoomLists:
else:
rooms_for_user[room_id] = change_room_for_user
- (
- newly_joined_room_ids,
- newly_left_room_ids,
- ) = await self._get_newly_joined_and_left_rooms(
- user_id, to_token=to_token, from_token=from_token
- )
-
# Ensure we have entries for rooms that the user has been "state reset"
# out of. These are rooms appear in the `newly_left_rooms` map but
# aren't in the `rooms_for_user` map.
- for room_id, left_event_pos in newly_left_room_ids.items():
+ for room_id, newly_left_room_for_user in newly_left_room_map.items():
+ # If we already know about the room, it's not a state reset
if room_id in rooms_for_user:
continue
- rooms_for_user[room_id] = RoomsForUserStateReset(
- room_id=room_id,
- event_id=None,
- event_pos=left_event_pos,
- membership=Membership.LEAVE,
- sender=None,
- room_version_id=await self.store.get_room_version_id(room_id),
- )
+ # This should be true if it's a state reset
+ assert newly_left_room_for_user.membership is Membership.LEAVE
+ assert newly_left_room_for_user.event_id is None
+ assert newly_left_room_for_user.sender is None
+
+ rooms_for_user[room_id] = newly_left_room_for_user
- return rooms_for_user, newly_joined_room_ids, set(newly_left_room_ids)
+ return rooms_for_user, newly_joined_room_ids, set(newly_left_room_map)
@trace
async def _get_newly_joined_and_left_rooms(
@@ -1024,7 +1059,7 @@ class SlidingSyncRoomLists:
user_id: str,
to_token: StreamToken,
from_token: Optional[StreamToken],
- ) -> Tuple[AbstractSet[str], Mapping[str, PersistedEventPosition]]:
+ ) -> Tuple[AbstractSet[str], Mapping[str, RoomsForUserStateReset]]:
"""Fetch the sets of rooms that the user newly joined or left in the
given token range.
@@ -1033,11 +1068,18 @@ class SlidingSyncRoomLists:
"current memberships" of the user.
Returns:
- A 2-tuple of newly joined room IDs and a map of newly left room
- IDs to the event position the leave happened at.
+ A 2-tuple of newly joined room IDs and a map of newly_left room
+ IDs to the `RoomsForUserStateReset` entry.
+
+ We're using `RoomsForUserStateReset` but that doesn't necessarily mean the
+ user was state reset of the rooms. It's just that the `event_id`/`sender`
+ are optional and we can't tell the difference between the server leaving the
+ room when the user was the last person participating in the room and left or
+ was state reset out of the room. To actually check for a state reset, you
+ need to check if a membership still exists in the room.
"""
newly_joined_room_ids: Set[str] = set()
- newly_left_room_map: Dict[str, PersistedEventPosition] = {}
+ newly_left_room_map: Dict[str, RoomsForUserStateReset] = {}
# We need to figure out the
#
@@ -1108,8 +1150,13 @@ class SlidingSyncRoomLists:
# 1) Figure out newly_left rooms (> `from_token` and <= `to_token`).
if last_membership_change_in_from_to_range.membership == Membership.LEAVE:
# 1) Mark this room as `newly_left`
- newly_left_room_map[room_id] = (
- last_membership_change_in_from_to_range.event_pos
+ newly_left_room_map[room_id] = RoomsForUserStateReset(
+ room_id=room_id,
+ sender=last_membership_change_in_from_to_range.sender,
+ membership=Membership.LEAVE,
+ event_id=last_membership_change_in_from_to_range.event_id,
+ event_pos=last_membership_change_in_from_to_range.event_pos,
+ room_version_id=await self.store.get_room_version_id(room_id),
)
# 2) Figure out `newly_joined`
@@ -1553,6 +1600,7 @@ class SlidingSyncRoomLists:
self,
user: UserID,
sync_room_map: Dict[str, RoomsForUserType],
+ previous_connection_state: PerConnectionState,
filters: SlidingSyncConfig.SlidingSyncList.Filters,
to_token: StreamToken,
dm_room_ids: AbstractSet[str],
@@ -1738,14 +1786,33 @@ class SlidingSyncRoomLists:
)
}
+ # Keep rooms if the user has been state reset out of it but we previously sent
+ # down the connection before. We want to make sure that we send these down to
+ # the client regardless of filters so they find out about the state reset.
+ #
+ # We don't always have access to the state in a room after being state reset if
+ # no one else locally on the server is participating in the room so we patch
+ # these back in manually.
+ state_reset_out_of_room_id_set = {
+ room_id
+ for room_id in sync_room_map.keys()
+ if sync_room_map[room_id].event_id is None
+ and previous_connection_state.rooms.have_sent_room(room_id).status
+ != HaveSentRoomFlag.NEVER
+ }
+
# Assemble a new sync room map but only with the `filtered_room_id_set`
- return {room_id: sync_room_map[room_id] for room_id in filtered_room_id_set}
+ return {
+ room_id: sync_room_map[room_id]
+ for room_id in filtered_room_id_set | state_reset_out_of_room_id_set
+ }
@trace
async def filter_rooms_using_tables(
self,
user_id: str,
sync_room_map: Mapping[str, RoomsForUserSlidingSync],
+ previous_connection_state: PerConnectionState,
filters: SlidingSyncConfig.SlidingSyncList.Filters,
to_token: StreamToken,
dm_room_ids: AbstractSet[str],
@@ -1887,8 +1954,26 @@ class SlidingSyncRoomLists:
)
}
+ # Keep rooms if the user has been state reset out of it but we previously sent
+ # down the connection before. We want to make sure that we send these down to
+ # the client regardless of filters so they find out about the state reset.
+ #
+ # We don't always have access to the state in a room after being state reset if
+ # no one else locally on the server is participating in the room so we patch
+ # these back in manually.
+ state_reset_out_of_room_id_set = {
+ room_id
+ for room_id in sync_room_map.keys()
+ if sync_room_map[room_id].event_id is None
+ and previous_connection_state.rooms.have_sent_room(room_id).status
+ != HaveSentRoomFlag.NEVER
+ }
+
# Assemble a new sync room map but only with the `filtered_room_id_set`
- return {room_id: sync_room_map[room_id] for room_id in filtered_room_id_set}
+ return {
+ room_id: sync_room_map[room_id]
+ for room_id in filtered_room_id_set | state_reset_out_of_room_id_set
+ }
@trace
async def sort_rooms(
|