summary refs log tree commit diff
path: root/synapse/storage/databases/main/roommember.py
diff options
context:
space:
mode:
authorDevon Hudson <devon.dmytro@gmail.com>2025-05-07 15:07:58 +0000
committerGitHub <noreply@github.com>2025-05-07 15:07:58 +0000
commitae877aa101796a0cd57c3637a875140ddb25ed51 (patch)
treeba25923f4ea58f5a0054208b1ff23ca87787adb1 /synapse/storage/databases/main/roommember.py
parentMerge branch 'master' into develop (diff)
downloadsynapse-ae877aa101796a0cd57c3637a875140ddb25ed51.tar.xz
Convert Sliding Sync tests to use higher-level `compute_interested_rooms` (#18399)
Spawning from
https://github.com/element-hq/synapse/pull/18375#discussion_r2071768635,

This updates some sliding sync tests to use a higher level function in
order to move test coverage to cover both fallback & new tables.
Important when https://github.com/element-hq/synapse/pull/18375 is
merged.

In other words, adjust tests to target `compute_interested_room(...)`
(relevant to both new and fallback path) instead of the lower level
`get_room_membership_for_user_at_to_token(...)` that only applies to the
fallback path.

### Dev notes

```
SYNAPSE_TEST_LOG_LEVEL=INFO poetry run trial tests.handlers.test_sliding_sync.ComputeInterestedRoomsTestCase_new
```

```
SYNAPSE_TEST_LOG_LEVEL=INFO poetry run trial tests.rest.client.sliding_sync
```

```
SYNAPSE_POSTGRES=1 SYNAPSE_POSTGRES_USER=postgres SYNAPSE_TEST_LOG_LEVEL=INFO poetry run trial tests.handlers.test_sliding_sync.ComputeInterestedRoomsTestCase_new.test_display_name_changes_leave_after_token_range
```

### Pull Request Checklist

<!-- Please read
https://element-hq.github.io/synapse/latest/development/contributing_guide.html
before submitting your pull request -->

* [x] Pull request is based on the develop branch
* [x] Pull request includes a [changelog
file](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#changelog).
The entry should:
- Be a short description of your change which makes sense to users.
"Fixed a bug that prevented receiving messages from other servers."
instead of "Moved X method from `EventStore` to `EventWorkerStore`.".
  - Use markdown where necessary, mostly for `code blocks`.
  - End with either a period (.) or an exclamation mark (!).
  - Start with a capital letter.
- Feel free to credit yourself, by adding a sentence "Contributed by
@github_username." or "Contributed by [Your Name]." to the end of the
entry.
* [x] [Code
style](https://element-hq.github.io/synapse/latest/code_style.html) is
correct
(run the
[linters](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#run-the-linters))

---------

Co-authored-by: Eric Eastwood <erice@element.io>
Diffstat (limited to 'synapse/storage/databases/main/roommember.py')
-rw-r--r--synapse/storage/databases/main/roommember.py135
1 files changed, 125 insertions, 10 deletions
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py

index b8c78baa6c..2084776543 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py
@@ -53,6 +53,7 @@ from synapse.storage.database import ( ) from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore from synapse.storage.databases.main.events_worker import EventsWorkerStore +from synapse.storage.databases.main.stream import _filter_results_by_stream from synapse.storage.engines import Sqlite3Engine from synapse.storage.roommember import ( MemberSummary, @@ -65,6 +66,7 @@ from synapse.types import ( PersistedEventPosition, StateMap, StrCollection, + StreamToken, get_domain_from_id, ) from synapse.util.caches.descriptors import _CacheContext, cached, cachedList @@ -1389,7 +1391,9 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): txn, self.get_forgotten_rooms_for_user, (user_id,) ) self._invalidate_cache_and_stream( - txn, self.get_sliding_sync_rooms_for_user, (user_id,) + txn, + self.get_sliding_sync_rooms_for_user_from_membership_snapshots, + (user_id,), ) await self.db_pool.runInteraction("forget_membership", f) @@ -1421,25 +1425,30 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): ) @cached(iterable=True, max_entries=10000) - async def get_sliding_sync_rooms_for_user( - self, - user_id: str, + async def get_sliding_sync_rooms_for_user_from_membership_snapshots( + self, user_id: str ) -> Mapping[str, RoomsForUserSlidingSync]: - """Get all the rooms for a user to handle a sliding sync request. + """ + Get all the rooms for a user to handle a sliding sync request from the + `sliding_sync_membership_snapshots` table. These will be current memberships and + need to be rewound to the token range. Ignores forgotten rooms and rooms that the user has left themselves. + Args: + user_id: The user ID to get the rooms for. + Returns: Map from room ID to membership info """ - def get_sliding_sync_rooms_for_user_txn( + def _txn( txn: LoggingTransaction, ) -> Dict[str, RoomsForUserSlidingSync]: # XXX: If you use any new columns that can change (like from # `sliding_sync_joined_rooms` or `forgotten`), make sure to bust the - # `get_sliding_sync_rooms_for_user` cache in the appropriate places (and add - # tests). + # `get_sliding_sync_rooms_for_user_from_membership_snapshots` cache in the + # appropriate places (and add tests). sql = """ SELECT m.room_id, m.sender, m.membership, m.membership_event_id, r.room_version, @@ -1455,6 +1464,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): AND (m.membership != 'leave' OR m.user_id != m.sender) """ txn.execute(sql, (user_id,)) + return { row[0]: RoomsForUserSlidingSync( room_id=row[0], @@ -1475,8 +1485,113 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore): } return await self.db_pool.runInteraction( - "get_sliding_sync_rooms_for_user", - get_sliding_sync_rooms_for_user_txn, + "get_sliding_sync_rooms_for_user_from_membership_snapshots", + _txn, + ) + + async def get_sliding_sync_self_leave_rooms_after_to_token( + self, + user_id: str, + to_token: StreamToken, + ) -> Dict[str, RoomsForUserSlidingSync]: + """ + Get all the self-leave rooms for a user after the `to_token` (outside the token + range) that are potentially relevant[1] and needed to handle a sliding sync + request. The results are from the `sliding_sync_membership_snapshots` table and + will be current memberships and need to be rewound to the token range. + + [1] If a leave happens after the token range, we may have still been joined (or + any non-self-leave which is relevant to sync) to the room before so we need to + include it in the list of potentially relevant rooms and apply + our rewind logic (outside of this function) to see if it's actually relevant. + + This is basically a sister-function to + `get_sliding_sync_rooms_for_user_from_membership_snapshots`. We could + alternatively incorporate this logic into + `get_sliding_sync_rooms_for_user_from_membership_snapshots` but those results + are cached and the `to_token` isn't very cache friendly (people are constantly + requesting with new tokens) so we separate it out here. + + Args: + user_id: The user ID to get the rooms for. + to_token: Any self-leave memberships after this position will be returned. + + Returns: + Map from room ID to membership info + """ + # TODO: Potential to check + # `self._membership_stream_cache.has_entity_changed(...)` as an early-return + # shortcut. + + def _txn( + txn: LoggingTransaction, + ) -> Dict[str, RoomsForUserSlidingSync]: + sql = """ + SELECT m.room_id, m.sender, m.membership, m.membership_event_id, + r.room_version, + m.event_instance_name, m.event_stream_ordering, + m.has_known_state, + m.room_type, + m.is_encrypted + FROM sliding_sync_membership_snapshots AS m + INNER JOIN rooms AS r USING (room_id) + WHERE user_id = ? + AND m.forgotten = 0 + AND m.membership = 'leave' + AND m.user_id = m.sender + AND (m.event_stream_ordering > ?) + """ + # If a leave happens after the token range, we may have still been joined + # (or any non-self-leave which is relevant to sync) to the room before so we + # need to include it in the list of potentially relevant rooms and apply our + # rewind logic (outside of this function). + # + # To handle tokens with a non-empty instance_map we fetch more + # results than necessary and then filter down + min_to_token_position = to_token.room_key.stream + txn.execute(sql, (user_id, min_to_token_position)) + + # Map from room_id to membership info + room_membership_for_user_map: Dict[str, RoomsForUserSlidingSync] = {} + for row in txn: + room_for_user = RoomsForUserSlidingSync( + room_id=row[0], + sender=row[1], + membership=row[2], + event_id=row[3], + room_version_id=row[4], + event_pos=PersistedEventPosition(row[5], row[6]), + has_known_state=bool(row[7]), + room_type=row[8], + is_encrypted=bool(row[9]), + ) + + # We filter out unknown room versions proactively. They shouldn't go + # down sync and their metadata may be in a broken state (causing + # errors). + if row[4] not in KNOWN_ROOM_VERSIONS: + continue + + # We only want to include the self-leave membership if it happened after + # the token range. + # + # Since the database pulls out more than necessary, we need to filter it + # down here. + if _filter_results_by_stream( + lower_token=None, + upper_token=to_token.room_key, + instance_name=room_for_user.event_pos.instance_name, + stream_ordering=room_for_user.event_pos.stream, + ): + continue + + room_membership_for_user_map[room_for_user.room_id] = room_for_user + + return room_membership_for_user_map + + return await self.db_pool.runInteraction( + "get_sliding_sync_self_leave_rooms_after_to_token", + _txn, ) async def get_sliding_sync_room_for_user(