From 578c5c736e4ca2479c8b72b5e9ac20cd7acde0e8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 10 Jan 2024 14:31:35 +0000 Subject: Reduce amount of state pulled out when querying federation hierachy (#16785) There are two changes here: 1. Only pull out the required state when handling the request. 2. Change the get filtered state return type to check that we're only querying state that was requested --------- Co-authored-by: reivilibre --- synapse/storage/databases/main/state.py | 48 +++++++++++++++++++++++++++++++-- 1 file changed, 46 insertions(+), 2 deletions(-) (limited to 'synapse/storage/databases/main/state.py') diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py index 4700e74ad2..06c44bb563 100644 --- a/synapse/storage/databases/main/state.py +++ b/synapse/storage/databases/main/state.py @@ -30,7 +30,10 @@ from typing import ( Optional, Set, Tuple, + TypeVar, + Union, cast, + overload, ) import attr @@ -52,7 +55,7 @@ from synapse.storage.database import ( ) from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.databases.main.roommember import RoomMemberWorkerStore -from synapse.types import JsonDict, JsonMapping, StateMap +from synapse.types import JsonDict, JsonMapping, StateKey, StateMap from synapse.types.state import StateFilter from synapse.util.caches import intern_string from synapse.util.caches.descriptors import cached, cachedList @@ -64,6 +67,8 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +_T = TypeVar("_T") + MAX_STATE_DELTA_HOPS = 100 @@ -349,7 +354,8 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): def _get_filtered_current_state_ids_txn( txn: LoggingTransaction, ) -> StateMap[str]: - results = {} + results = StateMapWrapper(state_filter=state_filter or StateFilter.all()) + sql = """ SELECT type, state_key, event_id FROM current_state_events WHERE room_id = ? @@ -726,3 +732,41 @@ class StateStore(StateGroupWorkerStore, MainStateBackgroundUpdateStore): hs: "HomeServer", ): super().__init__(database, db_conn, hs) + + +@attr.s(auto_attribs=True, slots=True) +class StateMapWrapper(Dict[StateKey, str]): + """A wrapper around a StateMap[str] to ensure that we only query for items + that were not filtered out. + + This is to help prevent bugs where we filter out state but other bits of the + code expect the state to be there. + """ + + state_filter: StateFilter + + def __getitem__(self, key: StateKey) -> str: + if key not in self.state_filter: + raise Exception("State map was filtered and doesn't include: %s", key) + return super().__getitem__(key) + + @overload + def get(self, key: Tuple[str, str]) -> Optional[str]: + ... + + @overload + def get(self, key: Tuple[str, str], default: Union[str, _T]) -> Union[str, _T]: + ... + + def get( + self, key: StateKey, default: Union[str, _T, None] = None + ) -> Union[str, _T, None]: + if key not in self.state_filter: + raise Exception("State map was filtered and doesn't include: %s", key) + return super().get(key, default) + + def __contains__(self, key: Any) -> bool: + if key not in self.state_filter: + raise Exception("State map was filtered and doesn't include: %s", key) + + return super().__contains__(key) -- cgit 1.5.1 From cbe8a80d108068c585f76bc18e14c61371644f84 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 10 Jan 2024 15:11:59 +0000 Subject: Faster load recents for sync (#16783) This hopefully reduces the amount of state we need to keep in memory --- changelog.d/16783.misc | 1 + synapse/handlers/sync.py | 14 ++++++++------ synapse/storage/databases/main/state.py | 17 ++++++++++++++++- 3 files changed, 25 insertions(+), 7 deletions(-) create mode 100644 changelog.d/16783.misc (limited to 'synapse/storage/databases/main/state.py') diff --git a/changelog.d/16783.misc b/changelog.d/16783.misc new file mode 100644 index 0000000000..9d3b96ffc6 --- /dev/null +++ b/changelog.d/16783.misc @@ -0,0 +1 @@ +Faster load recents for sync by reducing amount of state pulled out. diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 0385c04bc2..2e10035772 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -583,10 +583,11 @@ class SyncHandler: # `recents`, so partial state is only a problem when a membership # event turns up in `recents` but has not made it into the current # state. - current_state_ids_map = ( - await self.store.get_partial_current_state_ids(room_id) + current_state_ids = ( + await self.store.check_if_events_in_current_state( + {e.event_id for e in recents if e.is_state()} + ) ) - current_state_ids = frozenset(current_state_ids_map.values()) recents = await filter_events_for_client( self._storage_controllers, @@ -667,10 +668,11 @@ class SyncHandler: # `loaded_recents`, so partial state is only a problem when a # membership event turns up in `loaded_recents` but has not made it # into the current state. - current_state_ids_map = ( - await self.store.get_partial_current_state_ids(room_id) + current_state_ids = ( + await self.store.check_if_events_in_current_state( + {e.event_id for e in loaded_recents if e.is_state()} + ) ) - current_state_ids = frozenset(current_state_ids_map.values()) loaded_recents = await filter_events_for_client( self._storage_controllers, diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py index 06c44bb563..8006046453 100644 --- a/synapse/storage/databases/main/state.py +++ b/synapse/storage/databases/main/state.py @@ -24,6 +24,7 @@ from typing import ( Any, Collection, Dict, + FrozenSet, Iterable, List, Mapping, @@ -55,7 +56,7 @@ from synapse.storage.database import ( ) from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.databases.main.roommember import RoomMemberWorkerStore -from synapse.types import JsonDict, JsonMapping, StateKey, StateMap +from synapse.types import JsonDict, JsonMapping, StateKey, StateMap, StrCollection from synapse.types.state import StateFilter from synapse.util.caches import intern_string from synapse.util.caches.descriptors import cached, cachedList @@ -323,6 +324,20 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): "get_partial_current_state_ids", _get_current_state_ids_txn ) + async def check_if_events_in_current_state( + self, event_ids: StrCollection + ) -> FrozenSet[str]: + """Checks and returns which of the given events is part of the current state.""" + rows = await self.db_pool.simple_select_many_batch( + table="current_state_events", + column="event_id", + iterable=event_ids, + retcols=("event_id",), + desc="check_if_events_in_current_state", + ) + + return frozenset(event_id for event_id, in rows) + # FIXME: how should this be cached? @cancellable async def get_partial_filtered_current_state_ids( -- cgit 1.5.1