diff options
author | Richard van der Hoff <1389908+richvdh@users.noreply.github.com> | 2020-09-30 10:37:52 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-30 10:37:52 +0100 |
commit | c429dfc300492ec9477b5169754eddb4e40b5ea2 (patch) | |
tree | c0330c26d607362a7ea6e57e22ad93ae1013ea79 /synapse | |
parent | Various clean ups to room stream tokens. (#8423) (diff) | |
parent | changelog (diff) | |
download | synapse-c429dfc300492ec9477b5169754eddb4e40b5ea2.tar.xz |
Merge pull request #8420 from matrix-org/rav/state_res_stats
Report metrics on expensive rooms for state res
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/handlers/federation.py | 13 | ||||
-rw-r--r-- | synapse/state/__init__.py | 233 | ||||
-rw-r--r-- | synapse/util/metrics.py | 31 |
3 files changed, 198 insertions, 79 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 0073e7c996..1a8144405a 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -21,7 +21,7 @@ import itertools import logging from collections.abc import Container from http import HTTPStatus -from typing import Dict, Iterable, List, Optional, Sequence, Tuple, Union +from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Sequence, Tuple, Union import attr from signedjson.key import decode_verify_key_bytes @@ -69,7 +69,7 @@ from synapse.replication.http.federation import ( ReplicationFederationSendEventsRestServlet, ReplicationStoreRoomOnInviteRestServlet, ) -from synapse.state import StateResolutionStore, resolve_events_with_store +from synapse.state import StateResolutionStore from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.types import ( JsonDict, @@ -85,6 +85,9 @@ from synapse.util.retryutils import NotRetryingDestination from synapse.util.stringutils import shortstr from synapse.visibility import filter_events_for_server +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) @@ -116,7 +119,7 @@ class FederationHandler(BaseHandler): rooms. """ - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): super().__init__(hs) self.hs = hs @@ -126,6 +129,7 @@ class FederationHandler(BaseHandler): self.state_store = self.storage.state self.federation_client = hs.get_federation_client() self.state_handler = hs.get_state_handler() + self._state_resolution_handler = hs.get_state_resolution_handler() self.server_name = hs.hostname self.keyring = hs.get_keyring() self.action_generator = hs.get_action_generator() @@ -381,8 +385,7 @@ class FederationHandler(BaseHandler): event_map[x.event_id] = x room_version = await self.store.get_room_version_id(room_id) - state_map = await resolve_events_with_store( - self.clock, + state_map = await self._state_resolution_handler.resolve_events_with_store( room_id, room_version, state_maps, diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index 5a5ea39e01..31082bb16a 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -13,42 +13,46 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import heapq import logging -from collections import namedtuple +from collections import defaultdict, namedtuple from typing import ( + Any, Awaitable, + Callable, + DefaultDict, Dict, Iterable, List, Optional, Sequence, Set, + Tuple, Union, overload, ) import attr from frozendict import frozendict -from prometheus_client import Histogram +from prometheus_client import Counter, Histogram from typing_extensions import Literal from synapse.api.constants import EventTypes from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, StateResolutionVersions from synapse.events import EventBase from synapse.events.snapshot import EventContext +from synapse.logging.context import ContextResourceUsage from synapse.logging.utils import log_function from synapse.state import v1, v2 from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.storage.roommember import ProfileInfo from synapse.types import Collection, StateMap -from synapse.util import Clock from synapse.util.async_helpers import Linearizer from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.metrics import Measure, measure_func logger = logging.getLogger(__name__) - +metrics_logger = logging.getLogger("synapse.state.metrics") # Metrics for number of state groups involved in a resolution. state_groups_histogram = Histogram( @@ -448,19 +452,44 @@ class StateHandler: state_map = {ev.event_id: ev for st in state_sets for ev in st} - with Measure(self.clock, "state._resolve_events"): - new_state = await resolve_events_with_store( - self.clock, - event.room_id, - room_version, - state_set_ids, - event_map=state_map, - state_res_store=StateResolutionStore(self.store), - ) + new_state = await self._state_resolution_handler.resolve_events_with_store( + event.room_id, + room_version, + state_set_ids, + event_map=state_map, + state_res_store=StateResolutionStore(self.store), + ) return {key: state_map[ev_id] for key, ev_id in new_state.items()} +@attr.s(slots=True) +class _StateResMetrics: + """Keeps track of some usage metrics about state res.""" + + # System and User CPU time, in seconds + cpu_time = attr.ib(type=float, default=0.0) + + # time spent on database transactions (excluding scheduling time). This roughly + # corresponds to the amount of work done on the db server, excluding event fetches. + db_time = attr.ib(type=float, default=0.0) + + # number of events fetched from the db. + db_events = attr.ib(type=int, default=0) + + +_biggest_room_by_cpu_counter = Counter( + "synapse_state_res_cpu_for_biggest_room_seconds", + "CPU time spent performing state resolution for the single most expensive " + "room for state resolution", +) +_biggest_room_by_db_counter = Counter( + "synapse_state_res_db_for_biggest_room_seconds", + "Database time spent performing state resolution for the single most " + "expensive room for state resolution", +) + + class StateResolutionHandler: """Responsible for doing state conflict resolution. @@ -483,6 +512,17 @@ class StateResolutionHandler: reset_expiry_on_get=True, ) + # + # stuff for tracking time spent on state-res by room + # + + # tracks the amount of work done on state res per room + self._state_res_metrics = defaultdict( + _StateResMetrics + ) # type: DefaultDict[str, _StateResMetrics] + + self.clock.looping_call(self._report_metrics, 120 * 1000) + @log_function async def resolve_state_groups( self, @@ -530,15 +570,13 @@ class StateResolutionHandler: state_groups_histogram.observe(len(state_groups_ids)) - with Measure(self.clock, "state._resolve_events"): - new_state = await resolve_events_with_store( - self.clock, - room_id, - room_version, - list(state_groups_ids.values()), - event_map=event_map, - state_res_store=state_res_store, - ) + new_state = await self.resolve_events_with_store( + room_id, + room_version, + list(state_groups_ids.values()), + event_map=event_map, + state_res_store=state_res_store, + ) # if the new state matches any of the input state groups, we can # use that state group again. Otherwise we will generate a state_id @@ -552,6 +590,114 @@ class StateResolutionHandler: return cache + async def resolve_events_with_store( + self, + room_id: str, + room_version: str, + state_sets: Sequence[StateMap[str]], + event_map: Optional[Dict[str, EventBase]], + state_res_store: "StateResolutionStore", + ) -> StateMap[str]: + """ + Args: + room_id: the room we are working in + + room_version: Version of the room + + state_sets: List of dicts of (type, state_key) -> event_id, + which are the different state groups to resolve. + + event_map: + a dict from event_id to event, for any events that we happen to + have in flight (eg, those currently being persisted). This will be + used as a starting point fof finding the state we need; any missing + events will be requested via state_map_factory. + + If None, all events will be fetched via state_res_store. + + state_res_store: a place to fetch events from + + Returns: + a map from (type, state_key) to event_id. + """ + try: + with Measure(self.clock, "state._resolve_events") as m: + v = KNOWN_ROOM_VERSIONS[room_version] + if v.state_res == StateResolutionVersions.V1: + return await v1.resolve_events_with_store( + room_id, state_sets, event_map, state_res_store.get_events + ) + else: + return await v2.resolve_events_with_store( + self.clock, + room_id, + room_version, + state_sets, + event_map, + state_res_store, + ) + finally: + self._record_state_res_metrics(room_id, m.get_resource_usage()) + + def _record_state_res_metrics(self, room_id: str, rusage: ContextResourceUsage): + room_metrics = self._state_res_metrics[room_id] + room_metrics.cpu_time += rusage.ru_utime + rusage.ru_stime + room_metrics.db_time += rusage.db_txn_duration_sec + room_metrics.db_events += rusage.evt_db_fetch_count + + def _report_metrics(self): + if not self._state_res_metrics: + # no state res has happened since the last iteration: don't bother logging. + return + + self._report_biggest( + lambda i: i.cpu_time, "CPU time", _biggest_room_by_cpu_counter, + ) + + self._report_biggest( + lambda i: i.db_time, "DB time", _biggest_room_by_db_counter, + ) + + self._state_res_metrics.clear() + + def _report_biggest( + self, + extract_key: Callable[[_StateResMetrics], Any], + metric_name: str, + prometheus_counter_metric: Counter, + ) -> None: + """Report metrics on the biggest rooms for state res + + Args: + extract_key: a callable which, given a _StateResMetrics, extracts a single + metric to sort by. + metric_name: the name of the metric we have extracted, for the log line + prometheus_counter_metric: a prometheus metric recording the sum of the + the extracted metric + """ + n_to_log = 10 + if not metrics_logger.isEnabledFor(logging.DEBUG): + # only need the most expensive if we don't have debug logging, which + # allows nlargest() to degrade to max() + n_to_log = 1 + + items = self._state_res_metrics.items() + + # log the N biggest rooms + biggest = heapq.nlargest( + n_to_log, items, key=lambda i: extract_key(i[1]) + ) # type: List[Tuple[str, _StateResMetrics]] + metrics_logger.debug( + "%i biggest rooms for state-res by %s: %s", + len(biggest), + metric_name, + ["%s (%gs)" % (r, extract_key(m)) for (r, m) in biggest], + ) + + # report info on the single biggest to prometheus + _, biggest_metrics = biggest[0] + prometheus_counter_metric.inc(extract_key(biggest_metrics)) + def _make_state_cache_entry( new_state: StateMap[str], state_groups_ids: Dict[int, StateMap[str]] @@ -605,47 +751,6 @@ def _make_state_cache_entry( ) -def resolve_events_with_store( - clock: Clock, - room_id: str, - room_version: str, - state_sets: Sequence[StateMap[str]], - event_map: Optional[Dict[str, EventBase]], - state_res_store: "StateResolutionStore", -) -> Awaitable[StateMap[str]]: - """ - Args: - room_id: the room we are working in - - room_version: Version of the room - - state_sets: List of dicts of (type, state_key) -> event_id, - which are the different state groups to resolve. - - event_map: - a dict from event_id to event, for any events that we happen to - have in flight (eg, those currently being persisted). This will be - used as a starting point fof finding the state we need; any missing - events will be requested via state_map_factory. - - If None, all events will be fetched via state_res_store. - - state_res_store: a place to fetch events from - - Returns: - a map from (type, state_key) to event_id. - """ - v = KNOWN_ROOM_VERSIONS[room_version] - if v.state_res == StateResolutionVersions.V1: - return v1.resolve_events_with_store( - room_id, state_sets, event_map, state_res_store.get_events - ) - else: - return v2.resolve_events_with_store( - clock, room_id, room_version, state_sets, event_map, state_res_store - ) - - @attr.s(slots=True) class StateResolutionStore: """Interface that allows state resolution algorithms to access the database diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index 6e57c1ee72..ffdea0de8d 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -19,7 +19,11 @@ from typing import Any, Callable, Optional, TypeVar, cast from prometheus_client import Counter -from synapse.logging.context import LoggingContext, current_context +from synapse.logging.context import ( + ContextResourceUsage, + LoggingContext, + current_context, +) from synapse.metrics import InFlightGauge logger = logging.getLogger(__name__) @@ -104,27 +108,27 @@ class Measure: def __init__(self, clock, name): self.clock = clock self.name = name - self._logging_context = None + parent_context = current_context() + self._logging_context = LoggingContext( + "Measure[%s]" % (self.name,), parent_context + ) self.start = None - def __enter__(self): - if self._logging_context: + def __enter__(self) -> "Measure": + if self.start is not None: raise RuntimeError("Measure() objects cannot be re-used") self.start = self.clock.time() - parent_context = current_context() - self._logging_context = LoggingContext( - "Measure[%s]" % (self.name,), parent_context - ) self._logging_context.__enter__() in_flight.register((self.name,), self._update_in_flight) + return self def __exit__(self, exc_type, exc_val, exc_tb): - if not self._logging_context: + if self.start is None: raise RuntimeError("Measure() block exited without being entered") duration = self.clock.time() - self.start - usage = self._logging_context.get_resource_usage() + usage = self.get_resource_usage() in_flight.unregister((self.name,), self._update_in_flight) self._logging_context.__exit__(exc_type, exc_val, exc_tb) @@ -140,6 +144,13 @@ class Measure: except ValueError: logger.warning("Failed to save metrics! Usage: %s", usage) + def get_resource_usage(self) -> ContextResourceUsage: + """Get the resources used within this Measure block + + If the Measure block is still active, returns the resource usage so far. + """ + return self._logging_context.get_resource_usage() + def _update_in_flight(self, metrics): """Gets called when processing in flight metrics """ |