diff options
Diffstat (limited to 'synapse/state')
-rw-r--r-- | synapse/state/__init__.py | 233 |
1 files changed, 169 insertions, 64 deletions
diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index 5a5ea39e01..31082bb16a 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -13,42 +13,46 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import heapq import logging -from collections import namedtuple +from collections import defaultdict, namedtuple from typing import ( + Any, Awaitable, + Callable, + DefaultDict, Dict, Iterable, List, Optional, Sequence, Set, + Tuple, Union, overload, ) import attr from frozendict import frozendict -from prometheus_client import Histogram +from prometheus_client import Counter, Histogram from typing_extensions import Literal from synapse.api.constants import EventTypes from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, StateResolutionVersions from synapse.events import EventBase from synapse.events.snapshot import EventContext +from synapse.logging.context import ContextResourceUsage from synapse.logging.utils import log_function from synapse.state import v1, v2 from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.storage.roommember import ProfileInfo from synapse.types import Collection, StateMap -from synapse.util import Clock from synapse.util.async_helpers import Linearizer from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.metrics import Measure, measure_func logger = logging.getLogger(__name__) - +metrics_logger = logging.getLogger("synapse.state.metrics") # Metrics for number of state groups involved in a resolution. state_groups_histogram = Histogram( @@ -448,19 +452,44 @@ class StateHandler: state_map = {ev.event_id: ev for st in state_sets for ev in st} - with Measure(self.clock, "state._resolve_events"): - new_state = await resolve_events_with_store( - self.clock, - event.room_id, - room_version, - state_set_ids, - event_map=state_map, - state_res_store=StateResolutionStore(self.store), - ) + new_state = await self._state_resolution_handler.resolve_events_with_store( + event.room_id, + room_version, + state_set_ids, + event_map=state_map, + state_res_store=StateResolutionStore(self.store), + ) return {key: state_map[ev_id] for key, ev_id in new_state.items()} +@attr.s(slots=True) +class _StateResMetrics: + """Keeps track of some usage metrics about state res.""" + + # System and User CPU time, in seconds + cpu_time = attr.ib(type=float, default=0.0) + + # time spent on database transactions (excluding scheduling time). This roughly + # corresponds to the amount of work done on the db server, excluding event fetches. + db_time = attr.ib(type=float, default=0.0) + + # number of events fetched from the db. + db_events = attr.ib(type=int, default=0) + + +_biggest_room_by_cpu_counter = Counter( + "synapse_state_res_cpu_for_biggest_room_seconds", + "CPU time spent performing state resolution for the single most expensive " + "room for state resolution", +) +_biggest_room_by_db_counter = Counter( + "synapse_state_res_db_for_biggest_room_seconds", + "Database time spent performing state resolution for the single most " + "expensive room for state resolution", +) + + class StateResolutionHandler: """Responsible for doing state conflict resolution. @@ -483,6 +512,17 @@ class StateResolutionHandler: reset_expiry_on_get=True, ) + # + # stuff for tracking time spent on state-res by room + # + + # tracks the amount of work done on state res per room + self._state_res_metrics = defaultdict( + _StateResMetrics + ) # type: DefaultDict[str, _StateResMetrics] + + self.clock.looping_call(self._report_metrics, 120 * 1000) + @log_function async def resolve_state_groups( self, @@ -530,15 +570,13 @@ class StateResolutionHandler: state_groups_histogram.observe(len(state_groups_ids)) - with Measure(self.clock, "state._resolve_events"): - new_state = await resolve_events_with_store( - self.clock, - room_id, - room_version, - list(state_groups_ids.values()), - event_map=event_map, - state_res_store=state_res_store, - ) + new_state = await self.resolve_events_with_store( + room_id, + room_version, + list(state_groups_ids.values()), + event_map=event_map, + state_res_store=state_res_store, + ) # if the new state matches any of the input state groups, we can # use that state group again. Otherwise we will generate a state_id @@ -552,6 +590,114 @@ class StateResolutionHandler: return cache + async def resolve_events_with_store( + self, + room_id: str, + room_version: str, + state_sets: Sequence[StateMap[str]], + event_map: Optional[Dict[str, EventBase]], + state_res_store: "StateResolutionStore", + ) -> StateMap[str]: + """ + Args: + room_id: the room we are working in + + room_version: Version of the room + + state_sets: List of dicts of (type, state_key) -> event_id, + which are the different state groups to resolve. + + event_map: + a dict from event_id to event, for any events that we happen to + have in flight (eg, those currently being persisted). This will be + used as a starting point fof finding the state we need; any missing + events will be requested via state_map_factory. + + If None, all events will be fetched via state_res_store. + + state_res_store: a place to fetch events from + + Returns: + a map from (type, state_key) to event_id. + """ + try: + with Measure(self.clock, "state._resolve_events") as m: + v = KNOWN_ROOM_VERSIONS[room_version] + if v.state_res == StateResolutionVersions.V1: + return await v1.resolve_events_with_store( + room_id, state_sets, event_map, state_res_store.get_events + ) + else: + return await v2.resolve_events_with_store( + self.clock, + room_id, + room_version, + state_sets, + event_map, + state_res_store, + ) + finally: + self._record_state_res_metrics(room_id, m.get_resource_usage()) + + def _record_state_res_metrics(self, room_id: str, rusage: ContextResourceUsage): + room_metrics = self._state_res_metrics[room_id] + room_metrics.cpu_time += rusage.ru_utime + rusage.ru_stime + room_metrics.db_time += rusage.db_txn_duration_sec + room_metrics.db_events += rusage.evt_db_fetch_count + + def _report_metrics(self): + if not self._state_res_metrics: + # no state res has happened since the last iteration: don't bother logging. + return + + self._report_biggest( + lambda i: i.cpu_time, "CPU time", _biggest_room_by_cpu_counter, + ) + + self._report_biggest( + lambda i: i.db_time, "DB time", _biggest_room_by_db_counter, + ) + + self._state_res_metrics.clear() + + def _report_biggest( + self, + extract_key: Callable[[_StateResMetrics], Any], + metric_name: str, + prometheus_counter_metric: Counter, + ) -> None: + """Report metrics on the biggest rooms for state res + + Args: + extract_key: a callable which, given a _StateResMetrics, extracts a single + metric to sort by. + metric_name: the name of the metric we have extracted, for the log line + prometheus_counter_metric: a prometheus metric recording the sum of the + the extracted metric + """ + n_to_log = 10 + if not metrics_logger.isEnabledFor(logging.DEBUG): + # only need the most expensive if we don't have debug logging, which + # allows nlargest() to degrade to max() + n_to_log = 1 + + items = self._state_res_metrics.items() + + # log the N biggest rooms + biggest = heapq.nlargest( + n_to_log, items, key=lambda i: extract_key(i[1]) + ) # type: List[Tuple[str, _StateResMetrics]] + metrics_logger.debug( + "%i biggest rooms for state-res by %s: %s", + len(biggest), + metric_name, + ["%s (%gs)" % (r, extract_key(m)) for (r, m) in biggest], + ) + + # report info on the single biggest to prometheus + _, biggest_metrics = biggest[0] + prometheus_counter_metric.inc(extract_key(biggest_metrics)) + def _make_state_cache_entry( new_state: StateMap[str], state_groups_ids: Dict[int, StateMap[str]] @@ -605,47 +751,6 @@ def _make_state_cache_entry( ) -def resolve_events_with_store( - clock: Clock, - room_id: str, - room_version: str, - state_sets: Sequence[StateMap[str]], - event_map: Optional[Dict[str, EventBase]], - state_res_store: "StateResolutionStore", -) -> Awaitable[StateMap[str]]: - """ - Args: - room_id: the room we are working in - - room_version: Version of the room - - state_sets: List of dicts of (type, state_key) -> event_id, - which are the different state groups to resolve. - - event_map: - a dict from event_id to event, for any events that we happen to - have in flight (eg, those currently being persisted). This will be - used as a starting point fof finding the state we need; any missing - events will be requested via state_map_factory. - - If None, all events will be fetched via state_res_store. - - state_res_store: a place to fetch events from - - Returns: - a map from (type, state_key) to event_id. - """ - v = KNOWN_ROOM_VERSIONS[room_version] - if v.state_res == StateResolutionVersions.V1: - return v1.resolve_events_with_store( - room_id, state_sets, event_map, state_res_store.get_events - ) - else: - return v2.resolve_events_with_store( - clock, room_id, room_version, state_sets, event_map, state_res_store - ) - - @attr.s(slots=True) class StateResolutionStore: """Interface that allows state resolution algorithms to access the database |