summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorRichard van der Hoff <1389908+richvdh@users.noreply.github.com>2020-09-30 10:37:52 +0100
committerGitHub <noreply@github.com>2020-09-30 10:37:52 +0100
commitc429dfc300492ec9477b5169754eddb4e40b5ea2 (patch)
treec0330c26d607362a7ea6e57e22ad93ae1013ea79 /synapse
parentVarious clean ups to room stream tokens. (#8423) (diff)
parentchangelog (diff)
downloadsynapse-c429dfc300492ec9477b5169754eddb4e40b5ea2.tar.xz
Merge pull request #8420 from matrix-org/rav/state_res_stats
Report metrics on expensive rooms for state res
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/federation.py13
-rw-r--r--synapse/state/__init__.py233
-rw-r--r--synapse/util/metrics.py31
3 files changed, 198 insertions, 79 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 0073e7c996..1a8144405a 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -21,7 +21,7 @@ import itertools
 import logging
 from collections.abc import Container
 from http import HTTPStatus
-from typing import Dict, Iterable, List, Optional, Sequence, Tuple, Union
+from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Sequence, Tuple, Union
 
 import attr
 from signedjson.key import decode_verify_key_bytes
@@ -69,7 +69,7 @@ from synapse.replication.http.federation import (
     ReplicationFederationSendEventsRestServlet,
     ReplicationStoreRoomOnInviteRestServlet,
 )
-from synapse.state import StateResolutionStore, resolve_events_with_store
+from synapse.state import StateResolutionStore
 from synapse.storage.databases.main.events_worker import EventRedactBehaviour
 from synapse.types import (
     JsonDict,
@@ -85,6 +85,9 @@ from synapse.util.retryutils import NotRetryingDestination
 from synapse.util.stringutils import shortstr
 from synapse.visibility import filter_events_for_server
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
 logger = logging.getLogger(__name__)
 
 
@@ -116,7 +119,7 @@ class FederationHandler(BaseHandler):
         rooms.
     """
 
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         super().__init__(hs)
 
         self.hs = hs
@@ -126,6 +129,7 @@ class FederationHandler(BaseHandler):
         self.state_store = self.storage.state
         self.federation_client = hs.get_federation_client()
         self.state_handler = hs.get_state_handler()
+        self._state_resolution_handler = hs.get_state_resolution_handler()
         self.server_name = hs.hostname
         self.keyring = hs.get_keyring()
         self.action_generator = hs.get_action_generator()
@@ -381,8 +385,7 @@ class FederationHandler(BaseHandler):
                                 event_map[x.event_id] = x
 
                     room_version = await self.store.get_room_version_id(room_id)
-                    state_map = await resolve_events_with_store(
-                        self.clock,
+                    state_map = await self._state_resolution_handler.resolve_events_with_store(
                         room_id,
                         room_version,
                         state_maps,
diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py
index 5a5ea39e01..31082bb16a 100644
--- a/synapse/state/__init__.py
+++ b/synapse/state/__init__.py
@@ -13,42 +13,46 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
+import heapq
 import logging
-from collections import namedtuple
+from collections import defaultdict, namedtuple
 from typing import (
+    Any,
     Awaitable,
+    Callable,
+    DefaultDict,
     Dict,
     Iterable,
     List,
     Optional,
     Sequence,
     Set,
+    Tuple,
     Union,
     overload,
 )
 
 import attr
 from frozendict import frozendict
-from prometheus_client import Histogram
+from prometheus_client import Counter, Histogram
 from typing_extensions import Literal
 
 from synapse.api.constants import EventTypes
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, StateResolutionVersions
 from synapse.events import EventBase
 from synapse.events.snapshot import EventContext
+from synapse.logging.context import ContextResourceUsage
 from synapse.logging.utils import log_function
 from synapse.state import v1, v2
 from synapse.storage.databases.main.events_worker import EventRedactBehaviour
 from synapse.storage.roommember import ProfileInfo
 from synapse.types import Collection, StateMap
-from synapse.util import Clock
 from synapse.util.async_helpers import Linearizer
 from synapse.util.caches.expiringcache import ExpiringCache
 from synapse.util.metrics import Measure, measure_func
 
 logger = logging.getLogger(__name__)
-
+metrics_logger = logging.getLogger("synapse.state.metrics")
 
 # Metrics for number of state groups involved in a resolution.
 state_groups_histogram = Histogram(
@@ -448,19 +452,44 @@ class StateHandler:
 
         state_map = {ev.event_id: ev for st in state_sets for ev in st}
 
-        with Measure(self.clock, "state._resolve_events"):
-            new_state = await resolve_events_with_store(
-                self.clock,
-                event.room_id,
-                room_version,
-                state_set_ids,
-                event_map=state_map,
-                state_res_store=StateResolutionStore(self.store),
-            )
+        new_state = await self._state_resolution_handler.resolve_events_with_store(
+            event.room_id,
+            room_version,
+            state_set_ids,
+            event_map=state_map,
+            state_res_store=StateResolutionStore(self.store),
+        )
 
         return {key: state_map[ev_id] for key, ev_id in new_state.items()}
 
 
+@attr.s(slots=True)
+class _StateResMetrics:
+    """Keeps track of some usage metrics about state res."""
+
+    # System and User CPU time, in seconds
+    cpu_time = attr.ib(type=float, default=0.0)
+
+    # time spent on database transactions (excluding scheduling time). This roughly
+    # corresponds to the amount of work done on the db server, excluding event fetches.
+    db_time = attr.ib(type=float, default=0.0)
+
+    # number of events fetched from the db.
+    db_events = attr.ib(type=int, default=0)
+
+
+_biggest_room_by_cpu_counter = Counter(
+    "synapse_state_res_cpu_for_biggest_room_seconds",
+    "CPU time spent performing state resolution for the single most expensive "
+    "room for state resolution",
+)
+_biggest_room_by_db_counter = Counter(
+    "synapse_state_res_db_for_biggest_room_seconds",
+    "Database time spent performing state resolution for the single most "
+    "expensive room for state resolution",
+)
+
+
 class StateResolutionHandler:
     """Responsible for doing state conflict resolution.
 
@@ -483,6 +512,17 @@ class StateResolutionHandler:
             reset_expiry_on_get=True,
         )
 
+        #
+        # stuff for tracking time spent on state-res by room
+        #
+
+        # tracks the amount of work done on state res per room
+        self._state_res_metrics = defaultdict(
+            _StateResMetrics
+        )  # type: DefaultDict[str, _StateResMetrics]
+
+        self.clock.looping_call(self._report_metrics, 120 * 1000)
+
     @log_function
     async def resolve_state_groups(
         self,
@@ -530,15 +570,13 @@ class StateResolutionHandler:
 
             state_groups_histogram.observe(len(state_groups_ids))
 
-            with Measure(self.clock, "state._resolve_events"):
-                new_state = await resolve_events_with_store(
-                    self.clock,
-                    room_id,
-                    room_version,
-                    list(state_groups_ids.values()),
-                    event_map=event_map,
-                    state_res_store=state_res_store,
-                )
+            new_state = await self.resolve_events_with_store(
+                room_id,
+                room_version,
+                list(state_groups_ids.values()),
+                event_map=event_map,
+                state_res_store=state_res_store,
+            )
 
             # if the new state matches any of the input state groups, we can
             # use that state group again. Otherwise we will generate a state_id
@@ -552,6 +590,114 @@ class StateResolutionHandler:
 
             return cache
 
+    async def resolve_events_with_store(
+        self,
+        room_id: str,
+        room_version: str,
+        state_sets: Sequence[StateMap[str]],
+        event_map: Optional[Dict[str, EventBase]],
+        state_res_store: "StateResolutionStore",
+    ) -> StateMap[str]:
+        """
+        Args:
+            room_id: the room we are working in
+
+            room_version: Version of the room
+
+            state_sets: List of dicts of (type, state_key) -> event_id,
+                which are the different state groups to resolve.
+
+            event_map:
+                a dict from event_id to event, for any events that we happen to
+                have in flight (eg, those currently being persisted). This will be
+                used as a starting point fof finding the state we need; any missing
+                events will be requested via state_map_factory.
+
+                If None, all events will be fetched via state_res_store.
+
+            state_res_store: a place to fetch events from
+
+        Returns:
+            a map from (type, state_key) to event_id.
+        """
+        try:
+            with Measure(self.clock, "state._resolve_events") as m:
+                v = KNOWN_ROOM_VERSIONS[room_version]
+                if v.state_res == StateResolutionVersions.V1:
+                    return await v1.resolve_events_with_store(
+                        room_id, state_sets, event_map, state_res_store.get_events
+                    )
+                else:
+                    return await v2.resolve_events_with_store(
+                        self.clock,
+                        room_id,
+                        room_version,
+                        state_sets,
+                        event_map,
+                        state_res_store,
+                    )
+        finally:
+            self._record_state_res_metrics(room_id, m.get_resource_usage())
+
+    def _record_state_res_metrics(self, room_id: str, rusage: ContextResourceUsage):
+        room_metrics = self._state_res_metrics[room_id]
+        room_metrics.cpu_time += rusage.ru_utime + rusage.ru_stime
+        room_metrics.db_time += rusage.db_txn_duration_sec
+        room_metrics.db_events += rusage.evt_db_fetch_count
+
+    def _report_metrics(self):
+        if not self._state_res_metrics:
+            # no state res has happened since the last iteration: don't bother logging.
+            return
+
+        self._report_biggest(
+            lambda i: i.cpu_time, "CPU time", _biggest_room_by_cpu_counter,
+        )
+
+        self._report_biggest(
+            lambda i: i.db_time, "DB time", _biggest_room_by_db_counter,
+        )
+
+        self._state_res_metrics.clear()
+
+    def _report_biggest(
+        self,
+        extract_key: Callable[[_StateResMetrics], Any],
+        metric_name: str,
+        prometheus_counter_metric: Counter,
+    ) -> None:
+        """Report metrics on the biggest rooms for state res
+
+        Args:
+            extract_key: a callable which, given a _StateResMetrics, extracts a single
+                metric to sort by.
+            metric_name: the name of the metric we have extracted, for the log line
+            prometheus_counter_metric: a prometheus metric recording the sum of the
+                the extracted metric
+        """
+        n_to_log = 10
+        if not metrics_logger.isEnabledFor(logging.DEBUG):
+            # only need the most expensive if we don't have debug logging, which
+            # allows nlargest() to degrade to max()
+            n_to_log = 1
+
+        items = self._state_res_metrics.items()
+
+        # log the N biggest rooms
+        biggest = heapq.nlargest(
+            n_to_log, items, key=lambda i: extract_key(i[1])
+        )  # type: List[Tuple[str, _StateResMetrics]]
+        metrics_logger.debug(
+            "%i biggest rooms for state-res by %s: %s",
+            len(biggest),
+            metric_name,
+            ["%s (%gs)" % (r, extract_key(m)) for (r, m) in biggest],
+        )
+
+        # report info on the single biggest to prometheus
+        _, biggest_metrics = biggest[0]
+        prometheus_counter_metric.inc(extract_key(biggest_metrics))
+
 
 def _make_state_cache_entry(
     new_state: StateMap[str], state_groups_ids: Dict[int, StateMap[str]]
@@ -605,47 +751,6 @@ def _make_state_cache_entry(
     )
 
 
-def resolve_events_with_store(
-    clock: Clock,
-    room_id: str,
-    room_version: str,
-    state_sets: Sequence[StateMap[str]],
-    event_map: Optional[Dict[str, EventBase]],
-    state_res_store: "StateResolutionStore",
-) -> Awaitable[StateMap[str]]:
-    """
-    Args:
-        room_id: the room we are working in
-
-        room_version: Version of the room
-
-        state_sets: List of dicts of (type, state_key) -> event_id,
-            which are the different state groups to resolve.
-
-        event_map:
-            a dict from event_id to event, for any events that we happen to
-            have in flight (eg, those currently being persisted). This will be
-            used as a starting point fof finding the state we need; any missing
-            events will be requested via state_map_factory.
-
-            If None, all events will be fetched via state_res_store.
-
-        state_res_store: a place to fetch events from
-
-    Returns:
-        a map from (type, state_key) to event_id.
-    """
-    v = KNOWN_ROOM_VERSIONS[room_version]
-    if v.state_res == StateResolutionVersions.V1:
-        return v1.resolve_events_with_store(
-            room_id, state_sets, event_map, state_res_store.get_events
-        )
-    else:
-        return v2.resolve_events_with_store(
-            clock, room_id, room_version, state_sets, event_map, state_res_store
-        )
-
-
 @attr.s(slots=True)
 class StateResolutionStore:
     """Interface that allows state resolution algorithms to access the database
diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py
index 6e57c1ee72..ffdea0de8d 100644
--- a/synapse/util/metrics.py
+++ b/synapse/util/metrics.py
@@ -19,7 +19,11 @@ from typing import Any, Callable, Optional, TypeVar, cast
 
 from prometheus_client import Counter
 
-from synapse.logging.context import LoggingContext, current_context
+from synapse.logging.context import (
+    ContextResourceUsage,
+    LoggingContext,
+    current_context,
+)
 from synapse.metrics import InFlightGauge
 
 logger = logging.getLogger(__name__)
@@ -104,27 +108,27 @@ class Measure:
     def __init__(self, clock, name):
         self.clock = clock
         self.name = name
-        self._logging_context = None
+        parent_context = current_context()
+        self._logging_context = LoggingContext(
+            "Measure[%s]" % (self.name,), parent_context
+        )
         self.start = None
 
-    def __enter__(self):
-        if self._logging_context:
+    def __enter__(self) -> "Measure":
+        if self.start is not None:
             raise RuntimeError("Measure() objects cannot be re-used")
 
         self.start = self.clock.time()
-        parent_context = current_context()
-        self._logging_context = LoggingContext(
-            "Measure[%s]" % (self.name,), parent_context
-        )
         self._logging_context.__enter__()
         in_flight.register((self.name,), self._update_in_flight)
+        return self
 
     def __exit__(self, exc_type, exc_val, exc_tb):
-        if not self._logging_context:
+        if self.start is None:
             raise RuntimeError("Measure() block exited without being entered")
 
         duration = self.clock.time() - self.start
-        usage = self._logging_context.get_resource_usage()
+        usage = self.get_resource_usage()
 
         in_flight.unregister((self.name,), self._update_in_flight)
         self._logging_context.__exit__(exc_type, exc_val, exc_tb)
@@ -140,6 +144,13 @@ class Measure:
         except ValueError:
             logger.warning("Failed to save metrics! Usage: %s", usage)
 
+    def get_resource_usage(self) -> ContextResourceUsage:
+        """Get the resources used within this Measure block
+
+        If the Measure block is still active, returns the resource usage so far.
+        """
+        return self._logging_context.get_resource_usage()
+
     def _update_in_flight(self, metrics):
         """Gets called when processing in flight metrics
         """