diff --git a/changelog.d/17320.feature b/changelog.d/17320.feature
new file mode 100644
index 0000000000..1e524f3eca
--- /dev/null
+++ b/changelog.d/17320.feature
@@ -0,0 +1 @@
+Add `rooms` data to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.
diff --git a/synapse/events/utils.py b/synapse/events/utils.py
index b997d82d71..f937fd4698 100644
--- a/synapse/events/utils.py
+++ b/synapse/events/utils.py
@@ -836,3 +836,21 @@ def maybe_upsert_event_field(
del container[key]
return upsert_okay
+
+
+def strip_event(event: EventBase) -> JsonDict:
+ """
+ Used for "stripped state" events which provide a simplified view of the state of a
+ room intended to help a potential joiner identify the room (relevant when the user
+ is invited or knocked).
+
+ Stripped state events can only have the `sender`, `type`, `state_key` and `content`
+ properties present.
+ """
+
+ return {
+ "type": event.type,
+ "state_key": event.state_key,
+ "content": event.content,
+ "sender": event.sender,
+ }
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 847a638bba..8622ef8472 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -18,22 +18,28 @@
#
#
import logging
-from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple
+import attr
from immutabledict import immutabledict
-from synapse.api.constants import AccountDataTypes, EventTypes, Membership
+from synapse.api.constants import AccountDataTypes, Direction, EventTypes, Membership
from synapse.events import EventBase
-from synapse.storage.roommember import RoomsForUser
+from synapse.events.utils import strip_event
+from synapse.handlers.relations import BundledAggregations
+from synapse.storage.databases.main.stream import CurrentStateDeltaMembership
from synapse.types import (
+ JsonDict,
PersistedEventPosition,
Requester,
RoomStreamToken,
+ StreamKeyType,
StreamToken,
UserID,
)
from synapse.types.handlers import OperationType, SlidingSyncConfig, SlidingSyncResult
from synapse.types.state import StateFilter
+from synapse.visibility import filter_events_for_client
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -41,28 +47,9 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
-def convert_event_to_rooms_for_user(event: EventBase) -> RoomsForUser:
- """
- Quick helper to convert an event to a `RoomsForUser` object.
- """
- # These fields should be present for all persisted events
- assert event.internal_metadata.stream_ordering is not None
- assert event.internal_metadata.instance_name is not None
-
- return RoomsForUser(
- room_id=event.room_id,
- sender=event.sender,
- membership=event.membership,
- event_id=event.event_id,
- event_pos=PersistedEventPosition(
- event.internal_metadata.instance_name,
- event.internal_metadata.stream_ordering,
- ),
- room_version_id=event.room_version.identifier,
- )
-
-
-def filter_membership_for_sync(*, membership: str, user_id: str, sender: str) -> bool:
+def filter_membership_for_sync(
+ *, membership: str, user_id: str, sender: Optional[str]
+) -> bool:
"""
Returns True if the membership event should be included in the sync response,
otherwise False.
@@ -79,7 +66,54 @@ def filter_membership_for_sync(*, membership: str, user_id: str, sender: str) ->
#
# This logic includes kicks (leave events where the sender is not the same user) and
# can be read as "anything that isn't a leave or a leave with a different sender".
- return membership != Membership.LEAVE or sender != user_id
+ #
+ # When `sender=None` and `membership=Membership.LEAVE`, it means that a state reset
+ # happened that removed the user from the room, or the user was the last person
+ # locally to leave the room which caused the server to leave the room. In both
+ # cases, we can just remove the rooms since they are no longer relevant to the user.
+ # They could still be added back later if they are `newly_left`.
+ return membership != Membership.LEAVE or sender not in (user_id, None)
+
+
+# We can't freeze this class because we want to update it in place with the
+# de-duplicated data.
+@attr.s(slots=True, auto_attribs=True)
+class RoomSyncConfig:
+ """
+ Holds the config for what data we should fetch for a room in the sync response.
+
+ Attributes:
+ timeline_limit: The maximum number of events to return in the timeline.
+ required_state: The set of state events requested for the room. The
+ values are close to `StateKey` but actually use a syntax where you can
+ provide `*` wildcard and `$LAZY` for lazy room members as the `state_key` part
+ of the tuple (type, state_key).
+ """
+
+ timeline_limit: int
+ required_state: Set[Tuple[str, str]]
+
+
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class _RoomMembershipForUser:
+ """
+ Attributes:
+ event_id: The event ID of the membership event
+ event_pos: The stream position of the membership event
+ membership: The membership state of the user in the room
+ sender: The person who sent the membership event
+ newly_joined: Whether the user newly joined the room during the given token
+ range
+ """
+
+ event_id: Optional[str]
+ event_pos: PersistedEventPosition
+ membership: str
+ sender: Optional[str]
+ newly_joined: bool
+
+ def copy_and_replace(self, **kwds: Any) -> "_RoomMembershipForUser":
+ return attr.evolve(self, **kwds)
class SlidingSyncHandler:
@@ -90,6 +124,7 @@ class SlidingSyncHandler:
self.auth_blocking = hs.get_auth_blocking()
self.notifier = hs.get_notifier()
self.event_sources = hs.get_event_sources()
+ self.relations_handler = hs.get_relations_handler()
self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync
async def wait_for_sync_for_user(
@@ -201,6 +236,7 @@ class SlidingSyncHandler:
# Assemble sliding window lists
lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {}
+ relevant_room_map: Dict[str, RoomSyncConfig] = {}
if sync_config.lists:
# Get all of the room IDs that the user should be able to see in the sync
# response
@@ -225,29 +261,67 @@ class SlidingSyncHandler:
ops: List[SlidingSyncResult.SlidingWindowList.Operation] = []
if list_config.ranges:
for range in list_config.ranges:
+ sliced_room_ids = [
+ room_id
+ # Both sides of range are inclusive
+ for room_id, _ in sorted_room_info[range[0] : range[1] + 1]
+ ]
+
ops.append(
SlidingSyncResult.SlidingWindowList.Operation(
op=OperationType.SYNC,
range=range,
- room_ids=[
- room_id
- for room_id, _ in sorted_room_info[
- range[0] : range[1]
- ]
- ],
+ room_ids=sliced_room_ids,
)
)
+ # Take the superset of the `RoomSyncConfig` for each room
+ for room_id in sliced_room_ids:
+ if relevant_room_map.get(room_id) is not None:
+ # Take the highest timeline limit
+ if (
+ relevant_room_map[room_id].timeline_limit
+ < list_config.timeline_limit
+ ):
+ relevant_room_map[room_id].timeline_limit = (
+ list_config.timeline_limit
+ )
+
+ # Union the required state
+ relevant_room_map[room_id].required_state.update(
+ list_config.required_state
+ )
+ else:
+ relevant_room_map[room_id] = RoomSyncConfig(
+ timeline_limit=list_config.timeline_limit,
+ required_state=set(list_config.required_state),
+ )
+
lists[list_key] = SlidingSyncResult.SlidingWindowList(
count=len(sorted_room_info),
ops=ops,
)
+ # TODO: if (sync_config.room_subscriptions):
+
+ # Fetch room data
+ rooms: Dict[str, SlidingSyncResult.RoomResult] = {}
+ for room_id, room_sync_config in relevant_room_map.items():
+ room_sync_result = await self.get_room_sync_data(
+ user=sync_config.user,
+ room_id=room_id,
+ room_sync_config=room_sync_config,
+ rooms_membership_for_user_at_to_token=sync_room_map[room_id],
+ from_token=from_token,
+ to_token=to_token,
+ )
+
+ rooms[room_id] = room_sync_result
+
return SlidingSyncResult(
next_pos=to_token,
lists=lists,
- # TODO: Gather room data for rooms in lists and `sync_config.room_subscriptions`
- rooms={},
+ rooms=rooms,
extensions={},
)
@@ -256,7 +330,7 @@ class SlidingSyncHandler:
user: UserID,
to_token: StreamToken,
from_token: Optional[StreamToken] = None,
- ) -> Dict[str, RoomsForUser]:
+ ) -> Dict[str, _RoomMembershipForUser]:
"""
Fetch room IDs that should be listed for this user in the sync response (the
full room list that will be filtered, sorted, and sliced).
@@ -305,13 +379,17 @@ class SlidingSyncHandler:
# Our working list of rooms that can show up in the sync response
sync_room_id_set = {
- room_for_user.room_id: room_for_user
- for room_for_user in room_for_user_list
- if filter_membership_for_sync(
+ # Note: The `room_for_user` we're assigning here will need to be fixed up
+ # (below) because they are potentially from the current snapshot time
+ # instead from the time of the `to_token`.
+ room_for_user.room_id: _RoomMembershipForUser(
+ event_id=room_for_user.event_id,
+ event_pos=room_for_user.event_pos,
membership=room_for_user.membership,
- user_id=user_id,
sender=room_for_user.sender,
+ newly_joined=False,
)
+ for room_for_user in room_for_user_list
}
# Get the `RoomStreamToken` that represents the spot we queried up to when we got
@@ -346,14 +424,9 @@ class SlidingSyncHandler:
#
# - 1a) Remove rooms that the user joined after the `to_token`
# - 1b) Add back rooms that the user left after the `to_token`
+ # - 1c) Update room membership events to the point in time of the `to_token`
# - 2) Add back newly_left rooms (> `from_token` and <= `to_token`)
- #
- # Below, we're doing two separate lookups for membership changes. We could
- # request everything for both fixups in one range, [`from_token.room_key`,
- # `membership_snapshot_token`), but we want to avoid raw `stream_ordering`
- # comparison without `instance_name` (which is flawed). We could refactor
- # `event.internal_metadata` to include `instance_name` but it might turn out a
- # little difficult and a bigger, broader Synapse change than we want to make.
+ # - 3) Figure out which rooms are `newly_joined`
# 1) -----------------------------------------------------
@@ -363,159 +436,198 @@ class SlidingSyncHandler:
# If our `to_token` is already the same or ahead of the latest room membership
# for the user, we don't need to do any "2)" fix-ups and can just straight-up
# use the room list from the snapshot as a base (nothing has changed)
- membership_change_events_after_to_token = []
+ current_state_delta_membership_changes_after_to_token = []
if not membership_snapshot_token.is_before_or_eq(to_token.room_key):
- membership_change_events_after_to_token = (
- await self.store.get_membership_changes_for_user(
+ current_state_delta_membership_changes_after_to_token = (
+ await self.store.get_current_state_delta_membership_changes_for_user(
user_id,
from_key=to_token.room_key,
to_key=membership_snapshot_token,
- excluded_rooms=self.rooms_to_exclude_globally,
+ excluded_room_ids=self.rooms_to_exclude_globally,
)
)
- # 1) Assemble a list of the last membership events in some given ranges. Someone
- # could have left and joined multiple times during the given range but we only
- # care about end-result so we grab the last one.
- last_membership_change_by_room_id_after_to_token: Dict[str, EventBase] = {}
- # We also need the first membership event after the `to_token` so we can step
- # backward to the previous membership that would apply to the from/to range.
- first_membership_change_by_room_id_after_to_token: Dict[str, EventBase] = {}
- for event in membership_change_events_after_to_token:
- last_membership_change_by_room_id_after_to_token[event.room_id] = event
+ # 1) Assemble a list of the first membership event after the `to_token` so we can
+ # step backward to the previous membership that would apply to the from/to
+ # range.
+ first_membership_change_by_room_id_after_to_token: Dict[
+ str, CurrentStateDeltaMembership
+ ] = {}
+ for membership_change in current_state_delta_membership_changes_after_to_token:
# Only set if we haven't already set it
first_membership_change_by_room_id_after_to_token.setdefault(
- event.room_id, event
+ membership_change.room_id, membership_change
)
# 1) Fixup
+ #
+ # Since we fetched a snapshot of the users room list at some point in time after
+ # the from/to tokens, we need to revert/rewind some membership changes to match
+ # the point in time of the `to_token`.
for (
- last_membership_change_after_to_token
- ) in last_membership_change_by_room_id_after_to_token.values():
- room_id = last_membership_change_after_to_token.room_id
-
- # We want to find the first membership change after the `to_token` then step
- # backward to know the membership in the from/to range.
- first_membership_change_after_to_token = (
- first_membership_change_by_room_id_after_to_token.get(room_id)
- )
- assert first_membership_change_after_to_token is not None, (
- "If there was a `last_membership_change_after_to_token` that we're iterating over, "
- + "then there should be corresponding a first change. For example, even if there "
- + "is only one event after the `to_token`, the first and last event will be same event. "
- + "This is probably a mistake in assembling the `last_membership_change_by_room_id_after_to_token`"
- + "/`first_membership_change_by_room_id_after_to_token` dicts above."
- )
- # TODO: Instead of reading from `unsigned`, refactor this to use the
- # `current_state_delta_stream` table in the future. Probably a new
- # `get_membership_changes_for_user()` function that uses
- # `current_state_delta_stream` with a join to `room_memberships`. This would
- # help in state reset scenarios since `prev_content` is looking at the
- # current branch vs the current room state. This is all just data given to
- # the client so no real harm to data integrity, but we'd like to be nice to
- # the client. Since the `current_state_delta_stream` table is new, it
- # doesn't have all events in it. Since this is Sliding Sync, if we ever need
- # to, we can signal the client to throw all of their state away by sending
- # "operation: RESET".
- prev_content = first_membership_change_after_to_token.unsigned.get(
- "prev_content", {}
- )
- prev_membership = prev_content.get("membership", None)
- prev_sender = first_membership_change_after_to_token.unsigned.get(
- "prev_sender", None
- )
-
- # Check if the previous membership (membership that applies to the from/to
- # range) should be included in our `sync_room_id_set`
- should_prev_membership_be_included = (
- prev_membership is not None
- and prev_sender is not None
- and filter_membership_for_sync(
- membership=prev_membership,
- user_id=user_id,
- sender=prev_sender,
- )
- )
-
- # Check if the last membership (membership that applies to our snapshot) was
- # already included in our `sync_room_id_set`
- was_last_membership_already_included = filter_membership_for_sync(
- membership=last_membership_change_after_to_token.membership,
+ room_id,
+ first_membership_change_after_to_token,
+ ) in first_membership_change_by_room_id_after_to_token.items():
+ # 1a) Remove rooms that the user joined after the `to_token`
+ if first_membership_change_after_to_token.prev_event_id is None:
+ sync_room_id_set.pop(room_id, None)
+ # 1b) 1c) From the first membership event after the `to_token`, step backward to the
+ # previous membership that would apply to the from/to range.
+ else:
+ # We don't expect these fields to be `None` if we have a `prev_event_id`
+ # but we're being defensive since it's possible that the prev event was
+ # culled from the database.
+ if (
+ first_membership_change_after_to_token.prev_event_pos is not None
+ and first_membership_change_after_to_token.prev_membership
+ is not None
+ ):
+ sync_room_id_set[room_id] = _RoomMembershipForUser(
+ event_id=first_membership_change_after_to_token.prev_event_id,
+ event_pos=first_membership_change_after_to_token.prev_event_pos,
+ membership=first_membership_change_after_to_token.prev_membership,
+ sender=first_membership_change_after_to_token.prev_sender,
+ newly_joined=False,
+ )
+ else:
+ # If we can't find the previous membership event, we shouldn't
+ # include the room in the sync response since we can't determine the
+ # exact membership state and shouldn't rely on the current snapshot.
+ sync_room_id_set.pop(room_id, None)
+
+ # Filter the rooms that that we have updated room membership events to the point
+ # in time of the `to_token` (from the "1)" fixups)
+ filtered_sync_room_id_set = {
+ room_id: room_membership_for_user
+ for room_id, room_membership_for_user in sync_room_id_set.items()
+ if filter_membership_for_sync(
+ membership=room_membership_for_user.membership,
user_id=user_id,
- sender=last_membership_change_after_to_token.sender,
+ sender=room_membership_for_user.sender,
)
-
- # 1a) Add back rooms that the user left after the `to_token`
- #
- # For example, if the last membership event after the `to_token` is a leave
- # event, then the room was excluded from `sync_room_id_set` when we first
- # crafted it above. We should add these rooms back as long as the user also
- # was part of the room before the `to_token`.
- if (
- not was_last_membership_already_included
- and should_prev_membership_be_included
- ):
- sync_room_id_set[room_id] = convert_event_to_rooms_for_user(
- last_membership_change_after_to_token
- )
- # 1b) Remove rooms that the user joined (hasn't left) after the `to_token`
- #
- # For example, if the last membership event after the `to_token` is a "join"
- # event, then the room was included `sync_room_id_set` when we first crafted
- # it above. We should remove these rooms as long as the user also wasn't
- # part of the room before the `to_token`.
- elif (
- was_last_membership_already_included
- and not should_prev_membership_be_included
- ):
- del sync_room_id_set[room_id]
+ }
# 2) -----------------------------------------------------
# We fix-up newly_left rooms after the first fixup because it may have removed
- # some left rooms that we can figure out our newly_left in the following code
+ # some left rooms that we can figure out are newly_left in the following code
# 2) Fetch membership changes that fall in the range from `from_token` up to `to_token`
- membership_change_events_in_from_to_range = []
+ current_state_delta_membership_changes_in_from_to_range = []
if from_token:
- membership_change_events_in_from_to_range = (
- await self.store.get_membership_changes_for_user(
+ current_state_delta_membership_changes_in_from_to_range = (
+ await self.store.get_current_state_delta_membership_changes_for_user(
user_id,
from_key=from_token.room_key,
to_key=to_token.room_key,
- excluded_rooms=self.rooms_to_exclude_globally,
+ excluded_room_ids=self.rooms_to_exclude_globally,
)
)
# 2) Assemble a list of the last membership events in some given ranges. Someone
# could have left and joined multiple times during the given range but we only
# care about end-result so we grab the last one.
- last_membership_change_by_room_id_in_from_to_range: Dict[str, EventBase] = {}
- for event in membership_change_events_in_from_to_range:
- last_membership_change_by_room_id_in_from_to_range[event.room_id] = event
+ last_membership_change_by_room_id_in_from_to_range: Dict[
+ str, CurrentStateDeltaMembership
+ ] = {}
+ # We also want to assemble a list of the first membership events during the token
+ # range so we can step backward to the previous membership that would apply to
+ # before the token range to see if we have `newly_joined` the room.
+ first_membership_change_by_room_id_in_from_to_range: Dict[
+ str, CurrentStateDeltaMembership
+ ] = {}
+ # Keep track if the room has a non-join event in the token range so we can later
+ # tell if it was a `newly_joined` room. If the last membership event in the
+ # token range is a join and there is also some non-join in the range, we know
+ # they `newly_joined`.
+ has_non_join_event_by_room_id_in_from_to_range: Dict[str, bool] = {}
+ for (
+ membership_change
+ ) in current_state_delta_membership_changes_in_from_to_range:
+ room_id = membership_change.room_id
+
+ last_membership_change_by_room_id_in_from_to_range[room_id] = (
+ membership_change
+ )
+ # Only set if we haven't already set it
+ first_membership_change_by_room_id_in_from_to_range.setdefault(
+ room_id, membership_change
+ )
+
+ if membership_change.membership != Membership.JOIN:
+ has_non_join_event_by_room_id_in_from_to_range[room_id] = True
# 2) Fixup
+ #
+ # 3) We also want to assemble a list of possibly newly joined rooms. Someone
+ # could have left and joined multiple times during the given range but we only
+ # care about whether they are joined at the end of the token range so we are
+ # working with the last membership even in the token range.
+ possibly_newly_joined_room_ids = set()
for (
last_membership_change_in_from_to_range
) in last_membership_change_by_room_id_in_from_to_range.values():
room_id = last_membership_change_in_from_to_range.room_id
+ # 3)
+ if last_membership_change_in_from_to_range.membership == Membership.JOIN:
+ possibly_newly_joined_room_ids.add(room_id)
+
# 2) Add back newly_left rooms (> `from_token` and <= `to_token`). We
# include newly_left rooms because the last event that the user should see
# is their own leave event
if last_membership_change_in_from_to_range.membership == Membership.LEAVE:
- sync_room_id_set[room_id] = convert_event_to_rooms_for_user(
- last_membership_change_in_from_to_range
+ filtered_sync_room_id_set[room_id] = _RoomMembershipForUser(
+ event_id=last_membership_change_in_from_to_range.event_id,
+ event_pos=last_membership_change_in_from_to_range.event_pos,
+ membership=last_membership_change_in_from_to_range.membership,
+ sender=last_membership_change_in_from_to_range.sender,
+ newly_joined=False,
)
- return sync_room_id_set
+ # 3) Figure out `newly_joined`
+ for room_id in possibly_newly_joined_room_ids:
+ has_non_join_in_from_to_range = (
+ has_non_join_event_by_room_id_in_from_to_range.get(room_id, False)
+ )
+ # If the last membership event in the token range is a join and there is
+ # also some non-join in the range, we know they `newly_joined`.
+ if has_non_join_in_from_to_range:
+ # We found a `newly_joined` room (we left and joined within the token range)
+ filtered_sync_room_id_set[room_id] = filtered_sync_room_id_set[
+ room_id
+ ].copy_and_replace(newly_joined=True)
+ else:
+ prev_event_id = first_membership_change_by_room_id_in_from_to_range[
+ room_id
+ ].prev_event_id
+ prev_membership = first_membership_change_by_room_id_in_from_to_range[
+ room_id
+ ].prev_membership
+
+ if prev_event_id is None:
+ # We found a `newly_joined` room (we are joining the room for the
+ # first time within the token range)
+ filtered_sync_room_id_set[room_id] = filtered_sync_room_id_set[
+ room_id
+ ].copy_and_replace(newly_joined=True)
+ # Last resort, we need to step back to the previous membership event
+ # just before the token range to see if we're joined then or not.
+ elif prev_membership != Membership.JOIN:
+ # We found a `newly_joined` room (we left before the token range
+ # and joined within the token range)
+ filtered_sync_room_id_set[room_id] = filtered_sync_room_id_set[
+ room_id
+ ].copy_and_replace(newly_joined=True)
+
+ return filtered_sync_room_id_set
async def filter_rooms(
self,
user: UserID,
- sync_room_map: Dict[str, RoomsForUser],
+ sync_room_map: Dict[str, _RoomMembershipForUser],
filters: SlidingSyncConfig.SlidingSyncList.Filters,
to_token: StreamToken,
- ) -> Dict[str, RoomsForUser]:
+ ) -> Dict[str, _RoomMembershipForUser]:
"""
Filter rooms based on the sync request.
@@ -629,9 +741,9 @@ class SlidingSyncHandler:
async def sort_rooms(
self,
- sync_room_map: Dict[str, RoomsForUser],
+ sync_room_map: Dict[str, _RoomMembershipForUser],
to_token: StreamToken,
- ) -> List[Tuple[str, RoomsForUser]]:
+ ) -> List[Tuple[str, _RoomMembershipForUser]]:
"""
Sort by `stream_ordering` of the last event that the user should see in the
room. `stream_ordering` is unique so we get a stable sort.
@@ -678,3 +790,229 @@ class SlidingSyncHandler:
# We want descending order
reverse=True,
)
+
+ async def get_room_sync_data(
+ self,
+ user: UserID,
+ room_id: str,
+ room_sync_config: RoomSyncConfig,
+ rooms_membership_for_user_at_to_token: _RoomMembershipForUser,
+ from_token: Optional[StreamToken],
+ to_token: StreamToken,
+ ) -> SlidingSyncResult.RoomResult:
+ """
+ Fetch room data for the sync response.
+
+ We fetch data according to the token range (> `from_token` and <= `to_token`).
+
+ Args:
+ user: User to fetch data for
+ room_id: The room ID to fetch data for
+ room_sync_config: Config for what data we should fetch for a room in the
+ sync response.
+ rooms_membership_for_user_at_to_token: Membership information for the user
+ in the room at the time of `to_token`.
+ from_token: The point in the stream to sync from.
+ to_token: The point in the stream to sync up to.
+ """
+
+ # Assemble the list of timeline events
+ #
+ # It would be nice to make the `rooms` response more uniform regardless of
+ # membership. Currently, we have to make all of these optional because
+ # `invite`/`knock` rooms only have `stripped_state`. See
+ # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932
+ timeline_events: Optional[List[EventBase]] = None
+ bundled_aggregations: Optional[Dict[str, BundledAggregations]] = None
+ limited: Optional[bool] = None
+ prev_batch_token: Optional[StreamToken] = None
+ num_live: Optional[int] = None
+ if (
+ room_sync_config.timeline_limit > 0
+ # No timeline for invite/knock rooms (just `stripped_state`)
+ and rooms_membership_for_user_at_to_token.membership
+ not in (Membership.INVITE, Membership.KNOCK)
+ ):
+ limited = False
+ # We want to start off using the `to_token` (vs `from_token`) because we look
+ # backwards from the `to_token` up to the `timeline_limit` and we might not
+ # reach the `from_token` before we hit the limit. We will update the room stream
+ # position once we've fetched the events to point to the earliest event fetched.
+ prev_batch_token = to_token
+
+ # We're going to paginate backwards from the `to_token`
+ from_bound = to_token.room_key
+ # People shouldn't see past their leave/ban event
+ if rooms_membership_for_user_at_to_token.membership in (
+ Membership.LEAVE,
+ Membership.BAN,
+ ):
+ from_bound = (
+ rooms_membership_for_user_at_to_token.event_pos.to_room_stream_token()
+ )
+
+ # Determine whether we should limit the timeline to the token range.
+ #
+ # We should return historical messages (before token range) in the
+ # following cases because we want clients to be able to show a basic
+ # screen of information:
+ # - Initial sync (because no `from_token` to limit us anyway)
+ # - When users `newly_joined`
+ # - TODO: For an incremental sync where we haven't sent it down this
+ # connection before
+ to_bound = (
+ from_token.room_key
+ if from_token is not None
+ and not rooms_membership_for_user_at_to_token.newly_joined
+ else None
+ )
+
+ timeline_events, new_room_key = await self.store.paginate_room_events(
+ room_id=room_id,
+ from_key=from_bound,
+ to_key=to_bound,
+ direction=Direction.BACKWARDS,
+ # We add one so we can determine if there are enough events to saturate
+ # the limit or not (see `limited`)
+ limit=room_sync_config.timeline_limit + 1,
+ event_filter=None,
+ )
+
+ # We want to return the events in ascending order (the last event is the
+ # most recent).
+ timeline_events.reverse()
+
+ # Determine our `limited` status based on the timeline. We do this before
+ # filtering the events so we can accurately determine if there is more to
+ # paginate even if we filter out some/all events.
+ if len(timeline_events) > room_sync_config.timeline_limit:
+ limited = True
+ # Get rid of that extra "+ 1" event because we only used it to determine
+ # if we hit the limit or not
+ timeline_events = timeline_events[-room_sync_config.timeline_limit :]
+ assert timeline_events[0].internal_metadata.stream_ordering
+ new_room_key = RoomStreamToken(
+ stream=timeline_events[0].internal_metadata.stream_ordering - 1
+ )
+
+ # Make sure we don't expose any events that the client shouldn't see
+ timeline_events = await filter_events_for_client(
+ self.storage_controllers,
+ user.to_string(),
+ timeline_events,
+ is_peeking=rooms_membership_for_user_at_to_token.membership
+ != Membership.JOIN,
+ filter_send_to_client=True,
+ )
+ # TODO: Filter out `EventTypes.CallInvite` in public rooms,
+ # see https://github.com/element-hq/synapse/issues/17359
+
+ # TODO: Handle timeline gaps (`get_timeline_gaps()`)
+
+ # Determine how many "live" events we have (events within the given token range).
+ #
+ # This is mostly useful to determine whether a given @mention event should
+ # make a noise or not. Clients cannot rely solely on the absence of
+ # `initial: true` to determine live events because if a room not in the
+ # sliding window bumps into the window because of an @mention it will have
+ # `initial: true` yet contain a single live event (with potentially other
+ # old events in the timeline)
+ num_live = 0
+ if from_token is not None:
+ for timeline_event in reversed(timeline_events):
+ # This fields should be present for all persisted events
+ assert timeline_event.internal_metadata.stream_ordering is not None
+ assert timeline_event.internal_metadata.instance_name is not None
+
+ persisted_position = PersistedEventPosition(
+ instance_name=timeline_event.internal_metadata.instance_name,
+ stream=timeline_event.internal_metadata.stream_ordering,
+ )
+ if persisted_position.persisted_after(from_token.room_key):
+ num_live += 1
+ else:
+ # Since we're iterating over the timeline events in
+ # reverse-chronological order, we can break once we hit an event
+ # that's not live. In the future, we could potentially optimize
+ # this more with a binary search (bisect).
+ break
+
+ # If the timeline is `limited=True`, the client does not have all events
+ # necessary to calculate aggregations themselves.
+ if limited:
+ bundled_aggregations = (
+ await self.relations_handler.get_bundled_aggregations(
+ timeline_events, user.to_string()
+ )
+ )
+
+ # Update the `prev_batch_token` to point to the position that allows us to
+ # keep paginating backwards from the oldest event we return in the timeline.
+ prev_batch_token = prev_batch_token.copy_and_replace(
+ StreamKeyType.ROOM, new_room_key
+ )
+
+ # Figure out any stripped state events for invite/knocks. This allows the
+ # potential joiner to identify the room.
+ stripped_state: List[JsonDict] = []
+ if rooms_membership_for_user_at_to_token.membership in (
+ Membership.INVITE,
+ Membership.KNOCK,
+ ):
+ # This should never happen. If someone is invited/knocked on room, then
+ # there should be an event for it.
+ assert rooms_membership_for_user_at_to_token.event_id is not None
+
+ invite_or_knock_event = await self.store.get_event(
+ rooms_membership_for_user_at_to_token.event_id
+ )
+
+ stripped_state = []
+ if invite_or_knock_event.membership == Membership.INVITE:
+ stripped_state.extend(
+ invite_or_knock_event.unsigned.get("invite_room_state", [])
+ )
+ elif invite_or_knock_event.membership == Membership.KNOCK:
+ stripped_state.extend(
+ invite_or_knock_event.unsigned.get("knock_room_state", [])
+ )
+
+ stripped_state.append(strip_event(invite_or_knock_event))
+
+ # TODO: Handle state resets. For example, if we see
+ # `rooms_membership_for_user_at_to_token.membership = Membership.LEAVE` but
+ # `required_state` doesn't include it, we should indicate to the client that a
+ # state reset happened. Perhaps we should indicate this by setting `initial:
+ # True` and empty `required_state`.
+
+ return SlidingSyncResult.RoomResult(
+ # TODO: Dummy value
+ name=None,
+ # TODO: Dummy value
+ avatar=None,
+ # TODO: Dummy value
+ heroes=None,
+ # TODO: Since we can't determine whether we've already sent a room down this
+ # Sliding Sync connection before (we plan to add this optimization in the
+ # future), we're always returning the requested room state instead of
+ # updates.
+ initial=True,
+ # TODO: Dummy value
+ required_state=[],
+ timeline_events=timeline_events,
+ bundled_aggregations=bundled_aggregations,
+ # TODO: Dummy value
+ is_dm=False,
+ stripped_state=stripped_state,
+ prev_batch=prev_batch_token,
+ limited=limited,
+ # TODO: Dummy values
+ joined_count=0,
+ invited_count=0,
+ # TODO: These are just dummy values. We could potentially just remove these
+ # since notifications can only really be done correctly on the client anyway
+ # (encrypted rooms).
+ notification_count=0,
+ highlight_count=0,
+ num_live=num_live,
+ )
diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py
index b5ab0d8534..1d955a2e89 100644
--- a/synapse/rest/client/sync.py
+++ b/synapse/rest/client/sync.py
@@ -761,7 +761,6 @@ class SlidingSyncRestServlet(RestServlet):
"lists": {
"foo-list": {
"ranges": [ [0, 99] ],
- "sort": [ "by_notification_level", "by_recency", "by_name" ],
"required_state": [
["m.room.join_rules", ""],
["m.room.history_visibility", ""],
@@ -771,7 +770,6 @@ class SlidingSyncRestServlet(RestServlet):
"filters": {
"is_dm": true
},
- "bump_event_types": [ "m.room.message", "m.room.encrypted" ],
}
},
// Room Subscriptions API
@@ -779,10 +777,6 @@ class SlidingSyncRestServlet(RestServlet):
"!sub1:bar": {
"required_state": [ ["*","*"] ],
"timeline_limit": 10,
- "include_old_rooms": {
- "timeline_limit": 1,
- "required_state": [ ["m.room.tombstone", ""], ["m.room.create", ""] ],
- }
}
},
// Extensions API
@@ -791,7 +785,7 @@ class SlidingSyncRestServlet(RestServlet):
Response JSON::
{
- "next_pos": "s58_224_0_13_10_1_1_16_0_1",
+ "pos": "s58_224_0_13_10_1_1_16_0_1",
"lists": {
"foo-list": {
"count": 1337,
@@ -830,7 +824,8 @@ class SlidingSyncRestServlet(RestServlet):
"joined_count": 41,
"invited_count": 1,
"notification_count": 1,
- "highlight_count": 0
+ "highlight_count": 0,
+ "num_live": 2"
},
// rooms from list
"!foo:bar": {
@@ -855,7 +850,8 @@ class SlidingSyncRestServlet(RestServlet):
"joined_count": 4,
"invited_count": 0,
"notification_count": 54,
- "highlight_count": 3
+ "highlight_count": 3,
+ "num_live": 1,
},
// ... 99 more items
},
@@ -871,10 +867,11 @@ class SlidingSyncRestServlet(RestServlet):
super().__init__()
self.auth = hs.get_auth()
self.store = hs.get_datastores().main
+ self.clock = hs.get_clock()
self.filtering = hs.get_filtering()
self.sliding_sync_handler = hs.get_sliding_sync_handler()
+ self.event_serializer = hs.get_event_client_serializer()
- # TODO: Update this to `on_GET` once we figure out how we want to handle params
async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request, allow_guest=True)
user = requester.user
@@ -920,22 +917,25 @@ class SlidingSyncRestServlet(RestServlet):
logger.info("Client has disconnected; not serializing response.")
return 200, {}
- response_content = await self.encode_response(sliding_sync_results)
+ response_content = await self.encode_response(requester, sliding_sync_results)
return 200, response_content
# TODO: Is there a better way to encode things?
async def encode_response(
self,
+ requester: Requester,
sliding_sync_result: SlidingSyncResult,
) -> JsonDict:
response: JsonDict = defaultdict(dict)
- response["next_pos"] = await sliding_sync_result.next_pos.to_string(self.store)
+ response["pos"] = await sliding_sync_result.next_pos.to_string(self.store)
serialized_lists = self.encode_lists(sliding_sync_result.lists)
if serialized_lists:
response["lists"] = serialized_lists
- response["rooms"] = {} # TODO: sliding_sync_result.rooms
+ response["rooms"] = await self.encode_rooms(
+ requester, sliding_sync_result.rooms
+ )
response["extensions"] = {} # TODO: sliding_sync_result.extensions
return response
@@ -961,6 +961,92 @@ class SlidingSyncRestServlet(RestServlet):
return serialized_lists
+ async def encode_rooms(
+ self,
+ requester: Requester,
+ rooms: Dict[str, SlidingSyncResult.RoomResult],
+ ) -> JsonDict:
+ time_now = self.clock.time_msec()
+
+ serialize_options = SerializeEventConfig(
+ event_format=format_event_for_client_v2_without_room_id,
+ requester=requester,
+ )
+
+ serialized_rooms: Dict[str, JsonDict] = {}
+ for room_id, room_result in rooms.items():
+ serialized_rooms[room_id] = {
+ "joined_count": room_result.joined_count,
+ "invited_count": room_result.invited_count,
+ "notification_count": room_result.notification_count,
+ "highlight_count": room_result.highlight_count,
+ }
+
+ if room_result.name:
+ serialized_rooms[room_id]["name"] = room_result.name
+
+ if room_result.avatar:
+ serialized_rooms[room_id]["avatar"] = room_result.avatar
+
+ if room_result.heroes:
+ serialized_rooms[room_id]["heroes"] = room_result.heroes
+
+ # We should only include the `initial` key if it's `True` to save bandwidth.
+ # The absense of this flag means `False`.
+ if room_result.initial:
+ serialized_rooms[room_id]["initial"] = room_result.initial
+
+ # This will omitted for invite/knock rooms with `stripped_state`
+ if room_result.required_state is not None:
+ serialized_required_state = (
+ await self.event_serializer.serialize_events(
+ room_result.required_state,
+ time_now,
+ config=serialize_options,
+ )
+ )
+ serialized_rooms[room_id]["required_state"] = serialized_required_state
+
+ # This will omitted for invite/knock rooms with `stripped_state`
+ if room_result.timeline_events is not None:
+ serialized_timeline = await self.event_serializer.serialize_events(
+ room_result.timeline_events,
+ time_now,
+ config=serialize_options,
+ bundle_aggregations=room_result.bundled_aggregations,
+ )
+ serialized_rooms[room_id]["timeline"] = serialized_timeline
+
+ # This will omitted for invite/knock rooms with `stripped_state`
+ if room_result.limited is not None:
+ serialized_rooms[room_id]["limited"] = room_result.limited
+
+ # This will omitted for invite/knock rooms with `stripped_state`
+ if room_result.prev_batch is not None:
+ serialized_rooms[room_id]["prev_batch"] = (
+ await room_result.prev_batch.to_string(self.store)
+ )
+
+ # This will omitted for invite/knock rooms with `stripped_state`
+ if room_result.num_live is not None:
+ serialized_rooms[room_id]["num_live"] = room_result.num_live
+
+ # Field should be absent on non-DM rooms
+ if room_result.is_dm:
+ serialized_rooms[room_id]["is_dm"] = room_result.is_dm
+
+ # Stripped state only applies to invite/knock rooms
+ if room_result.stripped_state is not None:
+ # TODO: `knocked_state` but that isn't specced yet.
+ #
+ # TODO: Instead of adding `knocked_state`, it would be good to rename
+ # this to `stripped_state` so it can be shared between invite and knock
+ # rooms, see
+ # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1117629919
+ serialized_rooms[room_id]["invite_state"] = room_result.stripped_state
+
+ return serialized_rooms
+
def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
SyncRestServlet(hs).register(http_server)
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 198e65cfa5..a5acea8c3b 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -55,7 +55,7 @@ from synapse.api.room_versions import (
)
from synapse.events import EventBase, make_event_from_dict
from synapse.events.snapshot import EventContext
-from synapse.events.utils import prune_event
+from synapse.events.utils import prune_event, strip_event
from synapse.logging.context import (
PreserveLoggingContext,
current_context,
@@ -1025,15 +1025,7 @@ class EventsWorkerStore(SQLBaseStore):
state_to_include = await self.get_events(selected_state_ids.values())
- return [
- {
- "type": e.type,
- "state_key": e.state_key,
- "content": e.content,
- "sender": e.sender,
- }
- for e in state_to_include.values()
- ]
+ return [strip_event(e) for e in state_to_include.values()]
def _maybe_start_fetch_thread(self) -> None:
"""Starts an event fetch thread if we are not yet at the maximum number."""
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index b7eb3116ae..d34376b8df 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -44,6 +44,7 @@ what sort order was used:
import logging
from typing import (
TYPE_CHECKING,
+ AbstractSet,
Any,
Collection,
Dict,
@@ -62,7 +63,7 @@ from typing_extensions import Literal
from twisted.internet import defer
-from synapse.api.constants import Direction
+from synapse.api.constants import Direction, EventTypes, Membership
from synapse.api.filtering import Filter
from synapse.events import EventBase
from synapse.logging.context import make_deferred_yieldable, run_in_background
@@ -111,6 +112,32 @@ class _EventsAround:
end: RoomStreamToken
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class CurrentStateDeltaMembership:
+ """
+ Attributes:
+ event_id: The "current" membership event ID in this room.
+ event_pos: The position of the "current" membership event in the event stream.
+ prev_event_id: The previous membership event in this room that was replaced by
+ the "current" one. May be `None` if there was no previous membership event.
+ room_id: The room ID of the membership event.
+ membership: The membership state of the user in the room
+ sender: The person who sent the membership event
+ """
+
+ room_id: str
+ # Event
+ event_id: Optional[str]
+ event_pos: PersistedEventPosition
+ membership: str
+ sender: Optional[str]
+ # Prev event
+ prev_event_id: Optional[str]
+ prev_event_pos: Optional[PersistedEventPosition]
+ prev_membership: Optional[str]
+ prev_sender: Optional[str]
+
+
def generate_pagination_where_clause(
direction: Direction,
column_names: Tuple[str, str],
@@ -390,6 +417,43 @@ def _filter_results(
return True
+def _filter_results_by_stream(
+ lower_token: Optional[RoomStreamToken],
+ upper_token: Optional[RoomStreamToken],
+ instance_name: str,
+ stream_ordering: int,
+) -> bool:
+ """
+ This function only works with "live" tokens with `stream_ordering` only. See
+ `_filter_results(...)` if you want to work with all tokens.
+
+ Returns True if the event persisted by the given instance at the given
+ stream_ordering falls between the two tokens (taking a None
+ token to mean unbounded).
+
+ Used to filter results from fetching events in the DB against the given
+ tokens. This is necessary to handle the case where the tokens include
+ position maps, which we handle by fetching more than necessary from the DB
+ and then filtering (rather than attempting to construct a complicated SQL
+ query).
+ """
+ if lower_token:
+ assert lower_token.topological is None
+
+ # If these are live tokens we compare the stream ordering against the
+ # writers stream position.
+ if stream_ordering <= lower_token.get_stream_pos_for_instance(instance_name):
+ return False
+
+ if upper_token:
+ assert upper_token.topological is None
+
+ if upper_token.get_stream_pos_for_instance(instance_name) < stream_ordering:
+ return False
+
+ return True
+
+
def filter_to_clause(event_filter: Optional[Filter]) -> Tuple[str, List[str]]:
# NB: This may create SQL clauses that don't optimise well (and we don't
# have indices on all possible clauses). E.g. it may create
@@ -734,6 +798,191 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
return ret, key
+ async def get_current_state_delta_membership_changes_for_user(
+ self,
+ user_id: str,
+ from_key: RoomStreamToken,
+ to_key: RoomStreamToken,
+ excluded_room_ids: Optional[List[str]] = None,
+ ) -> List[CurrentStateDeltaMembership]:
+ """
+ Fetch membership events (and the previous event that was replaced by that one)
+ for a given user.
+
+ Note: This function only works with "live" tokens with `stream_ordering` only.
+
+ We're looking for membership changes in the token range (> `from_key` and <=
+ `to_key`).
+
+ Please be mindful to only use this with `from_key` and `to_key` tokens that are
+ recent enough to be after when the first local user joined the room. Otherwise,
+ the results may be incomplete or too greedy. For example, if you use a token
+ range before the first local user joined the room, you will see 0 events since
+ `current_state_delta_stream` tracks what the server thinks is the current state
+ of the room as time goes. It does not track how state progresses from the
+ beginning of the room. So for example, when you remotely join a room, the first
+ rows will just be the state when you joined and progress from there.
+
+ You can probably reasonably use this with `/sync` because the `to_key` passed in
+ will be the "current" now token and the range will cover when the user joined
+ the room.
+
+ Args:
+ user_id: The user ID to fetch membership events for.
+ from_key: The point in the stream to sync from (fetching events > this point).
+ to_key: The token to fetch rooms up to (fetching events <= this point).
+ excluded_room_ids: Optional list of room IDs to exclude from the results.
+
+ Returns:
+ All membership changes to the current state in the token range. Events are
+ sorted by `stream_ordering` ascending.
+ """
+ # Start by ruling out cases where a DB query is not necessary.
+ if from_key == to_key:
+ return []
+
+ if from_key:
+ has_changed = self._membership_stream_cache.has_entity_changed(
+ user_id, int(from_key.stream)
+ )
+ if not has_changed:
+ return []
+
+ def f(txn: LoggingTransaction) -> List[CurrentStateDeltaMembership]:
+ # To handle tokens with a non-empty instance_map we fetch more
+ # results than necessary and then filter down
+ min_from_id = from_key.stream
+ max_to_id = to_key.get_max_stream_pos()
+
+ args: List[Any] = [min_from_id, max_to_id, EventTypes.Member, user_id]
+
+ # TODO: It would be good to assert that the `from_token`/`to_token` is >=
+ # the first row in `current_state_delta_stream` for the rooms we're
+ # interested in. Otherwise, we will end up with empty results and not know
+ # it.
+
+ # We could `COALESCE(e.stream_ordering, s.stream_id)` to get more accurate
+ # stream positioning when available but given our usages, we can avoid the
+ # complexity. Between two (valid) stream tokens, we will still get all of
+ # the state changes. Since those events are persisted in a batch, valid
+ # tokens will either be before or after the batch of events.
+ #
+ # `stream_ordering` from the `events` table is more accurate when available
+ # since the `current_state_delta_stream` table only tracks that the current
+ # state is at this stream position (not what stream position the state event
+ # was added) and uses the *minimum* stream position for batches of events.
+ sql = """
+ SELECT
+ s.room_id,
+ e.event_id,
+ s.instance_name,
+ s.stream_id,
+ m.membership,
+ e.sender,
+ s.prev_event_id,
+ e_prev.instance_name AS prev_instance_name,
+ e_prev.stream_ordering AS prev_stream_ordering,
+ m_prev.membership AS prev_membership,
+ e_prev.sender AS prev_sender
+ FROM current_state_delta_stream AS s
+ LEFT JOIN events AS e ON e.event_id = s.event_id
+ LEFT JOIN room_memberships AS m ON m.event_id = s.event_id
+ LEFT JOIN events AS e_prev ON e_prev.event_id = s.prev_event_id
+ LEFT JOIN room_memberships AS m_prev ON m_prev.event_id = s.prev_event_id
+ WHERE s.stream_id > ? AND s.stream_id <= ?
+ AND s.type = ?
+ AND s.state_key = ?
+ ORDER BY s.stream_id ASC
+ """
+
+ txn.execute(sql, args)
+
+ membership_changes: List[CurrentStateDeltaMembership] = []
+ for (
+ room_id,
+ event_id,
+ instance_name,
+ stream_ordering,
+ membership,
+ sender,
+ prev_event_id,
+ prev_instance_name,
+ prev_stream_ordering,
+ prev_membership,
+ prev_sender,
+ ) in txn:
+ assert room_id is not None
+ assert instance_name is not None
+ assert stream_ordering is not None
+
+ if _filter_results_by_stream(
+ from_key,
+ to_key,
+ instance_name,
+ stream_ordering,
+ ):
+ # When the server leaves a room, it will insert new rows into the
+ # `current_state_delta_stream` table with `event_id = null` for all
+ # current state. This means we might already have a row for the
+ # leave event and then another for the same leave where the
+ # `event_id=null` but the `prev_event_id` is pointing back at the
+ # earlier leave event. We don't want to report the leave, if we
+ # already have a leave event.
+ if event_id is None and prev_membership == Membership.LEAVE:
+ continue
+
+ membership_change = CurrentStateDeltaMembership(
+ room_id=room_id,
+ # Event
+ event_id=event_id,
+ event_pos=PersistedEventPosition(
+ instance_name=instance_name,
+ stream=stream_ordering,
+ ),
+ # When `s.event_id = null`, we won't be able to get respective
+ # `room_membership` but can assume the user has left the room
+ # because this only happens when the server leaves a room
+ # (meaning everyone locally left) or a state reset which removed
+ # the person from the room.
+ membership=(
+ membership if membership is not None else Membership.LEAVE
+ ),
+ sender=sender,
+ # Prev event
+ prev_event_id=prev_event_id,
+ prev_event_pos=(
+ PersistedEventPosition(
+ instance_name=prev_instance_name,
+ stream=prev_stream_ordering,
+ )
+ if (
+ prev_instance_name is not None
+ and prev_stream_ordering is not None
+ )
+ else None
+ ),
+ prev_membership=prev_membership,
+ prev_sender=prev_sender,
+ )
+
+ membership_changes.append(membership_change)
+
+ return membership_changes
+
+ membership_changes = await self.db_pool.runInteraction(
+ "get_current_state_delta_membership_changes_for_user", f
+ )
+
+ room_ids_to_exclude: AbstractSet[str] = set()
+ if excluded_room_ids is not None:
+ room_ids_to_exclude = set(excluded_room_ids)
+
+ return [
+ membership_change
+ for membership_change in membership_changes
+ if membership_change.room_id not in room_ids_to_exclude
+ ]
+
@cancellable
async def get_membership_changes_for_user(
self,
@@ -769,10 +1018,11 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
ignore_room_clause = ""
if excluded_rooms is not None and len(excluded_rooms) > 0:
- ignore_room_clause = "AND e.room_id NOT IN (%s)" % ",".join(
- "?" for _ in excluded_rooms
+ ignore_room_clause, ignore_room_args = make_in_list_sql_clause(
+ txn.database_engine, "e.room_id", excluded_rooms, negative=True
)
- args = args + excluded_rooms
+ ignore_room_clause = f"AND {ignore_room_clause}"
+ args += ignore_room_args
sql = """
SELECT m.event_id, instance_name, topological_ordering, stream_ordering
@@ -1554,6 +1804,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
) -> Tuple[List[EventBase], RoomStreamToken]:
"""Returns list of events before or after a given token.
+ When Direction.FORWARDS: from_key < x <= to_key
+ When Direction.BACKWARDS: from_key >= x > to_key
+
Args:
room_id
from_key: The token used to stream from
@@ -1570,6 +1823,27 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
and `to_key`).
"""
+ # We can bail early if we're looking forwards, and our `to_key` is already
+ # before our `from_key`.
+ if (
+ direction == Direction.FORWARDS
+ and to_key is not None
+ and to_key.is_before_or_eq(from_key)
+ ):
+ # Token selection matches what we do in `_paginate_room_events_txn` if there
+ # are no rows
+ return [], to_key if to_key else from_key
+ # Or vice-versa, if we're looking backwards and our `from_key` is already before
+ # our `to_key`.
+ elif (
+ direction == Direction.BACKWARDS
+ and to_key is not None
+ and from_key.is_before_or_eq(to_key)
+ ):
+ # Token selection matches what we do in `_paginate_room_events_txn` if there
+ # are no rows
+ return [], to_key if to_key else from_key
+
rows, token = await self.db_pool.runInteraction(
"paginate_room_events",
self._paginate_room_events_txn,
diff --git a/synapse/storage/schema/main/delta/42/current_state_delta.sql b/synapse/storage/schema/main/delta/42/current_state_delta.sql
index 876b61e6a5..3d2fd69480 100644
--- a/synapse/storage/schema/main/delta/42/current_state_delta.sql
+++ b/synapse/storage/schema/main/delta/42/current_state_delta.sql
@@ -32,7 +32,10 @@
* limitations under the License.
*/
-
+-- Tracks what the server thinks is the current state of the room as time goes. It does
+-- not track how state progresses from the beginning of the room. So for example, when
+-- you remotely join a room, the first rows will just be the state when you joined and
+-- progress from there.
CREATE TABLE current_state_delta_stream (
stream_id BIGINT NOT NULL,
room_id TEXT NOT NULL,
diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py
index 8ab9f90238..b22a13ef01 100644
--- a/synapse/types/__init__.py
+++ b/synapse/types/__init__.py
@@ -1096,6 +1096,9 @@ class PersistedPosition:
stream: int
def persisted_after(self, token: AbstractMultiWriterStreamToken) -> bool:
+ """
+ Checks whether this position happened after the token
+ """
return token.get_stream_pos_for_instance(self.instance_name) < self.stream
diff --git a/synapse/types/handlers/__init__.py b/synapse/types/handlers/__init__.py
index 1d65551d5b..3cd3c8fb0f 100644
--- a/synapse/types/handlers/__init__.py
+++ b/synapse/types/handlers/__init__.py
@@ -31,9 +31,12 @@ else:
from pydantic import Extra
from synapse.events import EventBase
-from synapse.types import JsonMapping, StreamToken, UserID
+from synapse.types import JsonDict, JsonMapping, StreamToken, UserID
from synapse.types.rest.client import SlidingSyncBody
+if TYPE_CHECKING:
+ from synapse.handlers.relations import BundledAggregations
+
class ShutdownRoomParams(TypedDict):
"""
@@ -159,11 +162,16 @@ class SlidingSyncResult:
entirely and NOT send "initial":false as this is wasteful on bandwidth. The
absence of this flag means 'false'.
required_state: The current state of the room
- timeline: Latest events in the room. The last event is the most recent
+ timeline: Latest events in the room. The last event is the most recent.
+ bundled_aggregations: A mapping of event ID to the bundled aggregations for
+ the timeline events above. This allows clients to show accurate reaction
+ counts (or edits, threads), even if some of the reaction events were skipped
+ over in a gappy sync.
is_dm: Flag to specify whether the room is a direct-message room (most likely
between two people).
- invite_state: Stripped state events. Same as `rooms.invite.$room_id.invite_state`
- in sync v2, absent on joined/left rooms
+ stripped_state: Stripped state events (for rooms where the usre is
+ invited/knocked). Same as `rooms.invite.$room_id.invite_state` in sync v2,
+ absent on joined/left rooms
prev_batch: A token that can be passed as a start parameter to the
`/rooms/<room_id>/messages` API to retrieve earlier messages.
limited: True if their are more events than fit between the given position and now.
@@ -185,21 +193,28 @@ class SlidingSyncResult:
(with potentially other old events in the timeline).
"""
- name: str
+ name: Optional[str]
avatar: Optional[str]
heroes: Optional[List[EventBase]]
initial: bool
- required_state: List[EventBase]
- timeline: List[EventBase]
+ # Only optional because it won't be included for invite/knock rooms with `stripped_state`
+ required_state: Optional[List[EventBase]]
+ # Only optional because it won't be included for invite/knock rooms with `stripped_state`
+ timeline_events: Optional[List[EventBase]]
+ bundled_aggregations: Optional[Dict[str, "BundledAggregations"]]
is_dm: bool
- invite_state: List[EventBase]
- prev_batch: StreamToken
- limited: bool
+ # Optional because it's only relevant to invite/knock rooms
+ stripped_state: Optional[List[JsonDict]]
+ # Only optional because it won't be included for invite/knock rooms with `stripped_state`
+ prev_batch: Optional[StreamToken]
+ # Only optional because it won't be included for invite/knock rooms with `stripped_state`
+ limited: Optional[bool]
joined_count: int
invited_count: int
notification_count: int
highlight_count: int
- num_live: int
+ # Only optional because it won't be included for invite/knock rooms with `stripped_state`
+ num_live: Optional[int]
@attr.s(slots=True, frozen=True, auto_attribs=True)
class SlidingWindowList:
diff --git a/synapse/types/rest/client/__init__.py b/synapse/types/rest/client/__init__.py
index e2c79c4106..5d453769b5 100644
--- a/synapse/types/rest/client/__init__.py
+++ b/synapse/types/rest/client/__init__.py
@@ -152,22 +152,14 @@ class SlidingSyncBody(RequestBodyModel):
anyway.
timeline_limit: The maximum number of timeline events to return per response.
(Max 1000 messages)
- include_old_rooms: Determines if `predecessor` rooms are included in the
- `rooms` response. The user MUST be joined to old rooms for them to show up
- in the response.
"""
- class IncludeOldRooms(RequestBodyModel):
- timeline_limit: StrictInt
- required_state: List[Tuple[StrictStr, StrictStr]]
-
required_state: List[Tuple[StrictStr, StrictStr]]
# mypy workaround via https://github.com/pydantic/pydantic/issues/156#issuecomment-1130883884
if TYPE_CHECKING:
timeline_limit: int
else:
timeline_limit: conint(le=1000, strict=True) # type: ignore[valid-type]
- include_old_rooms: Optional[IncludeOldRooms] = None
class SlidingSyncList(CommonRoomParameters):
"""
@@ -208,9 +200,6 @@ class SlidingSyncBody(RequestBodyModel):
}
timeline_limit: The maximum number of timeline events to return per response.
- include_old_rooms: Determines if `predecessor` rooms are included in the
- `rooms` response. The user MUST be joined to old rooms for them to show up
- in the response.
include_heroes: Return a stripped variant of membership events (containing
`user_id` and optionally `avatar_url` and `displayname`) for the users used
to calculate the room name.
diff --git a/tests/handlers/test_sliding_sync.py b/tests/handlers/test_sliding_sync.py
index 8dd4521b18..3d37a696d5 100644
--- a/tests/handlers/test_sliding_sync.py
+++ b/tests/handlers/test_sliding_sync.py
@@ -63,6 +63,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
self.sliding_sync_handler = self.hs.get_sliding_sync_handler()
self.store = self.hs.get_datastores().main
self.event_sources = hs.get_event_sources()
+ self.storage_controllers = hs.get_storage_controllers()
def test_no_rooms(self) -> None:
"""
@@ -90,10 +91,13 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
before_room_token = self.event_sources.get_current_token()
- room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
+ room_id = self.helper.create_room_as(user2_id, tok=user2_tok)
+ join_response = self.helper.join(room_id, user1_id, tok=user1_tok)
after_room_token = self.event_sources.get_current_token()
@@ -106,6 +110,15 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
)
self.assertEqual(room_id_results.keys(), {room_id})
+ # It should be pointing to the join event (latest membership event in the
+ # from/to range)
+ self.assertEqual(
+ room_id_results[room_id].event_id,
+ join_response["event_id"],
+ )
+ # We should be considered `newly_joined` because we joined during the token
+ # range
+ self.assertEqual(room_id_results[room_id].newly_joined, True)
def test_get_already_joined_room(self) -> None:
"""
@@ -113,8 +126,11 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
- room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
+ room_id = self.helper.create_room_as(user2_id, tok=user2_tok)
+ join_response = self.helper.join(room_id, user1_id, tok=user1_tok)
after_room_token = self.event_sources.get_current_token()
@@ -127,6 +143,14 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
)
self.assertEqual(room_id_results.keys(), {room_id})
+ # It should be pointing to the join event (latest membership event in the
+ # from/to range)
+ self.assertEqual(
+ room_id_results[room_id].event_id,
+ join_response["event_id"],
+ )
+ # We should *NOT* be `newly_joined` because we joined before the token range
+ self.assertEqual(room_id_results[room_id].newly_joined, False)
def test_get_invited_banned_knocked_room(self) -> None:
"""
@@ -142,14 +166,18 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
# Setup the invited room (user2 invites user1 to the room)
invited_room_id = self.helper.create_room_as(user2_id, tok=user2_tok)
- self.helper.invite(invited_room_id, targ=user1_id, tok=user2_tok)
+ invite_response = self.helper.invite(
+ invited_room_id, targ=user1_id, tok=user2_tok
+ )
# Setup the ban room (user2 bans user1 from the room)
ban_room_id = self.helper.create_room_as(
user2_id, tok=user2_tok, is_public=True
)
self.helper.join(ban_room_id, user1_id, tok=user1_tok)
- self.helper.ban(ban_room_id, src=user2_id, targ=user1_id, tok=user2_tok)
+ ban_response = self.helper.ban(
+ ban_room_id, src=user2_id, targ=user1_id, tok=user2_tok
+ )
# Setup the knock room (user1 knocks on the room)
knock_room_id = self.helper.create_room_as(
@@ -162,13 +190,19 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
tok=user2_tok,
)
# User1 knocks on the room
- channel = self.make_request(
+ knock_channel = self.make_request(
"POST",
"/_matrix/client/r0/knock/%s" % (knock_room_id,),
b"{}",
user1_tok,
)
- self.assertEqual(channel.code, 200, channel.result)
+ self.assertEqual(knock_channel.code, 200, knock_channel.result)
+ knock_room_membership_state_event = self.get_success(
+ self.storage_controllers.state.get_current_state_event(
+ knock_room_id, EventTypes.Member, user1_id
+ )
+ )
+ assert knock_room_membership_state_event is not None
after_room_token = self.event_sources.get_current_token()
@@ -189,6 +223,25 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
knock_room_id,
},
)
+ # It should be pointing to the the respective membership event (latest
+ # membership event in the from/to range)
+ self.assertEqual(
+ room_id_results[invited_room_id].event_id,
+ invite_response["event_id"],
+ )
+ self.assertEqual(
+ room_id_results[ban_room_id].event_id,
+ ban_response["event_id"],
+ )
+ self.assertEqual(
+ room_id_results[knock_room_id].event_id,
+ knock_room_membership_state_event.event_id,
+ )
+ # We should *NOT* be `newly_joined` because we were not joined at the the time
+ # of the `to_token`.
+ self.assertEqual(room_id_results[invited_room_id].newly_joined, False)
+ self.assertEqual(room_id_results[ban_room_id].newly_joined, False)
+ self.assertEqual(room_id_results[knock_room_id].newly_joined, False)
def test_get_kicked_room(self) -> None:
"""
@@ -206,7 +259,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
)
self.helper.join(kick_room_id, user1_id, tok=user1_tok)
# Kick user1 from the room
- self.helper.change_membership(
+ kick_response = self.helper.change_membership(
room=kick_room_id,
src=user2_id,
targ=user1_id,
@@ -229,6 +282,14 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
# The kicked room should show up
self.assertEqual(room_id_results.keys(), {kick_room_id})
+ # It should be pointing to the latest membership event in the from/to range
+ self.assertEqual(
+ room_id_results[kick_room_id].event_id,
+ kick_response["event_id"],
+ )
+ # We should *NOT* be `newly_joined` because we were not joined at the the time
+ # of the `to_token`.
+ self.assertEqual(room_id_results[kick_room_id].newly_joined, False)
def test_forgotten_rooms(self) -> None:
"""
@@ -329,7 +390,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
# Leave during the from_token/to_token range (newly_left)
room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
- self.helper.leave(room_id2, user1_id, tok=user1_tok)
+ _leave_response2 = self.helper.leave(room_id2, user1_id, tok=user1_tok)
after_room2_token = self.event_sources.get_current_token()
@@ -343,6 +404,16 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
# Only the newly_left room should show up
self.assertEqual(room_id_results.keys(), {room_id2})
+ # It should be pointing to the latest membership event in the from/to range but
+ # the `event_id` is `None` because we left the room causing the server to leave
+ # the room because no other local users are in it (quirk of the
+ # `current_state_delta_stream` table that we source things from)
+ self.assertEqual(
+ room_id_results[room_id2].event_id,
+ None, # _leave_response2["event_id"],
+ )
+ # We should *NOT* be `newly_joined` because we are instead `newly_left`
+ self.assertEqual(room_id_results[room_id2].newly_joined, False)
def test_no_joins_after_to_token(self) -> None:
"""
@@ -351,16 +422,19 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
before_room1_token = self.event_sources.get_current_token()
- room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
+ room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+ join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok)
after_room1_token = self.event_sources.get_current_token()
- # Room join after after our `to_token` shouldn't show up
- room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
- _ = room_id2
+ # Room join after our `to_token` shouldn't show up
+ room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok)
+ self.helper.join(room_id2, user1_id, tok=user1_tok)
room_id_results = self.get_success(
self.sliding_sync_handler.get_sync_room_ids_for_user(
@@ -371,6 +445,13 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
)
self.assertEqual(room_id_results.keys(), {room_id1})
+ # It should be pointing to the latest membership event in the from/to range
+ self.assertEqual(
+ room_id_results[room_id1].event_id,
+ join_response1["event_id"],
+ )
+ # We should be `newly_joined` because we joined during the token range
+ self.assertEqual(room_id_results[room_id1].newly_joined, True)
def test_join_during_range_and_left_room_after_to_token(self) -> None:
"""
@@ -380,15 +461,18 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
before_room1_token = self.event_sources.get_current_token()
- room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
+ room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+ join_response = self.helper.join(room_id1, user1_id, tok=user1_tok)
after_room1_token = self.event_sources.get_current_token()
# Leave the room after we already have our tokens
- self.helper.leave(room_id1, user1_id, tok=user1_tok)
+ leave_response = self.helper.leave(room_id1, user1_id, tok=user1_tok)
room_id_results = self.get_success(
self.sliding_sync_handler.get_sync_room_ids_for_user(
@@ -401,6 +485,20 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
# We should still see the room because we were joined during the
# from_token/to_token time period.
self.assertEqual(room_id_results.keys(), {room_id1})
+ # It should be pointing to the latest membership event in the from/to range
+ self.assertEqual(
+ room_id_results[room_id1].event_id,
+ join_response["event_id"],
+ "Corresponding map to disambiguate the opaque event IDs: "
+ + str(
+ {
+ "join_response": join_response["event_id"],
+ "leave_response": leave_response["event_id"],
+ }
+ ),
+ )
+ # We should be `newly_joined` because we joined during the token range
+ self.assertEqual(room_id_results[room_id1].newly_joined, True)
def test_join_before_range_and_left_room_after_to_token(self) -> None:
"""
@@ -410,13 +508,16 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
- room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
+ room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+ join_response = self.helper.join(room_id1, user1_id, tok=user1_tok)
after_room1_token = self.event_sources.get_current_token()
# Leave the room after we already have our tokens
- self.helper.leave(room_id1, user1_id, tok=user1_tok)
+ leave_response = self.helper.leave(room_id1, user1_id, tok=user1_tok)
room_id_results = self.get_success(
self.sliding_sync_handler.get_sync_room_ids_for_user(
@@ -428,6 +529,20 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
# We should still see the room because we were joined before the `from_token`
self.assertEqual(room_id_results.keys(), {room_id1})
+ # It should be pointing to the latest membership event in the from/to range
+ self.assertEqual(
+ room_id_results[room_id1].event_id,
+ join_response["event_id"],
+ "Corresponding map to disambiguate the opaque event IDs: "
+ + str(
+ {
+ "join_response": join_response["event_id"],
+ "leave_response": leave_response["event_id"],
+ }
+ ),
+ )
+ # We should *NOT* be `newly_joined` because we joined before the token range
+ self.assertEqual(room_id_results[room_id1].newly_joined, False)
def test_kicked_before_range_and_left_after_to_token(self) -> None:
"""
@@ -444,9 +559,9 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
kick_room_id = self.helper.create_room_as(
user2_id, tok=user2_tok, is_public=True
)
- self.helper.join(kick_room_id, user1_id, tok=user1_tok)
+ join_response1 = self.helper.join(kick_room_id, user1_id, tok=user1_tok)
# Kick user1 from the room
- self.helper.change_membership(
+ kick_response = self.helper.change_membership(
room=kick_room_id,
src=user2_id,
targ=user1_id,
@@ -463,8 +578,8 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
#
# We have to join before we can leave (leave -> leave isn't a valid transition
# or at least it doesn't work in Synapse, 403 forbidden)
- self.helper.join(kick_room_id, user1_id, tok=user1_tok)
- self.helper.leave(kick_room_id, user1_id, tok=user1_tok)
+ join_response2 = self.helper.join(kick_room_id, user1_id, tok=user1_tok)
+ leave_response = self.helper.leave(kick_room_id, user1_id, tok=user1_tok)
room_id_results = self.get_success(
self.sliding_sync_handler.get_sync_room_ids_for_user(
@@ -476,6 +591,22 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
# We shouldn't see the room because it was forgotten
self.assertEqual(room_id_results.keys(), {kick_room_id})
+ # It should be pointing to the latest membership event in the from/to range
+ self.assertEqual(
+ room_id_results[kick_room_id].event_id,
+ kick_response["event_id"],
+ "Corresponding map to disambiguate the opaque event IDs: "
+ + str(
+ {
+ "join_response1": join_response1["event_id"],
+ "kick_response": kick_response["event_id"],
+ "join_response2": join_response2["event_id"],
+ "leave_response": leave_response["event_id"],
+ }
+ ),
+ )
+ # We should *NOT* be `newly_joined` because we were kicked
+ self.assertEqual(room_id_results[kick_room_id].newly_joined, False)
def test_newly_left_during_range_and_join_leave_after_to_token(self) -> None:
"""
@@ -494,14 +625,14 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
# leave and can still re-join.
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
# Join and leave the room during the from/to range
- self.helper.join(room_id1, user1_id, tok=user1_tok)
- self.helper.leave(room_id1, user1_id, tok=user1_tok)
+ join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok)
+ leave_response1 = self.helper.leave(room_id1, user1_id, tok=user1_tok)
after_room1_token = self.event_sources.get_current_token()
# Join and leave the room after we already have our tokens
- self.helper.join(room_id1, user1_id, tok=user1_tok)
- self.helper.leave(room_id1, user1_id, tok=user1_tok)
+ join_response2 = self.helper.join(room_id1, user1_id, tok=user1_tok)
+ leave_response2 = self.helper.leave(room_id1, user1_id, tok=user1_tok)
room_id_results = self.get_success(
self.sliding_sync_handler.get_sync_room_ids_for_user(
@@ -513,6 +644,22 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
# Room should still show up because it's newly_left during the from/to range
self.assertEqual(room_id_results.keys(), {room_id1})
+ # It should be pointing to the latest membership event in the from/to range
+ self.assertEqual(
+ room_id_results[room_id1].event_id,
+ leave_response1["event_id"],
+ "Corresponding map to disambiguate the opaque event IDs: "
+ + str(
+ {
+ "join_response1": join_response1["event_id"],
+ "leave_response1": leave_response1["event_id"],
+ "join_response2": join_response2["event_id"],
+ "leave_response2": leave_response2["event_id"],
+ }
+ ),
+ )
+ # We should *NOT* be `newly_joined` because we left during the token range
+ self.assertEqual(room_id_results[room_id1].newly_joined, False)
def test_newly_left_during_range_and_join_after_to_token(self) -> None:
"""
@@ -531,13 +678,13 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
# leave and can still re-join.
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
# Join and leave the room during the from/to range
- self.helper.join(room_id1, user1_id, tok=user1_tok)
- self.helper.leave(room_id1, user1_id, tok=user1_tok)
+ join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok)
+ leave_response1 = self.helper.leave(room_id1, user1_id, tok=user1_tok)
after_room1_token = self.event_sources.get_current_token()
# Join the room after we already have our tokens
- self.helper.join(room_id1, user1_id, tok=user1_tok)
+ join_response2 = self.helper.join(room_id1, user1_id, tok=user1_tok)
room_id_results = self.get_success(
self.sliding_sync_handler.get_sync_room_ids_for_user(
@@ -549,11 +696,26 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
# Room should still show up because it's newly_left during the from/to range
self.assertEqual(room_id_results.keys(), {room_id1})
+ # It should be pointing to the latest membership event in the from/to range
+ self.assertEqual(
+ room_id_results[room_id1].event_id,
+ leave_response1["event_id"],
+ "Corresponding map to disambiguate the opaque event IDs: "
+ + str(
+ {
+ "join_response1": join_response1["event_id"],
+ "leave_response1": leave_response1["event_id"],
+ "join_response2": join_response2["event_id"],
+ }
+ ),
+ )
+ # We should *NOT* be `newly_joined` because we left during the token range
+ self.assertEqual(room_id_results[room_id1].newly_joined, False)
def test_no_from_token(self) -> None:
"""
Test that if we don't provide a `from_token`, we get all the rooms that we we're
- joined to up to the `to_token`.
+ joined up to the `to_token`.
Providing `from_token` only really has the effect that it adds `newly_left`
rooms to the response.
@@ -569,7 +731,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
# Join room1
- self.helper.join(room_id1, user1_id, tok=user1_tok)
+ join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok)
# Join and leave the room2 before the `to_token`
self.helper.join(room_id2, user1_id, tok=user1_tok)
@@ -590,6 +752,14 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
# Only rooms we were joined to before the `to_token` should show up
self.assertEqual(room_id_results.keys(), {room_id1})
+ # It should be pointing to the latest membership event in the from/to range
+ self.assertEqual(
+ room_id_results[room_id1].event_id,
+ join_response1["event_id"],
+ )
+ # We should *NOT* be `newly_joined` because there is no `from_token` to
+ # define a "live" range to compare against
+ self.assertEqual(room_id_results[room_id1].newly_joined, False)
def test_from_token_ahead_of_to_token(self) -> None:
"""
@@ -609,7 +779,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
room_id4 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
# Join room1 before `before_room_token`
- self.helper.join(room_id1, user1_id, tok=user1_tok)
+ join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok)
# Join and leave the room2 before `before_room_token`
self.helper.join(room_id2, user1_id, tok=user1_tok)
@@ -651,6 +821,13 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
# There won't be any newly_left rooms because the `from_token` is ahead of the
# `to_token` and that range will give no membership changes to check.
self.assertEqual(room_id_results.keys(), {room_id1})
+ # It should be pointing to the latest membership event in the from/to range
+ self.assertEqual(
+ room_id_results[room_id1].event_id,
+ join_response1["event_id"],
+ )
+ # We should *NOT* be `newly_joined` because we joined `room1` before either of the tokens
+ self.assertEqual(room_id_results[room_id1].newly_joined, False)
def test_leave_before_range_and_join_leave_after_to_token(self) -> None:
"""
@@ -741,16 +918,16 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
# leave and can still re-join.
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
# Join, leave, join back to the room before the from/to range
- self.helper.join(room_id1, user1_id, tok=user1_tok)
- self.helper.leave(room_id1, user1_id, tok=user1_tok)
- self.helper.join(room_id1, user1_id, tok=user1_tok)
+ join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok)
+ leave_response1 = self.helper.leave(room_id1, user1_id, tok=user1_tok)
+ join_response2 = self.helper.join(room_id1, user1_id, tok=user1_tok)
after_room1_token = self.event_sources.get_current_token()
# Leave and Join the room multiple times after we already have our tokens
- self.helper.leave(room_id1, user1_id, tok=user1_tok)
- self.helper.join(room_id1, user1_id, tok=user1_tok)
- self.helper.leave(room_id1, user1_id, tok=user1_tok)
+ leave_response2 = self.helper.leave(room_id1, user1_id, tok=user1_tok)
+ join_response3 = self.helper.join(room_id1, user1_id, tok=user1_tok)
+ leave_response3 = self.helper.leave(room_id1, user1_id, tok=user1_tok)
room_id_results = self.get_success(
self.sliding_sync_handler.get_sync_room_ids_for_user(
@@ -762,6 +939,24 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
# Room should show up because it was newly_left and joined during the from/to range
self.assertEqual(room_id_results.keys(), {room_id1})
+ # It should be pointing to the latest membership event in the from/to range
+ self.assertEqual(
+ room_id_results[room_id1].event_id,
+ join_response2["event_id"],
+ "Corresponding map to disambiguate the opaque event IDs: "
+ + str(
+ {
+ "join_response1": join_response1["event_id"],
+ "leave_response1": leave_response1["event_id"],
+ "join_response2": join_response2["event_id"],
+ "leave_response2": leave_response2["event_id"],
+ "join_response3": join_response3["event_id"],
+ "leave_response3": leave_response3["event_id"],
+ }
+ ),
+ )
+ # We should be `newly_joined` because we joined during the token range
+ self.assertEqual(room_id_results[room_id1].newly_joined, True)
def test_join_leave_multiple_times_before_range_and_after_to_token(
self,
@@ -781,16 +976,16 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
# leave and can still re-join.
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
# Join, leave, join back to the room before the from/to range
- self.helper.join(room_id1, user1_id, tok=user1_tok)
- self.helper.leave(room_id1, user1_id, tok=user1_tok)
- self.helper.join(room_id1, user1_id, tok=user1_tok)
+ join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok)
+ leave_response1 = self.helper.leave(room_id1, user1_id, tok=user1_tok)
+ join_response2 = self.helper.join(room_id1, user1_id, tok=user1_tok)
after_room1_token = self.event_sources.get_current_token()
# Leave and Join the room multiple times after we already have our tokens
- self.helper.leave(room_id1, user1_id, tok=user1_tok)
- self.helper.join(room_id1, user1_id, tok=user1_tok)
- self.helper.leave(room_id1, user1_id, tok=user1_tok)
+ leave_response2 = self.helper.leave(room_id1, user1_id, tok=user1_tok)
+ join_response3 = self.helper.join(room_id1, user1_id, tok=user1_tok)
+ leave_response3 = self.helper.leave(room_id1, user1_id, tok=user1_tok)
room_id_results = self.get_success(
self.sliding_sync_handler.get_sync_room_ids_for_user(
@@ -802,6 +997,24 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
# Room should show up because we were joined before the from/to range
self.assertEqual(room_id_results.keys(), {room_id1})
+ # It should be pointing to the latest membership event in the from/to range
+ self.assertEqual(
+ room_id_results[room_id1].event_id,
+ join_response2["event_id"],
+ "Corresponding map to disambiguate the opaque event IDs: "
+ + str(
+ {
+ "join_response1": join_response1["event_id"],
+ "leave_response1": leave_response1["event_id"],
+ "join_response2": join_response2["event_id"],
+ "leave_response2": leave_response2["event_id"],
+ "join_response3": join_response3["event_id"],
+ "leave_response3": leave_response3["event_id"],
+ }
+ ),
+ )
+ # We should *NOT* be `newly_joined` because we joined before the token range
+ self.assertEqual(room_id_results[room_id1].newly_joined, False)
def test_invite_before_range_and_join_leave_after_to_token(
self,
@@ -821,24 +1034,495 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
# Invited to the room before the token
- self.helper.invite(room_id1, src=user2_id, targ=user1_id, tok=user2_tok)
+ invite_response = self.helper.invite(
+ room_id1, src=user2_id, targ=user1_id, tok=user2_tok
+ )
after_room1_token = self.event_sources.get_current_token()
# Join and leave the room after we already have our tokens
+ join_respsonse = self.helper.join(room_id1, user1_id, tok=user1_tok)
+ leave_response = self.helper.leave(room_id1, user1_id, tok=user1_tok)
+
+ room_id_results = self.get_success(
+ self.sliding_sync_handler.get_sync_room_ids_for_user(
+ UserID.from_string(user1_id),
+ from_token=after_room1_token,
+ to_token=after_room1_token,
+ )
+ )
+
+ # Room should show up because we were invited before the from/to range
+ self.assertEqual(room_id_results.keys(), {room_id1})
+ # It should be pointing to the latest membership event in the from/to range
+ self.assertEqual(
+ room_id_results[room_id1].event_id,
+ invite_response["event_id"],
+ "Corresponding map to disambiguate the opaque event IDs: "
+ + str(
+ {
+ "invite_response": invite_response["event_id"],
+ "join_respsonse": join_respsonse["event_id"],
+ "leave_response": leave_response["event_id"],
+ }
+ ),
+ )
+ # We should *NOT* be `newly_joined` because we were only invited before the
+ # token range
+ self.assertEqual(room_id_results[room_id1].newly_joined, False)
+
+ def test_join_and_display_name_changes_in_token_range(
+ self,
+ ) -> None:
+ """
+ Test that we point to the correct membership event within the from/to range even
+ if there are multiple `join` membership events in a row indicating
+ `displayname`/`avatar_url` updates.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
+
+ before_room1_token = self.event_sources.get_current_token()
+
+ # We create the room with user2 so the room isn't left with no members when we
+ # leave and can still re-join.
+ room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
+ join_response = self.helper.join(room_id1, user1_id, tok=user1_tok)
+ # Update the displayname during the token range
+ displayname_change_during_token_range_response = self.helper.send_state(
+ room_id1,
+ event_type=EventTypes.Member,
+ state_key=user1_id,
+ body={
+ "membership": Membership.JOIN,
+ "displayname": "displayname during token range",
+ },
+ tok=user1_tok,
+ )
+
+ after_room1_token = self.event_sources.get_current_token()
+
+ # Update the displayname after the token range
+ displayname_change_after_token_range_response = self.helper.send_state(
+ room_id1,
+ event_type=EventTypes.Member,
+ state_key=user1_id,
+ body={
+ "membership": Membership.JOIN,
+ "displayname": "displayname after token range",
+ },
+ tok=user1_tok,
+ )
+
+ room_id_results = self.get_success(
+ self.sliding_sync_handler.get_sync_room_ids_for_user(
+ UserID.from_string(user1_id),
+ from_token=before_room1_token,
+ to_token=after_room1_token,
+ )
+ )
+
+ # Room should show up because we were joined during the from/to range
+ self.assertEqual(room_id_results.keys(), {room_id1})
+ # It should be pointing to the latest membership event in the from/to range
+ self.assertEqual(
+ room_id_results[room_id1].event_id,
+ displayname_change_during_token_range_response["event_id"],
+ "Corresponding map to disambiguate the opaque event IDs: "
+ + str(
+ {
+ "join_response": join_response["event_id"],
+ "displayname_change_during_token_range_response": displayname_change_during_token_range_response[
+ "event_id"
+ ],
+ "displayname_change_after_token_range_response": displayname_change_after_token_range_response[
+ "event_id"
+ ],
+ }
+ ),
+ )
+ # We should be `newly_joined` because we joined during the token range
+ self.assertEqual(room_id_results[room_id1].newly_joined, True)
+
+ def test_display_name_changes_in_token_range(
+ self,
+ ) -> None:
+ """
+ Test that we point to the correct membership event within the from/to range even
+ if there is `displayname`/`avatar_url` updates.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
+
+ # We create the room with user2 so the room isn't left with no members when we
+ # leave and can still re-join.
+ room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
+ join_response = self.helper.join(room_id1, user1_id, tok=user1_tok)
+
+ after_room1_token = self.event_sources.get_current_token()
+
+ # Update the displayname during the token range
+ displayname_change_during_token_range_response = self.helper.send_state(
+ room_id1,
+ event_type=EventTypes.Member,
+ state_key=user1_id,
+ body={
+ "membership": Membership.JOIN,
+ "displayname": "displayname during token range",
+ },
+ tok=user1_tok,
+ )
+
+ after_change1_token = self.event_sources.get_current_token()
+
+ room_id_results = self.get_success(
+ self.sliding_sync_handler.get_sync_room_ids_for_user(
+ UserID.from_string(user1_id),
+ from_token=after_room1_token,
+ to_token=after_change1_token,
+ )
+ )
+
+ # Room should show up because we were joined during the from/to range
+ self.assertEqual(room_id_results.keys(), {room_id1})
+ # It should be pointing to the latest membership event in the from/to range
+ self.assertEqual(
+ room_id_results[room_id1].event_id,
+ displayname_change_during_token_range_response["event_id"],
+ "Corresponding map to disambiguate the opaque event IDs: "
+ + str(
+ {
+ "join_response": join_response["event_id"],
+ "displayname_change_during_token_range_response": displayname_change_during_token_range_response[
+ "event_id"
+ ],
+ }
+ ),
+ )
+ # We should *NOT* be `newly_joined` because we joined before the token range
+ self.assertEqual(room_id_results[room_id1].newly_joined, False)
+
+ def test_display_name_changes_before_and_after_token_range(
+ self,
+ ) -> None:
+ """
+ Test that we point to the correct membership event even though there are no
+ membership events in the from/range but there are `displayname`/`avatar_url`
+ changes before/after the token range.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
+
+ # We create the room with user2 so the room isn't left with no members when we
+ # leave and can still re-join.
+ room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
+ join_response = self.helper.join(room_id1, user1_id, tok=user1_tok)
+ # Update the displayname before the token range
+ displayname_change_before_token_range_response = self.helper.send_state(
+ room_id1,
+ event_type=EventTypes.Member,
+ state_key=user1_id,
+ body={
+ "membership": Membership.JOIN,
+ "displayname": "displayname during token range",
+ },
+ tok=user1_tok,
+ )
+
+ after_room1_token = self.event_sources.get_current_token()
+
+ # Update the displayname after the token range
+ displayname_change_after_token_range_response = self.helper.send_state(
+ room_id1,
+ event_type=EventTypes.Member,
+ state_key=user1_id,
+ body={
+ "membership": Membership.JOIN,
+ "displayname": "displayname after token range",
+ },
+ tok=user1_tok,
+ )
+
+ room_id_results = self.get_success(
+ self.sliding_sync_handler.get_sync_room_ids_for_user(
+ UserID.from_string(user1_id),
+ from_token=after_room1_token,
+ to_token=after_room1_token,
+ )
+ )
+
+ # Room should show up because we were joined before the from/to range
+ self.assertEqual(room_id_results.keys(), {room_id1})
+ # It should be pointing to the latest membership event in the from/to range
+ self.assertEqual(
+ room_id_results[room_id1].event_id,
+ displayname_change_before_token_range_response["event_id"],
+ "Corresponding map to disambiguate the opaque event IDs: "
+ + str(
+ {
+ "join_response": join_response["event_id"],
+ "displayname_change_before_token_range_response": displayname_change_before_token_range_response[
+ "event_id"
+ ],
+ "displayname_change_after_token_range_response": displayname_change_after_token_range_response[
+ "event_id"
+ ],
+ }
+ ),
+ )
+ # We should *NOT* be `newly_joined` because we joined before the token range
+ self.assertEqual(room_id_results[room_id1].newly_joined, False)
+
+ def test_display_name_changes_leave_after_token_range(
+ self,
+ ) -> None:
+ """
+ Test that we point to the correct membership event within the from/to range even
+ if there are multiple `join` membership events in a row indicating
+ `displayname`/`avatar_url` updates and we leave after the `to_token`.
+
+ See condition "1a)" comments in the `get_sync_room_ids_for_user()` method.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
+
+ before_room1_token = self.event_sources.get_current_token()
+
+ # We create the room with user2 so the room isn't left with no members when we
+ # leave and can still re-join.
+ room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
+ join_response = self.helper.join(room_id1, user1_id, tok=user1_tok)
+ # Update the displayname during the token range
+ displayname_change_during_token_range_response = self.helper.send_state(
+ room_id1,
+ event_type=EventTypes.Member,
+ state_key=user1_id,
+ body={
+ "membership": Membership.JOIN,
+ "displayname": "displayname during token range",
+ },
+ tok=user1_tok,
+ )
+
+ after_room1_token = self.event_sources.get_current_token()
+
+ # Update the displayname after the token range
+ displayname_change_after_token_range_response = self.helper.send_state(
+ room_id1,
+ event_type=EventTypes.Member,
+ state_key=user1_id,
+ body={
+ "membership": Membership.JOIN,
+ "displayname": "displayname after token range",
+ },
+ tok=user1_tok,
+ )
+
+ # Leave after the token
+ self.helper.leave(room_id1, user1_id, tok=user1_tok)
+
+ room_id_results = self.get_success(
+ self.sliding_sync_handler.get_sync_room_ids_for_user(
+ UserID.from_string(user1_id),
+ from_token=before_room1_token,
+ to_token=after_room1_token,
+ )
+ )
+
+ # Room should show up because we were joined during the from/to range
+ self.assertEqual(room_id_results.keys(), {room_id1})
+ # It should be pointing to the latest membership event in the from/to range
+ self.assertEqual(
+ room_id_results[room_id1].event_id,
+ displayname_change_during_token_range_response["event_id"],
+ "Corresponding map to disambiguate the opaque event IDs: "
+ + str(
+ {
+ "join_response": join_response["event_id"],
+ "displayname_change_during_token_range_response": displayname_change_during_token_range_response[
+ "event_id"
+ ],
+ "displayname_change_after_token_range_response": displayname_change_after_token_range_response[
+ "event_id"
+ ],
+ }
+ ),
+ )
+ # We should be `newly_joined` because we joined during the token range
+ self.assertEqual(room_id_results[room_id1].newly_joined, True)
+
+ def test_display_name_changes_join_after_token_range(
+ self,
+ ) -> None:
+ """
+ Test that multiple `join` membership events (after the `to_token`) in a row
+ indicating `displayname`/`avatar_url` updates doesn't affect the results (we
+ joined after the token range so it shouldn't show up)
+
+ See condition "1b)" comments in the `get_sync_room_ids_for_user()` method.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
+
+ before_room1_token = self.event_sources.get_current_token()
+
+ # We create the room with user2 so the room isn't left with no members when we
+ # leave and can still re-join.
+ room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
+
+ after_room1_token = self.event_sources.get_current_token()
+
+ self.helper.join(room_id1, user1_id, tok=user1_tok)
+ # Update the displayname after the token range
+ self.helper.send_state(
+ room_id1,
+ event_type=EventTypes.Member,
+ state_key=user1_id,
+ body={
+ "membership": Membership.JOIN,
+ "displayname": "displayname after token range",
+ },
+ tok=user1_tok,
+ )
+
+ room_id_results = self.get_success(
+ self.sliding_sync_handler.get_sync_room_ids_for_user(
+ UserID.from_string(user1_id),
+ from_token=before_room1_token,
+ to_token=after_room1_token,
+ )
+ )
+
+ # Room shouldn't show up because we joined after the from/to range
+ self.assertEqual(room_id_results.keys(), set())
+
+ def test_newly_joined_with_leave_join_in_token_range(
+ self,
+ ) -> None:
+ """
+ Test that even though we're joined before the token range, if we leave and join
+ within the token range, it's still counted as `newly_joined`.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
+
+ # We create the room with user2 so the room isn't left with no members when we
+ # leave and can still re-join.
+ room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
self.helper.join(room_id1, user1_id, tok=user1_tok)
+
+ after_room1_token = self.event_sources.get_current_token()
+
+ # Leave and join back during the token range
self.helper.leave(room_id1, user1_id, tok=user1_tok)
+ join_response2 = self.helper.join(room_id1, user1_id, tok=user1_tok)
+
+ after_more_changes_token = self.event_sources.get_current_token()
room_id_results = self.get_success(
self.sliding_sync_handler.get_sync_room_ids_for_user(
UserID.from_string(user1_id),
from_token=after_room1_token,
+ to_token=after_more_changes_token,
+ )
+ )
+
+ # Room should show up because we were joined during the from/to range
+ self.assertEqual(room_id_results.keys(), {room_id1})
+ # It should be pointing to the latest membership event in the from/to range
+ self.assertEqual(
+ room_id_results[room_id1].event_id,
+ join_response2["event_id"],
+ )
+ # We should be considered `newly_joined` because there is some non-join event in
+ # between our latest join event.
+ self.assertEqual(room_id_results[room_id1].newly_joined, True)
+
+ def test_newly_joined_only_joins_during_token_range(
+ self,
+ ) -> None:
+ """
+ Test that a join and more joins caused by display name changes, all during the
+ token range, still count as `newly_joined`.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
+
+ before_room1_token = self.event_sources.get_current_token()
+
+ # We create the room with user2 so the room isn't left with no members when we
+ # leave and can still re-join.
+ room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
+ # Join, leave, join back to the room before the from/to range
+ join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok)
+ # Update the displayname during the token range (looks like another join)
+ displayname_change_during_token_range_response1 = self.helper.send_state(
+ room_id1,
+ event_type=EventTypes.Member,
+ state_key=user1_id,
+ body={
+ "membership": Membership.JOIN,
+ "displayname": "displayname during token range",
+ },
+ tok=user1_tok,
+ )
+ # Update the displayname during the token range (looks like another join)
+ displayname_change_during_token_range_response2 = self.helper.send_state(
+ room_id1,
+ event_type=EventTypes.Member,
+ state_key=user1_id,
+ body={
+ "membership": Membership.JOIN,
+ "displayname": "displayname during token range",
+ },
+ tok=user1_tok,
+ )
+
+ after_room1_token = self.event_sources.get_current_token()
+
+ room_id_results = self.get_success(
+ self.sliding_sync_handler.get_sync_room_ids_for_user(
+ UserID.from_string(user1_id),
+ from_token=before_room1_token,
to_token=after_room1_token,
)
)
- # Room should show up because we were invited before the from/to range
+ # Room should show up because it was newly_left and joined during the from/to range
self.assertEqual(room_id_results.keys(), {room_id1})
+ # It should be pointing to the latest membership event in the from/to range
+ self.assertEqual(
+ room_id_results[room_id1].event_id,
+ displayname_change_during_token_range_response2["event_id"],
+ "Corresponding map to disambiguate the opaque event IDs: "
+ + str(
+ {
+ "join_response1": join_response1["event_id"],
+ "displayname_change_during_token_range_response1": displayname_change_during_token_range_response1[
+ "event_id"
+ ],
+ "displayname_change_during_token_range_response2": displayname_change_during_token_range_response2[
+ "event_id"
+ ],
+ }
+ ),
+ )
+ # We should be `newly_joined` because we first joined during the token range
+ self.assertEqual(room_id_results[room_id1].newly_joined, True)
def test_multiple_rooms_are_not_confused(
self,
diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py
index 12c11f342c..966c622e14 100644
--- a/tests/rest/client/test_sync.py
+++ b/tests/rest/client/test_sync.py
@@ -31,12 +31,13 @@ from synapse.api.constants import (
AccountDataTypes,
EventContentFields,
EventTypes,
+ HistoryVisibility,
ReceiptTypes,
RelationTypes,
)
from synapse.rest.client import devices, knock, login, read_marker, receipts, room, sync
from synapse.server import HomeServer
-from synapse.types import JsonDict, RoomStreamToken, StreamKeyType
+from synapse.types import JsonDict, RoomStreamToken, StreamKeyType, StreamToken, UserID
from synapse.util import Clock
from tests import unittest
@@ -1326,7 +1327,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
def test_sync_list(self) -> None:
"""
- Test that room IDs show up in the Sliding Sync lists
+ Test that room IDs show up in the Sliding Sync `lists`
"""
alice_user_id = self.register_user("alice", "correcthorse")
alice_access_token = self.login(alice_user_id, "correcthorse")
@@ -1425,15 +1426,13 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
channel.await_result(timeout_ms=200)
self.assertEqual(channel.code, 200, channel.json_body)
- # We expect the `next_pos` in the result to be the same as what we requested
+ # We expect the next `pos` in the result to be the same as what we requested
# with because we weren't able to find anything new yet.
- self.assertEqual(
- channel.json_body["next_pos"], future_position_token_serialized
- )
+ self.assertEqual(channel.json_body["pos"], future_position_token_serialized)
def test_filter_list(self) -> None:
"""
- Test that filters apply to lists
+ Test that filters apply to `lists`
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
@@ -1564,7 +1563,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
def test_sort_list(self) -> None:
"""
- Test that the lists are sorted by `stream_ordering`
+ Test that the `lists` are sorted by `stream_ordering`
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
@@ -1618,3 +1617,1067 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
],
channel.json_body["lists"]["foo-list"],
)
+
+ def test_sliced_windows(self) -> None:
+ """
+ Test that the `lists` `ranges` are sliced correctly. Both sides of each range
+ are inclusive.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+
+ _room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
+ room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
+ room_id3 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
+
+ # Make the Sliding Sync request for a single room
+ channel = self.make_request(
+ "POST",
+ self.sync_endpoint,
+ {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 0]],
+ "required_state": [
+ ["m.room.join_rules", ""],
+ ["m.room.history_visibility", ""],
+ ["m.space.child", "*"],
+ ],
+ "timeline_limit": 1,
+ }
+ }
+ },
+ access_token=user1_tok,
+ )
+ self.assertEqual(channel.code, 200, channel.json_body)
+
+ # Make sure it has the foo-list we requested
+ self.assertListEqual(
+ list(channel.json_body["lists"].keys()),
+ ["foo-list"],
+ channel.json_body["lists"].keys(),
+ )
+ # Make sure the list is sorted in the way we expect
+ self.assertListEqual(
+ list(channel.json_body["lists"]["foo-list"]["ops"]),
+ [
+ {
+ "op": "SYNC",
+ "range": [0, 0],
+ "room_ids": [room_id3],
+ }
+ ],
+ channel.json_body["lists"]["foo-list"],
+ )
+
+ # Make the Sliding Sync request for the first two rooms
+ channel = self.make_request(
+ "POST",
+ self.sync_endpoint,
+ {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 1]],
+ "required_state": [
+ ["m.room.join_rules", ""],
+ ["m.room.history_visibility", ""],
+ ["m.space.child", "*"],
+ ],
+ "timeline_limit": 1,
+ }
+ }
+ },
+ access_token=user1_tok,
+ )
+ self.assertEqual(channel.code, 200, channel.json_body)
+
+ # Make sure it has the foo-list we requested
+ self.assertListEqual(
+ list(channel.json_body["lists"].keys()),
+ ["foo-list"],
+ channel.json_body["lists"].keys(),
+ )
+ # Make sure the list is sorted in the way we expect
+ self.assertListEqual(
+ list(channel.json_body["lists"]["foo-list"]["ops"]),
+ [
+ {
+ "op": "SYNC",
+ "range": [0, 1],
+ "room_ids": [room_id3, room_id2],
+ }
+ ],
+ channel.json_body["lists"]["foo-list"],
+ )
+
+ def test_rooms_limited_initial_sync(self) -> None:
+ """
+ Test that we mark `rooms` as `limited=True` when we saturate the `timeline_limit`
+ on initial sync.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
+
+ room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+ self.helper.send(room_id1, "activity1", tok=user2_tok)
+ self.helper.send(room_id1, "activity2", tok=user2_tok)
+ event_response3 = self.helper.send(room_id1, "activity3", tok=user2_tok)
+ event_pos3 = self.get_success(
+ self.store.get_position_for_event(event_response3["event_id"])
+ )
+ event_response4 = self.helper.send(room_id1, "activity4", tok=user2_tok)
+ event_pos4 = self.get_success(
+ self.store.get_position_for_event(event_response4["event_id"])
+ )
+ event_response5 = self.helper.send(room_id1, "activity5", tok=user2_tok)
+ user1_join_response = self.helper.join(room_id1, user1_id, tok=user1_tok)
+
+ # Make the Sliding Sync request
+ channel = self.make_request(
+ "POST",
+ self.sync_endpoint,
+ {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 1]],
+ "required_state": [],
+ "timeline_limit": 3,
+ }
+ }
+ },
+ access_token=user1_tok,
+ )
+ self.assertEqual(channel.code, 200, channel.json_body)
+
+ # We expect to saturate the `timeline_limit` (there are more than 3 messages in the room)
+ self.assertEqual(
+ channel.json_body["rooms"][room_id1]["limited"],
+ True,
+ channel.json_body["rooms"][room_id1],
+ )
+ # Check to make sure the latest events are returned
+ self.assertEqual(
+ [
+ event["event_id"]
+ for event in channel.json_body["rooms"][room_id1]["timeline"]
+ ],
+ [
+ event_response4["event_id"],
+ event_response5["event_id"],
+ user1_join_response["event_id"],
+ ],
+ channel.json_body["rooms"][room_id1]["timeline"],
+ )
+
+ # Check to make sure the `prev_batch` points at the right place
+ prev_batch_token = self.get_success(
+ StreamToken.from_string(
+ self.store, channel.json_body["rooms"][room_id1]["prev_batch"]
+ )
+ )
+ prev_batch_room_stream_token_serialized = self.get_success(
+ prev_batch_token.room_key.to_string(self.store)
+ )
+ # If we use the `prev_batch` token to look backwards, we should see `event3`
+ # next so make sure the token encompasses it
+ self.assertEqual(
+ event_pos3.persisted_after(prev_batch_token.room_key),
+ False,
+ f"`prev_batch` token {prev_batch_room_stream_token_serialized} should be >= event_pos3={self.get_success(event_pos3.to_room_stream_token().to_string(self.store))}",
+ )
+ # If we use the `prev_batch` token to look backwards, we shouldn't see `event4`
+ # anymore since it was just returned in this response.
+ self.assertEqual(
+ event_pos4.persisted_after(prev_batch_token.room_key),
+ True,
+ f"`prev_batch` token {prev_batch_room_stream_token_serialized} should be < event_pos4={self.get_success(event_pos4.to_room_stream_token().to_string(self.store))}",
+ )
+
+ # With no `from_token` (initial sync), it's all historical since there is no
+ # "live" range
+ self.assertEqual(
+ channel.json_body["rooms"][room_id1]["num_live"],
+ 0,
+ channel.json_body["rooms"][room_id1],
+ )
+
+ def test_rooms_not_limited_initial_sync(self) -> None:
+ """
+ Test that we mark `rooms` as `limited=False` when there are no more events to
+ paginate to.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
+
+ room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+ self.helper.send(room_id1, "activity1", tok=user2_tok)
+ self.helper.send(room_id1, "activity2", tok=user2_tok)
+ self.helper.send(room_id1, "activity3", tok=user2_tok)
+ self.helper.join(room_id1, user1_id, tok=user1_tok)
+
+ # Make the Sliding Sync request
+ timeline_limit = 100
+ channel = self.make_request(
+ "POST",
+ self.sync_endpoint,
+ {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 1]],
+ "required_state": [],
+ "timeline_limit": timeline_limit,
+ }
+ }
+ },
+ access_token=user1_tok,
+ )
+ self.assertEqual(channel.code, 200, channel.json_body)
+
+ # The timeline should be `limited=False` because we have all of the events (no
+ # more to paginate to)
+ self.assertEqual(
+ channel.json_body["rooms"][room_id1]["limited"],
+ False,
+ channel.json_body["rooms"][room_id1],
+ )
+ expected_number_of_events = 9
+ # We're just looking to make sure we got all of the events before hitting the `timeline_limit`
+ self.assertEqual(
+ len(channel.json_body["rooms"][room_id1]["timeline"]),
+ expected_number_of_events,
+ channel.json_body["rooms"][room_id1]["timeline"],
+ )
+ self.assertLessEqual(expected_number_of_events, timeline_limit)
+
+ # With no `from_token` (initial sync), it's all historical since there is no
+ # "live" token range.
+ self.assertEqual(
+ channel.json_body["rooms"][room_id1]["num_live"],
+ 0,
+ channel.json_body["rooms"][room_id1],
+ )
+
+ def test_rooms_incremental_sync(self) -> None:
+ """
+ Test `rooms` data during an incremental sync after an initial sync.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
+
+ room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+ self.helper.join(room_id1, user1_id, tok=user1_tok)
+ self.helper.send(room_id1, "activity before initial sync1", tok=user2_tok)
+
+ # Make an initial Sliding Sync request to grab a token. This is also a sanity
+ # check that we can go from initial to incremental sync.
+ sync_params = {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 1]],
+ "required_state": [],
+ "timeline_limit": 3,
+ }
+ }
+ }
+ channel = self.make_request(
+ "POST",
+ self.sync_endpoint,
+ sync_params,
+ access_token=user1_tok,
+ )
+ self.assertEqual(channel.code, 200, channel.json_body)
+ next_pos = channel.json_body["pos"]
+
+ # Send some events but don't send enough to saturate the `timeline_limit`.
+ # We want to later test that we only get the new events since the `next_pos`
+ event_response2 = self.helper.send(room_id1, "activity after2", tok=user2_tok)
+ event_response3 = self.helper.send(room_id1, "activity after3", tok=user2_tok)
+
+ # Make an incremental Sliding Sync request (what we're trying to test)
+ channel = self.make_request(
+ "POST",
+ self.sync_endpoint + f"?pos={next_pos}",
+ sync_params,
+ access_token=user1_tok,
+ )
+ self.assertEqual(channel.code, 200, channel.json_body)
+
+ # We only expect to see the new events since the last sync which isn't enough to
+ # fill up the `timeline_limit`.
+ self.assertEqual(
+ channel.json_body["rooms"][room_id1]["limited"],
+ False,
+ f'Our `timeline_limit` was {sync_params["lists"]["foo-list"]["timeline_limit"]} '
+ + f'and {len(channel.json_body["rooms"][room_id1]["timeline"])} events were returned in the timeline. '
+ + str(channel.json_body["rooms"][room_id1]),
+ )
+ # Check to make sure the latest events are returned
+ self.assertEqual(
+ [
+ event["event_id"]
+ for event in channel.json_body["rooms"][room_id1]["timeline"]
+ ],
+ [
+ event_response2["event_id"],
+ event_response3["event_id"],
+ ],
+ channel.json_body["rooms"][room_id1]["timeline"],
+ )
+
+ # All events are "live"
+ self.assertEqual(
+ channel.json_body["rooms"][room_id1]["num_live"],
+ 2,
+ channel.json_body["rooms"][room_id1],
+ )
+
+ def test_rooms_newly_joined_incremental_sync(self) -> None:
+ """
+ Test that when we make an incremental sync with a `newly_joined` `rooms`, we are
+ able to see some historical events before the `from_token`.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
+
+ room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+ self.helper.send(room_id1, "activity before token1", tok=user2_tok)
+ event_response2 = self.helper.send(
+ room_id1, "activity before token2", tok=user2_tok
+ )
+
+ from_token = self.event_sources.get_current_token()
+
+ # Join the room after the `from_token` which will make us consider this room as
+ # `newly_joined`.
+ user1_join_response = self.helper.join(room_id1, user1_id, tok=user1_tok)
+
+ # Send some events but don't send enough to saturate the `timeline_limit`.
+ # We want to later test that we only get the new events since the `next_pos`
+ event_response3 = self.helper.send(
+ room_id1, "activity after token3", tok=user2_tok
+ )
+ event_response4 = self.helper.send(
+ room_id1, "activity after token4", tok=user2_tok
+ )
+
+ # The `timeline_limit` is set to 4 so we can at least see one historical event
+ # before the `from_token`. We should see historical events because this is a
+ # `newly_joined` room.
+ timeline_limit = 4
+ # Make an incremental Sliding Sync request (what we're trying to test)
+ channel = self.make_request(
+ "POST",
+ self.sync_endpoint
+ + f"?pos={self.get_success(from_token.to_string(self.store))}",
+ {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 1]],
+ "required_state": [],
+ "timeline_limit": timeline_limit,
+ }
+ }
+ },
+ access_token=user1_tok,
+ )
+ self.assertEqual(channel.code, 200, channel.json_body)
+
+ # We should see the new events and the rest should be filled with historical
+ # events which will make us `limited=True` since there are more to paginate to.
+ self.assertEqual(
+ channel.json_body["rooms"][room_id1]["limited"],
+ True,
+ f"Our `timeline_limit` was {timeline_limit} "
+ + f'and {len(channel.json_body["rooms"][room_id1]["timeline"])} events were returned in the timeline. '
+ + str(channel.json_body["rooms"][room_id1]),
+ )
+ # Check to make sure that the "live" and historical events are returned
+ self.assertEqual(
+ [
+ event["event_id"]
+ for event in channel.json_body["rooms"][room_id1]["timeline"]
+ ],
+ [
+ event_response2["event_id"],
+ user1_join_response["event_id"],
+ event_response3["event_id"],
+ event_response4["event_id"],
+ ],
+ channel.json_body["rooms"][room_id1]["timeline"],
+ )
+
+ # Only events after the `from_token` are "live" (join, event3, event4)
+ self.assertEqual(
+ channel.json_body["rooms"][room_id1]["num_live"],
+ 3,
+ channel.json_body["rooms"][room_id1],
+ )
+
+ def test_rooms_invite_shared_history_initial_sync(self) -> None:
+ """
+ Test that `rooms` we are invited to have some stripped `invite_state` during an
+ initial sync.
+
+ This is an `invite` room so we should only have `stripped_state` (no `timeline`)
+ but we also shouldn't see any timeline events because the history visiblity is
+ `shared` and we haven't joined the room yet.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+ user1 = UserID.from_string(user1_id)
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
+ user2 = UserID.from_string(user2_id)
+
+ room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+ # Ensure we're testing with a room with `shared` history visibility which means
+ # history visible until you actually join the room.
+ history_visibility_response = self.helper.get_state(
+ room_id1, EventTypes.RoomHistoryVisibility, tok=user2_tok
+ )
+ self.assertEqual(
+ history_visibility_response.get("history_visibility"),
+ HistoryVisibility.SHARED,
+ )
+
+ self.helper.send(room_id1, "activity before1", tok=user2_tok)
+ self.helper.send(room_id1, "activity before2", tok=user2_tok)
+ self.helper.invite(room_id1, src=user2_id, targ=user1_id, tok=user2_tok)
+ self.helper.send(room_id1, "activity after3", tok=user2_tok)
+ self.helper.send(room_id1, "activity after4", tok=user2_tok)
+
+ # Make the Sliding Sync request
+ channel = self.make_request(
+ "POST",
+ self.sync_endpoint,
+ {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 1]],
+ "required_state": [],
+ "timeline_limit": 3,
+ }
+ }
+ },
+ access_token=user1_tok,
+ )
+ self.assertEqual(channel.code, 200, channel.json_body)
+
+ # `timeline` is omitted for `invite` rooms with `stripped_state`
+ self.assertIsNone(
+ channel.json_body["rooms"][room_id1].get("timeline"),
+ channel.json_body["rooms"][room_id1],
+ )
+ # `num_live` is omitted for `invite` rooms with `stripped_state` (no timeline anyway)
+ self.assertIsNone(
+ channel.json_body["rooms"][room_id1].get("num_live"),
+ channel.json_body["rooms"][room_id1],
+ )
+ # `limited` is omitted for `invite` rooms with `stripped_state` (no timeline anyway)
+ self.assertIsNone(
+ channel.json_body["rooms"][room_id1].get("limited"),
+ channel.json_body["rooms"][room_id1],
+ )
+ # `prev_batch` is omitted for `invite` rooms with `stripped_state` (no timeline anyway)
+ self.assertIsNone(
+ channel.json_body["rooms"][room_id1].get("prev_batch"),
+ channel.json_body["rooms"][room_id1],
+ )
+ # We should have some `stripped_state` so the potential joiner can identify the
+ # room (we don't care about the order).
+ self.assertCountEqual(
+ channel.json_body["rooms"][room_id1]["invite_state"],
+ [
+ {
+ "content": {"creator": user2_id, "room_version": "10"},
+ "sender": user2_id,
+ "state_key": "",
+ "type": "m.room.create",
+ },
+ {
+ "content": {"join_rule": "public"},
+ "sender": user2_id,
+ "state_key": "",
+ "type": "m.room.join_rules",
+ },
+ {
+ "content": {"displayname": user2.localpart, "membership": "join"},
+ "sender": user2_id,
+ "state_key": user2_id,
+ "type": "m.room.member",
+ },
+ {
+ "content": {"displayname": user1.localpart, "membership": "invite"},
+ "sender": user2_id,
+ "state_key": user1_id,
+ "type": "m.room.member",
+ },
+ ],
+ channel.json_body["rooms"][room_id1]["invite_state"],
+ )
+
+ def test_rooms_invite_shared_history_incremental_sync(self) -> None:
+ """
+ Test that `rooms` we are invited to have some stripped `invite_state` during an
+ incremental sync.
+
+ This is an `invite` room so we should only have `stripped_state` (no `timeline`)
+ but we also shouldn't see any timeline events because the history visiblity is
+ `shared` and we haven't joined the room yet.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+ user1 = UserID.from_string(user1_id)
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
+ user2 = UserID.from_string(user2_id)
+
+ room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+ # Ensure we're testing with a room with `shared` history visibility which means
+ # history visible until you actually join the room.
+ history_visibility_response = self.helper.get_state(
+ room_id1, EventTypes.RoomHistoryVisibility, tok=user2_tok
+ )
+ self.assertEqual(
+ history_visibility_response.get("history_visibility"),
+ HistoryVisibility.SHARED,
+ )
+
+ self.helper.send(room_id1, "activity before invite1", tok=user2_tok)
+ self.helper.send(room_id1, "activity before invite2", tok=user2_tok)
+ self.helper.invite(room_id1, src=user2_id, targ=user1_id, tok=user2_tok)
+ self.helper.send(room_id1, "activity after invite3", tok=user2_tok)
+ self.helper.send(room_id1, "activity after invite4", tok=user2_tok)
+
+ from_token = self.event_sources.get_current_token()
+
+ self.helper.send(room_id1, "activity after token5", tok=user2_tok)
+ self.helper.send(room_id1, "activity after toekn6", tok=user2_tok)
+
+ # Make the Sliding Sync request
+ channel = self.make_request(
+ "POST",
+ self.sync_endpoint
+ + f"?pos={self.get_success(from_token.to_string(self.store))}",
+ {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 1]],
+ "required_state": [],
+ "timeline_limit": 3,
+ }
+ }
+ },
+ access_token=user1_tok,
+ )
+ self.assertEqual(channel.code, 200, channel.json_body)
+
+ # `timeline` is omitted for `invite` rooms with `stripped_state`
+ self.assertIsNone(
+ channel.json_body["rooms"][room_id1].get("timeline"),
+ channel.json_body["rooms"][room_id1],
+ )
+ # `num_live` is omitted for `invite` rooms with `stripped_state` (no timeline anyway)
+ self.assertIsNone(
+ channel.json_body["rooms"][room_id1].get("num_live"),
+ channel.json_body["rooms"][room_id1],
+ )
+ # `limited` is omitted for `invite` rooms with `stripped_state` (no timeline anyway)
+ self.assertIsNone(
+ channel.json_body["rooms"][room_id1].get("limited"),
+ channel.json_body["rooms"][room_id1],
+ )
+ # `prev_batch` is omitted for `invite` rooms with `stripped_state` (no timeline anyway)
+ self.assertIsNone(
+ channel.json_body["rooms"][room_id1].get("prev_batch"),
+ channel.json_body["rooms"][room_id1],
+ )
+ # We should have some `stripped_state` so the potential joiner can identify the
+ # room (we don't care about the order).
+ self.assertCountEqual(
+ channel.json_body["rooms"][room_id1]["invite_state"],
+ [
+ {
+ "content": {"creator": user2_id, "room_version": "10"},
+ "sender": user2_id,
+ "state_key": "",
+ "type": "m.room.create",
+ },
+ {
+ "content": {"join_rule": "public"},
+ "sender": user2_id,
+ "state_key": "",
+ "type": "m.room.join_rules",
+ },
+ {
+ "content": {"displayname": user2.localpart, "membership": "join"},
+ "sender": user2_id,
+ "state_key": user2_id,
+ "type": "m.room.member",
+ },
+ {
+ "content": {"displayname": user1.localpart, "membership": "invite"},
+ "sender": user2_id,
+ "state_key": user1_id,
+ "type": "m.room.member",
+ },
+ ],
+ channel.json_body["rooms"][room_id1]["invite_state"],
+ )
+
+ def test_rooms_invite_world_readable_history_initial_sync(self) -> None:
+ """
+ Test that `rooms` we are invited to have some stripped `invite_state` during an
+ initial sync.
+
+ This is an `invite` room so we should only have `stripped_state` (no `timeline`)
+ but depending on the semantics we decide, we could potentially see some
+ historical events before/after the `from_token` because the history is
+ `world_readable`. Same situation for events after the `from_token` if the
+ history visibility was set to `invited`.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+ user1 = UserID.from_string(user1_id)
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
+ user2 = UserID.from_string(user2_id)
+
+ room_id1 = self.helper.create_room_as(
+ user2_id,
+ tok=user2_tok,
+ extra_content={
+ "preset": "public_chat",
+ "initial_state": [
+ {
+ "content": {
+ "history_visibility": HistoryVisibility.WORLD_READABLE
+ },
+ "state_key": "",
+ "type": EventTypes.RoomHistoryVisibility,
+ }
+ ],
+ },
+ )
+ # Ensure we're testing with a room with `world_readable` history visibility
+ # which means events are visible to anyone even without membership.
+ history_visibility_response = self.helper.get_state(
+ room_id1, EventTypes.RoomHistoryVisibility, tok=user2_tok
+ )
+ self.assertEqual(
+ history_visibility_response.get("history_visibility"),
+ HistoryVisibility.WORLD_READABLE,
+ )
+
+ self.helper.send(room_id1, "activity before1", tok=user2_tok)
+ self.helper.send(room_id1, "activity before2", tok=user2_tok)
+ self.helper.invite(room_id1, src=user2_id, targ=user1_id, tok=user2_tok)
+ self.helper.send(room_id1, "activity after3", tok=user2_tok)
+ self.helper.send(room_id1, "activity after4", tok=user2_tok)
+
+ # Make the Sliding Sync request
+ channel = self.make_request(
+ "POST",
+ self.sync_endpoint,
+ {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 1]],
+ "required_state": [],
+ # Large enough to see the latest events and before the invite
+ "timeline_limit": 4,
+ }
+ }
+ },
+ access_token=user1_tok,
+ )
+ self.assertEqual(channel.code, 200, channel.json_body)
+
+ # `timeline` is omitted for `invite` rooms with `stripped_state`
+ self.assertIsNone(
+ channel.json_body["rooms"][room_id1].get("timeline"),
+ channel.json_body["rooms"][room_id1],
+ )
+ # `num_live` is omitted for `invite` rooms with `stripped_state` (no timeline anyway)
+ self.assertIsNone(
+ channel.json_body["rooms"][room_id1].get("num_live"),
+ channel.json_body["rooms"][room_id1],
+ )
+ # `limited` is omitted for `invite` rooms with `stripped_state` (no timeline anyway)
+ self.assertIsNone(
+ channel.json_body["rooms"][room_id1].get("limited"),
+ channel.json_body["rooms"][room_id1],
+ )
+ # `prev_batch` is omitted for `invite` rooms with `stripped_state` (no timeline anyway)
+ self.assertIsNone(
+ channel.json_body["rooms"][room_id1].get("prev_batch"),
+ channel.json_body["rooms"][room_id1],
+ )
+ # We should have some `stripped_state` so the potential joiner can identify the
+ # room (we don't care about the order).
+ self.assertCountEqual(
+ channel.json_body["rooms"][room_id1]["invite_state"],
+ [
+ {
+ "content": {"creator": user2_id, "room_version": "10"},
+ "sender": user2_id,
+ "state_key": "",
+ "type": "m.room.create",
+ },
+ {
+ "content": {"join_rule": "public"},
+ "sender": user2_id,
+ "state_key": "",
+ "type": "m.room.join_rules",
+ },
+ {
+ "content": {"displayname": user2.localpart, "membership": "join"},
+ "sender": user2_id,
+ "state_key": user2_id,
+ "type": "m.room.member",
+ },
+ {
+ "content": {"displayname": user1.localpart, "membership": "invite"},
+ "sender": user2_id,
+ "state_key": user1_id,
+ "type": "m.room.member",
+ },
+ ],
+ channel.json_body["rooms"][room_id1]["invite_state"],
+ )
+
+ def test_rooms_invite_world_readable_history_incremental_sync(self) -> None:
+ """
+ Test that `rooms` we are invited to have some stripped `invite_state` during an
+ incremental sync.
+
+ This is an `invite` room so we should only have `stripped_state` (no `timeline`)
+ but depending on the semantics we decide, we could potentially see some
+ historical events before/after the `from_token` because the history is
+ `world_readable`. Same situation for events after the `from_token` if the
+ history visibility was set to `invited`.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+ user1 = UserID.from_string(user1_id)
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
+ user2 = UserID.from_string(user2_id)
+
+ room_id1 = self.helper.create_room_as(
+ user2_id,
+ tok=user2_tok,
+ extra_content={
+ "preset": "public_chat",
+ "initial_state": [
+ {
+ "content": {
+ "history_visibility": HistoryVisibility.WORLD_READABLE
+ },
+ "state_key": "",
+ "type": EventTypes.RoomHistoryVisibility,
+ }
+ ],
+ },
+ )
+ # Ensure we're testing with a room with `world_readable` history visibility
+ # which means events are visible to anyone even without membership.
+ history_visibility_response = self.helper.get_state(
+ room_id1, EventTypes.RoomHistoryVisibility, tok=user2_tok
+ )
+ self.assertEqual(
+ history_visibility_response.get("history_visibility"),
+ HistoryVisibility.WORLD_READABLE,
+ )
+
+ self.helper.send(room_id1, "activity before invite1", tok=user2_tok)
+ self.helper.send(room_id1, "activity before invite2", tok=user2_tok)
+ self.helper.invite(room_id1, src=user2_id, targ=user1_id, tok=user2_tok)
+ self.helper.send(room_id1, "activity after invite3", tok=user2_tok)
+ self.helper.send(room_id1, "activity after invite4", tok=user2_tok)
+
+ from_token = self.event_sources.get_current_token()
+
+ self.helper.send(room_id1, "activity after token5", tok=user2_tok)
+ self.helper.send(room_id1, "activity after toekn6", tok=user2_tok)
+
+ # Make the Sliding Sync request
+ channel = self.make_request(
+ "POST",
+ self.sync_endpoint
+ + f"?pos={self.get_success(from_token.to_string(self.store))}",
+ {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 1]],
+ "required_state": [],
+ # Large enough to see the latest events and before the invite
+ "timeline_limit": 4,
+ }
+ }
+ },
+ access_token=user1_tok,
+ )
+ self.assertEqual(channel.code, 200, channel.json_body)
+
+ # `timeline` is omitted for `invite` rooms with `stripped_state`
+ self.assertIsNone(
+ channel.json_body["rooms"][room_id1].get("timeline"),
+ channel.json_body["rooms"][room_id1],
+ )
+ # `num_live` is omitted for `invite` rooms with `stripped_state` (no timeline anyway)
+ self.assertIsNone(
+ channel.json_body["rooms"][room_id1].get("num_live"),
+ channel.json_body["rooms"][room_id1],
+ )
+ # `limited` is omitted for `invite` rooms with `stripped_state` (no timeline anyway)
+ self.assertIsNone(
+ channel.json_body["rooms"][room_id1].get("limited"),
+ channel.json_body["rooms"][room_id1],
+ )
+ # `prev_batch` is omitted for `invite` rooms with `stripped_state` (no timeline anyway)
+ self.assertIsNone(
+ channel.json_body["rooms"][room_id1].get("prev_batch"),
+ channel.json_body["rooms"][room_id1],
+ )
+ # We should have some `stripped_state` so the potential joiner can identify the
+ # room (we don't care about the order).
+ self.assertCountEqual(
+ channel.json_body["rooms"][room_id1]["invite_state"],
+ [
+ {
+ "content": {"creator": user2_id, "room_version": "10"},
+ "sender": user2_id,
+ "state_key": "",
+ "type": "m.room.create",
+ },
+ {
+ "content": {"join_rule": "public"},
+ "sender": user2_id,
+ "state_key": "",
+ "type": "m.room.join_rules",
+ },
+ {
+ "content": {"displayname": user2.localpart, "membership": "join"},
+ "sender": user2_id,
+ "state_key": user2_id,
+ "type": "m.room.member",
+ },
+ {
+ "content": {"displayname": user1.localpart, "membership": "invite"},
+ "sender": user2_id,
+ "state_key": user1_id,
+ "type": "m.room.member",
+ },
+ ],
+ channel.json_body["rooms"][room_id1]["invite_state"],
+ )
+
+ def test_rooms_ban_initial_sync(self) -> None:
+ """
+ Test that `rooms` we are banned from in an intial sync only allows us to see
+ timeline events up to the ban event.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
+
+ room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+ self.helper.send(room_id1, "activity before1", tok=user2_tok)
+ self.helper.send(room_id1, "activity before2", tok=user2_tok)
+ self.helper.join(room_id1, user1_id, tok=user1_tok)
+
+ event_response3 = self.helper.send(room_id1, "activity after3", tok=user2_tok)
+ event_response4 = self.helper.send(room_id1, "activity after4", tok=user2_tok)
+ user1_ban_response = self.helper.ban(
+ room_id1, src=user2_id, targ=user1_id, tok=user2_tok
+ )
+
+ self.helper.send(room_id1, "activity after5", tok=user2_tok)
+ self.helper.send(room_id1, "activity after6", tok=user2_tok)
+
+ # Make the Sliding Sync request
+ channel = self.make_request(
+ "POST",
+ self.sync_endpoint,
+ {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 1]],
+ "required_state": [],
+ "timeline_limit": 3,
+ }
+ }
+ },
+ access_token=user1_tok,
+ )
+ self.assertEqual(channel.code, 200, channel.json_body)
+
+ # We should see events before the ban but not after
+ self.assertEqual(
+ [
+ event["event_id"]
+ for event in channel.json_body["rooms"][room_id1]["timeline"]
+ ],
+ [
+ event_response3["event_id"],
+ event_response4["event_id"],
+ user1_ban_response["event_id"],
+ ],
+ channel.json_body["rooms"][room_id1]["timeline"],
+ )
+ # No "live" events in an initial sync (no `from_token` to define the "live"
+ # range)
+ self.assertEqual(
+ channel.json_body["rooms"][room_id1]["num_live"],
+ 0,
+ channel.json_body["rooms"][room_id1],
+ )
+ # There are more events to paginate to
+ self.assertEqual(
+ channel.json_body["rooms"][room_id1]["limited"],
+ True,
+ channel.json_body["rooms"][room_id1],
+ )
+
+ def test_rooms_ban_incremental_sync1(self) -> None:
+ """
+ Test that `rooms` we are banned from during the next incremental sync only
+ allows us to see timeline events up to the ban event.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
+
+ room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+ self.helper.send(room_id1, "activity before1", tok=user2_tok)
+ self.helper.send(room_id1, "activity before2", tok=user2_tok)
+ self.helper.join(room_id1, user1_id, tok=user1_tok)
+
+ from_token = self.event_sources.get_current_token()
+
+ event_response3 = self.helper.send(room_id1, "activity after3", tok=user2_tok)
+ event_response4 = self.helper.send(room_id1, "activity after4", tok=user2_tok)
+ # The ban is within the token range (between the `from_token` and the sliding
+ # sync request)
+ user1_ban_response = self.helper.ban(
+ room_id1, src=user2_id, targ=user1_id, tok=user2_tok
+ )
+
+ self.helper.send(room_id1, "activity after5", tok=user2_tok)
+ self.helper.send(room_id1, "activity after6", tok=user2_tok)
+
+ # Make the Sliding Sync request
+ channel = self.make_request(
+ "POST",
+ self.sync_endpoint
+ + f"?pos={self.get_success(from_token.to_string(self.store))}",
+ {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 1]],
+ "required_state": [],
+ "timeline_limit": 4,
+ }
+ }
+ },
+ access_token=user1_tok,
+ )
+ self.assertEqual(channel.code, 200, channel.json_body)
+
+ # We should see events before the ban but not after
+ self.assertEqual(
+ [
+ event["event_id"]
+ for event in channel.json_body["rooms"][room_id1]["timeline"]
+ ],
+ [
+ event_response3["event_id"],
+ event_response4["event_id"],
+ user1_ban_response["event_id"],
+ ],
+ channel.json_body["rooms"][room_id1]["timeline"],
+ )
+ # All live events in the incremental sync
+ self.assertEqual(
+ channel.json_body["rooms"][room_id1]["num_live"],
+ 3,
+ channel.json_body["rooms"][room_id1],
+ )
+ # There aren't anymore events to paginate to in this range
+ self.assertEqual(
+ channel.json_body["rooms"][room_id1]["limited"],
+ False,
+ channel.json_body["rooms"][room_id1],
+ )
+
+ def test_rooms_ban_incremental_sync2(self) -> None:
+ """
+ Test that `rooms` we are banned from before the incremental sync don't return
+ any events in the timeline.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
+
+ room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+ self.helper.send(room_id1, "activity before1", tok=user2_tok)
+ self.helper.join(room_id1, user1_id, tok=user1_tok)
+
+ self.helper.send(room_id1, "activity after2", tok=user2_tok)
+ # The ban is before we get our `from_token`
+ self.helper.ban(room_id1, src=user2_id, targ=user1_id, tok=user2_tok)
+
+ self.helper.send(room_id1, "activity after3", tok=user2_tok)
+
+ from_token = self.event_sources.get_current_token()
+
+ self.helper.send(room_id1, "activity after4", tok=user2_tok)
+
+ # Make the Sliding Sync request
+ channel = self.make_request(
+ "POST",
+ self.sync_endpoint
+ + f"?pos={self.get_success(from_token.to_string(self.store))}",
+ {
+ "lists": {
+ "foo-list": {
+ "ranges": [[0, 1]],
+ "required_state": [],
+ "timeline_limit": 4,
+ }
+ }
+ },
+ access_token=user1_tok,
+ )
+ self.assertEqual(channel.code, 200, channel.json_body)
+
+ # Nothing to see for this banned user in the room in the token range
+ self.assertEqual(
+ channel.json_body["rooms"][room_id1]["timeline"],
+ [],
+ channel.json_body["rooms"][room_id1]["timeline"],
+ )
+ # No events returned in the timeline so nothing is "live"
+ self.assertEqual(
+ channel.json_body["rooms"][room_id1]["num_live"],
+ 0,
+ channel.json_body["rooms"][room_id1],
+ )
+ # There aren't anymore events to paginate to in this range
+ self.assertEqual(
+ channel.json_body["rooms"][room_id1]["limited"],
+ False,
+ channel.json_body["rooms"][room_id1],
+ )
diff --git a/tests/rest/client/utils.py b/tests/rest/client/utils.py
index f0ba40a1f1..e43140720d 100644
--- a/tests/rest/client/utils.py
+++ b/tests/rest/client/utils.py
@@ -261,9 +261,9 @@ class RestHelper:
targ: str,
expect_code: int = HTTPStatus.OK,
tok: Optional[str] = None,
- ) -> None:
+ ) -> JsonDict:
"""A convenience helper: `change_membership` with `membership` preset to "ban"."""
- self.change_membership(
+ return self.change_membership(
room=room,
src=src,
targ=targ,
diff --git a/tests/storage/test_stream.py b/tests/storage/test_stream.py
index fe1e873e15..aad46b1b44 100644
--- a/tests/storage/test_stream.py
+++ b/tests/storage/test_stream.py
@@ -21,20 +21,32 @@
import logging
from typing import List, Tuple
+from unittest.mock import AsyncMock, patch
from immutabledict import immutabledict
from twisted.test.proto_helpers import MemoryReactor
-from synapse.api.constants import Direction, EventTypes, RelationTypes
+from synapse.api.constants import Direction, EventTypes, Membership, RelationTypes
from synapse.api.filtering import Filter
+from synapse.crypto.event_signing import add_hashes_and_signatures
+from synapse.events import FrozenEventV3
+from synapse.federation.federation_client import SendJoinResult
from synapse.rest import admin
from synapse.rest.client import login, room
from synapse.server import HomeServer
-from synapse.types import JsonDict, PersistedEventPosition, RoomStreamToken
+from synapse.storage.databases.main.stream import CurrentStateDeltaMembership
+from synapse.types import (
+ JsonDict,
+ PersistedEventPosition,
+ RoomStreamToken,
+ UserID,
+ create_requester,
+)
from synapse.util import Clock
-from tests.unittest import HomeserverTestCase
+from tests.test_utils.event_injection import create_event
+from tests.unittest import FederatingHomeserverTestCase, HomeserverTestCase
logger = logging.getLogger(__name__)
@@ -543,3 +555,859 @@ class GetLastEventInRoomBeforeStreamOrderingTestCase(HomeserverTestCase):
}
),
)
+
+
+class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
+ """
+ Test `get_current_state_delta_membership_changes_for_user(...)`
+ """
+
+ servlets = [
+ admin.register_servlets,
+ room.register_servlets,
+ login.register_servlets,
+ ]
+
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+ self.store = hs.get_datastores().main
+ self.event_sources = hs.get_event_sources()
+ self.state_handler = self.hs.get_state_handler()
+ persistence = hs.get_storage_controllers().persistence
+ assert persistence is not None
+ self.persistence = persistence
+
+ def test_returns_membership_events(self) -> None:
+ """
+ A basic test that a membership event in the token range is returned for the user.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
+
+ before_room1_token = self.event_sources.get_current_token()
+
+ room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+ join_response = self.helper.join(room_id1, user1_id, tok=user1_tok)
+ join_pos = self.get_success(
+ self.store.get_position_for_event(join_response["event_id"])
+ )
+
+ after_room1_token = self.event_sources.get_current_token()
+
+ membership_changes = self.get_success(
+ self.store.get_current_state_delta_membership_changes_for_user(
+ user1_id,
+ from_key=before_room1_token.room_key,
+ to_key=after_room1_token.room_key,
+ )
+ )
+
+ # Let the whole diff show on failure
+ self.maxDiff = None
+ self.assertEqual(
+ membership_changes,
+ [
+ CurrentStateDeltaMembership(
+ room_id=room_id1,
+ event_id=join_response["event_id"],
+ event_pos=join_pos,
+ membership="join",
+ sender=user1_id,
+ prev_event_id=None,
+ prev_event_pos=None,
+ prev_membership=None,
+ prev_sender=None,
+ )
+ ],
+ )
+
+ def test_server_left_room_after_us(self) -> None:
+ """
+ Test that when probing over part of the DAG where the server left the room *after
+ us*, we still see the join and leave changes.
+
+ This is to make sure we play nicely with this behavior: When the server leaves a
+ room, it will insert new rows with `event_id = null` into the
+ `current_state_delta_stream` table for all current state.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
+
+ before_room1_token = self.event_sources.get_current_token()
+
+ room_id1 = self.helper.create_room_as(
+ user2_id,
+ tok=user2_tok,
+ extra_content={
+ "power_level_content_override": {
+ "users": {
+ user2_id: 100,
+ # Allow user1 to send state in the room
+ user1_id: 100,
+ }
+ }
+ },
+ )
+ join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok)
+ join_pos1 = self.get_success(
+ self.store.get_position_for_event(join_response1["event_id"])
+ )
+ # Make sure that random other non-member state that happens to have a `state_key`
+ # matching the user ID doesn't mess with things.
+ self.helper.send_state(
+ room_id1,
+ event_type="foobarbazdummy",
+ state_key=user1_id,
+ body={"foo": "bar"},
+ tok=user1_tok,
+ )
+ # User1 should leave the room first
+ leave_response1 = self.helper.leave(room_id1, user1_id, tok=user1_tok)
+ leave_pos1 = self.get_success(
+ self.store.get_position_for_event(leave_response1["event_id"])
+ )
+
+ # User2 should also leave the room (everyone has left the room which means the
+ # server is no longer in the room).
+ self.helper.leave(room_id1, user2_id, tok=user2_tok)
+
+ after_room1_token = self.event_sources.get_current_token()
+
+ # Get the membership changes for the user.
+ #
+ # At this point, the `current_state_delta_stream` table should look like the
+ # following. When the server leaves a room, it will insert new rows with
+ # `event_id = null` for all current state.
+ #
+ # | stream_id | room_id | type | state_key | event_id | prev_event_id |
+ # |-----------|----------|-----------------------------|----------------|----------|---------------|
+ # | 2 | !x:test | 'm.room.create' | '' | $xxx | None |
+ # | 3 | !x:test | 'm.room.member' | '@user2:test' | $aaa | None |
+ # | 4 | !x:test | 'm.room.history_visibility' | '' | $xxx | None |
+ # | 4 | !x:test | 'm.room.join_rules' | '' | $xxx | None |
+ # | 4 | !x:test | 'm.room.power_levels' | '' | $xxx | None |
+ # | 7 | !x:test | 'm.room.member' | '@user1:test' | $ooo | None |
+ # | 8 | !x:test | 'foobarbazdummy' | '@user1:test' | $xxx | None |
+ # | 9 | !x:test | 'm.room.member' | '@user1:test' | $ppp | $ooo |
+ # | 10 | !x:test | 'foobarbazdummy' | '@user1:test' | None | $xxx |
+ # | 10 | !x:test | 'm.room.create' | '' | None | $xxx |
+ # | 10 | !x:test | 'm.room.history_visibility' | '' | None | $xxx |
+ # | 10 | !x:test | 'm.room.join_rules' | '' | None | $xxx |
+ # | 10 | !x:test | 'm.room.member' | '@user1:test' | None | $ppp |
+ # | 10 | !x:test | 'm.room.member' | '@user2:test' | None | $aaa |
+ # | 10 | !x:test | 'm.room.power_levels' | | None | $xxx |
+ membership_changes = self.get_success(
+ self.store.get_current_state_delta_membership_changes_for_user(
+ user1_id,
+ from_key=before_room1_token.room_key,
+ to_key=after_room1_token.room_key,
+ )
+ )
+
+ # Let the whole diff show on failure
+ self.maxDiff = None
+ self.assertEqual(
+ membership_changes,
+ [
+ CurrentStateDeltaMembership(
+ room_id=room_id1,
+ event_id=join_response1["event_id"],
+ event_pos=join_pos1,
+ membership="join",
+ sender=user1_id,
+ prev_event_id=None,
+ prev_event_pos=None,
+ prev_membership=None,
+ prev_sender=None,
+ ),
+ CurrentStateDeltaMembership(
+ room_id=room_id1,
+ event_id=leave_response1["event_id"],
+ event_pos=leave_pos1,
+ membership="leave",
+ sender=user1_id,
+ prev_event_id=join_response1["event_id"],
+ prev_event_pos=join_pos1,
+ prev_membership="join",
+ prev_sender=user1_id,
+ ),
+ ],
+ )
+
+ def test_server_left_room_after_us_later(self) -> None:
+ """
+ Test when the user leaves the room, then sometime later, everyone else leaves
+ the room, causing the server to leave the room, we shouldn't see any membership
+ changes.
+
+ This is to make sure we play nicely with this behavior: When the server leaves a
+ room, it will insert new rows with `event_id = null` into the
+ `current_state_delta_stream` table for all current state.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
+
+ room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+ self.helper.join(room_id1, user1_id, tok=user1_tok)
+ # User1 should leave the room first
+ self.helper.leave(room_id1, user1_id, tok=user1_tok)
+
+ after_user1_leave_token = self.event_sources.get_current_token()
+
+ # User2 should also leave the room (everyone has left the room which means the
+ # server is no longer in the room).
+ self.helper.leave(room_id1, user2_id, tok=user2_tok)
+
+ after_server_leave_token = self.event_sources.get_current_token()
+
+ # Join another room as user1 just to advance the stream_ordering and bust
+ # `_membership_stream_cache`
+ room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok)
+ self.helper.join(room_id2, user1_id, tok=user1_tok)
+
+ # Get the membership changes for the user.
+ #
+ # At this point, the `current_state_delta_stream` table should look like the
+ # following. When the server leaves a room, it will insert new rows with
+ # `event_id = null` for all current state.
+ #
+ # TODO: Add DB rows to better see what's going on.
+ membership_changes = self.get_success(
+ self.store.get_current_state_delta_membership_changes_for_user(
+ user1_id,
+ from_key=after_user1_leave_token.room_key,
+ to_key=after_server_leave_token.room_key,
+ )
+ )
+
+ # Let the whole diff show on failure
+ self.maxDiff = None
+ self.assertEqual(
+ membership_changes,
+ [],
+ )
+
+ def test_we_cause_server_left_room(self) -> None:
+ """
+ Test that when probing over part of the DAG where the user leaves the room
+ causing the server to leave the room (because we were the last local user in the
+ room), we still see the join and leave changes.
+
+ This is to make sure we play nicely with this behavior: When the server leaves a
+ room, it will insert new rows with `event_id = null` into the
+ `current_state_delta_stream` table for all current state.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
+
+ before_room1_token = self.event_sources.get_current_token()
+
+ room_id1 = self.helper.create_room_as(
+ user2_id,
+ tok=user2_tok,
+ extra_content={
+ "power_level_content_override": {
+ "users": {
+ user2_id: 100,
+ # Allow user1 to send state in the room
+ user1_id: 100,
+ }
+ }
+ },
+ )
+ join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok)
+ join_pos1 = self.get_success(
+ self.store.get_position_for_event(join_response1["event_id"])
+ )
+ # Make sure that random other non-member state that happens to have a `state_key`
+ # matching the user ID doesn't mess with things.
+ self.helper.send_state(
+ room_id1,
+ event_type="foobarbazdummy",
+ state_key=user1_id,
+ body={"foo": "bar"},
+ tok=user1_tok,
+ )
+
+ # User2 should leave the room first.
+ self.helper.leave(room_id1, user2_id, tok=user2_tok)
+
+ # User1 (the person we're testing with) should also leave the room (everyone has
+ # left the room which means the server is no longer in the room).
+ leave_response1 = self.helper.leave(room_id1, user1_id, tok=user1_tok)
+ leave_pos1 = self.get_success(
+ self.store.get_position_for_event(leave_response1["event_id"])
+ )
+
+ after_room1_token = self.event_sources.get_current_token()
+
+ # Get the membership changes for the user.
+ #
+ # At this point, the `current_state_delta_stream` table should look like the
+ # following. When the server leaves a room, it will insert new rows with
+ # `event_id = null` for all current state.
+ #
+ # | stream_id | room_id | type | state_key | event_id | prev_event_id |
+ # |-----------|-----------|-----------------------------|---------------|----------|---------------|
+ # | 2 | '!x:test' | 'm.room.create' | '' | '$xxx' | None |
+ # | 3 | '!x:test' | 'm.room.member' | '@user2:test' | '$aaa' | None |
+ # | 4 | '!x:test' | 'm.room.history_visibility' | '' | '$xxx' | None |
+ # | 4 | '!x:test' | 'm.room.join_rules' | '' | '$xxx' | None |
+ # | 4 | '!x:test' | 'm.room.power_levels' | '' | '$xxx' | None |
+ # | 7 | '!x:test' | 'm.room.member' | '@user1:test' | '$ooo' | None |
+ # | 8 | '!x:test' | 'foobarbazdummy' | '@user1:test' | '$xxx' | None |
+ # | 9 | '!x:test' | 'm.room.member' | '@user2:test' | '$bbb' | '$aaa' |
+ # | 10 | '!x:test' | 'foobarbazdummy' | '@user1:test' | None | '$xxx' |
+ # | 10 | '!x:test' | 'm.room.create' | '' | None | '$xxx' |
+ # | 10 | '!x:test' | 'm.room.history_visibility' | '' | None | '$xxx' |
+ # | 10 | '!x:test' | 'm.room.join_rules' | '' | None | '$xxx' |
+ # | 10 | '!x:test' | 'm.room.member' | '@user1:test' | None | '$ooo' |
+ # | 10 | '!x:test' | 'm.room.member' | '@user2:test' | None | '$bbb' |
+ # | 10 | '!x:test' | 'm.room.power_levels' | '' | None | '$xxx' |
+ membership_changes = self.get_success(
+ self.store.get_current_state_delta_membership_changes_for_user(
+ user1_id,
+ from_key=before_room1_token.room_key,
+ to_key=after_room1_token.room_key,
+ )
+ )
+
+ # Let the whole diff show on failure
+ self.maxDiff = None
+ self.assertEqual(
+ membership_changes,
+ [
+ CurrentStateDeltaMembership(
+ room_id=room_id1,
+ event_id=join_response1["event_id"],
+ event_pos=join_pos1,
+ membership="join",
+ sender=user1_id,
+ prev_event_id=None,
+ prev_event_pos=None,
+ prev_membership=None,
+ prev_sender=None,
+ ),
+ CurrentStateDeltaMembership(
+ room_id=room_id1,
+ event_id=None, # leave_response1["event_id"],
+ event_pos=leave_pos1,
+ membership="leave",
+ sender=None, # user1_id,
+ prev_event_id=join_response1["event_id"],
+ prev_event_pos=join_pos1,
+ prev_membership="join",
+ prev_sender=user1_id,
+ ),
+ ],
+ )
+
+ def test_different_user_membership_persisted_in_same_batch(self) -> None:
+ """
+ Test batch of membership events from different users being processed at once.
+ This will result in all of the memberships being stored in the
+ `current_state_delta_stream` table with the same `stream_ordering` even though
+ the individual events have different `stream_ordering`s.
+ """
+ user1_id = self.register_user("user1", "pass")
+ _user1_tok = self.login(user1_id, "pass")
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
+ user3_id = self.register_user("user3", "pass")
+ _user3_tok = self.login(user3_id, "pass")
+ user4_id = self.register_user("user4", "pass")
+ _user4_tok = self.login(user4_id, "pass")
+
+ before_room1_token = self.event_sources.get_current_token()
+
+ # User2 is just the designated person to create the room (we do this across the
+ # tests to be consistent)
+ room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+
+ # Persist the user1, user3, and user4 join events in the same batch so they all
+ # end up in the `current_state_delta_stream` table with the same
+ # stream_ordering.
+ join_event3, join_event_context3 = self.get_success(
+ create_event(
+ self.hs,
+ sender=user3_id,
+ type=EventTypes.Member,
+ state_key=user3_id,
+ content={"membership": "join"},
+ room_id=room_id1,
+ )
+ )
+ # We want to put user1 in the middle of the batch. This way, regardless of the
+ # implementation that inserts rows into current_state_delta_stream` (whether it
+ # be minimum/maximum of stream position of the batch), we will still catch bugs.
+ join_event1, join_event_context1 = self.get_success(
+ create_event(
+ self.hs,
+ sender=user1_id,
+ type=EventTypes.Member,
+ state_key=user1_id,
+ content={"membership": "join"},
+ room_id=room_id1,
+ )
+ )
+ join_event4, join_event_context4 = self.get_success(
+ create_event(
+ self.hs,
+ sender=user4_id,
+ type=EventTypes.Member,
+ state_key=user4_id,
+ content={"membership": "join"},
+ room_id=room_id1,
+ )
+ )
+ self.get_success(
+ self.persistence.persist_events(
+ [
+ (join_event3, join_event_context3),
+ (join_event1, join_event_context1),
+ (join_event4, join_event_context4),
+ ]
+ )
+ )
+
+ after_room1_token = self.event_sources.get_current_token()
+
+ # Get the membership changes for the user.
+ #
+ # At this point, the `current_state_delta_stream` table should look like (notice
+ # those three memberships at the end with `stream_id=7` because we persisted
+ # them in the same batch):
+ #
+ # | stream_id | room_id | type | state_key | event_id | prev_event_id |
+ # |-----------|-----------|----------------------------|------------------|----------|---------------|
+ # | 2 | '!x:test' | 'm.room.create' | '' | '$xxx' | None |
+ # | 3 | '!x:test' | 'm.room.member' | '@user2:test' | '$xxx' | None |
+ # | 4 | '!x:test' | 'm.room.history_visibility'| '' | '$xxx' | None |
+ # | 4 | '!x:test' | 'm.room.join_rules' | '' | '$xxx' | None |
+ # | 4 | '!x:test' | 'm.room.power_levels' | '' | '$xxx' | None |
+ # | 7 | '!x:test' | 'm.room.member' | '@user3:test' | '$xxx' | None |
+ # | 7 | '!x:test' | 'm.room.member' | '@user1:test' | '$xxx' | None |
+ # | 7 | '!x:test' | 'm.room.member' | '@user4:test' | '$xxx' | None |
+ membership_changes = self.get_success(
+ self.store.get_current_state_delta_membership_changes_for_user(
+ user1_id,
+ from_key=before_room1_token.room_key,
+ to_key=after_room1_token.room_key,
+ )
+ )
+
+ join_pos3 = self.get_success(
+ self.store.get_position_for_event(join_event3.event_id)
+ )
+
+ # Let the whole diff show on failure
+ self.maxDiff = None
+ self.assertEqual(
+ membership_changes,
+ [
+ CurrentStateDeltaMembership(
+ room_id=room_id1,
+ event_id=join_event1.event_id,
+ # Ideally, this would be `join_pos1` (to match the `event_id`) but
+ # when events are persisted in a batch, they are all stored in the
+ # `current_state_delta_stream` table with the minimum
+ # `stream_ordering` from the batch.
+ event_pos=join_pos3,
+ membership="join",
+ sender=user1_id,
+ prev_event_id=None,
+ prev_event_pos=None,
+ prev_membership=None,
+ prev_sender=None,
+ ),
+ ],
+ )
+
+ def test_state_reset(self) -> None:
+ """
+ Test a state reset scenario where the user gets removed from the room (when
+ there is no corresponding leave event)
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
+
+ room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+ join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok)
+ join_pos1 = self.get_success(
+ self.store.get_position_for_event(join_response1["event_id"])
+ )
+
+ before_reset_token = self.event_sources.get_current_token()
+
+ # Send another state event to make a position for the state reset to happen at
+ dummy_state_response = self.helper.send_state(
+ room_id1,
+ event_type="foobarbaz",
+ state_key="",
+ body={"foo": "bar"},
+ tok=user2_tok,
+ )
+ dummy_state_pos = self.get_success(
+ self.store.get_position_for_event(dummy_state_response["event_id"])
+ )
+
+ # Mock a state reset removing the membership for user1 in the current state
+ self.get_success(
+ self.store.db_pool.simple_delete(
+ table="current_state_events",
+ keyvalues={
+ "room_id": room_id1,
+ "type": EventTypes.Member,
+ "state_key": user1_id,
+ },
+ desc="state reset user in current_state_delta_stream",
+ )
+ )
+ self.get_success(
+ self.store.db_pool.simple_insert(
+ table="current_state_delta_stream",
+ values={
+ "stream_id": dummy_state_pos.stream,
+ "room_id": room_id1,
+ "type": EventTypes.Member,
+ "state_key": user1_id,
+ "event_id": None,
+ "prev_event_id": join_response1["event_id"],
+ "instance_name": dummy_state_pos.instance_name,
+ },
+ desc="state reset user in current_state_delta_stream",
+ )
+ )
+
+ # Manually bust the cache since we we're just manually messing with the database
+ # and not causing an actual state reset.
+ self.store._membership_stream_cache.entity_has_changed(
+ user1_id, dummy_state_pos.stream
+ )
+
+ after_reset_token = self.event_sources.get_current_token()
+
+ membership_changes = self.get_success(
+ self.store.get_current_state_delta_membership_changes_for_user(
+ user1_id,
+ from_key=before_reset_token.room_key,
+ to_key=after_reset_token.room_key,
+ )
+ )
+
+ # Let the whole diff show on failure
+ self.maxDiff = None
+ self.assertEqual(
+ membership_changes,
+ [
+ CurrentStateDeltaMembership(
+ room_id=room_id1,
+ event_id=None,
+ event_pos=dummy_state_pos,
+ membership="leave",
+ sender=None, # user1_id,
+ prev_event_id=join_response1["event_id"],
+ prev_event_pos=join_pos1,
+ prev_membership="join",
+ prev_sender=user1_id,
+ ),
+ ],
+ )
+
+ def test_excluded_room_ids(self) -> None:
+ """
+ Test that the `excluded_room_ids` option excludes changes from the specified rooms.
+ """
+ user1_id = self.register_user("user1", "pass")
+ user1_tok = self.login(user1_id, "pass")
+ user2_id = self.register_user("user2", "pass")
+ user2_tok = self.login(user2_id, "pass")
+
+ before_room1_token = self.event_sources.get_current_token()
+
+ room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+ join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok)
+ join_pos1 = self.get_success(
+ self.store.get_position_for_event(join_response1["event_id"])
+ )
+
+ room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok)
+ join_response2 = self.helper.join(room_id2, user1_id, tok=user1_tok)
+ join_pos2 = self.get_success(
+ self.store.get_position_for_event(join_response2["event_id"])
+ )
+
+ after_room1_token = self.event_sources.get_current_token()
+
+ # First test the the room is returned without the `excluded_room_ids` option
+ membership_changes = self.get_success(
+ self.store.get_current_state_delta_membership_changes_for_user(
+ user1_id,
+ from_key=before_room1_token.room_key,
+ to_key=after_room1_token.room_key,
+ )
+ )
+
+ # Let the whole diff show on failure
+ self.maxDiff = None
+ self.assertEqual(
+ membership_changes,
+ [
+ CurrentStateDeltaMembership(
+ room_id=room_id1,
+ event_id=join_response1["event_id"],
+ event_pos=join_pos1,
+ membership="join",
+ sender=user1_id,
+ prev_event_id=None,
+ prev_event_pos=None,
+ prev_membership=None,
+ prev_sender=None,
+ ),
+ CurrentStateDeltaMembership(
+ room_id=room_id2,
+ event_id=join_response2["event_id"],
+ event_pos=join_pos2,
+ membership="join",
+ sender=user1_id,
+ prev_event_id=None,
+ prev_event_pos=None,
+ prev_membership=None,
+ prev_sender=None,
+ ),
+ ],
+ )
+
+ # The test that `excluded_room_ids` excludes room2 as expected
+ membership_changes = self.get_success(
+ self.store.get_current_state_delta_membership_changes_for_user(
+ user1_id,
+ from_key=before_room1_token.room_key,
+ to_key=after_room1_token.room_key,
+ excluded_room_ids=[room_id2],
+ )
+ )
+
+ # Let the whole diff show on failure
+ self.maxDiff = None
+ self.assertEqual(
+ membership_changes,
+ [
+ CurrentStateDeltaMembership(
+ room_id=room_id1,
+ event_id=join_response1["event_id"],
+ event_pos=join_pos1,
+ membership="join",
+ sender=user1_id,
+ prev_event_id=None,
+ prev_event_pos=None,
+ prev_membership=None,
+ prev_sender=None,
+ )
+ ],
+ )
+
+
+class GetCurrentStateDeltaMembershipChangesForUserFederationTestCase(
+ FederatingHomeserverTestCase
+):
+ """
+ Test `get_current_state_delta_membership_changes_for_user(...)` when joining remote federated rooms.
+ """
+
+ servlets = [
+ admin.register_servlets_for_client_rest_resource,
+ room.register_servlets,
+ login.register_servlets,
+ ]
+
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+ self.sliding_sync_handler = self.hs.get_sliding_sync_handler()
+ self.store = self.hs.get_datastores().main
+ self.event_sources = hs.get_event_sources()
+ self.room_member_handler = hs.get_room_member_handler()
+
+ def test_remote_join(self) -> None:
+ """
+ Test remote join where the first rows in `current_state_delta_stream` will just
+ be the state when you joined the remote room.
+ """
+ user1_id = self.register_user("user1", "pass")
+ _user1_tok = self.login(user1_id, "pass")
+
+ before_join_token = self.event_sources.get_current_token()
+
+ intially_unjoined_room_id = f"!example:{self.OTHER_SERVER_NAME}"
+
+ # Remotely join a room on another homeserver.
+ #
+ # To do this we have to mock the responses from the remote homeserver. We also
+ # patch out a bunch of event checks on our end.
+ create_event_source = {
+ "auth_events": [],
+ "content": {
+ "creator": f"@creator:{self.OTHER_SERVER_NAME}",
+ "room_version": self.hs.config.server.default_room_version.identifier,
+ },
+ "depth": 0,
+ "origin_server_ts": 0,
+ "prev_events": [],
+ "room_id": intially_unjoined_room_id,
+ "sender": f"@creator:{self.OTHER_SERVER_NAME}",
+ "state_key": "",
+ "type": EventTypes.Create,
+ }
+ self.add_hashes_and_signatures_from_other_server(
+ create_event_source,
+ self.hs.config.server.default_room_version,
+ )
+ create_event = FrozenEventV3(
+ create_event_source,
+ self.hs.config.server.default_room_version,
+ {},
+ None,
+ )
+ creator_join_event_source = {
+ "auth_events": [create_event.event_id],
+ "content": {
+ "membership": "join",
+ },
+ "depth": 1,
+ "origin_server_ts": 1,
+ "prev_events": [],
+ "room_id": intially_unjoined_room_id,
+ "sender": f"@creator:{self.OTHER_SERVER_NAME}",
+ "state_key": f"@creator:{self.OTHER_SERVER_NAME}",
+ "type": EventTypes.Member,
+ }
+ self.add_hashes_and_signatures_from_other_server(
+ creator_join_event_source,
+ self.hs.config.server.default_room_version,
+ )
+ creator_join_event = FrozenEventV3(
+ creator_join_event_source,
+ self.hs.config.server.default_room_version,
+ {},
+ None,
+ )
+
+ # Our local user is going to remote join the room
+ join_event_source = {
+ "auth_events": [create_event.event_id],
+ "content": {"membership": "join"},
+ "depth": 1,
+ "origin_server_ts": 100,
+ "prev_events": [creator_join_event.event_id],
+ "sender": user1_id,
+ "state_key": user1_id,
+ "room_id": intially_unjoined_room_id,
+ "type": EventTypes.Member,
+ }
+ add_hashes_and_signatures(
+ self.hs.config.server.default_room_version,
+ join_event_source,
+ self.hs.hostname,
+ self.hs.signing_key,
+ )
+ join_event = FrozenEventV3(
+ join_event_source,
+ self.hs.config.server.default_room_version,
+ {},
+ None,
+ )
+
+ mock_make_membership_event = AsyncMock(
+ return_value=(
+ self.OTHER_SERVER_NAME,
+ join_event,
+ self.hs.config.server.default_room_version,
+ )
+ )
+ mock_send_join = AsyncMock(
+ return_value=SendJoinResult(
+ join_event,
+ self.OTHER_SERVER_NAME,
+ state=[create_event, creator_join_event],
+ auth_chain=[create_event, creator_join_event],
+ partial_state=False,
+ servers_in_room=frozenset(),
+ )
+ )
+
+ with patch.object(
+ self.room_member_handler.federation_handler.federation_client,
+ "make_membership_event",
+ mock_make_membership_event,
+ ), patch.object(
+ self.room_member_handler.federation_handler.federation_client,
+ "send_join",
+ mock_send_join,
+ ), patch(
+ "synapse.event_auth._is_membership_change_allowed",
+ return_value=None,
+ ), patch(
+ "synapse.handlers.federation_event.check_state_dependent_auth_rules",
+ return_value=None,
+ ):
+ self.get_success(
+ self.room_member_handler.update_membership(
+ requester=create_requester(user1_id),
+ target=UserID.from_string(user1_id),
+ room_id=intially_unjoined_room_id,
+ action=Membership.JOIN,
+ remote_room_hosts=[self.OTHER_SERVER_NAME],
+ )
+ )
+
+ after_join_token = self.event_sources.get_current_token()
+
+ # Get the membership changes for the user.
+ #
+ # At this point, the `current_state_delta_stream` table should look like the
+ # following. Notice that all of the events are at the same `stream_id` because
+ # the current state starts out where we remotely joined:
+ #
+ # | stream_id | room_id | type | state_key | event_id | prev_event_id |
+ # |-----------|------------------------------|-----------------|------------------------------|----------|---------------|
+ # | 2 | '!example:other.example.com' | 'm.room.member' | '@user1:test' | '$xxx' | None |
+ # | 2 | '!example:other.example.com' | 'm.room.create' | '' | '$xxx' | None |
+ # | 2 | '!example:other.example.com' | 'm.room.member' | '@creator:other.example.com' | '$xxx' | None |
+ membership_changes = self.get_success(
+ self.store.get_current_state_delta_membership_changes_for_user(
+ user1_id,
+ from_key=before_join_token.room_key,
+ to_key=after_join_token.room_key,
+ )
+ )
+
+ join_pos = self.get_success(
+ self.store.get_position_for_event(join_event.event_id)
+ )
+
+ # Let the whole diff show on failure
+ self.maxDiff = None
+ self.assertEqual(
+ membership_changes,
+ [
+ CurrentStateDeltaMembership(
+ room_id=intially_unjoined_room_id,
+ event_id=join_event.event_id,
+ event_pos=join_pos,
+ membership="join",
+ sender=user1_id,
+ prev_event_id=None,
+ prev_event_pos=None,
+ prev_membership=None,
+ prev_sender=None,
+ ),
+ ],
+ )
|