diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/metrics/background_process_metrics.py | 104 |
1 files changed, 70 insertions, 34 deletions
diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index 8449ef82f7..13785038ad 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -17,16 +17,18 @@ import logging import threading from asyncio import iscoroutine from functools import wraps -from typing import Dict, Set +from typing import TYPE_CHECKING, Dict, Optional, Set -import six - -from prometheus_client.core import REGISTRY, Counter, GaugeMetricFamily +from prometheus_client.core import REGISTRY, Counter, Gauge from twisted.internet import defer from synapse.logging.context import LoggingContext, PreserveLoggingContext +if TYPE_CHECKING: + import resource + + logger = logging.getLogger(__name__) @@ -36,6 +38,12 @@ _background_process_start_count = Counter( ["name"], ) +_background_process_in_flight_count = Gauge( + "synapse_background_process_in_flight_count", + "Number of background processes in flight", + labelnames=["name"], +) + # we set registry=None in all of these to stop them getting registered with # the default registry. Instead we collect them all via the CustomCollector, # which ensures that we can update them before they are collected. @@ -83,13 +91,17 @@ _background_process_db_sched_duration = Counter( # it's much simpler to do so than to try to combine them.) _background_process_counts = {} # type: Dict[str, int] -# map from description to the currently running background processes. +# Set of all running background processes that became active active since the +# last time metrics were scraped (i.e. background processes that performed some +# work since the last scrape.) # -# it's kept as a dict of sets rather than a big set so that we can keep track -# of process descriptions that no longer have any active processes. -_background_processes = {} # type: Dict[str, Set[_BackgroundProcess]] +# We do it like this to handle the case where we have a large number of +# background processes stacking up behind a lock or linearizer, where we then +# only need to iterate over and update metrics for the process that have +# actually been active and can ignore the idle ones. +_background_processes_active_since_last_scrape = set() # type: Set[_BackgroundProcess] -# A lock that covers the above dicts +# A lock that covers the above set and dict _bg_metrics_lock = threading.Lock() @@ -101,25 +113,16 @@ class _Collector(object): """ def collect(self): - background_process_in_flight_count = GaugeMetricFamily( - "synapse_background_process_in_flight_count", - "Number of background processes in flight", - labels=["name"], - ) + global _background_processes_active_since_last_scrape - # We copy the dict so that it doesn't change from underneath us. - # We also copy the process lists as that can also change + # We swap out the _background_processes set with an empty one so that + # we can safely iterate over the set without holding the lock. with _bg_metrics_lock: - _background_processes_copy = { - k: list(v) for k, v in six.iteritems(_background_processes) - } + _background_processes_copy = _background_processes_active_since_last_scrape + _background_processes_active_since_last_scrape = set() - for desc, processes in six.iteritems(_background_processes_copy): - background_process_in_flight_count.add_metric((desc,), len(processes)) - for process in processes: - process.update_metrics() - - yield background_process_in_flight_count + for process in _background_processes_copy: + process.update_metrics() # now we need to run collect() over each of the static Counters, and # yield each metric they return. @@ -191,13 +194,10 @@ def run_as_background_process(desc, func, *args, **kwargs): _background_process_counts[desc] = count + 1 _background_process_start_count.labels(desc).inc() + _background_process_in_flight_count.labels(desc).inc() - with LoggingContext(desc) as context: + with BackgroundProcessLoggingContext(desc) as context: context.request = "%s-%i" % (desc, count) - proc = _BackgroundProcess(desc, context) - - with _bg_metrics_lock: - _background_processes.setdefault(desc, set()).add(proc) try: result = func(*args, **kwargs) @@ -214,10 +214,7 @@ def run_as_background_process(desc, func, *args, **kwargs): except Exception: logger.exception("Background process '%s' threw an exception", desc) finally: - proc.update_metrics() - - with _bg_metrics_lock: - _background_processes[desc].remove(proc) + _background_process_in_flight_count.labels(desc).dec() with PreserveLoggingContext(): return run() @@ -238,3 +235,42 @@ def wrap_as_background_process(desc): return wrap_as_background_process_inner_2 return wrap_as_background_process_inner + + +class BackgroundProcessLoggingContext(LoggingContext): + """A logging context that tracks in flight metrics for background + processes. + """ + + __slots__ = ["_proc"] + + def __init__(self, name: str): + super().__init__(name) + + self._proc = _BackgroundProcess(name, self) + + def start(self, rusage: "Optional[resource._RUsage]"): + """Log context has started running (again). + """ + + super().start(rusage) + + # We've become active again so we make sure we're in the list of active + # procs. (Note that "start" here means we've become active, as opposed + # to starting for the first time.) + with _bg_metrics_lock: + _background_processes_active_since_last_scrape.add(self._proc) + + def __exit__(self, type, value, traceback) -> None: + """Log context has finished. + """ + + super().__exit__(type, value, traceback) + + # The background process has finished. We explictly remove and manually + # update the metrics here so that if nothing is scraping metrics the set + # doesn't infinitely grow. + with _bg_metrics_lock: + _background_processes_active_since_last_scrape.discard(self._proc) + + self._proc.update_metrics() |