From ae877aa101796a0cd57c3637a875140ddb25ed51 Mon Sep 17 00:00:00 2001 From: Devon Hudson Date: Wed, 7 May 2025 15:07:58 +0000 Subject: Convert Sliding Sync tests to use higher-level `compute_interested_rooms` (#18399) Spawning from https://github.com/element-hq/synapse/pull/18375#discussion_r2071768635, This updates some sliding sync tests to use a higher level function in order to move test coverage to cover both fallback & new tables. Important when https://github.com/element-hq/synapse/pull/18375 is merged. In other words, adjust tests to target `compute_interested_room(...)` (relevant to both new and fallback path) instead of the lower level `get_room_membership_for_user_at_to_token(...)` that only applies to the fallback path. ### Dev notes ``` SYNAPSE_TEST_LOG_LEVEL=INFO poetry run trial tests.handlers.test_sliding_sync.ComputeInterestedRoomsTestCase_new ``` ``` SYNAPSE_TEST_LOG_LEVEL=INFO poetry run trial tests.rest.client.sliding_sync ``` ``` SYNAPSE_POSTGRES=1 SYNAPSE_POSTGRES_USER=postgres SYNAPSE_TEST_LOG_LEVEL=INFO poetry run trial tests.handlers.test_sliding_sync.ComputeInterestedRoomsTestCase_new.test_display_name_changes_leave_after_token_range ``` ### Pull Request Checklist * [x] Pull request is based on the develop branch * [x] Pull request includes a [changelog file](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#changelog). The entry should: - Be a short description of your change which makes sense to users. "Fixed a bug that prevented receiving messages from other servers." instead of "Moved X method from `EventStore` to `EventWorkerStore`.". - Use markdown where necessary, mostly for `code blocks`. - End with either a period (.) or an exclamation mark (!). - Start with a capital letter. - Feel free to credit yourself, by adding a sentence "Contributed by @github_username." or "Contributed by [Your Name]." to the end of the entry. * [x] [Code style](https://element-hq.github.io/synapse/latest/code_style.html) is correct (run the [linters](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#run-the-linters)) --------- Co-authored-by: Eric Eastwood --- synapse/storage/databases/main/roommember.py | 135 +++++++++++++++++++++++++-- 1 file changed, 125 insertions(+), 10 deletions(-) (limited to 'synapse/storage/databases/main/roommember.py') diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index b8c78baa6c..2084776543 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -53,6 +53,7 @@ from synapse.storage.database import ( ) from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore from synapse.storage.databases.main.events_worker import EventsWorkerStore +from synapse.storage.databases.main.stream import _filter_results_by_stream from synapse.storage.engines import Sqlite3Engine from synapse.storage.roommember import ( MemberSummary, @@ -65,6 +66,7 @@ from synapse.types import ( PersistedEventPosition, StateMap, StrCollection, + StreamToken, get_domain_from_id, ) from synapse.util.caches.descriptors import _CacheContext, cached, cachedList @@ -1389,7 +1391,9 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): txn, self.get_forgotten_rooms_for_user, (user_id,) ) self._invalidate_cache_and_stream( - txn, self.get_sliding_sync_rooms_for_user, (user_id,) + txn, + self.get_sliding_sync_rooms_for_user_from_membership_snapshots, + (user_id,), ) await self.db_pool.runInteraction("forget_membership", f) @@ -1421,25 +1425,30 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): ) @cached(iterable=True, max_entries=10000) - async def get_sliding_sync_rooms_for_user( - self, - user_id: str, + async def get_sliding_sync_rooms_for_user_from_membership_snapshots( + self, user_id: str ) -> Mapping[str, RoomsForUserSlidingSync]: - """Get all the rooms for a user to handle a sliding sync request. + """ + Get all the rooms for a user to handle a sliding sync request from the + `sliding_sync_membership_snapshots` table. These will be current memberships and + need to be rewound to the token range. Ignores forgotten rooms and rooms that the user has left themselves. + Args: + user_id: The user ID to get the rooms for. + Returns: Map from room ID to membership info """ - def get_sliding_sync_rooms_for_user_txn( + def _txn( txn: LoggingTransaction, ) -> Dict[str, RoomsForUserSlidingSync]: # XXX: If you use any new columns that can change (like from # `sliding_sync_joined_rooms` or `forgotten`), make sure to bust the - # `get_sliding_sync_rooms_for_user` cache in the appropriate places (and add - # tests). + # `get_sliding_sync_rooms_for_user_from_membership_snapshots` cache in the + # appropriate places (and add tests). sql = """ SELECT m.room_id, m.sender, m.membership, m.membership_event_id, r.room_version, @@ -1455,6 +1464,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): AND (m.membership != 'leave' OR m.user_id != m.sender) """ txn.execute(sql, (user_id,)) + return { row[0]: RoomsForUserSlidingSync( room_id=row[0], @@ -1475,8 +1485,113 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): } return await self.db_pool.runInteraction( - "get_sliding_sync_rooms_for_user", - get_sliding_sync_rooms_for_user_txn, + "get_sliding_sync_rooms_for_user_from_membership_snapshots", + _txn, + ) + + async def get_sliding_sync_self_leave_rooms_after_to_token( + self, + user_id: str, + to_token: StreamToken, + ) -> Dict[str, RoomsForUserSlidingSync]: + """ + Get all the self-leave rooms for a user after the `to_token` (outside the token + range) that are potentially relevant[1] and needed to handle a sliding sync + request. The results are from the `sliding_sync_membership_snapshots` table and + will be current memberships and need to be rewound to the token range. + + [1] If a leave happens after the token range, we may have still been joined (or + any non-self-leave which is relevant to sync) to the room before so we need to + include it in the list of potentially relevant rooms and apply + our rewind logic (outside of this function) to see if it's actually relevant. + + This is basically a sister-function to + `get_sliding_sync_rooms_for_user_from_membership_snapshots`. We could + alternatively incorporate this logic into + `get_sliding_sync_rooms_for_user_from_membership_snapshots` but those results + are cached and the `to_token` isn't very cache friendly (people are constantly + requesting with new tokens) so we separate it out here. + + Args: + user_id: The user ID to get the rooms for. + to_token: Any self-leave memberships after this position will be returned. + + Returns: + Map from room ID to membership info + """ + # TODO: Potential to check + # `self._membership_stream_cache.has_entity_changed(...)` as an early-return + # shortcut. + + def _txn( + txn: LoggingTransaction, + ) -> Dict[str, RoomsForUserSlidingSync]: + sql = """ + SELECT m.room_id, m.sender, m.membership, m.membership_event_id, + r.room_version, + m.event_instance_name, m.event_stream_ordering, + m.has_known_state, + m.room_type, + m.is_encrypted + FROM sliding_sync_membership_snapshots AS m + INNER JOIN rooms AS r USING (room_id) + WHERE user_id = ? + AND m.forgotten = 0 + AND m.membership = 'leave' + AND m.user_id = m.sender + AND (m.event_stream_ordering > ?) + """ + # If a leave happens after the token range, we may have still been joined + # (or any non-self-leave which is relevant to sync) to the room before so we + # need to include it in the list of potentially relevant rooms and apply our + # rewind logic (outside of this function). + # + # To handle tokens with a non-empty instance_map we fetch more + # results than necessary and then filter down + min_to_token_position = to_token.room_key.stream + txn.execute(sql, (user_id, min_to_token_position)) + + # Map from room_id to membership info + room_membership_for_user_map: Dict[str, RoomsForUserSlidingSync] = {} + for row in txn: + room_for_user = RoomsForUserSlidingSync( + room_id=row[0], + sender=row[1], + membership=row[2], + event_id=row[3], + room_version_id=row[4], + event_pos=PersistedEventPosition(row[5], row[6]), + has_known_state=bool(row[7]), + room_type=row[8], + is_encrypted=bool(row[9]), + ) + + # We filter out unknown room versions proactively. They shouldn't go + # down sync and their metadata may be in a broken state (causing + # errors). + if row[4] not in KNOWN_ROOM_VERSIONS: + continue + + # We only want to include the self-leave membership if it happened after + # the token range. + # + # Since the database pulls out more than necessary, we need to filter it + # down here. + if _filter_results_by_stream( + lower_token=None, + upper_token=to_token.room_key, + instance_name=room_for_user.event_pos.instance_name, + stream_ordering=room_for_user.event_pos.stream, + ): + continue + + room_membership_for_user_map[room_for_user.room_id] = room_for_user + + return room_membership_for_user_map + + return await self.db_pool.runInteraction( + "get_sliding_sync_self_leave_rooms_after_to_token", + _txn, ) async def get_sliding_sync_room_for_user( -- cgit 1.5.1