summary refs log tree commit diff
path: root/synapse/storage/databases/main/stream.py
diff options
context:
space:
mode:
authorDevon Hudson <devon.dmytro@gmail.com>2025-05-08 14:28:23 +0000
committerGitHub <noreply@github.com>2025-05-08 14:28:23 +0000
commit7c633f1a58e22ea27a172efdc52d94bfdac8c728 (patch)
tree2d69b18c11fdece402a52918ea1a3241bd0daf1c /synapse/storage/databases/main/stream.py
parentConvert Sliding Sync tests to use higher-level `compute_interested_rooms` (#1... (diff)
downloadsynapse-7c633f1a58e22ea27a172efdc52d94bfdac8c728.tar.xz
Pass leave from remote invite rejection down Sliding Sync (#18375)
Fixes #17753 


### Dev notes

The `sliding_sync_membership_snapshots` and `sliding_sync_joined_rooms`
database tables were added in
https://github.com/element-hq/synapse/pull/17512

### Pull Request Checklist

<!-- Please read
https://element-hq.github.io/synapse/latest/development/contributing_guide.html
before submitting your pull request -->

* [X] Pull request is based on the develop branch
* [x] Pull request includes a [changelog
file](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#changelog).
The entry should:
- Be a short description of your change which makes sense to users.
"Fixed a bug that prevented receiving messages from other servers."
instead of "Moved X method from `EventStore` to `EventWorkerStore`.".
  - Use markdown where necessary, mostly for `code blocks`.
  - End with either a period (.) or an exclamation mark (!).
  - Start with a capital letter.
- Feel free to credit yourself, by adding a sentence "Contributed by
@github_username." or "Contributed by [Your Name]." to the end of the
entry.
* [X] [Code
style](https://element-hq.github.io/synapse/latest/code_style.html) is
correct
(run the
[linters](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#run-the-linters))

---------

Co-authored-by: Erik Johnston <erik@matrix.org>
Co-authored-by: Olivier 'reivilibre <oliverw@matrix.org>
Co-authored-by: Eric Eastwood <erice@element.io>
Diffstat (limited to 'synapse/storage/databases/main/stream.py')
-rw-r--r--synapse/storage/databases/main/stream.py202
1 files changed, 202 insertions, 0 deletions
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py

index c52389b8a9..3fda49f31f 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py
@@ -80,6 +80,7 @@ from synapse.storage.database import ( ) from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine +from synapse.storage.roommember import RoomsForUserStateReset from synapse.storage.util.id_generators import MultiWriterIdGenerator from synapse.types import PersistedEventPosition, RoomStreamToken, StrCollection from synapse.util.caches.descriptors import cached, cachedList @@ -993,6 +994,10 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): available in the `current_state_delta_stream` table. To actually check for a state reset, you need to check if a membership still exists in the room. """ + + assert from_key.topological is None + assert to_key.topological is None + # Start by ruling out cases where a DB query is not necessary. if from_key == to_key: return [] @@ -1138,6 +1143,203 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): if membership_change.room_id not in room_ids_to_exclude ] + @trace + async def get_sliding_sync_membership_changes( + self, + user_id: str, + from_key: RoomStreamToken, + to_key: RoomStreamToken, + excluded_room_ids: Optional[AbstractSet[str]] = None, + ) -> Dict[str, RoomsForUserStateReset]: + """ + Fetch membership events that result in a meaningful membership change for a + given user. + + A meaningful membership changes is one where the `membership` value actually + changes. This means memberships changes from `join` to `join` (like a display + name change) will be filtered out since they result in no meaningful change. + + Note: This function only works with "live" tokens with `stream_ordering` only. + + We're looking for membership changes in the token range (> `from_key` and <= + `to_key`). + + Args: + user_id: The user ID to fetch membership events for. + from_key: The point in the stream to sync from (fetching events > this point). + to_key: The token to fetch rooms up to (fetching events <= this point). + excluded_room_ids: Optional list of room IDs to exclude from the results. + + Returns: + All meaningful membership changes to the current state in the token range. + Events are sorted by `stream_ordering` ascending. + + `event_id`/`sender` can be `None` when the server leaves a room (meaning + everyone locally left) or a state reset which removed the person from the + room. We can't tell the difference between the two cases with what's + available in the `current_state_delta_stream` table. To actually check for a + state reset, you need to check if a membership still exists in the room. + """ + + assert from_key.topological is None + assert to_key.topological is None + + # Start by ruling out cases where a DB query is not necessary. + if from_key == to_key: + return {} + + if from_key: + has_changed = self._membership_stream_cache.has_entity_changed( + user_id, int(from_key.stream) + ) + if not has_changed: + return {} + + room_ids_to_exclude: AbstractSet[str] = set() + if excluded_room_ids is not None: + room_ids_to_exclude = excluded_room_ids + + def f(txn: LoggingTransaction) -> Dict[str, RoomsForUserStateReset]: + # To handle tokens with a non-empty instance_map we fetch more + # results than necessary and then filter down + min_from_id = from_key.stream + max_to_id = to_key.get_max_stream_pos() + + # This query looks at membership changes in + # `sliding_sync_membership_snapshots` which will not include users + # that were state reset out of rooms; so we need to look for that + # case in `current_state_delta_stream`. + sql = """ + SELECT + room_id, + membership_event_id, + event_instance_name, + event_stream_ordering, + membership, + sender, + prev_membership, + room_version + FROM + ( + SELECT + s.room_id, + s.membership_event_id, + s.event_instance_name, + s.event_stream_ordering, + s.membership, + s.sender, + m_prev.membership AS prev_membership + FROM sliding_sync_membership_snapshots as s + LEFT JOIN event_edges AS e ON e.event_id = s.membership_event_id + LEFT JOIN room_memberships AS m_prev ON m_prev.event_id = e.prev_event_id + WHERE s.user_id = ? + + UNION ALL + + SELECT + s.room_id, + e.event_id, + s.instance_name, + s.stream_id, + m.membership, + e.sender, + m_prev.membership AS prev_membership + FROM current_state_delta_stream AS s + LEFT JOIN events AS e ON e.event_id = s.event_id + LEFT JOIN room_memberships AS m ON m.event_id = s.event_id + LEFT JOIN room_memberships AS m_prev ON m_prev.event_id = s.prev_event_id + WHERE + s.type = ? + AND s.state_key = ? + ) AS c + INNER JOIN rooms USING (room_id) + WHERE event_stream_ordering > ? AND event_stream_ordering <= ? + ORDER BY event_stream_ordering ASC + """ + + txn.execute( + sql, + (user_id, EventTypes.Member, user_id, min_from_id, max_to_id), + ) + + membership_changes: Dict[str, RoomsForUserStateReset] = {} + for ( + room_id, + membership_event_id, + event_instance_name, + event_stream_ordering, + membership, + sender, + prev_membership, + room_version_id, + ) in txn: + assert room_id is not None + assert event_stream_ordering is not None + + if room_id in room_ids_to_exclude: + continue + + if _filter_results_by_stream( + from_key, + to_key, + event_instance_name, + event_stream_ordering, + ): + # When the server leaves a room, it will insert new rows into the + # `current_state_delta_stream` table with `event_id = null` for all + # current state. This means we might already have a row for the + # leave event and then another for the same leave where the + # `event_id=null` but the `prev_event_id` is pointing back at the + # earlier leave event. We don't want to report the leave, if we + # already have a leave event. + if ( + membership_event_id is None + and prev_membership == Membership.LEAVE + ): + continue + + if membership_event_id is None and room_id in membership_changes: + # SUSPICIOUS: if we join a room and get state reset out of it + # in the same queried window, + # won't this ignore the 'state reset out of it' part? + continue + + # When `s.event_id = null`, we won't be able to get respective + # `room_membership` but can assume the user has left the room + # because this only happens when the server leaves a room + # (meaning everyone locally left) or a state reset which removed + # the person from the room. + membership = ( + membership if membership is not None else Membership.LEAVE + ) + + if membership == prev_membership: + # If `membership` and `prev_membership` are the same then this + # is not a meaningful change so we can skip it. + # An example of this happening is when the user changes their display name. + continue + + membership_change = RoomsForUserStateReset( + room_id=room_id, + sender=sender, + membership=membership, + event_id=membership_event_id, + event_pos=PersistedEventPosition( + event_instance_name, event_stream_ordering + ), + room_version_id=room_version_id, + ) + + membership_changes[room_id] = membership_change + + return membership_changes + + membership_changes = await self.db_pool.runInteraction( + "get_sliding_sync_membership_changes", f + ) + + return membership_changes + @cancellable async def get_membership_changes_for_user( self,