diff options
author | David Robertson <davidr@element.io> | 2023-01-23 15:44:39 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-01-23 15:44:39 +0000 |
commit | 80d44060c99e87c84da72fdfcaa9a508d38a26b4 (patch) | |
tree | d26f9ab6427271c9b299f2dd4aab00702303aa6b /synapse/storage/databases/main/room.py | |
parent | Bump ruff from 0.0.224 to 0.0.230 (#14897) (diff) | |
download | synapse-80d44060c99e87c84da72fdfcaa9a508d38a26b4.tar.xz |
Faster joins: omit partial rooms from eager syncs until the resync completes (#14870)
* Allow `AbstractSet` in `StrCollection` Or else frozensets are excluded. This will be useful in an upcoming commit where I plan to change a function that accepts `List[str]` to accept `StrCollection` instead. * `rooms_to_exclude` -> `rooms_to_exclude_globally` I am about to make use of this exclusion mechanism to exclude rooms for a specific user and a specific sync. This rename helps to clarify the distinction between the global config and the rooms to exclude for a specific sync. * Better function names for internal sync methods * Track a list of excluded rooms on SyncResultBuilder I plan to feed a list of partially stated rooms for this sync to ignore * Exclude partial state rooms during eager sync using the mechanism established in the previous commit * Track un-partial-state stream in sync tokens So that we can work out which rooms have become fully-stated during a given sync period. * Fix mutation of `@cached` return value This was fouling up a complement test added alongside this PR. Excluding a room would mean the set of forgotten rooms in the cache would be extended. This means that room could be erroneously considered forgotten in the future. Introduced in #12310, Synapse 1.57.0. I don't think this had any user-visible side effects (until now). * SyncResultBuilder: track rooms to force as newly joined Similar plan as before. We've omitted rooms from certain sync responses; now we establish the mechanism to reintroduce them into future syncs. * Read new field, to present rooms as newly joined * Force un-partial-stated rooms to be newly-joined for eager incremental syncs only, provided they're still fully stated * Notify user stream listeners to wake up long polling syncs * Changelog * Typo fix Co-authored-by: Sean Quah <8349537+squahtx@users.noreply.github.com> * Unnecessary list cast Co-authored-by: Sean Quah <8349537+squahtx@users.noreply.github.com> * Rephrase comment Co-authored-by: Sean Quah <8349537+squahtx@users.noreply.github.com> * Another comment Co-authored-by: Sean Quah <8349537+squahtx@users.noreply.github.com> * Fixup merge(?) * Poke notifier when receiving un-partial-stated msg over replication * Fixup merge whoops Thanks MV :) Co-authored-by: Mathieu Velen <mathieuv@matrix.org> Co-authored-by: Mathieu Velten <mathieuv@matrix.org> Co-authored-by: Sean Quah <8349537+squahtx@users.noreply.github.com>
Diffstat (limited to 'synapse/storage/databases/main/room.py')
-rw-r--r-- | synapse/storage/databases/main/room.py | 47 |
1 files changed, 41 insertions, 6 deletions
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index 6a65b2a89b..3aa7b94560 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -26,6 +26,7 @@ from typing import ( Mapping, Optional, Sequence, + Set, Tuple, Union, cast, @@ -1294,10 +1295,44 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): instance_name ) + async def get_un_partial_stated_rooms_between( + self, last_id: int, current_id: int, room_ids: Collection[str] + ) -> Set[str]: + """Get all rooms that got un partial stated between `last_id` exclusive and + `current_id` inclusive. + + Returns: + The list of room ids. + """ + + if last_id == current_id: + return set() + + def _get_un_partial_stated_rooms_between_txn( + txn: LoggingTransaction, + ) -> Set[str]: + sql = """ + SELECT DISTINCT room_id FROM un_partial_stated_room_stream + WHERE ? < stream_id AND stream_id <= ? AND + """ + + clause, args = make_in_list_sql_clause( + self.database_engine, "room_id", room_ids + ) + + txn.execute(sql + clause, [last_id, current_id] + args) + + return {r[0] for r in txn} + + return await self.db_pool.runInteraction( + "get_un_partial_stated_rooms_between", + _get_un_partial_stated_rooms_between_txn, + ) + async def get_un_partial_stated_rooms_from_stream( self, instance_name: str, last_id: int, current_id: int, limit: int ) -> Tuple[List[Tuple[int, Tuple[str]]], int, bool]: - """Get updates for caches replication stream. + """Get updates for un partial stated rooms replication stream. Args: instance_name: The writer we want to fetch updates from. Unused @@ -2304,16 +2339,16 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore): (room_id,), ) - async def clear_partial_state_room(self, room_id: str) -> bool: + async def clear_partial_state_room(self, room_id: str) -> Optional[int]: """Clears the partial state flag for a room. Args: room_id: The room whose partial state flag is to be cleared. Returns: - `True` if the partial state flag has been cleared successfully. + The corresponding stream id for the un-partial-stated rooms stream. - `False` if the partial state flag could not be cleared because the room + `None` if the partial state flag could not be cleared because the room still contains events with partial state. """ try: @@ -2324,7 +2359,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore): room_id, un_partial_state_room_stream_id, ) - return True + return un_partial_state_room_stream_id except self.db_pool.engine.module.IntegrityError as e: # Assume that any `IntegrityError`s are due to partial state events. logger.info( @@ -2332,7 +2367,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore): room_id, e, ) - return False + return None def _clear_partial_state_room_txn( self, |