diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 8e6c2fb860..a23a6b9dd9 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -19,7 +19,7 @@
#
import logging
from itertools import chain
-from typing import TYPE_CHECKING, Any, Dict, Final, List, Optional, Set, Tuple
+from typing import TYPE_CHECKING, Any, Dict, Final, List, Mapping, Optional, Set, Tuple
import attr
from immutabledict import immutabledict
@@ -28,7 +28,9 @@ from synapse.api.constants import AccountDataTypes, Direction, EventTypes, Membe
from synapse.events import EventBase
from synapse.events.utils import strip_event
from synapse.handlers.relations import BundledAggregations
+from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
from synapse.storage.databases.main.stream import CurrentStateDeltaMembership
+from synapse.storage.roommember import MemberSummary
from synapse.types import (
JsonDict,
PersistedEventPosition,
@@ -51,6 +53,7 @@ logger = logging.getLogger(__name__)
# The event types that clients should consider as new activity.
DEFAULT_BUMP_EVENT_TYPES = {
+ EventTypes.Create,
EventTypes.Message,
EventTypes.Encrypted,
EventTypes.Sticker,
@@ -60,32 +63,79 @@ DEFAULT_BUMP_EVENT_TYPES = {
}
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class _RoomMembershipForUser:
+ """
+ Attributes:
+ room_id: The room ID of the membership event
+ event_id: The event ID of the membership event
+ event_pos: The stream position of the membership event
+ membership: The membership state of the user in the room
+ sender: The person who sent the membership event
+ newly_joined: Whether the user newly joined the room during the given token
+ range and is still joined to the room at the end of this range.
+ newly_left: Whether the user newly left (or kicked) the room during the given
+ token range and is still "leave" at the end of this range.
+ is_dm: Whether this user considers this room as a direct-message (DM) room
+ """
+
+ room_id: str
+ # Optional because state resets can affect room membership without a corresponding event.
+ event_id: Optional[str]
+ # Even during a state reset which removes the user from the room, we expect this to
+ # be set because `current_state_delta_stream` will note the position that the reset
+ # happened.
+ event_pos: PersistedEventPosition
+ # Even during a state reset which removes the user from the room, we expect this to
+ # be set to `LEAVE` because we can make that assumption based on the situaton (see
+ # `get_current_state_delta_membership_changes_for_user(...)`)
+ membership: str
+ # Optional because state resets can affect room membership without a corresponding event.
+ sender: Optional[str]
+ newly_joined: bool
+ newly_left: bool
+ is_dm: bool
+
+ def copy_and_replace(self, **kwds: Any) -> "_RoomMembershipForUser":
+ return attr.evolve(self, **kwds)
+
+
def filter_membership_for_sync(
- *, membership: str, user_id: str, sender: Optional[str]
+ *, user_id: str, room_membership_for_user: _RoomMembershipForUser
) -> bool:
"""
Returns True if the membership event should be included in the sync response,
otherwise False.
Attributes:
- membership: The membership state of the user in the room.
user_id: The user ID that the membership applies to
- sender: The person who sent the membership event
+ room_membership_for_user: Membership information for the user in the room
"""
- # Everything except `Membership.LEAVE` because we want everything that's *still*
- # relevant to the user. There are few more things to include in the sync response
- # (newly_left) but those are handled separately.
+ membership = room_membership_for_user.membership
+ sender = room_membership_for_user.sender
+ newly_left = room_membership_for_user.newly_left
+
+ # We want to allow everything except rooms the user has left unless `newly_left`
+ # because we want everything that's *still* relevant to the user. We include
+ # `newly_left` rooms because the last event that the user should see is their own
+ # leave event.
#
- # This logic includes kicks (leave events where the sender is not the same user) and
- # can be read as "anything that isn't a leave or a leave with a different sender".
+ # A leave != kick. This logic includes kicks (leave events where the sender is not
+ # the same user).
#
- # When `sender=None` and `membership=Membership.LEAVE`, it means that a state reset
- # happened that removed the user from the room, or the user was the last person
- # locally to leave the room which caused the server to leave the room. In both
- # cases, we can just remove the rooms since they are no longer relevant to the user.
- # They could still be added back later if they are `newly_left`.
- return membership != Membership.LEAVE or sender not in (user_id, None)
+ # When `sender=None`, it means that a state reset happened that removed the user
+ # from the room without a corresponding leave event. We can just remove the rooms
+ # since they are no longer relevant to the user but will still appear if they are
+ # `newly_left`.
+ return (
+ # Anything except leave events
+ membership != Membership.LEAVE
+ # Unless...
+ or newly_left
+ # Allow kicks
+ or (membership == Membership.LEAVE and sender not in (user_id, None))
+ )
# We can't freeze this class because we want to update it in place with the
@@ -279,29 +329,6 @@ class StateValues:
LAZY: Final = "$LAZY"
-@attr.s(slots=True, frozen=True, auto_attribs=True)
-class _RoomMembershipForUser:
- """
- Attributes:
- event_id: The event ID of the membership event
- event_pos: The stream position of the membership event
- membership: The membership state of the user in the room
- sender: The person who sent the membership event
- newly_joined: Whether the user newly joined the room during the given token
- range
- """
-
- room_id: str
- event_id: Optional[str]
- event_pos: PersistedEventPosition
- membership: str
- sender: Optional[str]
- newly_joined: bool
-
- def copy_and_replace(self, **kwds: Any) -> "_RoomMembershipForUser":
- return attr.evolve(self, **kwds)
-
-
class SlidingSyncHandler:
def __init__(self, hs: "HomeServer"):
self.clock = hs.get_clock()
@@ -420,18 +447,31 @@ class SlidingSyncHandler:
# See https://github.com/matrix-org/matrix-doc/issues/1144
raise NotImplementedError()
+ # Get all of the room IDs that the user should be able to see in the sync
+ # response
+ has_lists = sync_config.lists is not None and len(sync_config.lists) > 0
+ has_room_subscriptions = (
+ sync_config.room_subscriptions is not None
+ and len(sync_config.room_subscriptions) > 0
+ )
+ if has_lists or has_room_subscriptions:
+ room_membership_for_user_map = (
+ await self.get_room_membership_for_user_at_to_token(
+ user=sync_config.user,
+ to_token=to_token,
+ from_token=from_token,
+ )
+ )
+
# Assemble sliding window lists
lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {}
# Keep track of the rooms that we're going to display and need to fetch more
# info about
relevant_room_map: Dict[str, RoomSyncConfig] = {}
- if sync_config.lists:
- # Get all of the room IDs that the user should be able to see in the sync
- # response
- sync_room_map = await self.get_sync_room_ids_for_user(
- sync_config.user,
- from_token=from_token,
- to_token=to_token,
+ if has_lists and sync_config.lists is not None:
+ sync_room_map = await self.filter_rooms_relevant_for_sync(
+ user=sync_config.user,
+ room_membership_for_user_map=room_membership_for_user_map,
)
for list_key, list_config in sync_config.lists.items():
@@ -520,7 +560,35 @@ class SlidingSyncHandler:
ops=ops,
)
- # TODO: if (sync_config.room_subscriptions):
+ # Handle room subscriptions
+ if has_room_subscriptions and sync_config.room_subscriptions is not None:
+ for room_id, room_subscription in sync_config.room_subscriptions.items():
+ room_membership_for_user_at_to_token = (
+ await self.check_room_subscription_allowed_for_user(
+ room_id=room_id,
+ room_membership_for_user_map=room_membership_for_user_map,
+ to_token=to_token,
+ )
+ )
+
+ # Skip this room if the user isn't allowed to see it
+ if not room_membership_for_user_at_to_token:
+ continue
+
+ room_membership_for_user_map[room_id] = (
+ room_membership_for_user_at_to_token
+ )
+
+ # Take the superset of the `RoomSyncConfig` for each room.
+ #
+ # Update our `relevant_room_map` with the room we're going to display
+ # and need to fetch more info about.
+ room_sync_config = RoomSyncConfig.from_room_config(room_subscription)
+ existing_room_sync_config = relevant_room_map.get(room_id)
+ if existing_room_sync_config is not None:
+ existing_room_sync_config.combine_room_sync_config(room_sync_config)
+ else:
+ relevant_room_map[room_id] = room_sync_config
# Fetch room data
rooms: Dict[str, SlidingSyncResult.RoomResult] = {}
@@ -529,7 +597,9 @@ class SlidingSyncHandler:
user=sync_config.user,
room_id=room_id,
room_sync_config=room_sync_config,
- room_membership_for_user_at_to_token=sync_room_map[room_id],
+ room_membership_for_user_at_to_token=room_membership_for_user_map[
+ room_id
+ ],
from_token=from_token,
to_token=to_token,
)
@@ -547,28 +617,23 @@ class SlidingSyncHandler:
extensions=extensions,
)
- async def get_sync_room_ids_for_user(
+ async def get_room_membership_for_user_at_to_token(
self,
user: UserID,
to_token: StreamToken,
- from_token: Optional[StreamToken] = None,
+ from_token: Optional[StreamToken],
) -> Dict[str, _RoomMembershipForUser]:
"""
- Fetch room IDs that should be listed for this user in the sync response (the
- full room list that will be filtered, sorted, and sliced).
+ Fetch room IDs that the user has had membership in (the full room list including
+ long-lost left rooms that will be filtered, sorted, and sliced).
- We're looking for rooms where the user has the following state in the token
- range (> `from_token` and <= `to_token`):
+ We're looking for rooms where the user has had any sort of membership in the
+ token range (> `from_token` and <= `to_token`)
- - `invite`, `join`, `knock`, `ban` membership events
- - Kicks (`leave` membership events where `sender` is different from the
- `user_id`/`state_key`)
- - `newly_left` (rooms that were left during the given token range)
- - In order for bans/kicks to not show up in sync, you need to `/forget` those
- rooms. This doesn't modify the event itself though and only adds the
- `forgotten` flag to the `room_memberships` table in Synapse. There isn't a way
- to tell when a room was forgotten at the moment so we can't factor it into the
- from/to range.
+ In order for bans/kicks to not show up, you need to `/forget` those rooms. This
+ doesn't modify the event itself though and only adds the `forgotten` flag to the
+ `room_memberships` table in Synapse. There isn't a way to tell when a room was
+ forgotten at the moment so we can't factor it into the token range.
Args:
user: User to fetch rooms for
@@ -576,8 +641,8 @@ class SlidingSyncHandler:
from_token: The point in the stream to sync from.
Returns:
- A dictionary of room IDs that should be listed in the sync response along
- with membership information in that room at the time of `to_token`.
+ A dictionary of room IDs that the user has had membership in along with
+ membership information in that room at the time of `to_token`.
"""
user_id = user.to_string()
@@ -588,9 +653,6 @@ class SlidingSyncHandler:
# We want to fetch any kind of membership (joined and left rooms) in order
# to get the `event_pos` of the latest room membership event for the
# user.
- #
- # We will filter out the rooms that don't belong below (see
- # `filter_membership_for_sync`)
membership_list=Membership.LIST,
excluded_rooms=self.rooms_to_exclude_globally,
)
@@ -610,7 +672,10 @@ class SlidingSyncHandler:
event_pos=room_for_user.event_pos,
membership=room_for_user.membership,
sender=room_for_user.sender,
+ # We will update these fields below to be accurate
newly_joined=False,
+ newly_left=False,
+ is_dm=False,
)
for room_for_user in room_for_user_list
}
@@ -635,10 +700,17 @@ class SlidingSyncHandler:
instance_to_max_stream_ordering_map[instance_name] = stream_ordering
# Then assemble the `RoomStreamToken`
+ min_stream_pos = min(instance_to_max_stream_ordering_map.values())
membership_snapshot_token = RoomStreamToken(
# Minimum position in the `instance_map`
- stream=min(instance_to_max_stream_ordering_map.values()),
- instance_map=immutabledict(instance_to_max_stream_ordering_map),
+ stream=min_stream_pos,
+ instance_map=immutabledict(
+ {
+ instance_name: stream_pos
+ for instance_name, stream_pos in instance_to_max_stream_ordering_map.items()
+ if stream_pos > min_stream_pos
+ }
+ ),
)
# Since we fetched the users room list at some point in time after the from/to
@@ -648,10 +720,9 @@ class SlidingSyncHandler:
# - 1a) Remove rooms that the user joined after the `to_token`
# - 1b) Add back rooms that the user left after the `to_token`
# - 1c) Update room membership events to the point in time of the `to_token`
- # - 2) Add back newly_left rooms (> `from_token` and <= `to_token`)
- # - 3) Figure out which rooms are `newly_joined`
-
- # 1) -----------------------------------------------------
+ # - 2) Figure out which rooms are `newly_left` rooms (> `from_token` and <= `to_token`)
+ # - 3) Figure out which rooms are `newly_joined` (> `from_token` and <= `to_token`)
+ # - 4) Figure out which rooms are DM's
# 1) Fetch membership changes that fall in the range from `to_token` up to
# `membership_snapshot_token`
@@ -711,7 +782,10 @@ class SlidingSyncHandler:
event_pos=first_membership_change_after_to_token.prev_event_pos,
membership=first_membership_change_after_to_token.prev_membership,
sender=first_membership_change_after_to_token.prev_sender,
+ # We will update these fields below to be accurate
newly_joined=False,
+ newly_left=False,
+ is_dm=False,
)
else:
# If we can't find the previous membership event, we shouldn't
@@ -719,22 +793,6 @@ class SlidingSyncHandler:
# exact membership state and shouldn't rely on the current snapshot.
sync_room_id_set.pop(room_id, None)
- # Filter the rooms that that we have updated room membership events to the point
- # in time of the `to_token` (from the "1)" fixups)
- filtered_sync_room_id_set = {
- room_id: room_membership_for_user
- for room_id, room_membership_for_user in sync_room_id_set.items()
- if filter_membership_for_sync(
- membership=room_membership_for_user.membership,
- user_id=user_id,
- sender=room_membership_for_user.sender,
- )
- }
-
- # 2) -----------------------------------------------------
- # We fix-up newly_left rooms after the first fixup because it may have removed
- # some left rooms that we can figure out are newly_left in the following code
-
# 2) Fetch membership changes that fall in the range from `from_token` up to `to_token`
current_state_delta_membership_changes_in_from_to_range = []
if from_token:
@@ -796,18 +854,40 @@ class SlidingSyncHandler:
if last_membership_change_in_from_to_range.membership == Membership.JOIN:
possibly_newly_joined_room_ids.add(room_id)
- # 2) Add back newly_left rooms (> `from_token` and <= `to_token`). We
- # include newly_left rooms because the last event that the user should see
- # is their own leave event
+ # 2) Figure out newly_left rooms (> `from_token` and <= `to_token`).
if last_membership_change_in_from_to_range.membership == Membership.LEAVE:
- filtered_sync_room_id_set[room_id] = _RoomMembershipForUser(
- room_id=room_id,
- event_id=last_membership_change_in_from_to_range.event_id,
- event_pos=last_membership_change_in_from_to_range.event_pos,
- membership=last_membership_change_in_from_to_range.membership,
- sender=last_membership_change_in_from_to_range.sender,
- newly_joined=False,
- )
+ # 2) Mark this room as `newly_left`
+
+ # If we're seeing a membership change here, we should expect to already
+ # have it in our snapshot but if a state reset happens, it wouldn't have
+ # shown up in our snapshot but appear as a change here.
+ existing_sync_entry = sync_room_id_set.get(room_id)
+ if existing_sync_entry is not None:
+ # Normal expected case
+ sync_room_id_set[room_id] = existing_sync_entry.copy_and_replace(
+ newly_left=True
+ )
+ else:
+ # State reset!
+ logger.warn(
+ "State reset detected for room_id %s with %s who is no longer in the room",
+ room_id,
+ user_id,
+ )
+ # Even though a state reset happened which removed the person from
+ # the room, we still add it the list so the user knows they left the
+ # room. Downstream code can check for a state reset by looking for
+ # `event_id=None and membership is not None`.
+ sync_room_id_set[room_id] = _RoomMembershipForUser(
+ room_id=room_id,
+ event_id=last_membership_change_in_from_to_range.event_id,
+ event_pos=last_membership_change_in_from_to_range.event_pos,
+ membership=last_membership_change_in_from_to_range.membership,
+ sender=last_membership_change_in_from_to_range.sender,
+ newly_joined=False,
+ newly_left=True,
+ is_dm=False,
+ )
# 3) Figure out `newly_joined`
for room_id in possibly_newly_joined_room_ids:
@@ -818,9 +898,9 @@ class SlidingSyncHandler:
# also some non-join in the range, we know they `newly_joined`.
if has_non_join_in_from_to_range:
# We found a `newly_joined` room (we left and joined within the token range)
- filtered_sync_room_id_set[room_id] = filtered_sync_room_id_set[
- room_id
- ].copy_and_replace(newly_joined=True)
+ sync_room_id_set[room_id] = sync_room_id_set[room_id].copy_and_replace(
+ newly_joined=True
+ )
else:
prev_event_id = first_membership_change_by_room_id_in_from_to_range[
room_id
@@ -832,7 +912,7 @@ class SlidingSyncHandler:
if prev_event_id is None:
# We found a `newly_joined` room (we are joining the room for the
# first time within the token range)
- filtered_sync_room_id_set[room_id] = filtered_sync_room_id_set[
+ sync_room_id_set[room_id] = sync_room_id_set[
room_id
].copy_and_replace(newly_joined=True)
# Last resort, we need to step back to the previous membership event
@@ -840,11 +920,150 @@ class SlidingSyncHandler:
elif prev_membership != Membership.JOIN:
# We found a `newly_joined` room (we left before the token range
# and joined within the token range)
- filtered_sync_room_id_set[room_id] = filtered_sync_room_id_set[
+ sync_room_id_set[room_id] = sync_room_id_set[
room_id
].copy_and_replace(newly_joined=True)
- return filtered_sync_room_id_set
+ # 4) Figure out which rooms the user considers to be direct-message (DM) rooms
+ #
+ # We're using global account data (`m.direct`) instead of checking for
+ # `is_direct` on membership events because that property only appears for
+ # the invitee membership event (doesn't show up for the inviter).
+ #
+ # We're unable to take `to_token` into account for global account data since
+ # we only keep track of the latest account data for the user.
+ dm_map = await self.store.get_global_account_data_by_type_for_user(
+ user_id, AccountDataTypes.DIRECT
+ )
+
+ # Flatten out the map. Account data is set by the client so it needs to be
+ # scrutinized.
+ dm_room_id_set = set()
+ if isinstance(dm_map, dict):
+ for room_ids in dm_map.values():
+ # Account data should be a list of room IDs. Ignore anything else
+ if isinstance(room_ids, list):
+ for room_id in room_ids:
+ if isinstance(room_id, str):
+ dm_room_id_set.add(room_id)
+
+ # 4) Fixup
+ for room_id in sync_room_id_set:
+ sync_room_id_set[room_id] = sync_room_id_set[room_id].copy_and_replace(
+ is_dm=room_id in dm_room_id_set
+ )
+
+ return sync_room_id_set
+
+ async def filter_rooms_relevant_for_sync(
+ self,
+ user: UserID,
+ room_membership_for_user_map: Dict[str, _RoomMembershipForUser],
+ ) -> Dict[str, _RoomMembershipForUser]:
+ """
+ Filter room IDs that should/can be listed for this user in the sync response (the
+ full room list that will be further filtered, sorted, and sliced).
+
+ We're looking for rooms where the user has the following state in the token
+ range (> `from_token` and <= `to_token`):
+
+ - `invite`, `join`, `knock`, `ban` membership events
+ - Kicks (`leave` membership events where `sender` is different from the
+ `user_id`/`state_key`)
+ - `newly_left` (rooms that were left during the given token range)
+ - In order for bans/kicks to not show up in sync, you need to `/forget` those
+ rooms. This doesn't modify the event itself though and only adds the
+ `forgotten` flag to the `room_memberships` table in Synapse. There isn't a way
+ to tell when a room was forgotten at the moment so we can't factor it into the
+ from/to range.
+
+ Args:
+ user: User that is syncing
+ room_membership_for_user_map: Room membership for the user
+
+ Returns:
+ A dictionary of room IDs that should be listed in the sync response along
+ with membership information in that room at the time of `to_token`.
+ """
+ user_id = user.to_string()
+
+ # Filter rooms to only what we're interested to sync with
+ filtered_sync_room_map = {
+ room_id: room_membership_for_user
+ for room_id, room_membership_for_user in room_membership_for_user_map.items()
+ if filter_membership_for_sync(
+ user_id=user_id,
+ room_membership_for_user=room_membership_for_user,
+ )
+ }
+
+ return filtered_sync_room_map
+
+ async def check_room_subscription_allowed_for_user(
+ self,
+ room_id: str,
+ room_membership_for_user_map: Dict[str, _RoomMembershipForUser],
+ to_token: StreamToken,
+ ) -> Optional[_RoomMembershipForUser]:
+ """
+ Check whether the user is allowed to see the room based on whether they have
+ ever had membership in the room or if the room is `world_readable`.
+
+ Similar to `check_user_in_room_or_world_readable(...)`
+
+ Args:
+ room_id: Room to check
+ room_membership_for_user_map: Room membership for the user at the time of
+ the `to_token` (<= `to_token`).
+ to_token: The token to fetch rooms up to.
+
+ Returns:
+ The room membership for the user if they are allowed to subscribe to the
+ room else `None`.
+ """
+
+ # We can first check if they are already allowed to see the room based
+ # on our previous work to assemble the `room_membership_for_user_map`.
+ #
+ # If they have had any membership in the room over time (up to the `to_token`),
+ # let them subscribe and see what they can.
+ existing_membership_for_user = room_membership_for_user_map.get(room_id)
+ if existing_membership_for_user is not None:
+ return existing_membership_for_user
+
+ # TODO: Handle `world_readable` rooms
+ return None
+
+ # If the room is `world_readable`, it doesn't matter whether they can join,
+ # everyone can see the room.
+ # not_in_room_membership_for_user = _RoomMembershipForUser(
+ # room_id=room_id,
+ # event_id=None,
+ # event_pos=None,
+ # membership=None,
+ # sender=None,
+ # newly_joined=False,
+ # newly_left=False,
+ # is_dm=False,
+ # )
+ # room_state = await self.get_current_state_at(
+ # room_id=room_id,
+ # room_membership_for_user_at_to_token=not_in_room_membership_for_user,
+ # state_filter=StateFilter.from_types(
+ # [(EventTypes.RoomHistoryVisibility, "")]
+ # ),
+ # to_token=to_token,
+ # )
+
+ # visibility_event = room_state.get((EventTypes.RoomHistoryVisibility, ""))
+ # if (
+ # visibility_event is not None
+ # and visibility_event.content.get("history_visibility")
+ # == HistoryVisibility.WORLD_READABLE
+ # ):
+ # return not_in_room_membership_for_user
+
+ # return None
async def filter_rooms(
self,
@@ -867,41 +1086,24 @@ class SlidingSyncHandler:
A filtered dictionary of room IDs along with membership information in the
room at the time of `to_token`.
"""
- user_id = user.to_string()
-
- # TODO: Apply filters
-
filtered_room_id_set = set(sync_room_map.keys())
# Filter for Direct-Message (DM) rooms
if filters.is_dm is not None:
- # We're using global account data (`m.direct`) instead of checking for
- # `is_direct` on membership events because that property only appears for
- # the invitee membership event (doesn't show up for the inviter). Account
- # data is set by the client so it needs to be scrutinized.
- #
- # We're unable to take `to_token` into account for global account data since
- # we only keep track of the latest account data for the user.
- dm_map = await self.store.get_global_account_data_by_type_for_user(
- user_id, AccountDataTypes.DIRECT
- )
-
- # Flatten out the map
- dm_room_id_set = set()
- if isinstance(dm_map, dict):
- for room_ids in dm_map.values():
- # Account data should be a list of room IDs. Ignore anything else
- if isinstance(room_ids, list):
- for room_id in room_ids:
- if isinstance(room_id, str):
- dm_room_id_set.add(room_id)
-
if filters.is_dm:
# Only DM rooms please
- filtered_room_id_set = filtered_room_id_set.intersection(dm_room_id_set)
+ filtered_room_id_set = {
+ room_id
+ for room_id in filtered_room_id_set
+ if sync_room_map[room_id].is_dm
+ }
else:
# Only non-DM rooms please
- filtered_room_id_set = filtered_room_id_set.difference(dm_room_id_set)
+ filtered_room_id_set = {
+ room_id
+ for room_id in filtered_room_id_set
+ if not sync_room_map[room_id].is_dm
+ }
if filters.spaces:
raise NotImplementedError()
@@ -1043,6 +1245,102 @@ class SlidingSyncHandler:
reverse=True,
)
+ async def get_current_state_ids_at(
+ self,
+ room_id: str,
+ room_membership_for_user_at_to_token: _RoomMembershipForUser,
+ state_filter: StateFilter,
+ to_token: StreamToken,
+ ) -> StateMap[str]:
+ """
+ Get current state IDs for the user in the room according to their membership. This
+ will be the current state at the time of their LEAVE/BAN, otherwise will be the
+ current state <= to_token.
+
+ Args:
+ room_id: The room ID to fetch data for
+ room_membership_for_user_at_token: Membership information for the user
+ in the room at the time of `to_token`.
+ to_token: The point in the stream to sync up to.
+ """
+ room_state_ids: StateMap[str]
+ # People shouldn't see past their leave/ban event
+ if room_membership_for_user_at_to_token.membership in (
+ Membership.LEAVE,
+ Membership.BAN,
+ ):
+ # TODO: `get_state_ids_at(...)` doesn't take into account the "current state"
+ room_state_ids = await self.storage_controllers.state.get_state_ids_at(
+ room_id,
+ stream_position=to_token.copy_and_replace(
+ StreamKeyType.ROOM,
+ room_membership_for_user_at_to_token.event_pos.to_room_stream_token(),
+ ),
+ state_filter=state_filter,
+ # Partially-stated rooms should have all state events except for
+ # remote membership events. Since we've already excluded
+ # partially-stated rooms unless `required_state` only has
+ # `["m.room.member", "$LAZY"]` for membership, we should be able to
+ # retrieve everything requested. When we're lazy-loading, if there
+ # are some remote senders in the timeline, we should also have their
+ # membership event because we had to auth that timeline event. Plus
+ # we don't want to block the whole sync waiting for this one room.
+ await_full_state=False,
+ )
+ # Otherwise, we can get the latest current state in the room
+ else:
+ room_state_ids = await self.storage_controllers.state.get_current_state_ids(
+ room_id,
+ state_filter,
+ # Partially-stated rooms should have all state events except for
+ # remote membership events. Since we've already excluded
+ # partially-stated rooms unless `required_state` only has
+ # `["m.room.member", "$LAZY"]` for membership, we should be able to
+ # retrieve everything requested. When we're lazy-loading, if there
+ # are some remote senders in the timeline, we should also have their
+ # membership event because we had to auth that timeline event. Plus
+ # we don't want to block the whole sync waiting for this one room.
+ await_full_state=False,
+ )
+ # TODO: Query `current_state_delta_stream` and reverse/rewind back to the `to_token`
+
+ return room_state_ids
+
+ async def get_current_state_at(
+ self,
+ room_id: str,
+ room_membership_for_user_at_to_token: _RoomMembershipForUser,
+ state_filter: StateFilter,
+ to_token: StreamToken,
+ ) -> StateMap[EventBase]:
+ """
+ Get current state for the user in the room according to their membership. This
+ will be the current state at the time of their LEAVE/BAN, otherwise will be the
+ current state <= to_token.
+
+ Args:
+ room_id: The room ID to fetch data for
+ room_membership_for_user_at_token: Membership information for the user
+ in the room at the time of `to_token`.
+ to_token: The point in the stream to sync up to.
+ """
+ room_state_ids = await self.get_current_state_ids_at(
+ room_id=room_id,
+ room_membership_for_user_at_to_token=room_membership_for_user_at_to_token,
+ state_filter=state_filter,
+ to_token=to_token,
+ )
+
+ event_map = await self.store.get_events(list(room_state_ids.values()))
+
+ state_map = {}
+ for key, event_id in room_state_ids.items():
+ event = event_map.get(event_id)
+ if event:
+ state_map[key] = event
+
+ return state_map
+
async def get_room_sync_data(
self,
user: UserID,
@@ -1074,7 +1372,7 @@ class SlidingSyncHandler:
# membership. Currently, we have to make all of these optional because
# `invite`/`knock` rooms only have `stripped_state`. See
# https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932
- timeline_events: Optional[List[EventBase]] = None
+ timeline_events: List[EventBase] = []
bundled_aggregations: Optional[Dict[str, BundledAggregations]] = None
limited: Optional[bool] = None
prev_batch_token: Optional[StreamToken] = None
@@ -1206,7 +1504,7 @@ class SlidingSyncHandler:
# Figure out any stripped state events for invite/knocks. This allows the
# potential joiner to identify the room.
- stripped_state: Optional[List[JsonDict]] = None
+ stripped_state: List[JsonDict] = []
if room_membership_for_user_at_to_token.membership in (
Membership.INVITE,
Membership.KNOCK,
@@ -1232,10 +1530,10 @@ class SlidingSyncHandler:
stripped_state.append(strip_event(invite_or_knock_event))
# TODO: Handle state resets. For example, if we see
- # `room_membership_for_user_at_to_token.membership = Membership.LEAVE` but
- # `required_state` doesn't include it, we should indicate to the client that a
- # state reset happened. Perhaps we should indicate this by setting `initial:
- # True` and empty `required_state`.
+ # `room_membership_for_user_at_to_token.event_id=None and
+ # room_membership_for_user_at_to_token.membership is not None`, we should
+ # indicate to the client that a state reset happened. Perhaps we should indicate
+ # this by setting `initial: True` and empty `required_state`.
# TODO: Since we can't determine whether we've already sent a room down this
# Sliding Sync connection before (we plan to add this optimization in the
@@ -1243,6 +1541,44 @@ class SlidingSyncHandler:
# updates.
initial = True
+ # Check whether the room has a name set
+ name_state_ids = await self.get_current_state_ids_at(
+ room_id=room_id,
+ room_membership_for_user_at_to_token=room_membership_for_user_at_to_token,
+ state_filter=StateFilter.from_types([(EventTypes.Name, "")]),
+ to_token=to_token,
+ )
+ name_event_id = name_state_ids.get((EventTypes.Name, ""))
+
+ room_membership_summary: Mapping[str, MemberSummary]
+ empty_membership_summary = MemberSummary([], 0)
+ if room_membership_for_user_at_to_token.membership in (
+ Membership.LEAVE,
+ Membership.BAN,
+ ):
+ # TODO: Figure out how to get the membership summary for left/banned rooms
+ room_membership_summary = {}
+ else:
+ room_membership_summary = await self.store.get_room_summary(room_id)
+ # TODO: Reverse/rewind back to the `to_token`
+
+ # `heroes` are required if the room name is not set.
+ #
+ # Note: When you're the first one on your server to be invited to a new room
+ # over federation, we only have access to some stripped state in
+ # `event.unsigned.invite_room_state` which currently doesn't include `heroes`,
+ # see https://github.com/matrix-org/matrix-spec/issues/380. This means that
+ # clients won't be able to calculate the room name when necessary and just a
+ # pitfall we have to deal with until that spec issue is resolved.
+ hero_user_ids: List[str] = []
+ # TODO: Should we also check for `EventTypes.CanonicalAlias`
+ # (`m.room.canonical_alias`) as a fallback for the room name? see
+ # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1671260153
+ if name_event_id is None:
+ hero_user_ids = extract_heroes_from_room_summary(
+ room_membership_summary, me=user.to_string()
+ )
+
# Fetch the `required_state` for the room
#
# No `required_state` for invite/knock rooms (just `stripped_state`)
@@ -1253,13 +1589,11 @@ class SlidingSyncHandler:
# https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932
#
# Calculate the `StateFilter` based on the `required_state` for the room
- room_state: Optional[StateMap[EventBase]] = None
- required_room_state: Optional[StateMap[EventBase]] = None
+ required_state_filter = StateFilter.none()
if room_membership_for_user_at_to_token.membership not in (
Membership.INVITE,
Membership.KNOCK,
):
- required_state_filter = StateFilter.none()
# If we have a double wildcard ("*", "*") in the `required_state`, we need
# to fetch all state for the room
#
@@ -1325,86 +1659,65 @@ class SlidingSyncHandler:
required_state_filter = StateFilter.from_types(required_state_types)
- # We need this base set of info for the response so let's just fetch it along
- # with the `required_state` for the room
- META_ROOM_STATE = [(EventTypes.Name, ""), (EventTypes.RoomAvatar, "")]
+ # We need this base set of info for the response so let's just fetch it along
+ # with the `required_state` for the room
+ meta_room_state = [(EventTypes.Name, ""), (EventTypes.RoomAvatar, "")] + [
+ (EventTypes.Member, hero_user_id) for hero_user_id in hero_user_ids
+ ]
+ state_filter = StateFilter.all()
+ if required_state_filter != StateFilter.all():
state_filter = StateFilter(
types=StateFilter.from_types(
- chain(META_ROOM_STATE, required_state_filter.to_types())
+ chain(meta_room_state, required_state_filter.to_types())
).types,
include_others=required_state_filter.include_others,
)
- # We can return all of the state that was requested if this was the first
- # time we've sent the room down this connection.
- if initial:
- # People shouldn't see past their leave/ban event
- if room_membership_for_user_at_to_token.membership in (
- Membership.LEAVE,
- Membership.BAN,
- ):
- room_state = await self.storage_controllers.state.get_state_at(
- room_id,
- stream_position=to_token.copy_and_replace(
- StreamKeyType.ROOM,
- room_membership_for_user_at_to_token.event_pos.to_room_stream_token(),
- ),
- state_filter=state_filter,
- # Partially-stated rooms should have all state events except for
- # remote membership events. Since we've already excluded
- # partially-stated rooms unless `required_state` only has
- # `["m.room.member", "$LAZY"]` for membership, we should be able to
- # retrieve everything requested. When we're lazy-loading, if there
- # are some remote senders in the timeline, we should also have their
- # membership event because we had to auth that timeline event. Plus
- # we don't want to block the whole sync waiting for this one room.
- await_full_state=False,
- )
- # Otherwise, we can get the latest current state in the room
- else:
- room_state = await self.storage_controllers.state.get_current_state(
- room_id,
- state_filter,
- # Partially-stated rooms should have all state events except for
- # remote membership events. Since we've already excluded
- # partially-stated rooms unless `required_state` only has
- # `["m.room.member", "$LAZY"]` for membership, we should be able to
- # retrieve everything requested. When we're lazy-loading, if there
- # are some remote senders in the timeline, we should also have their
- # membership event because we had to auth that timeline event. Plus
- # we don't want to block the whole sync waiting for this one room.
- await_full_state=False,
- )
- # TODO: Query `current_state_delta_stream` and reverse/rewind back to the `to_token`
- else:
- # TODO: Once we can figure out if we've sent a room down this connection before,
- # we can return updates instead of the full required state.
- raise NotImplementedError()
+ # We can return all of the state that was requested if this was the first
+ # time we've sent the room down this connection.
+ room_state: StateMap[EventBase] = {}
+ if initial:
+ room_state = await self.get_current_state_at(
+ room_id=room_id,
+ room_membership_for_user_at_to_token=room_membership_for_user_at_to_token,
+ state_filter=state_filter,
+ to_token=to_token,
+ )
+ else:
+ # TODO: Once we can figure out if we've sent a room down this connection before,
+ # we can return updates instead of the full required state.
+ raise NotImplementedError()
- if required_state_filter != StateFilter.none():
- required_room_state = required_state_filter.filter_state(room_state)
+ required_room_state: StateMap[EventBase] = {}
+ if required_state_filter != StateFilter.none():
+ required_room_state = required_state_filter.filter_state(room_state)
# Find the room name and avatar from the state
room_name: Optional[str] = None
+ # TODO: Should we also check for `EventTypes.CanonicalAlias`
+ # (`m.room.canonical_alias`) as a fallback for the room name? see
+ # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1671260153
+ name_event = room_state.get((EventTypes.Name, ""))
+ if name_event is not None:
+ room_name = name_event.content.get("name")
+
room_avatar: Optional[str] = None
- if room_state is not None:
- name_event = room_state.get((EventTypes.Name, ""))
- if name_event is not None:
- room_name = name_event.content.get("name")
-
- avatar_event = room_state.get((EventTypes.RoomAvatar, ""))
- if avatar_event is not None:
- room_avatar = avatar_event.content.get("url")
- elif stripped_state is not None:
- for event in stripped_state:
- if event["type"] == EventTypes.Name:
- room_name = event.get("content", {}).get("name")
- elif event["type"] == EventTypes.RoomAvatar:
- room_avatar = event.get("content", {}).get("url")
-
- # Found everything so we can stop looking
- if room_name is not None and room_avatar is not None:
- break
+ avatar_event = room_state.get((EventTypes.RoomAvatar, ""))
+ if avatar_event is not None:
+ room_avatar = avatar_event.content.get("url")
+
+ # Assemble heroes: extract the info from the state we just fetched
+ heroes: List[SlidingSyncResult.RoomResult.StrippedHero] = []
+ for hero_user_id in hero_user_ids:
+ member_event = room_state.get((EventTypes.Member, hero_user_id))
+ if member_event is not None:
+ heroes.append(
+ SlidingSyncResult.RoomResult.StrippedHero(
+ user_id=hero_user_id,
+ display_name=member_event.content.get("displayname"),
+ avatar_url=member_event.content.get("avatar_url"),
+ )
+ )
# Figure out the last bump event in the room
last_bump_event_result = (
@@ -1423,14 +1736,10 @@ class SlidingSyncHandler:
return SlidingSyncResult.RoomResult(
name=room_name,
avatar=room_avatar,
- # TODO: Dummy value
- heroes=None,
- # TODO: Dummy value
- is_dm=False,
+ heroes=heroes,
+ is_dm=room_membership_for_user_at_to_token.is_dm,
initial=initial,
- required_state=(
- list(required_room_state.values()) if required_room_state else None
- ),
+ required_state=list(required_room_state.values()),
timeline_events=timeline_events,
bundled_aggregations=bundled_aggregations,
stripped_state=stripped_state,
@@ -1438,9 +1747,12 @@ class SlidingSyncHandler:
limited=limited,
num_live=num_live,
bump_stamp=bump_stamp,
- # TODO: Dummy values
- joined_count=0,
- invited_count=0,
+ joined_count=room_membership_summary.get(
+ Membership.JOIN, empty_membership_summary
+ ).count,
+ invited_count=room_membership_summary.get(
+ Membership.INVITE, empty_membership_summary
+ ).count,
# TODO: These are just dummy values. We could potentially just remove these
# since notifications can only really be done correctly on the client anyway
# (encrypted rooms).
|