diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py
index 8449ef82f7..13785038ad 100644
--- a/synapse/metrics/background_process_metrics.py
+++ b/synapse/metrics/background_process_metrics.py
@@ -17,16 +17,18 @@ import logging
import threading
from asyncio import iscoroutine
from functools import wraps
-from typing import Dict, Set
+from typing import TYPE_CHECKING, Dict, Optional, Set
-import six
-
-from prometheus_client.core import REGISTRY, Counter, GaugeMetricFamily
+from prometheus_client.core import REGISTRY, Counter, Gauge
from twisted.internet import defer
from synapse.logging.context import LoggingContext, PreserveLoggingContext
+if TYPE_CHECKING:
+ import resource
+
+
logger = logging.getLogger(__name__)
@@ -36,6 +38,12 @@ _background_process_start_count = Counter(
["name"],
)
+_background_process_in_flight_count = Gauge(
+ "synapse_background_process_in_flight_count",
+ "Number of background processes in flight",
+ labelnames=["name"],
+)
+
# we set registry=None in all of these to stop them getting registered with
# the default registry. Instead we collect them all via the CustomCollector,
# which ensures that we can update them before they are collected.
@@ -83,13 +91,17 @@ _background_process_db_sched_duration = Counter(
# it's much simpler to do so than to try to combine them.)
_background_process_counts = {} # type: Dict[str, int]
-# map from description to the currently running background processes.
+# Set of all running background processes that became active active since the
+# last time metrics were scraped (i.e. background processes that performed some
+# work since the last scrape.)
#
-# it's kept as a dict of sets rather than a big set so that we can keep track
-# of process descriptions that no longer have any active processes.
-_background_processes = {} # type: Dict[str, Set[_BackgroundProcess]]
+# We do it like this to handle the case where we have a large number of
+# background processes stacking up behind a lock or linearizer, where we then
+# only need to iterate over and update metrics for the process that have
+# actually been active and can ignore the idle ones.
+_background_processes_active_since_last_scrape = set() # type: Set[_BackgroundProcess]
-# A lock that covers the above dicts
+# A lock that covers the above set and dict
_bg_metrics_lock = threading.Lock()
@@ -101,25 +113,16 @@ class _Collector(object):
"""
def collect(self):
- background_process_in_flight_count = GaugeMetricFamily(
- "synapse_background_process_in_flight_count",
- "Number of background processes in flight",
- labels=["name"],
- )
+ global _background_processes_active_since_last_scrape
- # We copy the dict so that it doesn't change from underneath us.
- # We also copy the process lists as that can also change
+ # We swap out the _background_processes set with an empty one so that
+ # we can safely iterate over the set without holding the lock.
with _bg_metrics_lock:
- _background_processes_copy = {
- k: list(v) for k, v in six.iteritems(_background_processes)
- }
+ _background_processes_copy = _background_processes_active_since_last_scrape
+ _background_processes_active_since_last_scrape = set()
- for desc, processes in six.iteritems(_background_processes_copy):
- background_process_in_flight_count.add_metric((desc,), len(processes))
- for process in processes:
- process.update_metrics()
-
- yield background_process_in_flight_count
+ for process in _background_processes_copy:
+ process.update_metrics()
# now we need to run collect() over each of the static Counters, and
# yield each metric they return.
@@ -191,13 +194,10 @@ def run_as_background_process(desc, func, *args, **kwargs):
_background_process_counts[desc] = count + 1
_background_process_start_count.labels(desc).inc()
+ _background_process_in_flight_count.labels(desc).inc()
- with LoggingContext(desc) as context:
+ with BackgroundProcessLoggingContext(desc) as context:
context.request = "%s-%i" % (desc, count)
- proc = _BackgroundProcess(desc, context)
-
- with _bg_metrics_lock:
- _background_processes.setdefault(desc, set()).add(proc)
try:
result = func(*args, **kwargs)
@@ -214,10 +214,7 @@ def run_as_background_process(desc, func, *args, **kwargs):
except Exception:
logger.exception("Background process '%s' threw an exception", desc)
finally:
- proc.update_metrics()
-
- with _bg_metrics_lock:
- _background_processes[desc].remove(proc)
+ _background_process_in_flight_count.labels(desc).dec()
with PreserveLoggingContext():
return run()
@@ -238,3 +235,42 @@ def wrap_as_background_process(desc):
return wrap_as_background_process_inner_2
return wrap_as_background_process_inner
+
+
+class BackgroundProcessLoggingContext(LoggingContext):
+ """A logging context that tracks in flight metrics for background
+ processes.
+ """
+
+ __slots__ = ["_proc"]
+
+ def __init__(self, name: str):
+ super().__init__(name)
+
+ self._proc = _BackgroundProcess(name, self)
+
+ def start(self, rusage: "Optional[resource._RUsage]"):
+ """Log context has started running (again).
+ """
+
+ super().start(rusage)
+
+ # We've become active again so we make sure we're in the list of active
+ # procs. (Note that "start" here means we've become active, as opposed
+ # to starting for the first time.)
+ with _bg_metrics_lock:
+ _background_processes_active_since_last_scrape.add(self._proc)
+
+ def __exit__(self, type, value, traceback) -> None:
+ """Log context has finished.
+ """
+
+ super().__exit__(type, value, traceback)
+
+ # The background process has finished. We explictly remove and manually
+ # update the metrics here so that if nothing is scraping metrics the set
+ # doesn't infinitely grow.
+ with _bg_metrics_lock:
+ _background_processes_active_since_last_scrape.discard(self._proc)
+
+ self._proc.update_metrics()
|