diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 847a638bba..8622ef8472 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -18,22 +18,28 @@
#
#
import logging
-from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple
+import attr
from immutabledict import immutabledict
-from synapse.api.constants import AccountDataTypes, EventTypes, Membership
+from synapse.api.constants import AccountDataTypes, Direction, EventTypes, Membership
from synapse.events import EventBase
-from synapse.storage.roommember import RoomsForUser
+from synapse.events.utils import strip_event
+from synapse.handlers.relations import BundledAggregations
+from synapse.storage.databases.main.stream import CurrentStateDeltaMembership
from synapse.types import (
+ JsonDict,
PersistedEventPosition,
Requester,
RoomStreamToken,
+ StreamKeyType,
StreamToken,
UserID,
)
from synapse.types.handlers import OperationType, SlidingSyncConfig, SlidingSyncResult
from synapse.types.state import StateFilter
+from synapse.visibility import filter_events_for_client
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -41,28 +47,9 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
-def convert_event_to_rooms_for_user(event: EventBase) -> RoomsForUser:
- """
- Quick helper to convert an event to a `RoomsForUser` object.
- """
- # These fields should be present for all persisted events
- assert event.internal_metadata.stream_ordering is not None
- assert event.internal_metadata.instance_name is not None
-
- return RoomsForUser(
- room_id=event.room_id,
- sender=event.sender,
- membership=event.membership,
- event_id=event.event_id,
- event_pos=PersistedEventPosition(
- event.internal_metadata.instance_name,
- event.internal_metadata.stream_ordering,
- ),
- room_version_id=event.room_version.identifier,
- )
-
-
-def filter_membership_for_sync(*, membership: str, user_id: str, sender: str) -> bool:
+def filter_membership_for_sync(
+ *, membership: str, user_id: str, sender: Optional[str]
+) -> bool:
"""
Returns True if the membership event should be included in the sync response,
otherwise False.
@@ -79,7 +66,54 @@ def filter_membership_for_sync(*, membership: str, user_id: str, sender: str) ->
#
# This logic includes kicks (leave events where the sender is not the same user) and
# can be read as "anything that isn't a leave or a leave with a different sender".
- return membership != Membership.LEAVE or sender != user_id
+ #
+ # When `sender=None` and `membership=Membership.LEAVE`, it means that a state reset
+ # happened that removed the user from the room, or the user was the last person
+ # locally to leave the room which caused the server to leave the room. In both
+ # cases, we can just remove the rooms since they are no longer relevant to the user.
+ # They could still be added back later if they are `newly_left`.
+ return membership != Membership.LEAVE or sender not in (user_id, None)
+
+
+# We can't freeze this class because we want to update it in place with the
+# de-duplicated data.
+@attr.s(slots=True, auto_attribs=True)
+class RoomSyncConfig:
+ """
+ Holds the config for what data we should fetch for a room in the sync response.
+
+ Attributes:
+ timeline_limit: The maximum number of events to return in the timeline.
+ required_state: The set of state events requested for the room. The
+ values are close to `StateKey` but actually use a syntax where you can
+ provide `*` wildcard and `$LAZY` for lazy room members as the `state_key` part
+ of the tuple (type, state_key).
+ """
+
+ timeline_limit: int
+ required_state: Set[Tuple[str, str]]
+
+
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class _RoomMembershipForUser:
+ """
+ Attributes:
+ event_id: The event ID of the membership event
+ event_pos: The stream position of the membership event
+ membership: The membership state of the user in the room
+ sender: The person who sent the membership event
+ newly_joined: Whether the user newly joined the room during the given token
+ range
+ """
+
+ event_id: Optional[str]
+ event_pos: PersistedEventPosition
+ membership: str
+ sender: Optional[str]
+ newly_joined: bool
+
+ def copy_and_replace(self, **kwds: Any) -> "_RoomMembershipForUser":
+ return attr.evolve(self, **kwds)
class SlidingSyncHandler:
@@ -90,6 +124,7 @@ class SlidingSyncHandler:
self.auth_blocking = hs.get_auth_blocking()
self.notifier = hs.get_notifier()
self.event_sources = hs.get_event_sources()
+ self.relations_handler = hs.get_relations_handler()
self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync
async def wait_for_sync_for_user(
@@ -201,6 +236,7 @@ class SlidingSyncHandler:
# Assemble sliding window lists
lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {}
+ relevant_room_map: Dict[str, RoomSyncConfig] = {}
if sync_config.lists:
# Get all of the room IDs that the user should be able to see in the sync
# response
@@ -225,29 +261,67 @@ class SlidingSyncHandler:
ops: List[SlidingSyncResult.SlidingWindowList.Operation] = []
if list_config.ranges:
for range in list_config.ranges:
+ sliced_room_ids = [
+ room_id
+ # Both sides of range are inclusive
+ for room_id, _ in sorted_room_info[range[0] : range[1] + 1]
+ ]
+
ops.append(
SlidingSyncResult.SlidingWindowList.Operation(
op=OperationType.SYNC,
range=range,
- room_ids=[
- room_id
- for room_id, _ in sorted_room_info[
- range[0] : range[1]
- ]
- ],
+ room_ids=sliced_room_ids,
)
)
+ # Take the superset of the `RoomSyncConfig` for each room
+ for room_id in sliced_room_ids:
+ if relevant_room_map.get(room_id) is not None:
+ # Take the highest timeline limit
+ if (
+ relevant_room_map[room_id].timeline_limit
+ < list_config.timeline_limit
+ ):
+ relevant_room_map[room_id].timeline_limit = (
+ list_config.timeline_limit
+ )
+
+ # Union the required state
+ relevant_room_map[room_id].required_state.update(
+ list_config.required_state
+ )
+ else:
+ relevant_room_map[room_id] = RoomSyncConfig(
+ timeline_limit=list_config.timeline_limit,
+ required_state=set(list_config.required_state),
+ )
+
lists[list_key] = SlidingSyncResult.SlidingWindowList(
count=len(sorted_room_info),
ops=ops,
)
+ # TODO: if (sync_config.room_subscriptions):
+
+ # Fetch room data
+ rooms: Dict[str, SlidingSyncResult.RoomResult] = {}
+ for room_id, room_sync_config in relevant_room_map.items():
+ room_sync_result = await self.get_room_sync_data(
+ user=sync_config.user,
+ room_id=room_id,
+ room_sync_config=room_sync_config,
+ rooms_membership_for_user_at_to_token=sync_room_map[room_id],
+ from_token=from_token,
+ to_token=to_token,
+ )
+
+ rooms[room_id] = room_sync_result
+
return SlidingSyncResult(
next_pos=to_token,
lists=lists,
- # TODO: Gather room data for rooms in lists and `sync_config.room_subscriptions`
- rooms={},
+ rooms=rooms,
extensions={},
)
@@ -256,7 +330,7 @@ class SlidingSyncHandler:
user: UserID,
to_token: StreamToken,
from_token: Optional[StreamToken] = None,
- ) -> Dict[str, RoomsForUser]:
+ ) -> Dict[str, _RoomMembershipForUser]:
"""
Fetch room IDs that should be listed for this user in the sync response (the
full room list that will be filtered, sorted, and sliced).
@@ -305,13 +379,17 @@ class SlidingSyncHandler:
# Our working list of rooms that can show up in the sync response
sync_room_id_set = {
- room_for_user.room_id: room_for_user
- for room_for_user in room_for_user_list
- if filter_membership_for_sync(
+ # Note: The `room_for_user` we're assigning here will need to be fixed up
+ # (below) because they are potentially from the current snapshot time
+ # instead from the time of the `to_token`.
+ room_for_user.room_id: _RoomMembershipForUser(
+ event_id=room_for_user.event_id,
+ event_pos=room_for_user.event_pos,
membership=room_for_user.membership,
- user_id=user_id,
sender=room_for_user.sender,
+ newly_joined=False,
)
+ for room_for_user in room_for_user_list
}
# Get the `RoomStreamToken` that represents the spot we queried up to when we got
@@ -346,14 +424,9 @@ class SlidingSyncHandler:
#
# - 1a) Remove rooms that the user joined after the `to_token`
# - 1b) Add back rooms that the user left after the `to_token`
+ # - 1c) Update room membership events to the point in time of the `to_token`
# - 2) Add back newly_left rooms (> `from_token` and <= `to_token`)
- #
- # Below, we're doing two separate lookups for membership changes. We could
- # request everything for both fixups in one range, [`from_token.room_key`,
- # `membership_snapshot_token`), but we want to avoid raw `stream_ordering`
- # comparison without `instance_name` (which is flawed). We could refactor
- # `event.internal_metadata` to include `instance_name` but it might turn out a
- # little difficult and a bigger, broader Synapse change than we want to make.
+ # - 3) Figure out which rooms are `newly_joined`
# 1) -----------------------------------------------------
@@ -363,159 +436,198 @@ class SlidingSyncHandler:
# If our `to_token` is already the same or ahead of the latest room membership
# for the user, we don't need to do any "2)" fix-ups and can just straight-up
# use the room list from the snapshot as a base (nothing has changed)
- membership_change_events_after_to_token = []
+ current_state_delta_membership_changes_after_to_token = []
if not membership_snapshot_token.is_before_or_eq(to_token.room_key):
- membership_change_events_after_to_token = (
- await self.store.get_membership_changes_for_user(
+ current_state_delta_membership_changes_after_to_token = (
+ await self.store.get_current_state_delta_membership_changes_for_user(
user_id,
from_key=to_token.room_key,
to_key=membership_snapshot_token,
- excluded_rooms=self.rooms_to_exclude_globally,
+ excluded_room_ids=self.rooms_to_exclude_globally,
)
)
- # 1) Assemble a list of the last membership events in some given ranges. Someone
- # could have left and joined multiple times during the given range but we only
- # care about end-result so we grab the last one.
- last_membership_change_by_room_id_after_to_token: Dict[str, EventBase] = {}
- # We also need the first membership event after the `to_token` so we can step
- # backward to the previous membership that would apply to the from/to range.
- first_membership_change_by_room_id_after_to_token: Dict[str, EventBase] = {}
- for event in membership_change_events_after_to_token:
- last_membership_change_by_room_id_after_to_token[event.room_id] = event
+ # 1) Assemble a list of the first membership event after the `to_token` so we can
+ # step backward to the previous membership that would apply to the from/to
+ # range.
+ first_membership_change_by_room_id_after_to_token: Dict[
+ str, CurrentStateDeltaMembership
+ ] = {}
+ for membership_change in current_state_delta_membership_changes_after_to_token:
# Only set if we haven't already set it
first_membership_change_by_room_id_after_to_token.setdefault(
- event.room_id, event
+ membership_change.room_id, membership_change
)
# 1) Fixup
+ #
+ # Since we fetched a snapshot of the users room list at some point in time after
+ # the from/to tokens, we need to revert/rewind some membership changes to match
+ # the point in time of the `to_token`.
for (
- last_membership_change_after_to_token
- ) in last_membership_change_by_room_id_after_to_token.values():
- room_id = last_membership_change_after_to_token.room_id
-
- # We want to find the first membership change after the `to_token` then step
- # backward to know the membership in the from/to range.
- first_membership_change_after_to_token = (
- first_membership_change_by_room_id_after_to_token.get(room_id)
- )
- assert first_membership_change_after_to_token is not None, (
- "If there was a `last_membership_change_after_to_token` that we're iterating over, "
- + "then there should be corresponding a first change. For example, even if there "
- + "is only one event after the `to_token`, the first and last event will be same event. "
- + "This is probably a mistake in assembling the `last_membership_change_by_room_id_after_to_token`"
- + "/`first_membership_change_by_room_id_after_to_token` dicts above."
- )
- # TODO: Instead of reading from `unsigned`, refactor this to use the
- # `current_state_delta_stream` table in the future. Probably a new
- # `get_membership_changes_for_user()` function that uses
- # `current_state_delta_stream` with a join to `room_memberships`. This would
- # help in state reset scenarios since `prev_content` is looking at the
- # current branch vs the current room state. This is all just data given to
- # the client so no real harm to data integrity, but we'd like to be nice to
- # the client. Since the `current_state_delta_stream` table is new, it
- # doesn't have all events in it. Since this is Sliding Sync, if we ever need
- # to, we can signal the client to throw all of their state away by sending
- # "operation: RESET".
- prev_content = first_membership_change_after_to_token.unsigned.get(
- "prev_content", {}
- )
- prev_membership = prev_content.get("membership", None)
- prev_sender = first_membership_change_after_to_token.unsigned.get(
- "prev_sender", None
- )
-
- # Check if the previous membership (membership that applies to the from/to
- # range) should be included in our `sync_room_id_set`
- should_prev_membership_be_included = (
- prev_membership is not None
- and prev_sender is not None
- and filter_membership_for_sync(
- membership=prev_membership,
- user_id=user_id,
- sender=prev_sender,
- )
- )
-
- # Check if the last membership (membership that applies to our snapshot) was
- # already included in our `sync_room_id_set`
- was_last_membership_already_included = filter_membership_for_sync(
- membership=last_membership_change_after_to_token.membership,
+ room_id,
+ first_membership_change_after_to_token,
+ ) in first_membership_change_by_room_id_after_to_token.items():
+ # 1a) Remove rooms that the user joined after the `to_token`
+ if first_membership_change_after_to_token.prev_event_id is None:
+ sync_room_id_set.pop(room_id, None)
+ # 1b) 1c) From the first membership event after the `to_token`, step backward to the
+ # previous membership that would apply to the from/to range.
+ else:
+ # We don't expect these fields to be `None` if we have a `prev_event_id`
+ # but we're being defensive since it's possible that the prev event was
+ # culled from the database.
+ if (
+ first_membership_change_after_to_token.prev_event_pos is not None
+ and first_membership_change_after_to_token.prev_membership
+ is not None
+ ):
+ sync_room_id_set[room_id] = _RoomMembershipForUser(
+ event_id=first_membership_change_after_to_token.prev_event_id,
+ event_pos=first_membership_change_after_to_token.prev_event_pos,
+ membership=first_membership_change_after_to_token.prev_membership,
+ sender=first_membership_change_after_to_token.prev_sender,
+ newly_joined=False,
+ )
+ else:
+ # If we can't find the previous membership event, we shouldn't
+ # include the room in the sync response since we can't determine the
+ # exact membership state and shouldn't rely on the current snapshot.
+ sync_room_id_set.pop(room_id, None)
+
+ # Filter the rooms that that we have updated room membership events to the point
+ # in time of the `to_token` (from the "1)" fixups)
+ filtered_sync_room_id_set = {
+ room_id: room_membership_for_user
+ for room_id, room_membership_for_user in sync_room_id_set.items()
+ if filter_membership_for_sync(
+ membership=room_membership_for_user.membership,
user_id=user_id,
- sender=last_membership_change_after_to_token.sender,
+ sender=room_membership_for_user.sender,
)
-
- # 1a) Add back rooms that the user left after the `to_token`
- #
- # For example, if the last membership event after the `to_token` is a leave
- # event, then the room was excluded from `sync_room_id_set` when we first
- # crafted it above. We should add these rooms back as long as the user also
- # was part of the room before the `to_token`.
- if (
- not was_last_membership_already_included
- and should_prev_membership_be_included
- ):
- sync_room_id_set[room_id] = convert_event_to_rooms_for_user(
- last_membership_change_after_to_token
- )
- # 1b) Remove rooms that the user joined (hasn't left) after the `to_token`
- #
- # For example, if the last membership event after the `to_token` is a "join"
- # event, then the room was included `sync_room_id_set` when we first crafted
- # it above. We should remove these rooms as long as the user also wasn't
- # part of the room before the `to_token`.
- elif (
- was_last_membership_already_included
- and not should_prev_membership_be_included
- ):
- del sync_room_id_set[room_id]
+ }
# 2) -----------------------------------------------------
# We fix-up newly_left rooms after the first fixup because it may have removed
- # some left rooms that we can figure out our newly_left in the following code
+ # some left rooms that we can figure out are newly_left in the following code
# 2) Fetch membership changes that fall in the range from `from_token` up to `to_token`
- membership_change_events_in_from_to_range = []
+ current_state_delta_membership_changes_in_from_to_range = []
if from_token:
- membership_change_events_in_from_to_range = (
- await self.store.get_membership_changes_for_user(
+ current_state_delta_membership_changes_in_from_to_range = (
+ await self.store.get_current_state_delta_membership_changes_for_user(
user_id,
from_key=from_token.room_key,
to_key=to_token.room_key,
- excluded_rooms=self.rooms_to_exclude_globally,
+ excluded_room_ids=self.rooms_to_exclude_globally,
)
)
# 2) Assemble a list of the last membership events in some given ranges. Someone
# could have left and joined multiple times during the given range but we only
# care about end-result so we grab the last one.
- last_membership_change_by_room_id_in_from_to_range: Dict[str, EventBase] = {}
- for event in membership_change_events_in_from_to_range:
- last_membership_change_by_room_id_in_from_to_range[event.room_id] = event
+ last_membership_change_by_room_id_in_from_to_range: Dict[
+ str, CurrentStateDeltaMembership
+ ] = {}
+ # We also want to assemble a list of the first membership events during the token
+ # range so we can step backward to the previous membership that would apply to
+ # before the token range to see if we have `newly_joined` the room.
+ first_membership_change_by_room_id_in_from_to_range: Dict[
+ str, CurrentStateDeltaMembership
+ ] = {}
+ # Keep track if the room has a non-join event in the token range so we can later
+ # tell if it was a `newly_joined` room. If the last membership event in the
+ # token range is a join and there is also some non-join in the range, we know
+ # they `newly_joined`.
+ has_non_join_event_by_room_id_in_from_to_range: Dict[str, bool] = {}
+ for (
+ membership_change
+ ) in current_state_delta_membership_changes_in_from_to_range:
+ room_id = membership_change.room_id
+
+ last_membership_change_by_room_id_in_from_to_range[room_id] = (
+ membership_change
+ )
+ # Only set if we haven't already set it
+ first_membership_change_by_room_id_in_from_to_range.setdefault(
+ room_id, membership_change
+ )
+
+ if membership_change.membership != Membership.JOIN:
+ has_non_join_event_by_room_id_in_from_to_range[room_id] = True
# 2) Fixup
+ #
+ # 3) We also want to assemble a list of possibly newly joined rooms. Someone
+ # could have left and joined multiple times during the given range but we only
+ # care about whether they are joined at the end of the token range so we are
+ # working with the last membership even in the token range.
+ possibly_newly_joined_room_ids = set()
for (
last_membership_change_in_from_to_range
) in last_membership_change_by_room_id_in_from_to_range.values():
room_id = last_membership_change_in_from_to_range.room_id
+ # 3)
+ if last_membership_change_in_from_to_range.membership == Membership.JOIN:
+ possibly_newly_joined_room_ids.add(room_id)
+
# 2) Add back newly_left rooms (> `from_token` and <= `to_token`). We
# include newly_left rooms because the last event that the user should see
# is their own leave event
if last_membership_change_in_from_to_range.membership == Membership.LEAVE:
- sync_room_id_set[room_id] = convert_event_to_rooms_for_user(
- last_membership_change_in_from_to_range
+ filtered_sync_room_id_set[room_id] = _RoomMembershipForUser(
+ event_id=last_membership_change_in_from_to_range.event_id,
+ event_pos=last_membership_change_in_from_to_range.event_pos,
+ membership=last_membership_change_in_from_to_range.membership,
+ sender=last_membership_change_in_from_to_range.sender,
+ newly_joined=False,
)
- return sync_room_id_set
+ # 3) Figure out `newly_joined`
+ for room_id in possibly_newly_joined_room_ids:
+ has_non_join_in_from_to_range = (
+ has_non_join_event_by_room_id_in_from_to_range.get(room_id, False)
+ )
+ # If the last membership event in the token range is a join and there is
+ # also some non-join in the range, we know they `newly_joined`.
+ if has_non_join_in_from_to_range:
+ # We found a `newly_joined` room (we left and joined within the token range)
+ filtered_sync_room_id_set[room_id] = filtered_sync_room_id_set[
+ room_id
+ ].copy_and_replace(newly_joined=True)
+ else:
+ prev_event_id = first_membership_change_by_room_id_in_from_to_range[
+ room_id
+ ].prev_event_id
+ prev_membership = first_membership_change_by_room_id_in_from_to_range[
+ room_id
+ ].prev_membership
+
+ if prev_event_id is None:
+ # We found a `newly_joined` room (we are joining the room for the
+ # first time within the token range)
+ filtered_sync_room_id_set[room_id] = filtered_sync_room_id_set[
+ room_id
+ ].copy_and_replace(newly_joined=True)
+ # Last resort, we need to step back to the previous membership event
+ # just before the token range to see if we're joined then or not.
+ elif prev_membership != Membership.JOIN:
+ # We found a `newly_joined` room (we left before the token range
+ # and joined within the token range)
+ filtered_sync_room_id_set[room_id] = filtered_sync_room_id_set[
+ room_id
+ ].copy_and_replace(newly_joined=True)
+
+ return filtered_sync_room_id_set
async def filter_rooms(
self,
user: UserID,
- sync_room_map: Dict[str, RoomsForUser],
+ sync_room_map: Dict[str, _RoomMembershipForUser],
filters: SlidingSyncConfig.SlidingSyncList.Filters,
to_token: StreamToken,
- ) -> Dict[str, RoomsForUser]:
+ ) -> Dict[str, _RoomMembershipForUser]:
"""
Filter rooms based on the sync request.
@@ -629,9 +741,9 @@ class SlidingSyncHandler:
async def sort_rooms(
self,
- sync_room_map: Dict[str, RoomsForUser],
+ sync_room_map: Dict[str, _RoomMembershipForUser],
to_token: StreamToken,
- ) -> List[Tuple[str, RoomsForUser]]:
+ ) -> List[Tuple[str, _RoomMembershipForUser]]:
"""
Sort by `stream_ordering` of the last event that the user should see in the
room. `stream_ordering` is unique so we get a stable sort.
@@ -678,3 +790,229 @@ class SlidingSyncHandler:
# We want descending order
reverse=True,
)
+
+ async def get_room_sync_data(
+ self,
+ user: UserID,
+ room_id: str,
+ room_sync_config: RoomSyncConfig,
+ rooms_membership_for_user_at_to_token: _RoomMembershipForUser,
+ from_token: Optional[StreamToken],
+ to_token: StreamToken,
+ ) -> SlidingSyncResult.RoomResult:
+ """
+ Fetch room data for the sync response.
+
+ We fetch data according to the token range (> `from_token` and <= `to_token`).
+
+ Args:
+ user: User to fetch data for
+ room_id: The room ID to fetch data for
+ room_sync_config: Config for what data we should fetch for a room in the
+ sync response.
+ rooms_membership_for_user_at_to_token: Membership information for the user
+ in the room at the time of `to_token`.
+ from_token: The point in the stream to sync from.
+ to_token: The point in the stream to sync up to.
+ """
+
+ # Assemble the list of timeline events
+ #
+ # It would be nice to make the `rooms` response more uniform regardless of
+ # membership. Currently, we have to make all of these optional because
+ # `invite`/`knock` rooms only have `stripped_state`. See
+ # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932
+ timeline_events: Optional[List[EventBase]] = None
+ bundled_aggregations: Optional[Dict[str, BundledAggregations]] = None
+ limited: Optional[bool] = None
+ prev_batch_token: Optional[StreamToken] = None
+ num_live: Optional[int] = None
+ if (
+ room_sync_config.timeline_limit > 0
+ # No timeline for invite/knock rooms (just `stripped_state`)
+ and rooms_membership_for_user_at_to_token.membership
+ not in (Membership.INVITE, Membership.KNOCK)
+ ):
+ limited = False
+ # We want to start off using the `to_token` (vs `from_token`) because we look
+ # backwards from the `to_token` up to the `timeline_limit` and we might not
+ # reach the `from_token` before we hit the limit. We will update the room stream
+ # position once we've fetched the events to point to the earliest event fetched.
+ prev_batch_token = to_token
+
+ # We're going to paginate backwards from the `to_token`
+ from_bound = to_token.room_key
+ # People shouldn't see past their leave/ban event
+ if rooms_membership_for_user_at_to_token.membership in (
+ Membership.LEAVE,
+ Membership.BAN,
+ ):
+ from_bound = (
+ rooms_membership_for_user_at_to_token.event_pos.to_room_stream_token()
+ )
+
+ # Determine whether we should limit the timeline to the token range.
+ #
+ # We should return historical messages (before token range) in the
+ # following cases because we want clients to be able to show a basic
+ # screen of information:
+ # - Initial sync (because no `from_token` to limit us anyway)
+ # - When users `newly_joined`
+ # - TODO: For an incremental sync where we haven't sent it down this
+ # connection before
+ to_bound = (
+ from_token.room_key
+ if from_token is not None
+ and not rooms_membership_for_user_at_to_token.newly_joined
+ else None
+ )
+
+ timeline_events, new_room_key = await self.store.paginate_room_events(
+ room_id=room_id,
+ from_key=from_bound,
+ to_key=to_bound,
+ direction=Direction.BACKWARDS,
+ # We add one so we can determine if there are enough events to saturate
+ # the limit or not (see `limited`)
+ limit=room_sync_config.timeline_limit + 1,
+ event_filter=None,
+ )
+
+ # We want to return the events in ascending order (the last event is the
+ # most recent).
+ timeline_events.reverse()
+
+ # Determine our `limited` status based on the timeline. We do this before
+ # filtering the events so we can accurately determine if there is more to
+ # paginate even if we filter out some/all events.
+ if len(timeline_events) > room_sync_config.timeline_limit:
+ limited = True
+ # Get rid of that extra "+ 1" event because we only used it to determine
+ # if we hit the limit or not
+ timeline_events = timeline_events[-room_sync_config.timeline_limit :]
+ assert timeline_events[0].internal_metadata.stream_ordering
+ new_room_key = RoomStreamToken(
+ stream=timeline_events[0].internal_metadata.stream_ordering - 1
+ )
+
+ # Make sure we don't expose any events that the client shouldn't see
+ timeline_events = await filter_events_for_client(
+ self.storage_controllers,
+ user.to_string(),
+ timeline_events,
+ is_peeking=rooms_membership_for_user_at_to_token.membership
+ != Membership.JOIN,
+ filter_send_to_client=True,
+ )
+ # TODO: Filter out `EventTypes.CallInvite` in public rooms,
+ # see https://github.com/element-hq/synapse/issues/17359
+
+ # TODO: Handle timeline gaps (`get_timeline_gaps()`)
+
+ # Determine how many "live" events we have (events within the given token range).
+ #
+ # This is mostly useful to determine whether a given @mention event should
+ # make a noise or not. Clients cannot rely solely on the absence of
+ # `initial: true` to determine live events because if a room not in the
+ # sliding window bumps into the window because of an @mention it will have
+ # `initial: true` yet contain a single live event (with potentially other
+ # old events in the timeline)
+ num_live = 0
+ if from_token is not None:
+ for timeline_event in reversed(timeline_events):
+ # This fields should be present for all persisted events
+ assert timeline_event.internal_metadata.stream_ordering is not None
+ assert timeline_event.internal_metadata.instance_name is not None
+
+ persisted_position = PersistedEventPosition(
+ instance_name=timeline_event.internal_metadata.instance_name,
+ stream=timeline_event.internal_metadata.stream_ordering,
+ )
+ if persisted_position.persisted_after(from_token.room_key):
+ num_live += 1
+ else:
+ # Since we're iterating over the timeline events in
+ # reverse-chronological order, we can break once we hit an event
+ # that's not live. In the future, we could potentially optimize
+ # this more with a binary search (bisect).
+ break
+
+ # If the timeline is `limited=True`, the client does not have all events
+ # necessary to calculate aggregations themselves.
+ if limited:
+ bundled_aggregations = (
+ await self.relations_handler.get_bundled_aggregations(
+ timeline_events, user.to_string()
+ )
+ )
+
+ # Update the `prev_batch_token` to point to the position that allows us to
+ # keep paginating backwards from the oldest event we return in the timeline.
+ prev_batch_token = prev_batch_token.copy_and_replace(
+ StreamKeyType.ROOM, new_room_key
+ )
+
+ # Figure out any stripped state events for invite/knocks. This allows the
+ # potential joiner to identify the room.
+ stripped_state: List[JsonDict] = []
+ if rooms_membership_for_user_at_to_token.membership in (
+ Membership.INVITE,
+ Membership.KNOCK,
+ ):
+ # This should never happen. If someone is invited/knocked on room, then
+ # there should be an event for it.
+ assert rooms_membership_for_user_at_to_token.event_id is not None
+
+ invite_or_knock_event = await self.store.get_event(
+ rooms_membership_for_user_at_to_token.event_id
+ )
+
+ stripped_state = []
+ if invite_or_knock_event.membership == Membership.INVITE:
+ stripped_state.extend(
+ invite_or_knock_event.unsigned.get("invite_room_state", [])
+ )
+ elif invite_or_knock_event.membership == Membership.KNOCK:
+ stripped_state.extend(
+ invite_or_knock_event.unsigned.get("knock_room_state", [])
+ )
+
+ stripped_state.append(strip_event(invite_or_knock_event))
+
+ # TODO: Handle state resets. For example, if we see
+ # `rooms_membership_for_user_at_to_token.membership = Membership.LEAVE` but
+ # `required_state` doesn't include it, we should indicate to the client that a
+ # state reset happened. Perhaps we should indicate this by setting `initial:
+ # True` and empty `required_state`.
+
+ return SlidingSyncResult.RoomResult(
+ # TODO: Dummy value
+ name=None,
+ # TODO: Dummy value
+ avatar=None,
+ # TODO: Dummy value
+ heroes=None,
+ # TODO: Since we can't determine whether we've already sent a room down this
+ # Sliding Sync connection before (we plan to add this optimization in the
+ # future), we're always returning the requested room state instead of
+ # updates.
+ initial=True,
+ # TODO: Dummy value
+ required_state=[],
+ timeline_events=timeline_events,
+ bundled_aggregations=bundled_aggregations,
+ # TODO: Dummy value
+ is_dm=False,
+ stripped_state=stripped_state,
+ prev_batch=prev_batch_token,
+ limited=limited,
+ # TODO: Dummy values
+ joined_count=0,
+ invited_count=0,
+ # TODO: These are just dummy values. We could potentially just remove these
+ # since notifications can only really be done correctly on the client anyway
+ # (encrypted rooms).
+ notification_count=0,
+ highlight_count=0,
+ num_live=num_live,
+ )
|