From cf66d712c615b96bce19e44118cce1ebda41d0b8 Mon Sep 17 00:00:00 2001 From: Sean Quah <8349537+squahtx@users.noreply.github.com> Date: Thu, 26 Jan 2023 10:38:49 +0000 Subject: Fix initialization of `_device_list_id_gen` (#14914) On startup, the `_device_list_id_gen` stream id generator is initialized using the maximum stream id seen in a list of tables. When we started populating the `device_list_remote_pending` table in #13913, we forgot to add it to the aforementioned list of tables, so the stream id generator can hand out old stream ids after a restart. The end result is that Synapse can fail to handle device list update EDUs after a restart when a partial state join is in progress. Add the `device_list_remote_pending` table to the list of tables to consider when initializing the `_device_list_id_gen` stream id generator. Signed-off-by: Sean Quah --- synapse/storage/databases/main/devices.py | 1 + 1 file changed, 1 insertion(+) (limited to 'synapse') diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 903606fb46..e8b6cc6b80 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -99,6 +99,7 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): ("user_signature_stream", "stream_id"), ("device_lists_outbound_pokes", "stream_id"), ("device_lists_changes_in_room", "stream_id"), + ("device_lists_remote_pending", "stream_id"), ], is_writer=hs.config.worker.worker_app is None, ) -- cgit 1.4.1 From 8a05d5de21888cdd0b53870fead3a1eae64f0b17 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 26 Jan 2023 12:15:36 -0500 Subject: Batch look-ups to see if rooms are partial stated. (#14917) * Batch look-ups to see if rooms are partial stated. * Fix issues found in linting. * Fix typo. * Apply suggestions from code review Co-authored-by: Sean Quah <8349537+squahtx@users.noreply.github.com> * Clarify comments. Co-authored-by: Sean Quah <8349537+squahtx@users.noreply.github.com> * Also improve the cache size while we're at it * is_partial_state_rooms -> is_partial_state_room_batched * Run `black` * Improve annotation for `simple_select_many_batch` * Fix is_partial_state_room_batched impl * Okay, _actually_ fix impl * Update description. * Update synapse/storage/databases/main/room.py Co-authored-by: Patrick Cloke * Run black. Co-authored-by: Sean Quah <8349537+squahtx@users.noreply.github.com> Co-authored-by: David Robertson --- changelog.d/14917.misc | 1 + synapse/handlers/sync.py | 24 +++++++++++++++++------- synapse/storage/database.py | 2 +- synapse/storage/databases/main/room.py | 27 ++++++++++++++++++++++++--- 4 files changed, 43 insertions(+), 11 deletions(-) create mode 100644 changelog.d/14917.misc (limited to 'synapse') diff --git a/changelog.d/14917.misc b/changelog.d/14917.misc new file mode 100644 index 0000000000..4d1dd2639a --- /dev/null +++ b/changelog.d/14917.misc @@ -0,0 +1 @@ +Faster joins: Improve performance of looking up partial-state status of rooms. diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index ee11764567..5ebd3ea855 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1383,16 +1383,21 @@ class SyncHandler: if not sync_config.filter_collection.lazy_load_members(): # Non-lazy syncs should never include partially stated rooms. # Exclude all partially stated rooms from this sync. - for room_id in mutable_joined_room_ids: - if await self.store.is_partial_state_room(room_id): - mutable_rooms_to_exclude.add(room_id) + results = await self.store.is_partial_state_room_batched( + mutable_joined_room_ids + ) + mutable_rooms_to_exclude.update( + room_id + for room_id, is_partial_state in results.items() + if is_partial_state + ) # Incremental eager syncs should additionally include rooms that # - we are joined to # - are full-stated # - became fully-stated at some point during the sync period # (These rooms will have been omitted during a previous eager sync.) - forced_newly_joined_room_ids = set() + forced_newly_joined_room_ids: Set[str] = set() if since_token and not sync_config.filter_collection.lazy_load_members(): un_partial_stated_rooms = ( await self.store.get_un_partial_stated_rooms_between( @@ -1401,9 +1406,14 @@ class SyncHandler: mutable_joined_room_ids, ) ) - for room_id in un_partial_stated_rooms: - if not await self.store.is_partial_state_room(room_id): - forced_newly_joined_room_ids.add(room_id) + results = await self.store.is_partial_state_room_batched( + un_partial_stated_rooms + ) + forced_newly_joined_room_ids.update( + room_id + for room_id, is_partial_state in results.items() + if not is_partial_state + ) # Now we have our list of joined room IDs, exclude as configured and freeze joined_room_ids = frozenset( diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 88479a16db..e20c5c5302 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -1819,7 +1819,7 @@ class DatabasePool: keyvalues: Optional[Dict[str, Any]] = None, desc: str = "simple_select_many_batch", batch_size: int = 100, - ) -> List[Any]: + ) -> List[Dict[str, Any]]: """Executes a SELECT query on the named table, which may return zero or more rows, returning the result as a list of dicts. diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index 3aa7b94560..fbbc018887 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -60,9 +60,9 @@ from synapse.storage.util.id_generators import ( MultiWriterIdGenerator, StreamIdGenerator, ) -from synapse.types import JsonDict, RetentionPolicy, ThirdPartyInstanceID +from synapse.types import JsonDict, RetentionPolicy, StrCollection, ThirdPartyInstanceID from synapse.util import json_encoder -from synapse.util.caches.descriptors import cached +from synapse.util.caches.descriptors import cached, cachedList from synapse.util.stringutils import MXC_REGEX if TYPE_CHECKING: @@ -1255,7 +1255,7 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): return room_servers - @cached() + @cached(max_entries=10000) async def is_partial_state_room(self, room_id: str) -> bool: """Checks if this room has partial state. @@ -1274,6 +1274,27 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): return entry is not None + @cachedList(cached_method_name="is_partial_state_room", list_name="room_ids") + async def is_partial_state_room_batched( + self, room_ids: StrCollection + ) -> Mapping[str, bool]: + """Checks if the given rooms have partial state. + + Returns true for "partial-state" rooms, which means that the state + at events in the room, and `current_state_events`, may not yet be + complete. + """ + + rows: List[Dict[str, str]] = await self.db_pool.simple_select_many_batch( + table="partial_state_rooms", + column="room_id", + iterable=room_ids, + retcols=("room_id",), + desc="is_partial_state_room_batched", + ) + partial_state_rooms = {row_dict["room_id"] for row_dict in rows} + return {room_id: room_id in partial_state_rooms for room_id in room_ids} + async def get_join_event_id_and_device_lists_stream_id_for_partial_state( self, room_id: str ) -> Tuple[str, int]: -- cgit 1.4.1