diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 0073e7c996..1a8144405a 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -21,7 +21,7 @@ import itertools
import logging
from collections.abc import Container
from http import HTTPStatus
-from typing import Dict, Iterable, List, Optional, Sequence, Tuple, Union
+from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Sequence, Tuple, Union
import attr
from signedjson.key import decode_verify_key_bytes
@@ -69,7 +69,7 @@ from synapse.replication.http.federation import (
ReplicationFederationSendEventsRestServlet,
ReplicationStoreRoomOnInviteRestServlet,
)
-from synapse.state import StateResolutionStore, resolve_events_with_store
+from synapse.state import StateResolutionStore
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.types import (
JsonDict,
@@ -85,6 +85,9 @@ from synapse.util.retryutils import NotRetryingDestination
from synapse.util.stringutils import shortstr
from synapse.visibility import filter_events_for_server
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+
logger = logging.getLogger(__name__)
@@ -116,7 +119,7 @@ class FederationHandler(BaseHandler):
rooms.
"""
- def __init__(self, hs):
+ def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self.hs = hs
@@ -126,6 +129,7 @@ class FederationHandler(BaseHandler):
self.state_store = self.storage.state
self.federation_client = hs.get_federation_client()
self.state_handler = hs.get_state_handler()
+ self._state_resolution_handler = hs.get_state_resolution_handler()
self.server_name = hs.hostname
self.keyring = hs.get_keyring()
self.action_generator = hs.get_action_generator()
@@ -381,8 +385,7 @@ class FederationHandler(BaseHandler):
event_map[x.event_id] = x
room_version = await self.store.get_room_version_id(room_id)
- state_map = await resolve_events_with_store(
- self.clock,
+ state_map = await self._state_resolution_handler.resolve_events_with_store(
room_id,
room_version,
state_maps,
diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py
index 5a5ea39e01..31082bb16a 100644
--- a/synapse/state/__init__.py
+++ b/synapse/state/__init__.py
@@ -13,42 +13,46 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
+import heapq
import logging
-from collections import namedtuple
+from collections import defaultdict, namedtuple
from typing import (
+ Any,
Awaitable,
+ Callable,
+ DefaultDict,
Dict,
Iterable,
List,
Optional,
Sequence,
Set,
+ Tuple,
Union,
overload,
)
import attr
from frozendict import frozendict
-from prometheus_client import Histogram
+from prometheus_client import Counter, Histogram
from typing_extensions import Literal
from synapse.api.constants import EventTypes
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, StateResolutionVersions
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
+from synapse.logging.context import ContextResourceUsage
from synapse.logging.utils import log_function
from synapse.state import v1, v2
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.storage.roommember import ProfileInfo
from synapse.types import Collection, StateMap
-from synapse.util import Clock
from synapse.util.async_helpers import Linearizer
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.metrics import Measure, measure_func
logger = logging.getLogger(__name__)
-
+metrics_logger = logging.getLogger("synapse.state.metrics")
# Metrics for number of state groups involved in a resolution.
state_groups_histogram = Histogram(
@@ -448,19 +452,44 @@ class StateHandler:
state_map = {ev.event_id: ev for st in state_sets for ev in st}
- with Measure(self.clock, "state._resolve_events"):
- new_state = await resolve_events_with_store(
- self.clock,
- event.room_id,
- room_version,
- state_set_ids,
- event_map=state_map,
- state_res_store=StateResolutionStore(self.store),
- )
+ new_state = await self._state_resolution_handler.resolve_events_with_store(
+ event.room_id,
+ room_version,
+ state_set_ids,
+ event_map=state_map,
+ state_res_store=StateResolutionStore(self.store),
+ )
return {key: state_map[ev_id] for key, ev_id in new_state.items()}
+@attr.s(slots=True)
+class _StateResMetrics:
+ """Keeps track of some usage metrics about state res."""
+
+ # System and User CPU time, in seconds
+ cpu_time = attr.ib(type=float, default=0.0)
+
+ # time spent on database transactions (excluding scheduling time). This roughly
+ # corresponds to the amount of work done on the db server, excluding event fetches.
+ db_time = attr.ib(type=float, default=0.0)
+
+ # number of events fetched from the db.
+ db_events = attr.ib(type=int, default=0)
+
+
+_biggest_room_by_cpu_counter = Counter(
+ "synapse_state_res_cpu_for_biggest_room_seconds",
+ "CPU time spent performing state resolution for the single most expensive "
+ "room for state resolution",
+)
+_biggest_room_by_db_counter = Counter(
+ "synapse_state_res_db_for_biggest_room_seconds",
+ "Database time spent performing state resolution for the single most "
+ "expensive room for state resolution",
+)
+
+
class StateResolutionHandler:
"""Responsible for doing state conflict resolution.
@@ -483,6 +512,17 @@ class StateResolutionHandler:
reset_expiry_on_get=True,
)
+ #
+ # stuff for tracking time spent on state-res by room
+ #
+
+ # tracks the amount of work done on state res per room
+ self._state_res_metrics = defaultdict(
+ _StateResMetrics
+ ) # type: DefaultDict[str, _StateResMetrics]
+
+ self.clock.looping_call(self._report_metrics, 120 * 1000)
+
@log_function
async def resolve_state_groups(
self,
@@ -530,15 +570,13 @@ class StateResolutionHandler:
state_groups_histogram.observe(len(state_groups_ids))
- with Measure(self.clock, "state._resolve_events"):
- new_state = await resolve_events_with_store(
- self.clock,
- room_id,
- room_version,
- list(state_groups_ids.values()),
- event_map=event_map,
- state_res_store=state_res_store,
- )
+ new_state = await self.resolve_events_with_store(
+ room_id,
+ room_version,
+ list(state_groups_ids.values()),
+ event_map=event_map,
+ state_res_store=state_res_store,
+ )
# if the new state matches any of the input state groups, we can
# use that state group again. Otherwise we will generate a state_id
@@ -552,6 +590,114 @@ class StateResolutionHandler:
return cache
+ async def resolve_events_with_store(
+ self,
+ room_id: str,
+ room_version: str,
+ state_sets: Sequence[StateMap[str]],
+ event_map: Optional[Dict[str, EventBase]],
+ state_res_store: "StateResolutionStore",
+ ) -> StateMap[str]:
+ """
+ Args:
+ room_id: the room we are working in
+
+ room_version: Version of the room
+
+ state_sets: List of dicts of (type, state_key) -> event_id,
+ which are the different state groups to resolve.
+
+ event_map:
+ a dict from event_id to event, for any events that we happen to
+ have in flight (eg, those currently being persisted). This will be
+ used as a starting point fof finding the state we need; any missing
+ events will be requested via state_map_factory.
+
+ If None, all events will be fetched via state_res_store.
+
+ state_res_store: a place to fetch events from
+
+ Returns:
+ a map from (type, state_key) to event_id.
+ """
+ try:
+ with Measure(self.clock, "state._resolve_events") as m:
+ v = KNOWN_ROOM_VERSIONS[room_version]
+ if v.state_res == StateResolutionVersions.V1:
+ return await v1.resolve_events_with_store(
+ room_id, state_sets, event_map, state_res_store.get_events
+ )
+ else:
+ return await v2.resolve_events_with_store(
+ self.clock,
+ room_id,
+ room_version,
+ state_sets,
+ event_map,
+ state_res_store,
+ )
+ finally:
+ self._record_state_res_metrics(room_id, m.get_resource_usage())
+
+ def _record_state_res_metrics(self, room_id: str, rusage: ContextResourceUsage):
+ room_metrics = self._state_res_metrics[room_id]
+ room_metrics.cpu_time += rusage.ru_utime + rusage.ru_stime
+ room_metrics.db_time += rusage.db_txn_duration_sec
+ room_metrics.db_events += rusage.evt_db_fetch_count
+
+ def _report_metrics(self):
+ if not self._state_res_metrics:
+ # no state res has happened since the last iteration: don't bother logging.
+ return
+
+ self._report_biggest(
+ lambda i: i.cpu_time, "CPU time", _biggest_room_by_cpu_counter,
+ )
+
+ self._report_biggest(
+ lambda i: i.db_time, "DB time", _biggest_room_by_db_counter,
+ )
+
+ self._state_res_metrics.clear()
+
+ def _report_biggest(
+ self,
+ extract_key: Callable[[_StateResMetrics], Any],
+ metric_name: str,
+ prometheus_counter_metric: Counter,
+ ) -> None:
+ """Report metrics on the biggest rooms for state res
+
+ Args:
+ extract_key: a callable which, given a _StateResMetrics, extracts a single
+ metric to sort by.
+ metric_name: the name of the metric we have extracted, for the log line
+ prometheus_counter_metric: a prometheus metric recording the sum of the
+ the extracted metric
+ """
+ n_to_log = 10
+ if not metrics_logger.isEnabledFor(logging.DEBUG):
+ # only need the most expensive if we don't have debug logging, which
+ # allows nlargest() to degrade to max()
+ n_to_log = 1
+
+ items = self._state_res_metrics.items()
+
+ # log the N biggest rooms
+ biggest = heapq.nlargest(
+ n_to_log, items, key=lambda i: extract_key(i[1])
+ ) # type: List[Tuple[str, _StateResMetrics]]
+ metrics_logger.debug(
+ "%i biggest rooms for state-res by %s: %s",
+ len(biggest),
+ metric_name,
+ ["%s (%gs)" % (r, extract_key(m)) for (r, m) in biggest],
+ )
+
+ # report info on the single biggest to prometheus
+ _, biggest_metrics = biggest[0]
+ prometheus_counter_metric.inc(extract_key(biggest_metrics))
+
def _make_state_cache_entry(
new_state: StateMap[str], state_groups_ids: Dict[int, StateMap[str]]
@@ -605,47 +751,6 @@ def _make_state_cache_entry(
)
-def resolve_events_with_store(
- clock: Clock,
- room_id: str,
- room_version: str,
- state_sets: Sequence[StateMap[str]],
- event_map: Optional[Dict[str, EventBase]],
- state_res_store: "StateResolutionStore",
-) -> Awaitable[StateMap[str]]:
- """
- Args:
- room_id: the room we are working in
-
- room_version: Version of the room
-
- state_sets: List of dicts of (type, state_key) -> event_id,
- which are the different state groups to resolve.
-
- event_map:
- a dict from event_id to event, for any events that we happen to
- have in flight (eg, those currently being persisted). This will be
- used as a starting point fof finding the state we need; any missing
- events will be requested via state_map_factory.
-
- If None, all events will be fetched via state_res_store.
-
- state_res_store: a place to fetch events from
-
- Returns:
- a map from (type, state_key) to event_id.
- """
- v = KNOWN_ROOM_VERSIONS[room_version]
- if v.state_res == StateResolutionVersions.V1:
- return v1.resolve_events_with_store(
- room_id, state_sets, event_map, state_res_store.get_events
- )
- else:
- return v2.resolve_events_with_store(
- clock, room_id, room_version, state_sets, event_map, state_res_store
- )
-
-
@attr.s(slots=True)
class StateResolutionStore:
"""Interface that allows state resolution algorithms to access the database
diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py
index 6e57c1ee72..ffdea0de8d 100644
--- a/synapse/util/metrics.py
+++ b/synapse/util/metrics.py
@@ -19,7 +19,11 @@ from typing import Any, Callable, Optional, TypeVar, cast
from prometheus_client import Counter
-from synapse.logging.context import LoggingContext, current_context
+from synapse.logging.context import (
+ ContextResourceUsage,
+ LoggingContext,
+ current_context,
+)
from synapse.metrics import InFlightGauge
logger = logging.getLogger(__name__)
@@ -104,27 +108,27 @@ class Measure:
def __init__(self, clock, name):
self.clock = clock
self.name = name
- self._logging_context = None
+ parent_context = current_context()
+ self._logging_context = LoggingContext(
+ "Measure[%s]" % (self.name,), parent_context
+ )
self.start = None
- def __enter__(self):
- if self._logging_context:
+ def __enter__(self) -> "Measure":
+ if self.start is not None:
raise RuntimeError("Measure() objects cannot be re-used")
self.start = self.clock.time()
- parent_context = current_context()
- self._logging_context = LoggingContext(
- "Measure[%s]" % (self.name,), parent_context
- )
self._logging_context.__enter__()
in_flight.register((self.name,), self._update_in_flight)
+ return self
def __exit__(self, exc_type, exc_val, exc_tb):
- if not self._logging_context:
+ if self.start is None:
raise RuntimeError("Measure() block exited without being entered")
duration = self.clock.time() - self.start
- usage = self._logging_context.get_resource_usage()
+ usage = self.get_resource_usage()
in_flight.unregister((self.name,), self._update_in_flight)
self._logging_context.__exit__(exc_type, exc_val, exc_tb)
@@ -140,6 +144,13 @@ class Measure:
except ValueError:
logger.warning("Failed to save metrics! Usage: %s", usage)
+ def get_resource_usage(self) -> ContextResourceUsage:
+ """Get the resources used within this Measure block
+
+ If the Measure block is still active, returns the resource usage so far.
+ """
+ return self._logging_context.get_resource_usage()
+
def _update_in_flight(self, metrics):
"""Gets called when processing in flight metrics
"""
|