diff options
author | Eric Eastwood <erice@element.io> | 2023-05-19 12:26:58 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-05-19 12:26:58 -0500 |
commit | 703a8f9c67cfe25b956dfdcca654818d52fa7ebd (patch) | |
tree | b2d15c474c2020c0541083cc20e33344fefe1e7f | |
parent | Trace how many new events from the backfill response we need to process (#15633) (diff) | |
download | synapse-703a8f9c67cfe25b956dfdcca654818d52fa7ebd.tar.xz |
Instrument `state` and `state_group` storage related things (tracing) (#15610)
Instrument `state` and `state_group` storage related things (tracing) so it's a little more clear where these database transactions are coming from as there is a lot of wires crossing in these functions. Part of `/messages` performance investigation: https://github.com/matrix-org/synapse/issues/13356
-rw-r--r-- | changelog.d/15610.misc | 1 | ||||
-rw-r--r-- | synapse/events/snapshot.py | 5 | ||||
-rw-r--r-- | synapse/state/__init__.py | 4 | ||||
-rw-r--r-- | synapse/storage/controllers/state.py | 33 | ||||
-rw-r--r-- | synapse/storage/databases/state/bg_updates.py | 5 | ||||
-rw-r--r-- | synapse/storage/databases/state/store.py | 15 |
6 files changed, 63 insertions, 0 deletions
diff --git a/changelog.d/15610.misc b/changelog.d/15610.misc new file mode 100644 index 0000000000..2eff30f6e3 --- /dev/null +++ b/changelog.d/15610.misc @@ -0,0 +1 @@ +Instrument `state` and `state_group` storage-related operations to better picture what's happening when tracing. diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index 9b4d692cf4..e7e8225b8e 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -19,6 +19,7 @@ from immutabledict import immutabledict from synapse.appservice import ApplicationService from synapse.events import EventBase +from synapse.logging.opentracing import tag_args, trace from synapse.types import JsonDict, StateMap if TYPE_CHECKING: @@ -242,6 +243,8 @@ class EventContext(UnpersistedEventContextBase): return self._state_group + @trace + @tag_args async def get_current_state_ids( self, state_filter: Optional["StateFilter"] = None ) -> Optional[StateMap[str]]: @@ -275,6 +278,8 @@ class EventContext(UnpersistedEventContextBase): return prev_state_ids + @trace + @tag_args async def get_prev_state_ids( self, state_filter: Optional["StateFilter"] = None ) -> StateMap[str]: diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index 6031095249..9bc0c3b7b9 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -45,6 +45,7 @@ from synapse.events.snapshot import ( UnpersistedEventContextBase, ) from synapse.logging.context import ContextResourceUsage +from synapse.logging.opentracing import tag_args, trace from synapse.replication.http.state import ReplicationUpdateCurrentStateRestServlet from synapse.state import v1, v2 from synapse.storage.databases.main.events_worker import EventRedactBehaviour @@ -270,6 +271,8 @@ class StateHandler: state = await entry.get_state(self._state_storage_controller, StateFilter.all()) return await self.store.get_joined_hosts(room_id, state, entry) + @trace + @tag_args async def calculate_context_info( self, event: EventBase, @@ -465,6 +468,7 @@ class StateHandler: return await unpersisted_context.persist(event) + @trace @measure_func() async def resolve_state_groups_for_events( self, room_id: str, event_ids: Collection[str], await_full_state: bool = True diff --git a/synapse/storage/controllers/state.py b/synapse/storage/controllers/state.py index 9d7a8a792f..06a80869eb 100644 --- a/synapse/storage/controllers/state.py +++ b/synapse/storage/controllers/state.py @@ -67,6 +67,8 @@ class StateStorageController: """ self._partial_state_room_tracker.notify_un_partial_stated(room_id) + @trace + @tag_args async def get_state_group_delta( self, state_group: int ) -> Tuple[Optional[int], Optional[StateMap[str]]]: @@ -84,6 +86,8 @@ class StateStorageController: state_group_delta = await self.stores.state.get_state_group_delta(state_group) return state_group_delta.prev_group, state_group_delta.delta_ids + @trace + @tag_args async def get_state_groups_ids( self, _room_id: str, event_ids: Collection[str], await_full_state: bool = True ) -> Dict[int, MutableStateMap[str]]: @@ -114,6 +118,8 @@ class StateStorageController: return group_to_state + @trace + @tag_args async def get_state_ids_for_group( self, state_group: int, state_filter: Optional[StateFilter] = None ) -> StateMap[str]: @@ -130,6 +136,8 @@ class StateStorageController: return group_to_state[state_group] + @trace + @tag_args async def get_state_groups( self, room_id: str, event_ids: Collection[str] ) -> Dict[int, List[EventBase]]: @@ -165,6 +173,8 @@ class StateStorageController: for group, event_id_map in group_to_ids.items() } + @trace + @tag_args def _get_state_groups_from_groups( self, groups: List[int], state_filter: StateFilter ) -> Awaitable[Dict[int, StateMap[str]]]: @@ -183,6 +193,7 @@ class StateStorageController: return self.stores.state._get_state_groups_from_groups(groups, state_filter) @trace + @tag_args async def get_state_for_events( self, event_ids: Collection[str], state_filter: Optional[StateFilter] = None ) -> Dict[str, StateMap[EventBase]]: @@ -280,6 +291,8 @@ class StateStorageController: return {event: event_to_state[event] for event in event_ids} + @trace + @tag_args async def get_state_for_event( self, event_id: str, state_filter: Optional[StateFilter] = None ) -> StateMap[EventBase]: @@ -303,6 +316,7 @@ class StateStorageController: return state_map[event_id] @trace + @tag_args async def get_state_ids_for_event( self, event_id: str, @@ -333,6 +347,8 @@ class StateStorageController: ) return state_map[event_id] + @trace + @tag_args def get_state_for_groups( self, groups: Iterable[int], state_filter: Optional[StateFilter] = None ) -> Awaitable[Dict[int, MutableStateMap[str]]]: @@ -402,6 +418,8 @@ class StateStorageController: event_id, room_id, prev_group, delta_ids, current_state_ids ) + @trace + @tag_args @cancellable async def get_current_state_ids( self, @@ -442,6 +460,8 @@ class StateStorageController: room_id, on_invalidate=on_invalidate ) + @trace + @tag_args async def get_canonical_alias_for_room(self, room_id: str) -> Optional[str]: """Get canonical alias for room, if any @@ -466,6 +486,8 @@ class StateStorageController: return event.content.get("canonical_alias") + @trace + @tag_args async def get_current_state_deltas( self, prev_stream_id: int, max_stream_id: int ) -> Tuple[int, List[Dict[str, Any]]]: @@ -500,6 +522,7 @@ class StateStorageController: ) @trace + @tag_args async def get_current_state( self, room_id: str, state_filter: Optional[StateFilter] = None ) -> StateMap[EventBase]: @@ -516,6 +539,8 @@ class StateStorageController: return state_map + @trace + @tag_args async def get_current_state_event( self, room_id: str, event_type: str, state_key: str ) -> Optional[EventBase]: @@ -527,6 +552,8 @@ class StateStorageController: ) return state_map.get(key) + @trace + @tag_args async def get_current_hosts_in_room(self, room_id: str) -> AbstractSet[str]: """Get current hosts in room based on current state. @@ -538,6 +565,8 @@ class StateStorageController: return await self.stores.main.get_current_hosts_in_room(room_id) + @trace + @tag_args async def get_current_hosts_in_room_ordered(self, room_id: str) -> List[str]: """Get current hosts in room based on current state. @@ -553,6 +582,8 @@ class StateStorageController: return await self.stores.main.get_current_hosts_in_room_ordered(room_id) + @trace + @tag_args async def get_current_hosts_in_room_or_partial_state_approximation( self, room_id: str ) -> Collection[str]: @@ -582,6 +613,8 @@ class StateStorageController: return hosts + @trace + @tag_args async def get_users_in_room_with_profiles( self, room_id: str ) -> Mapping[str, ProfileInfo]: diff --git a/synapse/storage/databases/state/bg_updates.py b/synapse/storage/databases/state/bg_updates.py index 097dea5182..86eb1a8a08 100644 --- a/synapse/storage/databases/state/bg_updates.py +++ b/synapse/storage/databases/state/bg_updates.py @@ -15,6 +15,7 @@ import logging from typing import TYPE_CHECKING, Dict, List, Mapping, Optional, Tuple, Union +from synapse.logging.opentracing import tag_args, trace from synapse.storage._base import SQLBaseStore from synapse.storage.database import ( DatabasePool, @@ -40,6 +41,8 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore): updates. """ + @trace + @tag_args def _count_state_group_hops_txn( self, txn: LoggingTransaction, state_group: int ) -> int: @@ -83,6 +86,8 @@ class StateGroupBackgroundUpdateStore(SQLBaseStore): return count + @trace + @tag_args def _get_state_groups_from_groups_txn( self, txn: LoggingTransaction, diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py index 29ff64e876..6984d11352 100644 --- a/synapse/storage/databases/state/store.py +++ b/synapse/storage/databases/state/store.py @@ -20,6 +20,7 @@ import attr from synapse.api.constants import EventTypes from synapse.events import EventBase from synapse.events.snapshot import UnpersistedEventContext, UnpersistedEventContextBase +from synapse.logging.opentracing import tag_args, trace from synapse.storage._base import SQLBaseStore from synapse.storage.database import ( DatabasePool, @@ -159,6 +160,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): "get_state_group_delta", _get_state_group_delta_txn ) + @trace + @tag_args @cancellable async def _get_state_groups_from_groups( self, groups: List[int], state_filter: StateFilter @@ -187,6 +190,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): return results + @trace + @tag_args def _get_state_for_group_using_cache( self, cache: DictionaryCache[int, StateKey, str], @@ -239,6 +244,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): return state_filter.filter_state(state_dict_ids), not missing_types + @trace + @tag_args @cancellable async def _get_state_for_groups( self, groups: Iterable[int], state_filter: Optional[StateFilter] = None @@ -305,6 +312,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): return state + @trace + @tag_args def _get_state_for_groups_using_cache( self, groups: Iterable[int], @@ -403,6 +412,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): fetched_keys=non_member_types, ) + @trace + @tag_args async def store_state_deltas_for_batched( self, events_and_context: List[Tuple[EventBase, UnpersistedEventContextBase]], @@ -520,6 +531,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): prev_group, ) + @trace + @tag_args async def store_state_group( self, event_id: str, @@ -772,6 +785,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): ((sg,) for sg in state_groups_to_delete), ) + @trace + @tag_args async def get_previous_state_groups( self, state_groups: Iterable[int] ) -> Dict[int, int]: |