diff --git a/changelog.d/17187.feature b/changelog.d/17187.feature
new file mode 100644
index 0000000000..50383cb4a4
--- /dev/null
+++ b/changelog.d/17187.feature
@@ -0,0 +1 @@
+Add initial implementation of an experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index 0a9123c56b..542e4faaa1 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -50,7 +50,7 @@ class Membership:
KNOCK: Final = "knock"
LEAVE: Final = "leave"
BAN: Final = "ban"
- LIST: Final = (INVITE, JOIN, KNOCK, LEAVE, BAN)
+ LIST: Final = {INVITE, JOIN, KNOCK, LEAVE, BAN}
class PresenceState:
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
new file mode 100644
index 0000000000..34ae21ba50
--- /dev/null
+++ b/synapse/handlers/sliding_sync.py
@@ -0,0 +1,610 @@
+#
+# This file is licensed under the Affero General Public License (AGPL) version 3.
+#
+# Copyright (C) 2024 New Vector, Ltd
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# See the GNU Affero General Public License for more details:
+# <https://www.gnu.org/licenses/agpl-3.0.html>.
+#
+# Originally licensed under the Apache License, Version 2.0:
+# <http://www.apache.org/licenses/LICENSE-2.0>.
+#
+# [This file includes modifications made by New Vector Limited]
+#
+#
+import logging
+from enum import Enum
+from typing import TYPE_CHECKING, AbstractSet, Dict, Final, List, Optional, Tuple
+
+import attr
+from immutabledict import immutabledict
+
+from synapse._pydantic_compat import HAS_PYDANTIC_V2
+
+if TYPE_CHECKING or HAS_PYDANTIC_V2:
+ from pydantic.v1 import Extra
+else:
+ from pydantic import Extra
+
+from synapse.api.constants import Membership
+from synapse.events import EventBase
+from synapse.rest.client.models import SlidingSyncBody
+from synapse.types import JsonMapping, Requester, RoomStreamToken, StreamToken, UserID
+
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+
+logger = logging.getLogger(__name__)
+
+
+def filter_membership_for_sync(*, membership: str, user_id: str, sender: str) -> bool:
+ """
+ Returns True if the membership event should be included in the sync response,
+ otherwise False.
+
+ Attributes:
+ membership: The membership state of the user in the room.
+ user_id: The user ID that the membership applies to
+ sender: The person who sent the membership event
+ """
+
+ # Everything except `Membership.LEAVE` because we want everything that's *still*
+ # relevant to the user. There are few more things to include in the sync response
+ # (newly_left) but those are handled separately.
+ #
+ # This logic includes kicks (leave events where the sender is not the same user) and
+ # can be read as "anything that isn't a leave or a leave with a different sender".
+ return membership != Membership.LEAVE or sender != user_id
+
+
+class SlidingSyncConfig(SlidingSyncBody):
+ """
+ Inherit from `SlidingSyncBody` since we need all of the same fields and add a few
+ extra fields that we need in the handler
+ """
+
+ user: UserID
+ device_id: Optional[str]
+
+ # Pydantic config
+ class Config:
+ # By default, ignore fields that we don't recognise.
+ extra = Extra.ignore
+ # By default, don't allow fields to be reassigned after parsing.
+ allow_mutation = False
+ # Allow custom types like `UserID` to be used in the model
+ arbitrary_types_allowed = True
+
+
+class OperationType(Enum):
+ """
+ Represents the operation types in a Sliding Sync window.
+
+ Attributes:
+ SYNC: Sets a range of entries. Clients SHOULD discard what they previous knew about
+ entries in this range.
+ INSERT: Sets a single entry. If the position is not empty then clients MUST move
+ entries to the left or the right depending on where the closest empty space is.
+ DELETE: Remove a single entry. Often comes before an INSERT to allow entries to move
+ places.
+ INVALIDATE: Remove a range of entries. Clients MAY persist the invalidated range for
+ offline support, but they should be treated as empty when additional operations
+ which concern indexes in the range arrive from the server.
+ """
+
+ SYNC: Final = "SYNC"
+ INSERT: Final = "INSERT"
+ DELETE: Final = "DELETE"
+ INVALIDATE: Final = "INVALIDATE"
+
+
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class SlidingSyncResult:
+ """
+ The Sliding Sync result to be serialized to JSON for a response.
+
+ Attributes:
+ next_pos: The next position token in the sliding window to request (next_batch).
+ lists: Sliding window API. A map of list key to list results.
+ rooms: Room subscription API. A map of room ID to room subscription to room results.
+ extensions: Extensions API. A map of extension key to extension results.
+ """
+
+ @attr.s(slots=True, frozen=True, auto_attribs=True)
+ class RoomResult:
+ """
+ Attributes:
+ name: Room name or calculated room name.
+ avatar: Room avatar
+ heroes: List of stripped membership events (containing `user_id` and optionally
+ `avatar_url` and `displayname`) for the users used to calculate the room name.
+ initial: Flag which is set when this is the first time the server is sending this
+ data on this connection. Clients can use this flag to replace or update
+ their local state. When there is an update, servers MUST omit this flag
+ entirely and NOT send "initial":false as this is wasteful on bandwidth. The
+ absence of this flag means 'false'.
+ required_state: The current state of the room
+ timeline: Latest events in the room. The last event is the most recent
+ is_dm: Flag to specify whether the room is a direct-message room (most likely
+ between two people).
+ invite_state: Stripped state events. Same as `rooms.invite.$room_id.invite_state`
+ in sync v2, absent on joined/left rooms
+ prev_batch: A token that can be passed as a start parameter to the
+ `/rooms/<room_id>/messages` API to retrieve earlier messages.
+ limited: True if their are more events than fit between the given position and now.
+ Sync again to get more.
+ joined_count: The number of users with membership of join, including the client's
+ own user ID. (same as sync `v2 m.joined_member_count`)
+ invited_count: The number of users with membership of invite. (same as sync v2
+ `m.invited_member_count`)
+ notification_count: The total number of unread notifications for this room. (same
+ as sync v2)
+ highlight_count: The number of unread notifications for this room with the highlight
+ flag set. (same as sync v2)
+ num_live: The number of timeline events which have just occurred and are not historical.
+ The last N events are 'live' and should be treated as such. This is mostly
+ useful to determine whether a given @mention event should make a noise or not.
+ Clients cannot rely solely on the absence of `initial: true` to determine live
+ events because if a room not in the sliding window bumps into the window because
+ of an @mention it will have `initial: true` yet contain a single live event
+ (with potentially other old events in the timeline).
+ """
+
+ name: str
+ avatar: Optional[str]
+ heroes: Optional[List[EventBase]]
+ initial: bool
+ required_state: List[EventBase]
+ timeline: List[EventBase]
+ is_dm: bool
+ invite_state: List[EventBase]
+ prev_batch: StreamToken
+ limited: bool
+ joined_count: int
+ invited_count: int
+ notification_count: int
+ highlight_count: int
+ num_live: int
+
+ @attr.s(slots=True, frozen=True, auto_attribs=True)
+ class SlidingWindowList:
+ """
+ Attributes:
+ count: The total number of entries in the list. Always present if this list
+ is.
+ ops: The sliding list operations to perform.
+ """
+
+ @attr.s(slots=True, frozen=True, auto_attribs=True)
+ class Operation:
+ """
+ Attributes:
+ op: The operation type to perform.
+ range: Which index positions are affected by this operation. These are
+ both inclusive.
+ room_ids: Which room IDs are affected by this operation. These IDs match
+ up to the positions in the `range`, so the last room ID in this list
+ matches the 9th index. The room data is held in a separate object.
+ """
+
+ op: OperationType
+ range: Tuple[int, int]
+ room_ids: List[str]
+
+ count: int
+ ops: List[Operation]
+
+ next_pos: StreamToken
+ lists: Dict[str, SlidingWindowList]
+ rooms: Dict[str, RoomResult]
+ extensions: JsonMapping
+
+ def __bool__(self) -> bool:
+ """Make the result appear empty if there are no updates. This is used
+ to tell if the notifier needs to wait for more events when polling for
+ events.
+ """
+ return bool(self.lists or self.rooms or self.extensions)
+
+ @staticmethod
+ def empty(next_pos: StreamToken) -> "SlidingSyncResult":
+ "Return a new empty result"
+ return SlidingSyncResult(
+ next_pos=next_pos,
+ lists={},
+ rooms={},
+ extensions={},
+ )
+
+
+class SlidingSyncHandler:
+ def __init__(self, hs: "HomeServer"):
+ self.clock = hs.get_clock()
+ self.store = hs.get_datastores().main
+ self.auth_blocking = hs.get_auth_blocking()
+ self.notifier = hs.get_notifier()
+ self.event_sources = hs.get_event_sources()
+ self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync
+
+ async def wait_for_sync_for_user(
+ self,
+ requester: Requester,
+ sync_config: SlidingSyncConfig,
+ from_token: Optional[StreamToken] = None,
+ timeout_ms: int = 0,
+ ) -> SlidingSyncResult:
+ """Get the sync for a client if we have new data for it now. Otherwise
+ wait for new data to arrive on the server. If the timeout expires, then
+ return an empty sync result.
+ """
+ # If the user is not part of the mau group, then check that limits have
+ # not been exceeded (if not part of the group by this point, almost certain
+ # auth_blocking will occur)
+ await self.auth_blocking.check_auth_blocking(requester=requester)
+
+ # TODO: If the To-Device extension is enabled and we have a `from_token`, delete
+ # any to-device messages before that token (since we now know that the device
+ # has received them). (see sync v2 for how to do this)
+
+ # If we're working with a user-provided token, we need to make sure to wait for
+ # this worker to catch up with the token so we don't skip past any incoming
+ # events or future events if the user is nefariously, manually modifying the
+ # token.
+ if from_token is not None:
+ # We need to make sure this worker has caught up with the token. If
+ # this returns false, it means we timed out waiting, and we should
+ # just return an empty response.
+ before_wait_ts = self.clock.time_msec()
+ if not await self.notifier.wait_for_stream_token(from_token):
+ logger.warning(
+ "Timed out waiting for worker to catch up. Returning empty response"
+ )
+ return SlidingSyncResult.empty(from_token)
+
+ # If we've spent significant time waiting to catch up, take it off
+ # the timeout.
+ after_wait_ts = self.clock.time_msec()
+ if after_wait_ts - before_wait_ts > 1_000:
+ timeout_ms -= after_wait_ts - before_wait_ts
+ timeout_ms = max(timeout_ms, 0)
+
+ # We're going to respond immediately if the timeout is 0 or if this is an
+ # initial sync (without a `from_token`) so we can avoid calling
+ # `notifier.wait_for_events()`.
+ if timeout_ms == 0 or from_token is None:
+ now_token = self.event_sources.get_current_token()
+ result = await self.current_sync_for_user(
+ sync_config,
+ from_token=from_token,
+ to_token=now_token,
+ )
+ else:
+ # Otherwise, we wait for something to happen and report it to the user.
+ async def current_sync_callback(
+ before_token: StreamToken, after_token: StreamToken
+ ) -> SlidingSyncResult:
+ return await self.current_sync_for_user(
+ sync_config,
+ from_token=from_token,
+ to_token=after_token,
+ )
+
+ result = await self.notifier.wait_for_events(
+ sync_config.user.to_string(),
+ timeout_ms,
+ current_sync_callback,
+ from_token=from_token,
+ )
+
+ return result
+
+ async def current_sync_for_user(
+ self,
+ sync_config: SlidingSyncConfig,
+ to_token: StreamToken,
+ from_token: Optional[StreamToken] = None,
+ ) -> SlidingSyncResult:
+ """
+ Generates the response body of a Sliding Sync result, represented as a
+ `SlidingSyncResult`.
+ """
+ user_id = sync_config.user.to_string()
+ app_service = self.store.get_app_service_by_user_id(user_id)
+ if app_service:
+ # We no longer support AS users using /sync directly.
+ # See https://github.com/matrix-org/matrix-doc/issues/1144
+ raise NotImplementedError()
+
+ # Get all of the room IDs that the user should be able to see in the sync
+ # response
+ room_id_set = await self.get_sync_room_ids_for_user(
+ sync_config.user,
+ from_token=from_token,
+ to_token=to_token,
+ )
+
+ # Assemble sliding window lists
+ lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {}
+ if sync_config.lists:
+ for list_key, list_config in sync_config.lists.items():
+ # TODO: Apply filters
+ #
+ # TODO: Exclude partially stated rooms unless the `required_state` has
+ # `["m.room.member", "$LAZY"]`
+ filtered_room_ids = room_id_set
+ # TODO: Apply sorts
+ sorted_room_ids = sorted(filtered_room_ids)
+
+ ops: List[SlidingSyncResult.SlidingWindowList.Operation] = []
+ if list_config.ranges:
+ for range in list_config.ranges:
+ ops.append(
+ SlidingSyncResult.SlidingWindowList.Operation(
+ op=OperationType.SYNC,
+ range=range,
+ room_ids=sorted_room_ids[range[0] : range[1]],
+ )
+ )
+
+ lists[list_key] = SlidingSyncResult.SlidingWindowList(
+ count=len(sorted_room_ids),
+ ops=ops,
+ )
+
+ return SlidingSyncResult(
+ next_pos=to_token,
+ lists=lists,
+ # TODO: Gather room data for rooms in lists and `sync_config.room_subscriptions`
+ rooms={},
+ extensions={},
+ )
+
+ async def get_sync_room_ids_for_user(
+ self,
+ user: UserID,
+ to_token: StreamToken,
+ from_token: Optional[StreamToken] = None,
+ ) -> AbstractSet[str]:
+ """
+ Fetch room IDs that should be listed for this user in the sync response (the
+ full room list that will be filtered, sorted, and sliced).
+
+ We're looking for rooms where the user has the following state in the token
+ range (> `from_token` and <= `to_token`):
+
+ - `invite`, `join`, `knock`, `ban` membership events
+ - Kicks (`leave` membership events where `sender` is different from the
+ `user_id`/`state_key`)
+ - `newly_left` (rooms that were left during the given token range)
+ - In order for bans/kicks to not show up in sync, you need to `/forget` those
+ rooms. This doesn't modify the event itself though and only adds the
+ `forgotten` flag to the `room_memberships` table in Synapse. There isn't a way
+ to tell when a room was forgotten at the moment so we can't factor it into the
+ from/to range.
+ """
+ user_id = user.to_string()
+
+ # First grab a current snapshot rooms for the user
+ # (also handles forgotten rooms)
+ room_for_user_list = await self.store.get_rooms_for_local_user_where_membership_is(
+ user_id=user_id,
+ # We want to fetch any kind of membership (joined and left rooms) in order
+ # to get the `event_pos` of the latest room membership event for the
+ # user.
+ #
+ # We will filter out the rooms that don't belong below (see
+ # `filter_membership_for_sync`)
+ membership_list=Membership.LIST,
+ excluded_rooms=self.rooms_to_exclude_globally,
+ )
+
+ # If the user has never joined any rooms before, we can just return an empty list
+ if not room_for_user_list:
+ return set()
+
+ # Our working list of rooms that can show up in the sync response
+ sync_room_id_set = {
+ room_for_user.room_id
+ for room_for_user in room_for_user_list
+ if filter_membership_for_sync(
+ membership=room_for_user.membership,
+ user_id=user_id,
+ sender=room_for_user.sender,
+ )
+ }
+
+ # Get the `RoomStreamToken` that represents the spot we queried up to when we got
+ # our membership snapshot from `get_rooms_for_local_user_where_membership_is()`.
+ #
+ # First, we need to get the max stream_ordering of each event persister instance
+ # that we queried events from.
+ instance_to_max_stream_ordering_map: Dict[str, int] = {}
+ for room_for_user in room_for_user_list:
+ instance_name = room_for_user.event_pos.instance_name
+ stream_ordering = room_for_user.event_pos.stream
+
+ current_instance_max_stream_ordering = (
+ instance_to_max_stream_ordering_map.get(instance_name)
+ )
+ if (
+ current_instance_max_stream_ordering is None
+ or stream_ordering > current_instance_max_stream_ordering
+ ):
+ instance_to_max_stream_ordering_map[instance_name] = stream_ordering
+
+ # Then assemble the `RoomStreamToken`
+ membership_snapshot_token = RoomStreamToken(
+ # Minimum position in the `instance_map`
+ stream=min(instance_to_max_stream_ordering_map.values()),
+ instance_map=immutabledict(instance_to_max_stream_ordering_map),
+ )
+
+ # If our `to_token` is already the same or ahead of the latest room membership
+ # for the user, we can just straight-up return the room list (nothing has
+ # changed)
+ if membership_snapshot_token.is_before_or_eq(to_token.room_key):
+ return sync_room_id_set
+
+ # Since we fetched the users room list at some point in time after the from/to
+ # tokens, we need to revert/rewind some membership changes to match the point in
+ # time of the `to_token`. In particular, we need to make these fixups:
+ #
+ # - 1a) Remove rooms that the user joined after the `to_token`
+ # - 1b) Add back rooms that the user left after the `to_token`
+ # - 2) Add back newly_left rooms (> `from_token` and <= `to_token`)
+ #
+ # Below, we're doing two separate lookups for membership changes. We could
+ # request everything for both fixups in one range, [`from_token.room_key`,
+ # `membership_snapshot_token`), but we want to avoid raw `stream_ordering`
+ # comparison without `instance_name` (which is flawed). We could refactor
+ # `event.internal_metadata` to include `instance_name` but it might turn out a
+ # little difficult and a bigger, broader Synapse change than we want to make.
+
+ # 1) -----------------------------------------------------
+
+ # 1) Fetch membership changes that fall in the range from `to_token` up to
+ # `membership_snapshot_token`
+ membership_change_events_after_to_token = (
+ await self.store.get_membership_changes_for_user(
+ user_id,
+ from_key=to_token.room_key,
+ to_key=membership_snapshot_token,
+ excluded_rooms=self.rooms_to_exclude_globally,
+ )
+ )
+
+ # 1) Assemble a list of the last membership events in some given ranges. Someone
+ # could have left and joined multiple times during the given range but we only
+ # care about end-result so we grab the last one.
+ last_membership_change_by_room_id_after_to_token: Dict[str, EventBase] = {}
+ # We also need the first membership event after the `to_token` so we can step
+ # backward to the previous membership that would apply to the from/to range.
+ first_membership_change_by_room_id_after_to_token: Dict[str, EventBase] = {}
+ for event in membership_change_events_after_to_token:
+ last_membership_change_by_room_id_after_to_token[event.room_id] = event
+ # Only set if we haven't already set it
+ first_membership_change_by_room_id_after_to_token.setdefault(
+ event.room_id, event
+ )
+
+ # 1) Fixup
+ for (
+ last_membership_change_after_to_token
+ ) in last_membership_change_by_room_id_after_to_token.values():
+ room_id = last_membership_change_after_to_token.room_id
+
+ # We want to find the first membership change after the `to_token` then step
+ # backward to know the membership in the from/to range.
+ first_membership_change_after_to_token = (
+ first_membership_change_by_room_id_after_to_token.get(room_id)
+ )
+ assert first_membership_change_after_to_token is not None, (
+ "If there was a `last_membership_change_after_to_token` that we're iterating over, "
+ + "then there should be corresponding a first change. For example, even if there "
+ + "is only one event after the `to_token`, the first and last event will be same event. "
+ + "This is probably a mistake in assembling the `last_membership_change_by_room_id_after_to_token`"
+ + "/`first_membership_change_by_room_id_after_to_token` dicts above."
+ )
+ # TODO: Instead of reading from `unsigned`, refactor this to use the
+ # `current_state_delta_stream` table in the future. Probably a new
+ # `get_membership_changes_for_user()` function that uses
+ # `current_state_delta_stream` with a join to `room_memberships`. This would
+ # help in state reset scenarios since `prev_content` is looking at the
+ # current branch vs the current room state. This is all just data given to
+ # the client so no real harm to data integrity, but we'd like to be nice to
+ # the client. Since the `current_state_delta_stream` table is new, it
+ # doesn't have all events in it. Since this is Sliding Sync, if we ever need
+ # to, we can signal the client to throw all of their state away by sending
+ # "operation: RESET".
+ prev_content = first_membership_change_after_to_token.unsigned.get(
+ "prev_content", {}
+ )
+ prev_membership = prev_content.get("membership", None)
+ prev_sender = first_membership_change_after_to_token.unsigned.get(
+ "prev_sender", None
+ )
+
+ # Check if the previous membership (membership that applies to the from/to
+ # range) should be included in our `sync_room_id_set`
+ should_prev_membership_be_included = (
+ prev_membership is not None
+ and prev_sender is not None
+ and filter_membership_for_sync(
+ membership=prev_membership,
+ user_id=user_id,
+ sender=prev_sender,
+ )
+ )
+
+ # Check if the last membership (membership that applies to our snapshot) was
+ # already included in our `sync_room_id_set`
+ was_last_membership_already_included = filter_membership_for_sync(
+ membership=last_membership_change_after_to_token.membership,
+ user_id=user_id,
+ sender=last_membership_change_after_to_token.sender,
+ )
+
+ # 1a) Add back rooms that the user left after the `to_token`
+ #
+ # For example, if the last membership event after the `to_token` is a leave
+ # event, then the room was excluded from `sync_room_id_set` when we first
+ # crafted it above. We should add these rooms back as long as the user also
+ # was part of the room before the `to_token`.
+ if (
+ not was_last_membership_already_included
+ and should_prev_membership_be_included
+ ):
+ sync_room_id_set.add(room_id)
+ # 1b) Remove rooms that the user joined (hasn't left) after the `to_token`
+ #
+ # For example, if the last membership event after the `to_token` is a "join"
+ # event, then the room was included `sync_room_id_set` when we first crafted
+ # it above. We should remove these rooms as long as the user also wasn't
+ # part of the room before the `to_token`.
+ elif (
+ was_last_membership_already_included
+ and not should_prev_membership_be_included
+ ):
+ sync_room_id_set.discard(room_id)
+
+ # 2) -----------------------------------------------------
+ # We fix-up newly_left rooms after the first fixup because it may have removed
+ # some left rooms that we can figure out our newly_left in the following code
+
+ # 2) Fetch membership changes that fall in the range from `from_token` up to `to_token`
+ membership_change_events_in_from_to_range = []
+ if from_token:
+ membership_change_events_in_from_to_range = (
+ await self.store.get_membership_changes_for_user(
+ user_id,
+ from_key=from_token.room_key,
+ to_key=to_token.room_key,
+ excluded_rooms=self.rooms_to_exclude_globally,
+ )
+ )
+
+ # 2) Assemble a list of the last membership events in some given ranges. Someone
+ # could have left and joined multiple times during the given range but we only
+ # care about end-result so we grab the last one.
+ last_membership_change_by_room_id_in_from_to_range: Dict[str, EventBase] = {}
+ for event in membership_change_events_in_from_to_range:
+ last_membership_change_by_room_id_in_from_to_range[event.room_id] = event
+
+ # 2) Fixup
+ for (
+ last_membership_change_in_from_to_range
+ ) in last_membership_change_by_room_id_in_from_to_range.values():
+ room_id = last_membership_change_in_from_to_range.room_id
+
+ # 2) Add back newly_left rooms (> `from_token` and <= `to_token`). We
+ # include newly_left rooms because the last event that the user should see
+ # is their own leave event
+ if last_membership_change_in_from_to_range.membership == Membership.LEAVE:
+ sync_room_id_set.add(room_id)
+
+ return sync_room_id_set
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 9d37e2a86f..39964726c5 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -2002,7 +2002,7 @@ class SyncHandler:
"""
user_id = sync_config.user.to_string()
- # Note: we get the users room list *before* we get the current token, this
+ # Note: we get the users room list *before* we get the `now_token`, this
# avoids checking back in history if rooms are joined after the token is fetched.
token_before_rooms = self.event_sources.get_current_token()
mutable_joined_room_ids = set(await self.store.get_rooms_for_user(user_id))
@@ -2014,10 +2014,10 @@ class SyncHandler:
now_token = self.event_sources.get_current_token()
log_kv({"now_token": now_token})
- # Since we fetched the users room list before the token, there's a small window
- # during which membership events may have been persisted, so we fetch these now
- # and modify the joined room list for any changes between the get_rooms_for_user
- # call and the get_current_token call.
+ # Since we fetched the users room list before calculating the `now_token` (see
+ # above), there's a small window during which membership events may have been
+ # persisted, so we fetch these now and modify the joined room list for any
+ # changes between the get_rooms_for_user call and the get_current_token call.
membership_change_events = []
if since_token:
membership_change_events = await self.store.get_membership_changes_for_user(
@@ -2027,16 +2027,19 @@ class SyncHandler:
self.rooms_to_exclude_globally,
)
- mem_last_change_by_room_id: Dict[str, EventBase] = {}
+ last_membership_change_by_room_id: Dict[str, EventBase] = {}
for event in membership_change_events:
- mem_last_change_by_room_id[event.room_id] = event
+ last_membership_change_by_room_id[event.room_id] = event
# For the latest membership event in each room found, add/remove the room ID
# from the joined room list accordingly. In this case we only care if the
# latest change is JOIN.
- for room_id, event in mem_last_change_by_room_id.items():
+ for room_id, event in last_membership_change_by_room_id.items():
assert event.internal_metadata.stream_ordering
+ # As a shortcut, skip any events that happened before we got our
+ # `get_rooms_for_user()` snapshot (any changes are already represented
+ # in that list).
if (
event.internal_metadata.stream_ordering
< token_before_rooms.room_key.stream
diff --git a/synapse/rest/client/models.py b/synapse/rest/client/models.py
index fc1aed2889..5433ed91ef 100644
--- a/synapse/rest/client/models.py
+++ b/synapse/rest/client/models.py
@@ -18,14 +18,30 @@
# [This file includes modifications made by New Vector Limited]
#
#
-from typing import TYPE_CHECKING, Dict, Optional
+from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Union
from synapse._pydantic_compat import HAS_PYDANTIC_V2
if TYPE_CHECKING or HAS_PYDANTIC_V2:
- from pydantic.v1 import Extra, StrictInt, StrictStr, constr, validator
+ from pydantic.v1 import (
+ Extra,
+ StrictBool,
+ StrictInt,
+ StrictStr,
+ conint,
+ constr,
+ validator,
+ )
else:
- from pydantic import Extra, StrictInt, StrictStr, constr, validator
+ from pydantic import (
+ Extra,
+ StrictBool,
+ StrictInt,
+ StrictStr,
+ conint,
+ constr,
+ validator,
+ )
from synapse.rest.models import RequestBodyModel
from synapse.util.threepids import validate_email
@@ -97,3 +113,172 @@ else:
class MsisdnRequestTokenBody(ThreepidRequestTokenBody):
country: ISO3116_1_Alpha_2
phone_number: StrictStr
+
+
+class SlidingSyncBody(RequestBodyModel):
+ """
+ Sliding Sync API request body.
+
+ Attributes:
+ lists: Sliding window API. A map of list key to list information
+ (:class:`SlidingSyncList`). Max lists: 100. The list keys should be
+ arbitrary strings which the client is using to refer to the list. Keep this
+ small as it needs to be sent a lot. Max length: 64 bytes.
+ room_subscriptions: Room subscription API. A map of room ID to room subscription
+ information. Used to subscribe to a specific room. Sometimes clients know
+ exactly which room they want to get information about e.g by following a
+ permalink or by refreshing a webapp currently viewing a specific room. The
+ sliding window API alone is insufficient for this use case because there's
+ no way to say "please track this room explicitly".
+ extensions: Extensions API. A map of extension key to extension config.
+ """
+
+ class CommonRoomParameters(RequestBodyModel):
+ """
+ Common parameters shared between the sliding window and room subscription APIs.
+
+ Attributes:
+ required_state: Required state for each room returned. An array of event
+ type and state key tuples. Elements in this array are ORd together to
+ produce the final set of state events to return. One unique exception is
+ when you request all state events via `["*", "*"]`. When used, all state
+ events are returned by default, and additional entries FILTER OUT the
+ returned set of state events. These additional entries cannot use `*`
+ themselves. For example, `["*", "*"], ["m.room.member",
+ "@alice:example.com"]` will *exclude* every `m.room.member` event
+ *except* for `@alice:example.com`, and include every other state event.
+ In addition, `["*", "*"], ["m.space.child", "*"]` is an error, the
+ `m.space.child` filter is not required as it would have been returned
+ anyway.
+ timeline_limit: The maximum number of timeline events to return per response.
+ (Max 1000 messages)
+ include_old_rooms: Determines if `predecessor` rooms are included in the
+ `rooms` response. The user MUST be joined to old rooms for them to show up
+ in the response.
+ """
+
+ class IncludeOldRooms(RequestBodyModel):
+ timeline_limit: StrictInt
+ required_state: List[Tuple[StrictStr, StrictStr]]
+
+ required_state: List[Tuple[StrictStr, StrictStr]]
+ # mypy workaround via https://github.com/pydantic/pydantic/issues/156#issuecomment-1130883884
+ if TYPE_CHECKING:
+ timeline_limit: int
+ else:
+ timeline_limit: conint(le=1000, strict=True) # type: ignore[valid-type]
+ include_old_rooms: Optional[IncludeOldRooms] = None
+
+ class SlidingSyncList(CommonRoomParameters):
+ """
+ Attributes:
+ ranges: Sliding window ranges. If this field is missing, no sliding window
+ is used and all rooms are returned in this list. Integers are
+ *inclusive*.
+ sort: How the list should be sorted on the server. The first value is
+ applied first, then tiebreaks are performed with each subsequent sort
+ listed.
+
+ FIXME: Furthermore, it's not currently defined how servers should behave
+ if they encounter a filter or sort operation they do not recognise. If
+ the server rejects the request with an HTTP 400 then that will break
+ backwards compatibility with new clients vs old servers. However, the
+ client would be otherwise unaware that only some of the sort/filter
+ operations have taken effect. We may need to include a "warnings"
+ section to indicate which sort/filter operations are unrecognised,
+ allowing for some form of graceful degradation of service.
+ -- https://github.com/matrix-org/matrix-spec-proposals/blob/kegan/sync-v3/proposals/3575-sync.md#filter-and-sort-extensions
+
+ slow_get_all_rooms: Just get all rooms (for clients that don't want to deal with
+ sliding windows). When true, the `ranges` and `sort` fields are ignored.
+ required_state: Required state for each room returned. An array of event
+ type and state key tuples. Elements in this array are ORd together to
+ produce the final set of state events to return.
+
+ One unique exception is when you request all state events via `["*",
+ "*"]`. When used, all state events are returned by default, and
+ additional entries FILTER OUT the returned set of state events. These
+ additional entries cannot use `*` themselves. For example, `["*", "*"],
+ ["m.room.member", "@alice:example.com"]` will *exclude* every
+ `m.room.member` event *except* for `@alice:example.com`, and include
+ every other state event. In addition, `["*", "*"], ["m.space.child",
+ "*"]` is an error, the `m.space.child` filter is not required as it
+ would have been returned anyway.
+
+ Room members can be lazily-loaded by using the special `$LAZY` state key
+ (`["m.room.member", "$LAZY"]`). Typically, when you view a room, you
+ want to retrieve all state events except for m.room.member events which
+ you want to lazily load. To get this behaviour, clients can send the
+ following::
+
+ {
+ "required_state": [
+ // activate lazy loading
+ ["m.room.member", "$LAZY"],
+ // request all state events _except_ for m.room.member
+ events which are lazily loaded
+ ["*", "*"]
+ ]
+ }
+
+ timeline_limit: The maximum number of timeline events to return per response.
+ include_old_rooms: Determines if `predecessor` rooms are included in the
+ `rooms` response. The user MUST be joined to old rooms for them to show up
+ in the response.
+ include_heroes: Return a stripped variant of membership events (containing
+ `user_id` and optionally `avatar_url` and `displayname`) for the users used
+ to calculate the room name.
+ filters: Filters to apply to the list before sorting.
+ bump_event_types: Allowlist of event types which should be considered recent activity
+ when sorting `by_recency`. By omitting event types from this field,
+ clients can ensure that uninteresting events (e.g. a profile rename) do
+ not cause a room to jump to the top of its list(s). Empty or omitted
+ `bump_event_types` have no effect—all events in a room will be
+ considered recent activity.
+ """
+
+ class Filters(RequestBodyModel):
+ is_dm: Optional[StrictBool] = None
+ spaces: Optional[List[StrictStr]] = None
+ is_encrypted: Optional[StrictBool] = None
+ is_invite: Optional[StrictBool] = None
+ room_types: Optional[List[Union[StrictStr, None]]] = None
+ not_room_types: Optional[List[StrictStr]] = None
+ room_name_like: Optional[StrictStr] = None
+ tags: Optional[List[StrictStr]] = None
+ not_tags: Optional[List[StrictStr]] = None
+
+ # mypy workaround via https://github.com/pydantic/pydantic/issues/156#issuecomment-1130883884
+ if TYPE_CHECKING:
+ ranges: Optional[List[Tuple[int, int]]] = None
+ else:
+ ranges: Optional[List[Tuple[conint(ge=0, strict=True), conint(ge=0, strict=True)]]] = None # type: ignore[valid-type]
+ sort: Optional[List[StrictStr]] = None
+ slow_get_all_rooms: Optional[StrictBool] = False
+ include_heroes: Optional[StrictBool] = False
+ filters: Optional[Filters] = None
+ bump_event_types: Optional[List[StrictStr]] = None
+
+ class RoomSubscription(CommonRoomParameters):
+ pass
+
+ class Extension(RequestBodyModel):
+ enabled: Optional[StrictBool] = False
+ lists: Optional[List[StrictStr]] = None
+ rooms: Optional[List[StrictStr]] = None
+
+ # mypy workaround via https://github.com/pydantic/pydantic/issues/156#issuecomment-1130883884
+ if TYPE_CHECKING:
+ lists: Optional[Dict[str, SlidingSyncList]] = None
+ else:
+ lists: Optional[Dict[constr(max_length=64, strict=True), SlidingSyncList]] = None # type: ignore[valid-type]
+ room_subscriptions: Optional[Dict[StrictStr, RoomSubscription]] = None
+ extensions: Optional[Dict[StrictStr, Extension]] = None
+
+ @validator("lists")
+ def lists_length_check(
+ cls, value: Optional[Dict[str, SlidingSyncList]]
+ ) -> Optional[Dict[str, SlidingSyncList]]:
+ if value is not None:
+ assert len(value) <= 100, f"Max lists: 100 but saw {len(value)}"
+ return value
diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py
index fb4d44211e..61fdf71a27 100644
--- a/synapse/rest/client/room.py
+++ b/synapse/rest/client/room.py
@@ -292,6 +292,9 @@ class RoomStateEventRestServlet(RestServlet):
try:
if event_type == EventTypes.Member:
membership = content.get("membership", None)
+ if not isinstance(membership, str):
+ raise SynapseError(400, "Invalid membership (must be a string)")
+
event_id, _ = await self.room_member_handler.update_membership(
requester,
target=UserID.from_string(state_key),
diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py
index 27ea943e31..385b102b3d 100644
--- a/synapse/rest/client/sync.py
+++ b/synapse/rest/client/sync.py
@@ -33,6 +33,7 @@ from synapse.events.utils import (
format_event_raw,
)
from synapse.handlers.presence import format_user_presence_state
+from synapse.handlers.sliding_sync import SlidingSyncConfig, SlidingSyncResult
from synapse.handlers.sync import (
ArchivedSyncResult,
InvitedSyncResult,
@@ -43,9 +44,16 @@ from synapse.handlers.sync import (
SyncVersion,
)
from synapse.http.server import HttpServer
-from synapse.http.servlet import RestServlet, parse_boolean, parse_integer, parse_string
+from synapse.http.servlet import (
+ RestServlet,
+ parse_and_validate_json_object_from_request,
+ parse_boolean,
+ parse_integer,
+ parse_string,
+)
from synapse.http.site import SynapseRequest
from synapse.logging.opentracing import trace_with_opname
+from synapse.rest.client.models import SlidingSyncBody
from synapse.types import JsonDict, Requester, StreamToken
from synapse.util import json_decoder
from synapse.util.caches.lrucache import LruCache
@@ -735,8 +743,228 @@ class SlidingSyncE2eeRestServlet(RestServlet):
return 200, response
+class SlidingSyncRestServlet(RestServlet):
+ """
+ API endpoint for MSC3575 Sliding Sync `/sync`. Allows for clients to request a
+ subset (sliding window) of rooms, state, and timeline events (just what they need)
+ in order to bootstrap quickly and subscribe to only what the client cares about.
+ Because the client can specify what it cares about, we can respond quickly and skip
+ all of the work we would normally have to do with a sync v2 response.
+
+ Request query parameters:
+ timeout: How long to wait for new events in milliseconds.
+ pos: Stream position token when asking for incremental deltas.
+
+ Request body::
+ {
+ // Sliding Window API
+ "lists": {
+ "foo-list": {
+ "ranges": [ [0, 99] ],
+ "sort": [ "by_notification_level", "by_recency", "by_name" ],
+ "required_state": [
+ ["m.room.join_rules", ""],
+ ["m.room.history_visibility", ""],
+ ["m.space.child", "*"]
+ ],
+ "timeline_limit": 10,
+ "filters": {
+ "is_dm": true
+ },
+ "bump_event_types": [ "m.room.message", "m.room.encrypted" ],
+ }
+ },
+ // Room Subscriptions API
+ "room_subscriptions": {
+ "!sub1:bar": {
+ "required_state": [ ["*","*"] ],
+ "timeline_limit": 10,
+ "include_old_rooms": {
+ "timeline_limit": 1,
+ "required_state": [ ["m.room.tombstone", ""], ["m.room.create", ""] ],
+ }
+ }
+ },
+ // Extensions API
+ "extensions": {}
+ }
+
+ Response JSON::
+ {
+ "next_pos": "s58_224_0_13_10_1_1_16_0_1",
+ "lists": {
+ "foo-list": {
+ "count": 1337,
+ "ops": [{
+ "op": "SYNC",
+ "range": [0, 99],
+ "room_ids": [
+ "!foo:bar",
+ // ... 99 more room IDs
+ ]
+ }]
+ }
+ },
+ // Aggregated rooms from lists and room subscriptions
+ "rooms": {
+ // Room from room subscription
+ "!sub1:bar": {
+ "name": "Alice and Bob",
+ "avatar": "mxc://...",
+ "initial": true,
+ "required_state": [
+ {"sender":"@alice:example.com","type":"m.room.create", "state_key":"", "content":{"creator":"@alice:example.com"}},
+ {"sender":"@alice:example.com","type":"m.room.join_rules", "state_key":"", "content":{"join_rule":"invite"}},
+ {"sender":"@alice:example.com","type":"m.room.history_visibility", "state_key":"", "content":{"history_visibility":"joined"}},
+ {"sender":"@alice:example.com","type":"m.room.member", "state_key":"@alice:example.com", "content":{"membership":"join"}}
+ ],
+ "timeline": [
+ {"sender":"@alice:example.com","type":"m.room.create", "state_key":"", "content":{"creator":"@alice:example.com"}},
+ {"sender":"@alice:example.com","type":"m.room.join_rules", "state_key":"", "content":{"join_rule":"invite"}},
+ {"sender":"@alice:example.com","type":"m.room.history_visibility", "state_key":"", "content":{"history_visibility":"joined"}},
+ {"sender":"@alice:example.com","type":"m.room.member", "state_key":"@alice:example.com", "content":{"membership":"join"}},
+ {"sender":"@alice:example.com","type":"m.room.message", "content":{"body":"A"}},
+ {"sender":"@alice:example.com","type":"m.room.message", "content":{"body":"B"}},
+ ],
+ "prev_batch": "t111_222_333",
+ "joined_count": 41,
+ "invited_count": 1,
+ "notification_count": 1,
+ "highlight_count": 0
+ },
+ // rooms from list
+ "!foo:bar": {
+ "name": "The calculated room name",
+ "avatar": "mxc://...",
+ "initial": true,
+ "required_state": [
+ {"sender":"@alice:example.com","type":"m.room.join_rules", "state_key":"", "content":{"join_rule":"invite"}},
+ {"sender":"@alice:example.com","type":"m.room.history_visibility", "state_key":"", "content":{"history_visibility":"joined"}},
+ {"sender":"@alice:example.com","type":"m.space.child", "state_key":"!foo:example.com", "content":{"via":["example.com"]}},
+ {"sender":"@alice:example.com","type":"m.space.child", "state_key":"!bar:example.com", "content":{"via":["example.com"]}},
+ {"sender":"@alice:example.com","type":"m.space.child", "state_key":"!baz:example.com", "content":{"via":["example.com"]}}
+ ],
+ "timeline": [
+ {"sender":"@alice:example.com","type":"m.room.join_rules", "state_key":"", "content":{"join_rule":"invite"}},
+ {"sender":"@alice:example.com","type":"m.room.message", "content":{"body":"A"}},
+ {"sender":"@alice:example.com","type":"m.room.message", "content":{"body":"B"}},
+ {"sender":"@alice:example.com","type":"m.room.message", "content":{"body":"C"}},
+ {"sender":"@alice:example.com","type":"m.room.message", "content":{"body":"D"}},
+ ],
+ "prev_batch": "t111_222_333",
+ "joined_count": 4,
+ "invited_count": 0,
+ "notification_count": 54,
+ "highlight_count": 3
+ },
+ // ... 99 more items
+ },
+ "extensions": {}
+ }
+ """
+
+ PATTERNS = client_patterns(
+ "/org.matrix.msc3575/sync$", releases=[], v1=False, unstable=True
+ )
+
+ def __init__(self, hs: "HomeServer"):
+ super().__init__()
+ self.auth = hs.get_auth()
+ self.store = hs.get_datastores().main
+ self.filtering = hs.get_filtering()
+ self.sliding_sync_handler = hs.get_sliding_sync_handler()
+
+ # TODO: Update this to `on_GET` once we figure out how we want to handle params
+ async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
+ requester = await self.auth.get_user_by_req(request, allow_guest=True)
+ user = requester.user
+ device_id = requester.device_id
+
+ timeout = parse_integer(request, "timeout", default=0)
+ # Position in the stream
+ from_token_string = parse_string(request, "pos")
+
+ from_token = None
+ if from_token_string is not None:
+ from_token = await StreamToken.from_string(self.store, from_token_string)
+
+ # TODO: We currently don't know whether we're going to use sticky params or
+ # maybe some filters like sync v2 where they are built up once and referenced
+ # by filter ID. For now, we will just prototype with always passing everything
+ # in.
+ body = parse_and_validate_json_object_from_request(request, SlidingSyncBody)
+ logger.info("Sliding sync request: %r", body)
+
+ sync_config = SlidingSyncConfig(
+ user=user,
+ device_id=device_id,
+ # FIXME: Currently, we're just manually copying the fields from the
+ # `SlidingSyncBody` into the config. How can we gurantee into the future
+ # that we don't forget any? I would like something more structured like
+ # `copy_attributes(from=body, to=config)`
+ lists=body.lists,
+ room_subscriptions=body.room_subscriptions,
+ extensions=body.extensions,
+ )
+
+ sliding_sync_results = await self.sliding_sync_handler.wait_for_sync_for_user(
+ requester,
+ sync_config,
+ from_token,
+ timeout,
+ )
+
+ # The client may have disconnected by now; don't bother to serialize the
+ # response if so.
+ if request._disconnected:
+ logger.info("Client has disconnected; not serializing response.")
+ return 200, {}
+
+ response_content = await self.encode_response(sliding_sync_results)
+
+ return 200, response_content
+
+ # TODO: Is there a better way to encode things?
+ async def encode_response(
+ self,
+ sliding_sync_result: SlidingSyncResult,
+ ) -> JsonDict:
+ response: JsonDict = defaultdict(dict)
+
+ response["next_pos"] = await sliding_sync_result.next_pos.to_string(self.store)
+ serialized_lists = self.encode_lists(sliding_sync_result.lists)
+ if serialized_lists:
+ response["lists"] = serialized_lists
+ response["rooms"] = {} # TODO: sliding_sync_result.rooms
+ response["extensions"] = {} # TODO: sliding_sync_result.extensions
+
+ return response
+
+ def encode_lists(
+ self, lists: Dict[str, SlidingSyncResult.SlidingWindowList]
+ ) -> JsonDict:
+ def encode_operation(
+ operation: SlidingSyncResult.SlidingWindowList.Operation,
+ ) -> JsonDict:
+ return {
+ "op": operation.op.value,
+ "range": operation.range,
+ "room_ids": operation.room_ids,
+ }
+
+ serialized_lists = {}
+ for list_key, list_result in lists.items():
+ serialized_lists[list_key] = {
+ "count": list_result.count,
+ "ops": [encode_operation(op) for op in list_result.ops],
+ }
+
+ return serialized_lists
+
+
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
SyncRestServlet(hs).register(http_server)
if hs.config.experimental.msc3575_enabled:
+ SlidingSyncRestServlet(hs).register(http_server)
SlidingSyncE2eeRestServlet(hs).register(http_server)
diff --git a/synapse/server.py b/synapse/server.py
index 95e319d2e6..ae927c3904 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -109,6 +109,7 @@ from synapse.handlers.room_summary import RoomSummaryHandler
from synapse.handlers.search import SearchHandler
from synapse.handlers.send_email import SendEmailHandler
from synapse.handlers.set_password import SetPasswordHandler
+from synapse.handlers.sliding_sync import SlidingSyncHandler
from synapse.handlers.sso import SsoHandler
from synapse.handlers.stats import StatsHandler
from synapse.handlers.sync import SyncHandler
@@ -554,6 +555,9 @@ class HomeServer(metaclass=abc.ABCMeta):
def get_sync_handler(self) -> SyncHandler:
return SyncHandler(self)
+ def get_sliding_sync_handler(self) -> SlidingSyncHandler:
+ return SlidingSyncHandler(self)
+
@cache_in_self
def get_room_list_handler(self) -> RoomListHandler:
return RoomListHandler(self)
diff --git a/tests/handlers/test_sliding_sync.py b/tests/handlers/test_sliding_sync.py
new file mode 100644
index 0000000000..5c27474b96
--- /dev/null
+++ b/tests/handlers/test_sliding_sync.py
@@ -0,0 +1,1118 @@
+#
+# This file is licensed under the Affero General Public License (AGPL) version 3.
+#
+# Copyright (C) 2024 New Vector, Ltd
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# See the GNU Affero General Public License for more details:
+# <https://www.gnu.org/licenses/agpl-3.0.html>.
+#
+# Originally licensed under the Apache License, Version 2.0:
+# <http://www.apache.org/licenses/LICENSE-2.0>.
+#
+# [This file includes modifications made by New Vector Limited]
+#
+#
+import logging
+from unittest.mock import patch
+
+from twisted.test.proto_helpers import MemoryReactor
+
+from synapse.api.constants import EventTypes, JoinRules, Membership
+from synapse.api.room_versions import RoomVersions
+from synapse.rest import admin
+from synapse.rest.client import knock, login, room
+from synapse.server import HomeServer
+from synapse.storage.util.id_generators import MultiWriterIdGenerator
+from synapse.types import JsonDict, UserID
+from synapse.util import Clock
+
+from tests.replication._base import BaseMultiWorkerStreamTestCase
+from tests.unittest import HomeserverTestCase
+
+logger = logging.getLogger(__name__)
+
+
+class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
+ """
+ Tests Sliding Sync handler `get_sync_room_ids_for_user()` to make sure it returns
+ the correct list of rooms IDs.
+ """
+
+ servlets = [
+ admin.register_servlets,
+ knock.register_servlets,
+ login.register_servlets,
+ room.register_servlets,
+ ]
+
+ def default_config(self) -> JsonDict:
+ config = super().default_config()
+ # Enable sliding sync
+ config["experimental_features"] = {"msc3575_enabled": True}
+ return config
+
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+ self.sliding_sync_handler = self.hs.get_sliding_sync_handler()
+ self.store = self.hs.get_datastores().main
+ self.event_sources = hs.get_event_sources()
+
+ def test_no_rooms(self) -> None:
+ """
+ Test when the user has never joined any rooms before
+ """
+ user1_id = self.register_user("user1", "pass")
+ # user1_tok = self.login(user1_id, "pass")
+
+ now_token = self.event_sources.get_current_token()
+
+ room_id_results = self.get_success(
+ self.sliding_sync_handler.get_sync_room_ids_for_user(
+ UserID.from_string(user1_id),
+ from_token=now_token,
+ to_token=now_token,
+ )
+ )
+
+ self.assertEqual(room_id_results, set())
+
+ def test_get_newly_joined_room(self) -> None:
+ """
+ Test that rooms that the user has newly_joined show up. newly_joined is when you
+ join after the `from_token` and <= `to_token`.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+
+ before_room_token = self.event_sources.get_current_token()
+
+ room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
+
+ after_room_token = self.event_sources.get_current_token()
+
+ room_id_results = self.get_success(
+ self.sliding_sync_handler.get_sync_room_ids_for_user(
+ UserID.from_string(user1_id),
+ from_token=before_room_token,
+ to_token=after_room_token,
+ )
+ )
+
+ self.assertEqual(room_id_results, {room_id})
+
+ def test_get_already_joined_room(self) -> None:
+ """
+ Test that rooms that the user is already joined show up.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+
+ room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
+
+ after_room_token = self.event_sources.get_current_token()
+
+ room_id_results = self.get_success(
+ self.sliding_sync_handler.get_sync_room_ids_for_user(
+ UserID.from_string(user1_id),
+ from_token=after_room_token,
+ to_token=after_room_token,
+ )
+ )
+
+ self.assertEqual(room_id_results, {room_id})
+
+ def test_get_invited_banned_knocked_room(self) -> None:
+ """
+ Test that rooms that the user is invited to, banned from, and knocked on show
+ up.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
+
+ before_room_token = self.event_sources.get_current_token()
+
+ # Setup the invited room (user2 invites user1 to the room)
+ invited_room_id = self.helper.create_room_as(user2_id, tok=user2_tok)
+ self.helper.invite(invited_room_id, targ=user1_id, tok=user2_tok)
+
+ # Setup the ban room (user2 bans user1 from the room)
+ ban_room_id = self.helper.create_room_as(
+ user2_id, tok=user2_tok, is_public=True
+ )
+ self.helper.join(ban_room_id, user1_id, tok=user1_tok)
+ self.helper.ban(ban_room_id, src=user2_id, targ=user1_id, tok=user2_tok)
+
+ # Setup the knock room (user1 knocks on the room)
+ knock_room_id = self.helper.create_room_as(
+ user2_id, tok=user2_tok, room_version=RoomVersions.V7.identifier
+ )
+ self.helper.send_state(
+ knock_room_id,
+ EventTypes.JoinRules,
+ {"join_rule": JoinRules.KNOCK},
+ tok=user2_tok,
+ )
+ # User1 knocks on the room
+ channel = self.make_request(
+ "POST",
+ "/_matrix/client/r0/knock/%s" % (knock_room_id,),
+ b"{}",
+ user1_tok,
+ )
+ self.assertEqual(channel.code, 200, channel.result)
+
+ after_room_token = self.event_sources.get_current_token()
+
+ room_id_results = self.get_success(
+ self.sliding_sync_handler.get_sync_room_ids_for_user(
+ UserID.from_string(user1_id),
+ from_token=before_room_token,
+ to_token=after_room_token,
+ )
+ )
+
+ # Ensure that the invited, ban, and knock rooms show up
+ self.assertEqual(
+ room_id_results,
+ {
+ invited_room_id,
+ ban_room_id,
+ knock_room_id,
+ },
+ )
+
+ def test_get_kicked_room(self) -> None:
+ """
+ Test that a room that the user was kicked from still shows up. When the user
+ comes back to their client, they should see that they were kicked.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
+
+ # Setup the kick room (user2 kicks user1 from the room)
+ kick_room_id = self.helper.create_room_as(
+ user2_id, tok=user2_tok, is_public=True
+ )
+ self.helper.join(kick_room_id, user1_id, tok=user1_tok)
+ # Kick user1 from the room
+ self.helper.change_membership(
+ room=kick_room_id,
+ src=user2_id,
+ targ=user1_id,
+ tok=user2_tok,
+ membership=Membership.LEAVE,
+ extra_data={
+ "reason": "Bad manners",
+ },
+ )
+
+ after_kick_token = self.event_sources.get_current_token()
+
+ room_id_results = self.get_success(
+ self.sliding_sync_handler.get_sync_room_ids_for_user(
+ UserID.from_string(user1_id),
+ from_token=after_kick_token,
+ to_token=after_kick_token,
+ )
+ )
+
+ # The kicked room should show up
+ self.assertEqual(room_id_results, {kick_room_id})
+
+ def test_forgotten_rooms(self) -> None:
+ """
+ Forgotten rooms do not show up even if we forget after the from/to range.
+
+ Ideally, we would be able to track when the `/forget` happens and apply it
+ accordingly in the token range but the forgotten flag is only an extra bool in
+ the `room_memberships` table.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
+
+ # Setup a normal room that we leave. This won't show up in the sync response
+ # because we left it before our token but is good to check anyway.
+ leave_room_id = self.helper.create_room_as(
+ user2_id, tok=user2_tok, is_public=True
+ )
+ self.helper.join(leave_room_id, user1_id, tok=user1_tok)
+ self.helper.leave(leave_room_id, user1_id, tok=user1_tok)
+
+ # Setup the ban room (user2 bans user1 from the room)
+ ban_room_id = self.helper.create_room_as(
+ user2_id, tok=user2_tok, is_public=True
+ )
+ self.helper.join(ban_room_id, user1_id, tok=user1_tok)
+ self.helper.ban(ban_room_id, src=user2_id, targ=user1_id, tok=user2_tok)
+
+ # Setup the kick room (user2 kicks user1 from the room)
+ kick_room_id = self.helper.create_room_as(
+ user2_id, tok=user2_tok, is_public=True
+ )
+ self.helper.join(kick_room_id, user1_id, tok=user1_tok)
+ # Kick user1 from the room
+ self.helper.change_membership(
+ room=kick_room_id,
+ src=user2_id,
+ targ=user1_id,
+ tok=user2_tok,
+ membership=Membership.LEAVE,
+ extra_data={
+ "reason": "Bad manners",
+ },
+ )
+
+ before_room_forgets = self.event_sources.get_current_token()
+
+ # Forget the room after we already have our tokens. This doesn't change
+ # the membership event itself but will mark it internally in Synapse
+ channel = self.make_request(
+ "POST",
+ f"/_matrix/client/r0/rooms/{leave_room_id}/forget",
+ content={},
+ access_token=user1_tok,
+ )
+ self.assertEqual(channel.code, 200, channel.result)
+ channel = self.make_request(
+ "POST",
+ f"/_matrix/client/r0/rooms/{ban_room_id}/forget",
+ content={},
+ access_token=user1_tok,
+ )
+ self.assertEqual(channel.code, 200, channel.result)
+ channel = self.make_request(
+ "POST",
+ f"/_matrix/client/r0/rooms/{kick_room_id}/forget",
+ content={},
+ access_token=user1_tok,
+ )
+ self.assertEqual(channel.code, 200, channel.result)
+
+ room_id_results = self.get_success(
+ self.sliding_sync_handler.get_sync_room_ids_for_user(
+ UserID.from_string(user1_id),
+ from_token=before_room_forgets,
+ to_token=before_room_forgets,
+ )
+ )
+
+ # We shouldn't see the room because it was forgotten
+ self.assertEqual(room_id_results, set())
+
+ def test_only_newly_left_rooms_show_up(self) -> None:
+ """
+ Test that newly_left rooms still show up in the sync response but rooms that
+ were left before the `from_token` don't show up. See condition "2)" comments in
+ the `get_sync_room_ids_for_user` method.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+
+ # Leave before we calculate the `from_token`
+ room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
+ self.helper.leave(room_id1, user1_id, tok=user1_tok)
+
+ after_room1_token = self.event_sources.get_current_token()
+
+ # Leave during the from_token/to_token range (newly_left)
+ room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
+ self.helper.leave(room_id1, user1_id, tok=user1_tok)
+
+ after_room2_token = self.event_sources.get_current_token()
+
+ room_id_results = self.get_success(
+ self.sliding_sync_handler.get_sync_room_ids_for_user(
+ UserID.from_string(user1_id),
+ from_token=after_room1_token,
+ to_token=after_room2_token,
+ )
+ )
+
+ # Only the newly_left room should show up
+ self.assertEqual(room_id_results, {room_id2})
+
+ def test_no_joins_after_to_token(self) -> None:
+ """
+ Rooms we join after the `to_token` should *not* show up. See condition "1b)"
+ comments in the `get_sync_room_ids_for_user()` method.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+
+ before_room1_token = self.event_sources.get_current_token()
+
+ room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
+
+ after_room1_token = self.event_sources.get_current_token()
+
+ # Room join after after our `to_token` shouldn't show up
+ room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
+ _ = room_id2
+
+ room_id_results = self.get_success(
+ self.sliding_sync_handler.get_sync_room_ids_for_user(
+ UserID.from_string(user1_id),
+ from_token=before_room1_token,
+ to_token=after_room1_token,
+ )
+ )
+
+ self.assertEqual(room_id_results, {room_id1})
+
+ def test_join_during_range_and_left_room_after_to_token(self) -> None:
+ """
+ Room still shows up if we left the room but were joined during the
+ from_token/to_token. See condition "1a)" comments in the
+ `get_sync_room_ids_for_user()` method.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+
+ before_room1_token = self.event_sources.get_current_token()
+
+ room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
+
+ after_room1_token = self.event_sources.get_current_token()
+
+ # Leave the room after we already have our tokens
+ self.helper.leave(room_id1, user1_id, tok=user1_tok)
+
+ room_id_results = self.get_success(
+ self.sliding_sync_handler.get_sync_room_ids_for_user(
+ UserID.from_string(user1_id),
+ from_token=before_room1_token,
+ to_token=after_room1_token,
+ )
+ )
+
+ # We should still see the room because we were joined during the
+ # from_token/to_token time period.
+ self.assertEqual(room_id_results, {room_id1})
+
+ def test_join_before_range_and_left_room_after_to_token(self) -> None:
+ """
+ Room still shows up if we left the room but were joined before the `from_token`
+ so it should show up. See condition "1a)" comments in the
+ `get_sync_room_ids_for_user()` method.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+
+ room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
+
+ after_room1_token = self.event_sources.get_current_token()
+
+ # Leave the room after we already have our tokens
+ self.helper.leave(room_id1, user1_id, tok=user1_tok)
+
+ room_id_results = self.get_success(
+ self.sliding_sync_handler.get_sync_room_ids_for_user(
+ UserID.from_string(user1_id),
+ from_token=after_room1_token,
+ to_token=after_room1_token,
+ )
+ )
+
+ # We should still see the room because we were joined before the `from_token`
+ self.assertEqual(room_id_results, {room_id1})
+
+ def test_kicked_before_range_and_left_after_to_token(self) -> None:
+ """
+ Room still shows up if we left the room but were kicked before the `from_token`
+ so it should show up. See condition "1a)" comments in the
+ `get_sync_room_ids_for_user()` method.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
+
+ # Setup the kick room (user2 kicks user1 from the room)
+ kick_room_id = self.helper.create_room_as(
+ user2_id, tok=user2_tok, is_public=True
+ )
+ self.helper.join(kick_room_id, user1_id, tok=user1_tok)
+ # Kick user1 from the room
+ self.helper.change_membership(
+ room=kick_room_id,
+ src=user2_id,
+ targ=user1_id,
+ tok=user2_tok,
+ membership=Membership.LEAVE,
+ extra_data={
+ "reason": "Bad manners",
+ },
+ )
+
+ after_kick_token = self.event_sources.get_current_token()
+
+ # Leave the room after we already have our tokens
+ #
+ # We have to join before we can leave (leave -> leave isn't a valid transition
+ # or at least it doesn't work in Synapse, 403 forbidden)
+ self.helper.join(kick_room_id, user1_id, tok=user1_tok)
+ self.helper.leave(kick_room_id, user1_id, tok=user1_tok)
+
+ room_id_results = self.get_success(
+ self.sliding_sync_handler.get_sync_room_ids_for_user(
+ UserID.from_string(user1_id),
+ from_token=after_kick_token,
+ to_token=after_kick_token,
+ )
+ )
+
+ # We shouldn't see the room because it was forgotten
+ self.assertEqual(room_id_results, {kick_room_id})
+
+ def test_newly_left_during_range_and_join_leave_after_to_token(self) -> None:
+ """
+ Newly left room should show up. But we're also testing that joining and leaving
+ after the `to_token` doesn't mess with the results. See condition "2)" and "1a)"
+ comments in the `get_sync_room_ids_for_user()` method.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
+
+ before_room1_token = self.event_sources.get_current_token()
+
+ # We create the room with user2 so the room isn't left with no members when we
+ # leave and can still re-join.
+ room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
+ # Join and leave the room during the from/to range
+ self.helper.join(room_id1, user1_id, tok=user1_tok)
+ self.helper.leave(room_id1, user1_id, tok=user1_tok)
+
+ after_room1_token = self.event_sources.get_current_token()
+
+ # Join and leave the room after we already have our tokens
+ self.helper.join(room_id1, user1_id, tok=user1_tok)
+ self.helper.leave(room_id1, user1_id, tok=user1_tok)
+
+ room_id_results = self.get_success(
+ self.sliding_sync_handler.get_sync_room_ids_for_user(
+ UserID.from_string(user1_id),
+ from_token=before_room1_token,
+ to_token=after_room1_token,
+ )
+ )
+
+ # Room should still show up because it's newly_left during the from/to range
+ self.assertEqual(room_id_results, {room_id1})
+
+ def test_newly_left_during_range_and_join_after_to_token(self) -> None:
+ """
+ Newly left room should show up. But we're also testing that joining after the
+ `to_token` doesn't mess with the results. See condition "2)" and "1b)" comments
+ in the `get_sync_room_ids_for_user()` method.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
+
+ before_room1_token = self.event_sources.get_current_token()
+
+ # We create the room with user2 so the room isn't left with no members when we
+ # leave and can still re-join.
+ room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
+ # Join and leave the room during the from/to range
+ self.helper.join(room_id1, user1_id, tok=user1_tok)
+ self.helper.leave(room_id1, user1_id, tok=user1_tok)
+
+ after_room1_token = self.event_sources.get_current_token()
+
+ # Join the room after we already have our tokens
+ self.helper.join(room_id1, user1_id, tok=user1_tok)
+
+ room_id_results = self.get_success(
+ self.sliding_sync_handler.get_sync_room_ids_for_user(
+ UserID.from_string(user1_id),
+ from_token=before_room1_token,
+ to_token=after_room1_token,
+ )
+ )
+
+ # Room should still show up because it's newly_left during the from/to range
+ self.assertEqual(room_id_results, {room_id1})
+
+ def test_no_from_token(self) -> None:
+ """
+ Test that if we don't provide a `from_token`, we get all the rooms that we we're
+ joined to up to the `to_token`.
+
+ Providing `from_token` only really has the effect that it adds `newly_left`
+ rooms to the response.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
+
+ # We create the room with user2 so the room isn't left with no members when we
+ # leave and can still re-join.
+ room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
+ room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
+
+ # Join room1
+ self.helper.join(room_id1, user1_id, tok=user1_tok)
+
+ # Join and leave the room2 before the `to_token`
+ self.helper.join(room_id2, user1_id, tok=user1_tok)
+ self.helper.leave(room_id2, user1_id, tok=user1_tok)
+
+ after_room1_token = self.event_sources.get_current_token()
+
+ # Join the room2 after we already have our tokens
+ self.helper.join(room_id2, user1_id, tok=user1_tok)
+
+ room_id_results = self.get_success(
+ self.sliding_sync_handler.get_sync_room_ids_for_user(
+ UserID.from_string(user1_id),
+ from_token=None,
+ to_token=after_room1_token,
+ )
+ )
+
+ # Only rooms we were joined to before the `to_token` should show up
+ self.assertEqual(room_id_results, {room_id1})
+
+ def test_from_token_ahead_of_to_token(self) -> None:
+ """
+ Test when the provided `from_token` comes after the `to_token`. We should
+ basically expect the same result as having no `from_token`.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
+
+ # We create the room with user2 so the room isn't left with no members when we
+ # leave and can still re-join.
+ room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
+ room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
+ room_id3 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
+ room_id4 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
+
+ # Join room1 before `before_room_token`
+ self.helper.join(room_id1, user1_id, tok=user1_tok)
+
+ # Join and leave the room2 before `before_room_token`
+ self.helper.join(room_id2, user1_id, tok=user1_tok)
+ self.helper.leave(room_id2, user1_id, tok=user1_tok)
+
+ # Note: These are purposely swapped. The `from_token` should come after
+ # the `to_token` in this test
+ to_token = self.event_sources.get_current_token()
+
+ # Join room2 after `before_room_token`
+ self.helper.join(room_id2, user1_id, tok=user1_tok)
+
+ # --------
+
+ # Join room3 after `before_room_token`
+ self.helper.join(room_id3, user1_id, tok=user1_tok)
+
+ # Join and leave the room4 after `before_room_token`
+ self.helper.join(room_id4, user1_id, tok=user1_tok)
+ self.helper.leave(room_id4, user1_id, tok=user1_tok)
+
+ # Note: These are purposely swapped. The `from_token` should come after the
+ # `to_token` in this test
+ from_token = self.event_sources.get_current_token()
+
+ # Join the room4 after we already have our tokens
+ self.helper.join(room_id4, user1_id, tok=user1_tok)
+
+ room_id_results = self.get_success(
+ self.sliding_sync_handler.get_sync_room_ids_for_user(
+ UserID.from_string(user1_id),
+ from_token=from_token,
+ to_token=to_token,
+ )
+ )
+
+ # Only rooms we were joined to before the `to_token` should show up
+ #
+ # There won't be any newly_left rooms because the `from_token` is ahead of the
+ # `to_token` and that range will give no membership changes to check.
+ self.assertEqual(room_id_results, {room_id1})
+
+ def test_leave_before_range_and_join_leave_after_to_token(self) -> None:
+ """
+ Old left room shouldn't show up. But we're also testing that joining and leaving
+ after the `to_token` doesn't mess with the results. See condition "1a)" comments
+ in the `get_sync_room_ids_for_user()` method.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
+
+ # We create the room with user2 so the room isn't left with no members when we
+ # leave and can still re-join.
+ room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
+ # Join and leave the room before the from/to range
+ self.helper.join(room_id1, user1_id, tok=user1_tok)
+ self.helper.leave(room_id1, user1_id, tok=user1_tok)
+
+ after_room1_token = self.event_sources.get_current_token()
+
+ # Join and leave the room after we already have our tokens
+ self.helper.join(room_id1, user1_id, tok=user1_tok)
+ self.helper.leave(room_id1, user1_id, tok=user1_tok)
+
+ room_id_results = self.get_success(
+ self.sliding_sync_handler.get_sync_room_ids_for_user(
+ UserID.from_string(user1_id),
+ from_token=after_room1_token,
+ to_token=after_room1_token,
+ )
+ )
+
+ # Room shouldn't show up because it was left before the `from_token`
+ self.assertEqual(room_id_results, set())
+
+ def test_leave_before_range_and_join_after_to_token(self) -> None:
+ """
+ Old left room shouldn't show up. But we're also testing that joining after the
+ `to_token` doesn't mess with the results. See condition "1b)" comments in the
+ `get_sync_room_ids_for_user()` method.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
+
+ # We create the room with user2 so the room isn't left with no members when we
+ # leave and can still re-join.
+ room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
+ # Join and leave the room before the from/to range
+ self.helper.join(room_id1, user1_id, tok=user1_tok)
+ self.helper.leave(room_id1, user1_id, tok=user1_tok)
+
+ after_room1_token = self.event_sources.get_current_token()
+
+ # Join the room after we already have our tokens
+ self.helper.join(room_id1, user1_id, tok=user1_tok)
+
+ room_id_results = self.get_success(
+ self.sliding_sync_handler.get_sync_room_ids_for_user(
+ UserID.from_string(user1_id),
+ from_token=after_room1_token,
+ to_token=after_room1_token,
+ )
+ )
+
+ # Room shouldn't show up because it was left before the `from_token`
+ self.assertEqual(room_id_results, set())
+
+ def test_join_leave_multiple_times_during_range_and_after_to_token(
+ self,
+ ) -> None:
+ """
+ Join and leave multiple times shouldn't affect rooms from showing up. It just
+ matters that we were joined or newly_left in the from/to range. But we're also
+ testing that joining and leaving after the `to_token` doesn't mess with the
+ results.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
+
+ before_room1_token = self.event_sources.get_current_token()
+
+ # We create the room with user2 so the room isn't left with no members when we
+ # leave and can still re-join.
+ room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
+ # Join, leave, join back to the room before the from/to range
+ self.helper.join(room_id1, user1_id, tok=user1_tok)
+ self.helper.leave(room_id1, user1_id, tok=user1_tok)
+ self.helper.join(room_id1, user1_id, tok=user1_tok)
+
+ after_room1_token = self.event_sources.get_current_token()
+
+ # Leave and Join the room multiple times after we already have our tokens
+ self.helper.leave(room_id1, user1_id, tok=user1_tok)
+ self.helper.join(room_id1, user1_id, tok=user1_tok)
+ self.helper.leave(room_id1, user1_id, tok=user1_tok)
+
+ room_id_results = self.get_success(
+ self.sliding_sync_handler.get_sync_room_ids_for_user(
+ UserID.from_string(user1_id),
+ from_token=before_room1_token,
+ to_token=after_room1_token,
+ )
+ )
+
+ # Room should show up because it was newly_left and joined during the from/to range
+ self.assertEqual(room_id_results, {room_id1})
+
+ def test_join_leave_multiple_times_before_range_and_after_to_token(
+ self,
+ ) -> None:
+ """
+ Join and leave multiple times before the from/to range shouldn't affect rooms
+ from showing up. It just matters that we were joined or newly_left in the
+ from/to range. But we're also testing that joining and leaving after the
+ `to_token` doesn't mess with the results.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
+
+ # We create the room with user2 so the room isn't left with no members when we
+ # leave and can still re-join.
+ room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
+ # Join, leave, join back to the room before the from/to range
+ self.helper.join(room_id1, user1_id, tok=user1_tok)
+ self.helper.leave(room_id1, user1_id, tok=user1_tok)
+ self.helper.join(room_id1, user1_id, tok=user1_tok)
+
+ after_room1_token = self.event_sources.get_current_token()
+
+ # Leave and Join the room multiple times after we already have our tokens
+ self.helper.leave(room_id1, user1_id, tok=user1_tok)
+ self.helper.join(room_id1, user1_id, tok=user1_tok)
+ self.helper.leave(room_id1, user1_id, tok=user1_tok)
+
+ room_id_results = self.get_success(
+ self.sliding_sync_handler.get_sync_room_ids_for_user(
+ UserID.from_string(user1_id),
+ from_token=after_room1_token,
+ to_token=after_room1_token,
+ )
+ )
+
+ # Room should show up because we were joined before the from/to range
+ self.assertEqual(room_id_results, {room_id1})
+
+ def test_invite_before_range_and_join_leave_after_to_token(
+ self,
+ ) -> None:
+ """
+ Make it look like we joined after the token range but we were invited before the
+ from/to range so the room should still show up. See condition "1a)" comments in
+ the `get_sync_room_ids_for_user()` method.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
+
+ # We create the room with user2 so the room isn't left with no members when we
+ # leave and can still re-join.
+ room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
+
+ # Invited to the room before the token
+ self.helper.invite(room_id1, src=user2_id, targ=user1_id, tok=user2_tok)
+
+ after_room1_token = self.event_sources.get_current_token()
+
+ # Join and leave the room after we already have our tokens
+ self.helper.join(room_id1, user1_id, tok=user1_tok)
+ self.helper.leave(room_id1, user1_id, tok=user1_tok)
+
+ room_id_results = self.get_success(
+ self.sliding_sync_handler.get_sync_room_ids_for_user(
+ UserID.from_string(user1_id),
+ from_token=after_room1_token,
+ to_token=after_room1_token,
+ )
+ )
+
+ # Room should show up because we were invited before the from/to range
+ self.assertEqual(room_id_results, {room_id1})
+
+ def test_multiple_rooms_are_not_confused(
+ self,
+ ) -> None:
+ """
+ Test that multiple rooms are not confused as we fixup the list. This test is
+ spawning from a real world bug in the code where I was accidentally using
+ `event.room_id` in one of the fix-up loops but the `event` being referenced was
+ actually from a different loop.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
+
+ # We create the room with user2 so the room isn't left with no members when we
+ # leave and can still re-join.
+ room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
+ room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
+
+ # Invited and left the room before the token
+ self.helper.invite(room_id1, src=user2_id, targ=user1_id, tok=user2_tok)
+ self.helper.leave(room_id1, user1_id, tok=user1_tok)
+ # Invited to room2
+ self.helper.invite(room_id2, src=user2_id, targ=user1_id, tok=user2_tok)
+
+ before_room3_token = self.event_sources.get_current_token()
+
+ # Invited and left room3 during the from/to range
+ room_id3 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
+ self.helper.invite(room_id3, src=user2_id, targ=user1_id, tok=user2_tok)
+ self.helper.leave(room_id3, user1_id, tok=user1_tok)
+
+ after_room3_token = self.event_sources.get_current_token()
+
+ # Join and leave the room after we already have our tokens
+ self.helper.join(room_id1, user1_id, tok=user1_tok)
+ self.helper.leave(room_id1, user1_id, tok=user1_tok)
+ # Leave room2
+ self.helper.leave(room_id2, user1_id, tok=user1_tok)
+ # Leave room3
+ self.helper.leave(room_id3, user1_id, tok=user1_tok)
+
+ room_id_results = self.get_success(
+ self.sliding_sync_handler.get_sync_room_ids_for_user(
+ UserID.from_string(user1_id),
+ from_token=before_room3_token,
+ to_token=after_room3_token,
+ )
+ )
+
+ self.assertEqual(
+ room_id_results,
+ {
+ # `room_id1` shouldn't show up because we left before the from/to range
+ #
+ # Room should show up because we were invited before the from/to range
+ room_id2,
+ # Room should show up because it was newly_left during the from/to range
+ room_id3,
+ },
+ )
+
+
+class GetSyncRoomIdsForUserEventShardTestCase(BaseMultiWorkerStreamTestCase):
+ """
+ Tests Sliding Sync handler `get_sync_room_ids_for_user()` to make sure it works with
+ sharded event stream_writers enabled
+ """
+
+ servlets = [
+ admin.register_servlets_for_client_rest_resource,
+ room.register_servlets,
+ login.register_servlets,
+ ]
+
+ def default_config(self) -> dict:
+ config = super().default_config()
+ # Enable sliding sync
+ config["experimental_features"] = {"msc3575_enabled": True}
+
+ # Enable shared event stream_writers
+ config["stream_writers"] = {"events": ["worker1", "worker2", "worker3"]}
+ config["instance_map"] = {
+ "main": {"host": "testserv", "port": 8765},
+ "worker1": {"host": "testserv", "port": 1001},
+ "worker2": {"host": "testserv", "port": 1002},
+ "worker3": {"host": "testserv", "port": 1003},
+ }
+ return config
+
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+ self.sliding_sync_handler = self.hs.get_sliding_sync_handler()
+ self.store = self.hs.get_datastores().main
+ self.event_sources = hs.get_event_sources()
+
+ def _create_room(self, room_id: str, user_id: str, tok: str) -> None:
+ """
+ Create a room with a specific room_id. We use this so that that we have a
+ consistent room_id across test runs that hashes to the same value and will be
+ sharded to a known worker in the tests.
+ """
+
+ # We control the room ID generation by patching out the
+ # `_generate_room_id` method
+ with patch(
+ "synapse.handlers.room.RoomCreationHandler._generate_room_id"
+ ) as mock:
+ mock.side_effect = lambda: room_id
+ self.helper.create_room_as(user_id, tok=tok)
+
+ def test_sharded_event_persisters(self) -> None:
+ """
+ This test should catch bugs that would come from flawed stream position
+ (`stream_ordering`) comparisons or making `RoomStreamToken`'s naively. To
+ compare event positions properly, you need to consider both the `instance_name`
+ and `stream_ordering` together.
+
+ The test creates three event persister workers and a room that is sharded to
+ each worker. On worker2, we make the event stream position stuck so that it lags
+ behind the other workers and we start getting `RoomStreamToken` that have an
+ `instance_map` component (i.e. q`m{min_pos}~{writer1}.{pos1}~{writer2}.{pos2}`).
+
+ We then send some events to advance the stream positions of worker1 and worker3
+ but worker2 is lagging behind because it's stuck. We are specifically testing
+ that `get_sync_room_ids_for_user(from_token=xxx, to_token=xxx)` should work
+ correctly in these adverse conditions.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
+
+ self.make_worker_hs(
+ "synapse.app.generic_worker",
+ {"worker_name": "worker1"},
+ )
+
+ worker_hs2 = self.make_worker_hs(
+ "synapse.app.generic_worker",
+ {"worker_name": "worker2"},
+ )
+
+ self.make_worker_hs(
+ "synapse.app.generic_worker",
+ {"worker_name": "worker3"},
+ )
+
+ # Specially crafted room IDs that get persisted on different workers.
+ #
+ # Sharded to worker1
+ room_id1 = "!fooo:test"
+ # Sharded to worker2
+ room_id2 = "!bar:test"
+ # Sharded to worker3
+ room_id3 = "!quux:test"
+
+ # Create rooms on the different workers.
+ self._create_room(room_id1, user2_id, user2_tok)
+ self._create_room(room_id2, user2_id, user2_tok)
+ self._create_room(room_id3, user2_id, user2_tok)
+ join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok)
+ join_response2 = self.helper.join(room_id2, user1_id, tok=user1_tok)
+ # Leave room2
+ self.helper.leave(room_id2, user1_id, tok=user1_tok)
+ join_response3 = self.helper.join(room_id3, user1_id, tok=user1_tok)
+ # Leave room3
+ self.helper.leave(room_id3, user1_id, tok=user1_tok)
+
+ # Ensure that the events were sharded to different workers.
+ pos1 = self.get_success(
+ self.store.get_position_for_event(join_response1["event_id"])
+ )
+ self.assertEqual(pos1.instance_name, "worker1")
+ pos2 = self.get_success(
+ self.store.get_position_for_event(join_response2["event_id"])
+ )
+ self.assertEqual(pos2.instance_name, "worker2")
+ pos3 = self.get_success(
+ self.store.get_position_for_event(join_response3["event_id"])
+ )
+ self.assertEqual(pos3.instance_name, "worker3")
+
+ before_stuck_activity_token = self.event_sources.get_current_token()
+
+ # We now gut wrench into the events stream `MultiWriterIdGenerator` on worker2 to
+ # mimic it getting stuck persisting an event. This ensures that when we send an
+ # event on worker1/worker3 we end up in a state where worker2 events stream
+ # position lags that on worker1/worker3, resulting in a RoomStreamToken with a
+ # non-empty `instance_map` component.
+ #
+ # Worker2's event stream position will not advance until we call `__aexit__`
+ # again.
+ worker_store2 = worker_hs2.get_datastores().main
+ assert isinstance(worker_store2._stream_id_gen, MultiWriterIdGenerator)
+ actx = worker_store2._stream_id_gen.get_next()
+ self.get_success(actx.__aenter__())
+
+ # For room_id1/worker1: leave and join the room to advance the stream position
+ # and generate membership changes.
+ self.helper.leave(room_id1, user1_id, tok=user1_tok)
+ self.helper.join(room_id1, user1_id, tok=user1_tok)
+ # For room_id2/worker2: which is currently stuck, join the room.
+ join_on_worker2_response = self.helper.join(room_id2, user1_id, tok=user1_tok)
+ # For room_id3/worker3: leave and join the room to advance the stream position
+ # and generate membership changes.
+ self.helper.leave(room_id3, user1_id, tok=user1_tok)
+ join_on_worker3_response = self.helper.join(room_id3, user1_id, tok=user1_tok)
+
+ # Get a token while things are stuck after our activity
+ stuck_activity_token = self.event_sources.get_current_token()
+ logger.info("stuck_activity_token %s", stuck_activity_token)
+ # Let's make sure we're working with a token that has an `instance_map`
+ self.assertNotEqual(len(stuck_activity_token.room_key.instance_map), 0)
+
+ # Just double check that the join event on worker2 (that is stuck) happened
+ # after the position recorded for worker2 in the token but before the max
+ # position in the token. This is crucial for the behavior we're trying to test.
+ join_on_worker2_pos = self.get_success(
+ self.store.get_position_for_event(join_on_worker2_response["event_id"])
+ )
+ logger.info("join_on_worker2_pos %s", join_on_worker2_pos)
+ # Ensure the join technially came after our token
+ self.assertGreater(
+ join_on_worker2_pos.stream,
+ stuck_activity_token.room_key.get_stream_pos_for_instance("worker2"),
+ )
+ # But less than the max stream position of some other worker
+ self.assertLess(
+ join_on_worker2_pos.stream,
+ # max
+ stuck_activity_token.room_key.get_max_stream_pos(),
+ )
+
+ # Just double check that the join event on worker3 happened after the min stream
+ # value in the token but still within the position recorded for worker3. This is
+ # crucial for the behavior we're trying to test.
+ join_on_worker3_pos = self.get_success(
+ self.store.get_position_for_event(join_on_worker3_response["event_id"])
+ )
+ logger.info("join_on_worker3_pos %s", join_on_worker3_pos)
+ # Ensure the join came after the min but still encapsulated by the token
+ self.assertGreaterEqual(
+ join_on_worker3_pos.stream,
+ # min
+ stuck_activity_token.room_key.stream,
+ )
+ self.assertLessEqual(
+ join_on_worker3_pos.stream,
+ stuck_activity_token.room_key.get_stream_pos_for_instance("worker3"),
+ )
+
+ # We finish the fake persisting an event we started above and advance worker2's
+ # event stream position (unstuck worker2).
+ self.get_success(actx.__aexit__(None, None, None))
+
+ # The function under test
+ room_id_results = self.get_success(
+ self.sliding_sync_handler.get_sync_room_ids_for_user(
+ UserID.from_string(user1_id),
+ from_token=before_stuck_activity_token,
+ to_token=stuck_activity_token,
+ )
+ )
+
+ self.assertEqual(
+ room_id_results,
+ {
+ room_id1,
+ # room_id2 shouldn't show up because we left before the from/to range
+ # and the join event during the range happened while worker2 was stuck.
+ # This means that from the perspective of the master, where the
+ # `stuck_activity_token` is generated, the stream position for worker2
+ # wasn't advanced to the join yet. Looking at the `instance_map`, the
+ # join technically comes after `stuck_activity_token``.
+ #
+ # room_id2,
+ room_id3,
+ },
+ )
diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py
index daeb1d3ddd..a20a3fb40d 100644
--- a/tests/rest/client/test_sync.py
+++ b/tests/rest/client/test_sync.py
@@ -34,7 +34,7 @@ from synapse.api.constants import (
)
from synapse.rest.client import devices, knock, login, read_marker, receipts, room, sync
from synapse.server import HomeServer
-from synapse.types import JsonDict
+from synapse.types import JsonDict, RoomStreamToken, StreamKeyType
from synapse.util import Clock
from tests import unittest
@@ -1204,3 +1204,135 @@ class ExcludeRoomTestCase(unittest.HomeserverTestCase):
self.assertNotIn(self.excluded_room_id, channel.json_body["rooms"]["join"])
self.assertIn(self.included_room_id, channel.json_body["rooms"]["join"])
+
+
+class SlidingSyncTestCase(unittest.HomeserverTestCase):
+ """
+ Tests regarding MSC3575 Sliding Sync `/sync` endpoint.
+ """
+
+ servlets = [
+ synapse.rest.admin.register_servlets,
+ login.register_servlets,
+ room.register_servlets,
+ sync.register_servlets,
+ devices.register_servlets,
+ ]
+
+ def default_config(self) -> JsonDict:
+ config = super().default_config()
+ # Enable sliding sync
+ config["experimental_features"] = {"msc3575_enabled": True}
+ return config
+
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+ self.sync_endpoint = "/_matrix/client/unstable/org.matrix.msc3575/sync"
+ self.store = hs.get_datastores().main
+ self.event_sources = hs.get_event_sources()
+
+ def test_sync_list(self) -> None:
+ """
+ Test that room IDs show up in the Sliding Sync lists
+ """
+ alice_user_id = self.register_user("alice", "correcthorse")
+ alice_access_token = self.login(alice_user_id, "correcthorse")
+
+ room_id = self.helper.create_room_as(
+ alice_user_id, tok=alice_access_token, is_public=True
+ )
+
+ # Make the Sliding Sync request
+ channel = self.make_request(
+ "POST",
+ self.sync_endpoint,
+ {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 99]],
+ "sort": ["by_notification_level", "by_recency", "by_name"],
+ "required_state": [
+ ["m.room.join_rules", ""],
+ ["m.room.history_visibility", ""],
+ ["m.space.child", "*"],
+ ],
+ "timeline_limit": 1,
+ }
+ }
+ },
+ access_token=alice_access_token,
+ )
+ self.assertEqual(channel.code, 200, channel.json_body)
+
+ # Make sure it has the foo-list we requested
+ self.assertListEqual(
+ list(channel.json_body["lists"].keys()),
+ ["foo-list"],
+ channel.json_body["lists"].keys(),
+ )
+
+ # Make sure the list includes the room we are joined to
+ self.assertListEqual(
+ list(channel.json_body["lists"]["foo-list"]["ops"]),
+ [
+ {
+ "op": "SYNC",
+ "range": [0, 99],
+ "room_ids": [room_id],
+ }
+ ],
+ channel.json_body["lists"]["foo-list"],
+ )
+
+ def test_wait_for_sync_token(self) -> None:
+ """
+ Test that worker will wait until it catches up to the given token
+ """
+ alice_user_id = self.register_user("alice", "correcthorse")
+ alice_access_token = self.login(alice_user_id, "correcthorse")
+
+ # Create a future token that will cause us to wait. Since we never send a new
+ # event to reach that future stream_ordering, the worker will wait until the
+ # full timeout.
+ current_token = self.event_sources.get_current_token()
+ future_position_token = current_token.copy_and_replace(
+ StreamKeyType.ROOM,
+ RoomStreamToken(stream=current_token.room_key.stream + 1),
+ )
+
+ future_position_token_serialized = self.get_success(
+ future_position_token.to_string(self.store)
+ )
+
+ # Make the Sliding Sync request
+ channel = self.make_request(
+ "POST",
+ self.sync_endpoint + f"?pos={future_position_token_serialized}",
+ {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 99]],
+ "sort": ["by_notification_level", "by_recency", "by_name"],
+ "required_state": [
+ ["m.room.join_rules", ""],
+ ["m.room.history_visibility", ""],
+ ["m.space.child", "*"],
+ ],
+ "timeline_limit": 1,
+ }
+ }
+ },
+ access_token=alice_access_token,
+ await_result=False,
+ )
+ # Block for 10 seconds to make `notifier.wait_for_stream_token(from_token)`
+ # timeout
+ with self.assertRaises(TimedOutException):
+ channel.await_result(timeout_ms=9900)
+ channel.await_result(timeout_ms=200)
+ self.assertEqual(channel.code, 200, channel.json_body)
+
+ # We expect the `next_pos` in the result to be the same as what we requested
+ # with because we weren't able to find anything new yet.
+ self.assertEqual(
+ channel.json_body["next_pos"], future_position_token_serialized
+ )
diff --git a/tests/rest/client/utils.py b/tests/rest/client/utils.py
index 7362bde7ab..f0ba40a1f1 100644
--- a/tests/rest/client/utils.py
+++ b/tests/rest/client/utils.py
@@ -330,9 +330,12 @@ class RestHelper:
data,
)
- assert channel.code == expect_code, "Expected: %d, got: %d, resp: %r" % (
+ assert (
+ channel.code == expect_code
+ ), "Expected: %d, got: %d, PUT %s -> resp: %r" % (
expect_code,
channel.code,
+ path,
channel.result["body"],
)
|