summary refs log tree commit diff
path: root/synapse/metrics/background_process_metrics.py
diff options
context:
space:
mode:
authorAndrew Morgan <andrew@amorgan.xyz>2020-06-24 12:07:41 +0100
committerAndrew Morgan <andrew@amorgan.xyz>2020-06-24 12:07:41 +0100
commita7d49db74fdc303bcd295db501644d54846f1fd5 (patch)
treeec564c03c6b642fb7ea9d830a26156bfd44f0460 /synapse/metrics/background_process_metrics.py
parentPrevent M_USER_IN_USE from being raised by registration methods until after e... (diff)
parent1.15.0 (diff)
downloadsynapse-a7d49db74fdc303bcd295db501644d54846f1fd5.tar.xz
Merge branch 'release-v1.15.0' of github.com:matrix-org/synapse into dinsic-release-v1.15.x
* 'release-v1.15.0' of github.com:matrix-org/synapse: (55 commits)
  1.15.0
  Fix some attributions
  Update CHANGES.md
  1.15.0rc1
  Revert "1.15.0rc1"
  1.15.0rc1
  Fix bug in account data replication stream. (#7656)
  Convert the registration handler to async/await. (#7649)
  Accept device information at the login fallback endpoint. (#7629)
  Convert user directory handler and related classes to async/await. (#7640)
  Add an option to disable autojoin for guest accounts (#6637)
  Clarifications to the admin api documentation (#7647)
  Update to the stable SSO prefix for UI Auth. (#7630)
  Fix type information on `assert_*_is_admin` methods (#7645)
  Remove some unused constants. (#7644)
  Typo fixes.
  Allow new users to be registered via the admin API even if the monthly active user limit has been reached (#7263)
  Add device management to admin API (#7481)
  Attempt to fix PhoneHomeStatsTestCase.test_performance_100 being flaky. (#7634)
  Support CS API v0.6.0 (#6585)
  ...
Diffstat (limited to 'synapse/metrics/background_process_metrics.py')
-rw-r--r--synapse/metrics/background_process_metrics.py104
1 files changed, 70 insertions, 34 deletions
diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py
index 8449ef82f7..13785038ad 100644
--- a/synapse/metrics/background_process_metrics.py
+++ b/synapse/metrics/background_process_metrics.py
@@ -17,16 +17,18 @@ import logging
 import threading
 from asyncio import iscoroutine
 from functools import wraps
-from typing import Dict, Set
+from typing import TYPE_CHECKING, Dict, Optional, Set
 
-import six
-
-from prometheus_client.core import REGISTRY, Counter, GaugeMetricFamily
+from prometheus_client.core import REGISTRY, Counter, Gauge
 
 from twisted.internet import defer
 
 from synapse.logging.context import LoggingContext, PreserveLoggingContext
 
+if TYPE_CHECKING:
+    import resource
+
+
 logger = logging.getLogger(__name__)
 
 
@@ -36,6 +38,12 @@ _background_process_start_count = Counter(
     ["name"],
 )
 
+_background_process_in_flight_count = Gauge(
+    "synapse_background_process_in_flight_count",
+    "Number of background processes in flight",
+    labelnames=["name"],
+)
+
 # we set registry=None in all of these to stop them getting registered with
 # the default registry. Instead we collect them all via the CustomCollector,
 # which ensures that we can update them before they are collected.
@@ -83,13 +91,17 @@ _background_process_db_sched_duration = Counter(
 # it's much simpler to do so than to try to combine them.)
 _background_process_counts = {}  # type: Dict[str, int]
 
-# map from description to the currently running background processes.
+# Set of all running background processes that became active active since the
+# last time metrics were scraped (i.e. background processes that performed some
+# work since the last scrape.)
 #
-# it's kept as a dict of sets rather than a big set so that we can keep track
-# of process descriptions that no longer have any active processes.
-_background_processes = {}  # type: Dict[str, Set[_BackgroundProcess]]
+# We do it like this to handle the case where we have a large number of
+# background processes stacking up behind a lock or linearizer, where we then
+# only need to iterate over and update metrics for the process that have
+# actually been active and can ignore the idle ones.
+_background_processes_active_since_last_scrape = set()  # type: Set[_BackgroundProcess]
 
-# A lock that covers the above dicts
+# A lock that covers the above set and dict
 _bg_metrics_lock = threading.Lock()
 
 
@@ -101,25 +113,16 @@ class _Collector(object):
     """
 
     def collect(self):
-        background_process_in_flight_count = GaugeMetricFamily(
-            "synapse_background_process_in_flight_count",
-            "Number of background processes in flight",
-            labels=["name"],
-        )
+        global _background_processes_active_since_last_scrape
 
-        # We copy the dict so that it doesn't change from underneath us.
-        # We also copy the process lists as that can also change
+        # We swap out the _background_processes set with an empty one so that
+        # we can safely iterate over the set without holding the lock.
         with _bg_metrics_lock:
-            _background_processes_copy = {
-                k: list(v) for k, v in six.iteritems(_background_processes)
-            }
+            _background_processes_copy = _background_processes_active_since_last_scrape
+            _background_processes_active_since_last_scrape = set()
 
-        for desc, processes in six.iteritems(_background_processes_copy):
-            background_process_in_flight_count.add_metric((desc,), len(processes))
-            for process in processes:
-                process.update_metrics()
-
-        yield background_process_in_flight_count
+        for process in _background_processes_copy:
+            process.update_metrics()
 
         # now we need to run collect() over each of the static Counters, and
         # yield each metric they return.
@@ -191,13 +194,10 @@ def run_as_background_process(desc, func, *args, **kwargs):
             _background_process_counts[desc] = count + 1
 
         _background_process_start_count.labels(desc).inc()
+        _background_process_in_flight_count.labels(desc).inc()
 
-        with LoggingContext(desc) as context:
+        with BackgroundProcessLoggingContext(desc) as context:
             context.request = "%s-%i" % (desc, count)
-            proc = _BackgroundProcess(desc, context)
-
-            with _bg_metrics_lock:
-                _background_processes.setdefault(desc, set()).add(proc)
 
             try:
                 result = func(*args, **kwargs)
@@ -214,10 +214,7 @@ def run_as_background_process(desc, func, *args, **kwargs):
             except Exception:
                 logger.exception("Background process '%s' threw an exception", desc)
             finally:
-                proc.update_metrics()
-
-                with _bg_metrics_lock:
-                    _background_processes[desc].remove(proc)
+                _background_process_in_flight_count.labels(desc).dec()
 
     with PreserveLoggingContext():
         return run()
@@ -238,3 +235,42 @@ def wrap_as_background_process(desc):
         return wrap_as_background_process_inner_2
 
     return wrap_as_background_process_inner
+
+
+class BackgroundProcessLoggingContext(LoggingContext):
+    """A logging context that tracks in flight metrics for background
+    processes.
+    """
+
+    __slots__ = ["_proc"]
+
+    def __init__(self, name: str):
+        super().__init__(name)
+
+        self._proc = _BackgroundProcess(name, self)
+
+    def start(self, rusage: "Optional[resource._RUsage]"):
+        """Log context has started running (again).
+        """
+
+        super().start(rusage)
+
+        # We've become active again so we make sure we're in the list of active
+        # procs. (Note that "start" here means we've become active, as opposed
+        # to starting for the first time.)
+        with _bg_metrics_lock:
+            _background_processes_active_since_last_scrape.add(self._proc)
+
+    def __exit__(self, type, value, traceback) -> None:
+        """Log context has finished.
+        """
+
+        super().__exit__(type, value, traceback)
+
+        # The background process has finished. We explictly remove and manually
+        # update the metrics here so that if nothing is scraping metrics the set
+        # doesn't infinitely grow.
+        with _bg_metrics_lock:
+            _background_processes_active_since_last_scrape.discard(self._proc)
+
+        self._proc.update_metrics()