summary refs log tree commit diff
diff options
context:
space:
mode:
authorDevon Hudson <devon.dmytro@gmail.com>2025-05-08 14:28:23 +0000
committerGitHub <noreply@github.com>2025-05-08 14:28:23 +0000
commit7c633f1a58e22ea27a172efdc52d94bfdac8c728 (patch)
tree2d69b18c11fdece402a52918ea1a3241bd0daf1c
parentConvert Sliding Sync tests to use higher-level `compute_interested_rooms` (#1... (diff)
downloadsynapse-7c633f1a58e22ea27a172efdc52d94bfdac8c728.tar.xz
Pass leave from remote invite rejection down Sliding Sync (#18375)
Fixes #17753 


### Dev notes

The `sliding_sync_membership_snapshots` and `sliding_sync_joined_rooms`
database tables were added in
https://github.com/element-hq/synapse/pull/17512

### Pull Request Checklist

<!-- Please read
https://element-hq.github.io/synapse/latest/development/contributing_guide.html
before submitting your pull request -->

* [X] Pull request is based on the develop branch
* [x] Pull request includes a [changelog
file](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#changelog).
The entry should:
- Be a short description of your change which makes sense to users.
"Fixed a bug that prevented receiving messages from other servers."
instead of "Moved X method from `EventStore` to `EventWorkerStore`.".
  - Use markdown where necessary, mostly for `code blocks`.
  - End with either a period (.) or an exclamation mark (!).
  - Start with a capital letter.
- Feel free to credit yourself, by adding a sentence "Contributed by
@github_username." or "Contributed by [Your Name]." to the end of the
entry.
* [X] [Code
style](https://element-hq.github.io/synapse/latest/code_style.html) is
correct
(run the
[linters](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#run-the-linters))

---------

Co-authored-by: Erik Johnston <erik@matrix.org>
Co-authored-by: Olivier 'reivilibre <oliverw@matrix.org>
Co-authored-by: Eric Eastwood <erice@element.io>
-rw-r--r--changelog.d/18375.bugfix1
-rw-r--r--synapse/handlers/sliding_sync/__init__.py23
-rw-r--r--synapse/handlers/sliding_sync/room_lists.py49
-rw-r--r--synapse/storage/databases/main/stream.py202
-rw-r--r--synapse/storage/schema/main/delta/92/03_ss_membership_snapshot_idx.sql16
-rw-r--r--tests/handlers/test_sliding_sync.py12
-rw-r--r--tests/rest/client/sliding_sync/test_sliding_sync.py58
7 files changed, 360 insertions, 1 deletions
diff --git a/changelog.d/18375.bugfix b/changelog.d/18375.bugfix
new file mode 100644

index 0000000000..faebe6f046 --- /dev/null +++ b/changelog.d/18375.bugfix
@@ -0,0 +1 @@ +Pass leave from remote invite rejection down Sliding Sync. diff --git a/synapse/handlers/sliding_sync/__init__.py b/synapse/handlers/sliding_sync/__init__.py
index 459d3c3e24..cb56eb53fc 100644 --- a/synapse/handlers/sliding_sync/__init__.py +++ b/synapse/handlers/sliding_sync/__init__.py
@@ -271,6 +271,7 @@ class SlidingSyncHandler: from_token=from_token, to_token=to_token, newly_joined=room_id in interested_rooms.newly_joined_rooms, + newly_left=room_id in interested_rooms.newly_left_rooms, is_dm=room_id in interested_rooms.dm_room_ids, ) @@ -542,6 +543,7 @@ class SlidingSyncHandler: from_token: Optional[SlidingSyncStreamToken], to_token: StreamToken, newly_joined: bool, + newly_left: bool, is_dm: bool, ) -> SlidingSyncResult.RoomResult: """ @@ -559,6 +561,7 @@ class SlidingSyncHandler: from_token: The point in the stream to sync from. to_token: The point in the stream to sync up to. newly_joined: If the user has newly joined the room + newly_left: If the user has newly left the room is_dm: Whether the room is a DM room """ user = sync_config.user @@ -856,6 +859,26 @@ class SlidingSyncHandler: # TODO: Limit the number of state events we're about to send down # the room, if its too many we should change this to an # `initial=True`? + + # For the case of rejecting remote invites, the leave event won't be + # returned by `get_current_state_deltas_for_room`. This is due to the current + # state only being filled out for rooms the server is in, and so doesn't pick + # up out-of-band leaves (including locally rejected invites) as these events + # are outliers and not added to the `current_state_delta_stream`. + # + # We rely on being explicitly told that the room has been `newly_left` to + # ensure we extract the out-of-band leave. + if newly_left and room_membership_for_user_at_to_token.event_id is not None: + membership_changed = True + leave_event = await self.store.get_event( + room_membership_for_user_at_to_token.event_id + ) + state_key = leave_event.get_state_key() + if state_key is not None: + room_state_delta_id_map[(leave_event.type, state_key)] = ( + room_membership_for_user_at_to_token.event_id + ) + deltas = await self.get_current_state_deltas_for_room( room_id=room_id, room_membership_for_user_at_to_token=room_membership_for_user_at_to_token, diff --git a/synapse/handlers/sliding_sync/room_lists.py b/synapse/handlers/sliding_sync/room_lists.py
index 7e3cf539df..6d1ac91605 100644 --- a/synapse/handlers/sliding_sync/room_lists.py +++ b/synapse/handlers/sliding_sync/room_lists.py
@@ -1120,7 +1120,7 @@ class SlidingSyncRoomLists: ( newly_joined_room_ids, newly_left_room_map, - ) = await self._get_newly_joined_and_left_rooms( + ) = await self._get_newly_joined_and_left_rooms_fallback( user_id, to_token=to_token, from_token=from_token ) @@ -1187,6 +1187,53 @@ class SlidingSyncRoomLists: was state reset out of the room. To actually check for a state reset, you need to check if a membership still exists in the room. """ + + newly_joined_room_ids: Set[str] = set() + newly_left_room_map: Dict[str, RoomsForUserStateReset] = {} + + if not from_token: + return newly_joined_room_ids, newly_left_room_map + + changes = await self.store.get_sliding_sync_membership_changes( + user_id, + from_key=from_token.room_key, + to_key=to_token.room_key, + excluded_room_ids=set(self.rooms_to_exclude_globally), + ) + + for room_id, entry in changes.items(): + if entry.membership == Membership.JOIN: + newly_joined_room_ids.add(room_id) + elif entry.membership == Membership.LEAVE: + newly_left_room_map[room_id] = entry + + return newly_joined_room_ids, newly_left_room_map + + @trace + async def _get_newly_joined_and_left_rooms_fallback( + self, + user_id: str, + to_token: StreamToken, + from_token: Optional[StreamToken], + ) -> Tuple[AbstractSet[str], Mapping[str, RoomsForUserStateReset]]: + """Fetch the sets of rooms that the user newly joined or left in the + given token range. + + Note: there may be rooms in the newly left rooms where the user was + "state reset" out of the room, and so that room would not be part of the + "current memberships" of the user. + + Returns: + A 2-tuple of newly joined room IDs and a map of newly_left room + IDs to the `RoomsForUserStateReset` entry. + + We're using `RoomsForUserStateReset` but that doesn't necessarily mean the + user was state reset of the rooms. It's just that the `event_id`/`sender` + are optional and we can't tell the difference between the server leaving the + room when the user was the last person participating in the room and left or + was state reset out of the room. To actually check for a state reset, you + need to check if a membership still exists in the room. + """ newly_joined_room_ids: Set[str] = set() newly_left_room_map: Dict[str, RoomsForUserStateReset] = {} diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index c52389b8a9..3fda49f31f 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py
@@ -80,6 +80,7 @@ from synapse.storage.database import ( ) from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine +from synapse.storage.roommember import RoomsForUserStateReset from synapse.storage.util.id_generators import MultiWriterIdGenerator from synapse.types import PersistedEventPosition, RoomStreamToken, StrCollection from synapse.util.caches.descriptors import cached, cachedList @@ -993,6 +994,10 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): available in the `current_state_delta_stream` table. To actually check for a state reset, you need to check if a membership still exists in the room. """ + + assert from_key.topological is None + assert to_key.topological is None + # Start by ruling out cases where a DB query is not necessary. if from_key == to_key: return [] @@ -1138,6 +1143,203 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): if membership_change.room_id not in room_ids_to_exclude ] + @trace + async def get_sliding_sync_membership_changes( + self, + user_id: str, + from_key: RoomStreamToken, + to_key: RoomStreamToken, + excluded_room_ids: Optional[AbstractSet[str]] = None, + ) -> Dict[str, RoomsForUserStateReset]: + """ + Fetch membership events that result in a meaningful membership change for a + given user. + + A meaningful membership changes is one where the `membership` value actually + changes. This means memberships changes from `join` to `join` (like a display + name change) will be filtered out since they result in no meaningful change. + + Note: This function only works with "live" tokens with `stream_ordering` only. + + We're looking for membership changes in the token range (> `from_key` and <= + `to_key`). + + Args: + user_id: The user ID to fetch membership events for. + from_key: The point in the stream to sync from (fetching events > this point). + to_key: The token to fetch rooms up to (fetching events <= this point). + excluded_room_ids: Optional list of room IDs to exclude from the results. + + Returns: + All meaningful membership changes to the current state in the token range. + Events are sorted by `stream_ordering` ascending. + + `event_id`/`sender` can be `None` when the server leaves a room (meaning + everyone locally left) or a state reset which removed the person from the + room. We can't tell the difference between the two cases with what's + available in the `current_state_delta_stream` table. To actually check for a + state reset, you need to check if a membership still exists in the room. + """ + + assert from_key.topological is None + assert to_key.topological is None + + # Start by ruling out cases where a DB query is not necessary. + if from_key == to_key: + return {} + + if from_key: + has_changed = self._membership_stream_cache.has_entity_changed( + user_id, int(from_key.stream) + ) + if not has_changed: + return {} + + room_ids_to_exclude: AbstractSet[str] = set() + if excluded_room_ids is not None: + room_ids_to_exclude = excluded_room_ids + + def f(txn: LoggingTransaction) -> Dict[str, RoomsForUserStateReset]: + # To handle tokens with a non-empty instance_map we fetch more + # results than necessary and then filter down + min_from_id = from_key.stream + max_to_id = to_key.get_max_stream_pos() + + # This query looks at membership changes in + # `sliding_sync_membership_snapshots` which will not include users + # that were state reset out of rooms; so we need to look for that + # case in `current_state_delta_stream`. + sql = """ + SELECT + room_id, + membership_event_id, + event_instance_name, + event_stream_ordering, + membership, + sender, + prev_membership, + room_version + FROM + ( + SELECT + s.room_id, + s.membership_event_id, + s.event_instance_name, + s.event_stream_ordering, + s.membership, + s.sender, + m_prev.membership AS prev_membership + FROM sliding_sync_membership_snapshots as s + LEFT JOIN event_edges AS e ON e.event_id = s.membership_event_id + LEFT JOIN room_memberships AS m_prev ON m_prev.event_id = e.prev_event_id + WHERE s.user_id = ? + + UNION ALL + + SELECT + s.room_id, + e.event_id, + s.instance_name, + s.stream_id, + m.membership, + e.sender, + m_prev.membership AS prev_membership + FROM current_state_delta_stream AS s + LEFT JOIN events AS e ON e.event_id = s.event_id + LEFT JOIN room_memberships AS m ON m.event_id = s.event_id + LEFT JOIN room_memberships AS m_prev ON m_prev.event_id = s.prev_event_id + WHERE + s.type = ? + AND s.state_key = ? + ) AS c + INNER JOIN rooms USING (room_id) + WHERE event_stream_ordering > ? AND event_stream_ordering <= ? + ORDER BY event_stream_ordering ASC + """ + + txn.execute( + sql, + (user_id, EventTypes.Member, user_id, min_from_id, max_to_id), + ) + + membership_changes: Dict[str, RoomsForUserStateReset] = {} + for ( + room_id, + membership_event_id, + event_instance_name, + event_stream_ordering, + membership, + sender, + prev_membership, + room_version_id, + ) in txn: + assert room_id is not None + assert event_stream_ordering is not None + + if room_id in room_ids_to_exclude: + continue + + if _filter_results_by_stream( + from_key, + to_key, + event_instance_name, + event_stream_ordering, + ): + # When the server leaves a room, it will insert new rows into the + # `current_state_delta_stream` table with `event_id = null` for all + # current state. This means we might already have a row for the + # leave event and then another for the same leave where the + # `event_id=null` but the `prev_event_id` is pointing back at the + # earlier leave event. We don't want to report the leave, if we + # already have a leave event. + if ( + membership_event_id is None + and prev_membership == Membership.LEAVE + ): + continue + + if membership_event_id is None and room_id in membership_changes: + # SUSPICIOUS: if we join a room and get state reset out of it + # in the same queried window, + # won't this ignore the 'state reset out of it' part? + continue + + # When `s.event_id = null`, we won't be able to get respective + # `room_membership` but can assume the user has left the room + # because this only happens when the server leaves a room + # (meaning everyone locally left) or a state reset which removed + # the person from the room. + membership = ( + membership if membership is not None else Membership.LEAVE + ) + + if membership == prev_membership: + # If `membership` and `prev_membership` are the same then this + # is not a meaningful change so we can skip it. + # An example of this happening is when the user changes their display name. + continue + + membership_change = RoomsForUserStateReset( + room_id=room_id, + sender=sender, + membership=membership, + event_id=membership_event_id, + event_pos=PersistedEventPosition( + event_instance_name, event_stream_ordering + ), + room_version_id=room_version_id, + ) + + membership_changes[room_id] = membership_change + + return membership_changes + + membership_changes = await self.db_pool.runInteraction( + "get_sliding_sync_membership_changes", f + ) + + return membership_changes + @cancellable async def get_membership_changes_for_user( self, diff --git a/synapse/storage/schema/main/delta/92/03_ss_membership_snapshot_idx.sql b/synapse/storage/schema/main/delta/92/03_ss_membership_snapshot_idx.sql new file mode 100644
index 0000000000..c694203f95 --- /dev/null +++ b/synapse/storage/schema/main/delta/92/03_ss_membership_snapshot_idx.sql
@@ -0,0 +1,16 @@ +-- +-- This file is licensed under the Affero General Public License (AGPL) version 3. +-- +-- Copyright (C) 2025 New Vector, Ltd +-- +-- This program is free software: you can redistribute it and/or modify +-- it under the terms of the GNU Affero General Public License as +-- published by the Free Software Foundation, either version 3 of the +-- License, or (at your option) any later version. +-- +-- See the GNU Affero General Public License for more details: +-- <https://www.gnu.org/licenses/agpl-3.0.html>. + +-- So we can fetch all rooms for a given user sorted by stream order +DROP INDEX IF EXISTS sliding_sync_membership_snapshots_user_id; +CREATE INDEX IF NOT EXISTS sliding_sync_membership_snapshots_user_id ON sliding_sync_membership_snapshots(user_id, event_stream_ordering); diff --git a/tests/handlers/test_sliding_sync.py b/tests/handlers/test_sliding_sync.py
index cbacf21ae7..7144c58217 100644 --- a/tests/handlers/test_sliding_sync.py +++ b/tests/handlers/test_sliding_sync.py
@@ -594,6 +594,12 @@ class ComputeInterestedRoomsTestCase(SlidingSyncBase): the correct list of rooms IDs. """ + # FIXME: We should refactor these tests to run against `compute_interested_rooms(...)` + # instead of just `get_room_membership_for_user_at_to_token(...)` which is only used + # in the fallback path (`_compute_interested_rooms_fallback(...)`). These scenarios do + # well to stress that logic and we shouldn't remove them just because we're removing + # the fallback path (tracked by https://github.com/element-hq/synapse/issues/17623). + servlets = [ admin.register_servlets, knock.register_servlets, @@ -2976,6 +2982,12 @@ class ComputeInterestedRoomsShardTestCase( sharded event stream_writers enabled """ + # FIXME: We should refactor these tests to run against `compute_interested_rooms(...)` + # instead of just `get_room_membership_for_user_at_to_token(...)` which is only used + # in the fallback path (`_compute_interested_rooms_fallback(...)`). These scenarios do + # well to stress that logic and we shouldn't remove them just because we're removing + # the fallback path (tracked by https://github.com/element-hq/synapse/issues/17623). + servlets = [ admin.register_servlets_for_client_rest_resource, room.register_servlets, diff --git a/tests/rest/client/sliding_sync/test_sliding_sync.py b/tests/rest/client/sliding_sync/test_sliding_sync.py
index f3cf2111ec..dcec5b4cf0 100644 --- a/tests/rest/client/sliding_sync/test_sliding_sync.py +++ b/tests/rest/client/sliding_sync/test_sliding_sync.py
@@ -790,6 +790,64 @@ class SlidingSyncTestCase(SlidingSyncBase): exact=True, ) + def test_reject_remote_invite(self) -> None: + """Test that rejecting a remote invite comes down incremental sync""" + + user_id = self.register_user("user1", "pass") + user_tok = self.login(user_id, "pass") + + # Create a remote room invite (out-of-band membership) + room_id = "!room:remote.server" + self._create_remote_invite_room_for_user(user_id, None, room_id) + + # Make the Sliding Sync request + sync_body = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [(EventTypes.Member, StateValues.ME)], + "timeline_limit": 3, + } + } + } + response_body, from_token = self.do_sync(sync_body, tok=user_tok) + # We should see the room (like normal) + self.assertIncludes( + set(response_body["lists"]["foo-list"]["ops"][0]["room_ids"]), + {room_id}, + exact=True, + ) + + # Reject the remote room invite + self.helper.leave(room_id, user_id, tok=user_tok) + + # Sync again after rejecting the invite + response_body, _ = self.do_sync(sync_body, since=from_token, tok=user_tok) + + # The fix to add the leave event to incremental sync when rejecting a remote + # invite relies on the new tables to work. + if self.use_new_tables: + # We should see the newly_left room + self.assertIncludes( + set(response_body["lists"]["foo-list"]["ops"][0]["room_ids"]), + {room_id}, + exact=True, + ) + # We should see the leave state for the room so clients don't end up with stuck + # invites + self.assertIncludes( + { + ( + state["type"], + state["state_key"], + state["content"].get("membership"), + ) + for state in response_body["rooms"][room_id]["required_state"] + }, + {(EventTypes.Member, user_id, Membership.LEAVE)}, + exact=True, + ) + def test_ignored_user_invites_initial_sync(self) -> None: """ Make sure we ignore invites if they are from one of the `m.ignored_user_list` on