From aec294ee0d0f2fa4ccef57085d670b8939de3669 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 14 Sep 2020 12:50:06 -0400 Subject: Use slots in attrs classes where possible (#8296) slots use less memory (and attribute access is faster) while slightly limiting the flexibility of the class attributes. This focuses on objects which are instantiated "often" and for short periods of time. --- synapse/state/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/state/__init__.py') diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index c7e3015b5d..56d6afb863 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -678,7 +678,7 @@ def resolve_events_with_store( ) -@attr.s +@attr.s(slots=True) class StateResolutionStore: """Interface that allows state resolution algorithms to access the database in well defined way. -- cgit 1.5.1 From 91c60f304256c08e8aff53ed13d5b282057277d6 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Wed, 23 Sep 2020 16:42:44 +0100 Subject: Improve logging of state resolution (#8371) I'd like to get a better insight into what we are doing with respect to state res. The list of state groups we are resolving across should be short (if it isn't, that's a massive problem in itself), so it should be fine to log it in ite entiretly. I've done some grepping and found approximately zero cases in which the "shortcut" code delivered the result, so I've ripped that out too. --- changelog.d/8371.misc | 1 + synapse/state/__init__.py | 64 ++++++++++++----------------------------------- 2 files changed, 17 insertions(+), 48 deletions(-) create mode 100644 changelog.d/8371.misc (limited to 'synapse/state/__init__.py') diff --git a/changelog.d/8371.misc b/changelog.d/8371.misc new file mode 100644 index 0000000000..6a54a9496a --- /dev/null +++ b/changelog.d/8371.misc @@ -0,0 +1 @@ +Improve logging of state resolution. diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index 56d6afb863..5a5ea39e01 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -25,7 +25,6 @@ from typing import ( Sequence, Set, Union, - cast, overload, ) @@ -42,7 +41,7 @@ from synapse.logging.utils import log_function from synapse.state import v1, v2 from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.storage.roommember import ProfileInfo -from synapse.types import Collection, MutableStateMap, StateMap +from synapse.types import Collection, StateMap from synapse.util import Clock from synapse.util.async_helpers import Linearizer from synapse.util.caches.expiringcache import ExpiringCache @@ -472,10 +471,9 @@ class StateResolutionHandler: def __init__(self, hs): self.clock = hs.get_clock() - # dict of set of event_ids -> _StateCacheEntry. - self._state_cache = None self.resolve_linearizer = Linearizer(name="state_resolve_lock") + # dict of set of event_ids -> _StateCacheEntry. self._state_cache = ExpiringCache( cache_name="state_cache", clock=self.clock, @@ -519,57 +517,28 @@ class StateResolutionHandler: Returns: The resolved state """ - logger.debug("resolve_state_groups state_groups %s", state_groups_ids.keys()) - group_names = frozenset(state_groups_ids.keys()) with (await self.resolve_linearizer.queue(group_names)): - if self._state_cache is not None: - cache = self._state_cache.get(group_names, None) - if cache: - return cache + cache = self._state_cache.get(group_names, None) + if cache: + return cache logger.info( - "Resolving state for %s with %d groups", room_id, len(state_groups_ids) + "Resolving state for %s with groups %s", room_id, list(group_names), ) state_groups_histogram.observe(len(state_groups_ids)) - # start by assuming we won't have any conflicted state, and build up the new - # state map by iterating through the state groups. If we discover a conflict, - # we give up and instead use `resolve_events_with_store`. - # - # XXX: is this actually worthwhile, or should we just let - # resolve_events_with_store do it? - new_state = {} # type: MutableStateMap[str] - conflicted_state = False - for st in state_groups_ids.values(): - for key, e_id in st.items(): - if key in new_state: - conflicted_state = True - break - new_state[key] = e_id - if conflicted_state: - break - - if conflicted_state: - logger.info("Resolving conflicted state for %r", room_id) - with Measure(self.clock, "state._resolve_events"): - # resolve_events_with_store returns a StateMap, but we can - # treat it as a MutableStateMap as it is above. It isn't - # actually mutated anymore (and is frozen in - # _make_state_cache_entry below). - new_state = cast( - MutableStateMap, - await resolve_events_with_store( - self.clock, - room_id, - room_version, - list(state_groups_ids.values()), - event_map=event_map, - state_res_store=state_res_store, - ), - ) + with Measure(self.clock, "state._resolve_events"): + new_state = await resolve_events_with_store( + self.clock, + room_id, + room_version, + list(state_groups_ids.values()), + event_map=event_map, + state_res_store=state_res_store, + ) # if the new state matches any of the input state groups, we can # use that state group again. Otherwise we will generate a state_id @@ -579,8 +548,7 @@ class StateResolutionHandler: with Measure(self.clock, "state.create_group_ids"): cache = _make_state_cache_entry(new_state, state_groups_ids) - if self._state_cache is not None: - self._state_cache[group_names] = cache + self._state_cache[group_names] = cache return cache -- cgit 1.5.1 From 937393abd81e16c7d4bd4d02fe3c0fafafb9611b Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Mon, 28 Sep 2020 15:20:02 +0100 Subject: Move `resolve_events_with_store` into StateResolutionHandler --- synapse/handlers/federation.py | 13 +++--- synapse/state/__init__.py | 92 +++++++++++++++++++++--------------------- 2 files changed, 55 insertions(+), 50 deletions(-) (limited to 'synapse/state/__init__.py') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 0073e7c996..1a8144405a 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -21,7 +21,7 @@ import itertools import logging from collections.abc import Container from http import HTTPStatus -from typing import Dict, Iterable, List, Optional, Sequence, Tuple, Union +from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Sequence, Tuple, Union import attr from signedjson.key import decode_verify_key_bytes @@ -69,7 +69,7 @@ from synapse.replication.http.federation import ( ReplicationFederationSendEventsRestServlet, ReplicationStoreRoomOnInviteRestServlet, ) -from synapse.state import StateResolutionStore, resolve_events_with_store +from synapse.state import StateResolutionStore from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.types import ( JsonDict, @@ -85,6 +85,9 @@ from synapse.util.retryutils import NotRetryingDestination from synapse.util.stringutils import shortstr from synapse.visibility import filter_events_for_server +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) @@ -116,7 +119,7 @@ class FederationHandler(BaseHandler): rooms. """ - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): super().__init__(hs) self.hs = hs @@ -126,6 +129,7 @@ class FederationHandler(BaseHandler): self.state_store = self.storage.state self.federation_client = hs.get_federation_client() self.state_handler = hs.get_state_handler() + self._state_resolution_handler = hs.get_state_resolution_handler() self.server_name = hs.hostname self.keyring = hs.get_keyring() self.action_generator = hs.get_action_generator() @@ -381,8 +385,7 @@ class FederationHandler(BaseHandler): event_map[x.event_id] = x room_version = await self.store.get_room_version_id(room_id) - state_map = await resolve_events_with_store( - self.clock, + state_map = await self._state_resolution_handler.resolve_events_with_store( room_id, room_version, state_maps, diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index 5a5ea39e01..98ede2ea4f 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -449,8 +449,7 @@ class StateHandler: state_map = {ev.event_id: ev for st in state_sets for ev in st} with Measure(self.clock, "state._resolve_events"): - new_state = await resolve_events_with_store( - self.clock, + new_state = await self._state_resolution_handler.resolve_events_with_store( event.room_id, room_version, state_set_ids, @@ -531,8 +530,7 @@ class StateResolutionHandler: state_groups_histogram.observe(len(state_groups_ids)) with Measure(self.clock, "state._resolve_events"): - new_state = await resolve_events_with_store( - self.clock, + new_state = await self.resolve_events_with_store( room_id, room_version, list(state_groups_ids.values()), @@ -552,6 +550,51 @@ class StateResolutionHandler: return cache + def resolve_events_with_store( + self, + room_id: str, + room_version: str, + state_sets: Sequence[StateMap[str]], + event_map: Optional[Dict[str, EventBase]], + state_res_store: "StateResolutionStore", + ) -> Awaitable[StateMap[str]]: + """ + Args: + room_id: the room we are working in + + room_version: Version of the room + + state_sets: List of dicts of (type, state_key) -> event_id, + which are the different state groups to resolve. + + event_map: + a dict from event_id to event, for any events that we happen to + have in flight (eg, those currently being persisted). This will be + used as a starting point fof finding the state we need; any missing + events will be requested via state_map_factory. + + If None, all events will be fetched via state_res_store. + + state_res_store: a place to fetch events from + + Returns: + a map from (type, state_key) to event_id. + """ + v = KNOWN_ROOM_VERSIONS[room_version] + if v.state_res == StateResolutionVersions.V1: + return v1.resolve_events_with_store( + room_id, state_sets, event_map, state_res_store.get_events + ) + else: + return v2.resolve_events_with_store( + self.clock, + room_id, + room_version, + state_sets, + event_map, + state_res_store, + ) + def _make_state_cache_entry( new_state: StateMap[str], state_groups_ids: Dict[int, StateMap[str]] @@ -605,47 +648,6 @@ def _make_state_cache_entry( ) -def resolve_events_with_store( - clock: Clock, - room_id: str, - room_version: str, - state_sets: Sequence[StateMap[str]], - event_map: Optional[Dict[str, EventBase]], - state_res_store: "StateResolutionStore", -) -> Awaitable[StateMap[str]]: - """ - Args: - room_id: the room we are working in - - room_version: Version of the room - - state_sets: List of dicts of (type, state_key) -> event_id, - which are the different state groups to resolve. - - event_map: - a dict from event_id to event, for any events that we happen to - have in flight (eg, those currently being persisted). This will be - used as a starting point fof finding the state we need; any missing - events will be requested via state_map_factory. - - If None, all events will be fetched via state_res_store. - - state_res_store: a place to fetch events from - - Returns: - a map from (type, state_key) to event_id. - """ - v = KNOWN_ROOM_VERSIONS[room_version] - if v.state_res == StateResolutionVersions.V1: - return v1.resolve_events_with_store( - room_id, state_sets, event_map, state_res_store.get_events - ) - else: - return v2.resolve_events_with_store( - clock, room_id, room_version, state_sets, event_map, state_res_store - ) - - @attr.s(slots=True) class StateResolutionStore: """Interface that allows state resolution algorithms to access the database -- cgit 1.5.1 From 8412c08a87d35fc127f53063c8ede215237a042a Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 29 Sep 2020 13:07:09 +0100 Subject: Move Measure calls into `resolve_events_with_store` --- synapse/state/__init__.py | 63 +++++++++++++++++++++++------------------------ 1 file changed, 31 insertions(+), 32 deletions(-) (limited to 'synapse/state/__init__.py') diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index 98ede2ea4f..b99cf2d8cd 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -448,14 +448,13 @@ class StateHandler: state_map = {ev.event_id: ev for st in state_sets for ev in st} - with Measure(self.clock, "state._resolve_events"): - new_state = await self._state_resolution_handler.resolve_events_with_store( - event.room_id, - room_version, - state_set_ids, - event_map=state_map, - state_res_store=StateResolutionStore(self.store), - ) + new_state = await self._state_resolution_handler.resolve_events_with_store( + event.room_id, + room_version, + state_set_ids, + event_map=state_map, + state_res_store=StateResolutionStore(self.store), + ) return {key: state_map[ev_id] for key, ev_id in new_state.items()} @@ -529,14 +528,13 @@ class StateResolutionHandler: state_groups_histogram.observe(len(state_groups_ids)) - with Measure(self.clock, "state._resolve_events"): - new_state = await self.resolve_events_with_store( - room_id, - room_version, - list(state_groups_ids.values()), - event_map=event_map, - state_res_store=state_res_store, - ) + new_state = await self.resolve_events_with_store( + room_id, + room_version, + list(state_groups_ids.values()), + event_map=event_map, + state_res_store=state_res_store, + ) # if the new state matches any of the input state groups, we can # use that state group again. Otherwise we will generate a state_id @@ -550,14 +548,14 @@ class StateResolutionHandler: return cache - def resolve_events_with_store( + async def resolve_events_with_store( self, room_id: str, room_version: str, state_sets: Sequence[StateMap[str]], event_map: Optional[Dict[str, EventBase]], state_res_store: "StateResolutionStore", - ) -> Awaitable[StateMap[str]]: + ) -> StateMap[str]: """ Args: room_id: the room we are working in @@ -580,20 +578,21 @@ class StateResolutionHandler: Returns: a map from (type, state_key) to event_id. """ - v = KNOWN_ROOM_VERSIONS[room_version] - if v.state_res == StateResolutionVersions.V1: - return v1.resolve_events_with_store( - room_id, state_sets, event_map, state_res_store.get_events - ) - else: - return v2.resolve_events_with_store( - self.clock, - room_id, - room_version, - state_sets, - event_map, - state_res_store, - ) + with Measure(self.clock, "state._resolve_events"): + v = KNOWN_ROOM_VERSIONS[room_version] + if v.state_res == StateResolutionVersions.V1: + return await v1.resolve_events_with_store( + room_id, state_sets, event_map, state_res_store.get_events + ) + else: + return await v2.resolve_events_with_store( + self.clock, + room_id, + room_version, + state_sets, + event_map, + state_res_store, + ) def _make_state_cache_entry( -- cgit 1.5.1 From 057f04fa9fb5134621dff19c758b38fe253ff8a8 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 29 Sep 2020 13:07:45 +0100 Subject: Report state res metrics to Prometheus and log --- synapse/state/__init__.py | 144 +++++++++++++++++++++++++++++++++++++++------- 1 file changed, 124 insertions(+), 20 deletions(-) (limited to 'synapse/state/__init__.py') diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index b99cf2d8cd..31082bb16a 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -13,42 +13,46 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import heapq import logging -from collections import namedtuple +from collections import defaultdict, namedtuple from typing import ( + Any, Awaitable, + Callable, + DefaultDict, Dict, Iterable, List, Optional, Sequence, Set, + Tuple, Union, overload, ) import attr from frozendict import frozendict -from prometheus_client import Histogram +from prometheus_client import Counter, Histogram from typing_extensions import Literal from synapse.api.constants import EventTypes from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, StateResolutionVersions from synapse.events import EventBase from synapse.events.snapshot import EventContext +from synapse.logging.context import ContextResourceUsage from synapse.logging.utils import log_function from synapse.state import v1, v2 from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.storage.roommember import ProfileInfo from synapse.types import Collection, StateMap -from synapse.util import Clock from synapse.util.async_helpers import Linearizer from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.metrics import Measure, measure_func logger = logging.getLogger(__name__) - +metrics_logger = logging.getLogger("synapse.state.metrics") # Metrics for number of state groups involved in a resolution. state_groups_histogram = Histogram( @@ -459,6 +463,33 @@ class StateHandler: return {key: state_map[ev_id] for key, ev_id in new_state.items()} +@attr.s(slots=True) +class _StateResMetrics: + """Keeps track of some usage metrics about state res.""" + + # System and User CPU time, in seconds + cpu_time = attr.ib(type=float, default=0.0) + + # time spent on database transactions (excluding scheduling time). This roughly + # corresponds to the amount of work done on the db server, excluding event fetches. + db_time = attr.ib(type=float, default=0.0) + + # number of events fetched from the db. + db_events = attr.ib(type=int, default=0) + + +_biggest_room_by_cpu_counter = Counter( + "synapse_state_res_cpu_for_biggest_room_seconds", + "CPU time spent performing state resolution for the single most expensive " + "room for state resolution", +) +_biggest_room_by_db_counter = Counter( + "synapse_state_res_db_for_biggest_room_seconds", + "Database time spent performing state resolution for the single most " + "expensive room for state resolution", +) + + class StateResolutionHandler: """Responsible for doing state conflict resolution. @@ -481,6 +512,17 @@ class StateResolutionHandler: reset_expiry_on_get=True, ) + # + # stuff for tracking time spent on state-res by room + # + + # tracks the amount of work done on state res per room + self._state_res_metrics = defaultdict( + _StateResMetrics + ) # type: DefaultDict[str, _StateResMetrics] + + self.clock.looping_call(self._report_metrics, 120 * 1000) + @log_function async def resolve_state_groups( self, @@ -578,21 +620,83 @@ class StateResolutionHandler: Returns: a map from (type, state_key) to event_id. """ - with Measure(self.clock, "state._resolve_events"): - v = KNOWN_ROOM_VERSIONS[room_version] - if v.state_res == StateResolutionVersions.V1: - return await v1.resolve_events_with_store( - room_id, state_sets, event_map, state_res_store.get_events - ) - else: - return await v2.resolve_events_with_store( - self.clock, - room_id, - room_version, - state_sets, - event_map, - state_res_store, - ) + try: + with Measure(self.clock, "state._resolve_events") as m: + v = KNOWN_ROOM_VERSIONS[room_version] + if v.state_res == StateResolutionVersions.V1: + return await v1.resolve_events_with_store( + room_id, state_sets, event_map, state_res_store.get_events + ) + else: + return await v2.resolve_events_with_store( + self.clock, + room_id, + room_version, + state_sets, + event_map, + state_res_store, + ) + finally: + self._record_state_res_metrics(room_id, m.get_resource_usage()) + + def _record_state_res_metrics(self, room_id: str, rusage: ContextResourceUsage): + room_metrics = self._state_res_metrics[room_id] + room_metrics.cpu_time += rusage.ru_utime + rusage.ru_stime + room_metrics.db_time += rusage.db_txn_duration_sec + room_metrics.db_events += rusage.evt_db_fetch_count + + def _report_metrics(self): + if not self._state_res_metrics: + # no state res has happened since the last iteration: don't bother logging. + return + + self._report_biggest( + lambda i: i.cpu_time, "CPU time", _biggest_room_by_cpu_counter, + ) + + self._report_biggest( + lambda i: i.db_time, "DB time", _biggest_room_by_db_counter, + ) + + self._state_res_metrics.clear() + + def _report_biggest( + self, + extract_key: Callable[[_StateResMetrics], Any], + metric_name: str, + prometheus_counter_metric: Counter, + ) -> None: + """Report metrics on the biggest rooms for state res + + Args: + extract_key: a callable which, given a _StateResMetrics, extracts a single + metric to sort by. + metric_name: the name of the metric we have extracted, for the log line + prometheus_counter_metric: a prometheus metric recording the sum of the + the extracted metric + """ + n_to_log = 10 + if not metrics_logger.isEnabledFor(logging.DEBUG): + # only need the most expensive if we don't have debug logging, which + # allows nlargest() to degrade to max() + n_to_log = 1 + + items = self._state_res_metrics.items() + + # log the N biggest rooms + biggest = heapq.nlargest( + n_to_log, items, key=lambda i: extract_key(i[1]) + ) # type: List[Tuple[str, _StateResMetrics]] + metrics_logger.debug( + "%i biggest rooms for state-res by %s: %s", + len(biggest), + metric_name, + ["%s (%gs)" % (r, extract_key(m)) for (r, m) in biggest], + ) + + # report info on the single biggest to prometheus + _, biggest_metrics = biggest[0] + prometheus_counter_metric.inc(extract_key(biggest_metrics)) def _make_state_cache_entry( -- cgit 1.5.1