summary refs log tree commit diff
path: root/synapse/metrics
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/metrics')
-rw-r--r--synapse/metrics/__init__.py110
-rw-r--r--synapse/metrics/background_process_metrics.py33
2 files changed, 105 insertions, 38 deletions
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index ef48984fdd..1f30179b51 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -25,7 +25,7 @@ import six
 
 import attr
 from prometheus_client import Counter, Gauge, Histogram
-from prometheus_client.core import REGISTRY, GaugeMetricFamily
+from prometheus_client.core import REGISTRY, GaugeMetricFamily, HistogramMetricFamily
 
 from twisted.internet import reactor
 
@@ -40,7 +40,6 @@ HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
 
 
 class RegistryProxy(object):
-
     @staticmethod
     def collect():
         for metric in REGISTRY.collect():
@@ -63,10 +62,7 @@ class LaterGauge(object):
         try:
             calls = self.caller()
         except Exception:
-            logger.exception(
-                "Exception running callback for LaterGauge(%s)",
-                self.name,
-            )
+            logger.exception("Exception running callback for LaterGauge(%s)", self.name)
             yield g
             return
 
@@ -116,9 +112,7 @@ class InFlightGauge(object):
         # Create a class which have the sub_metrics values as attributes, which
         # default to 0 on initialization. Used to pass to registered callbacks.
         self._metrics_class = attr.make_class(
-            "_MetricsEntry",
-            attrs={x: attr.ib(0) for x in sub_metrics},
-            slots=True,
+            "_MetricsEntry", attrs={x: attr.ib(0) for x in sub_metrics}, slots=True
         )
 
         # Counts number of in flight blocks for a given set of label values
@@ -157,7 +151,9 @@ class InFlightGauge(object):
 
         Note: may be called by a separate thread.
         """
-        in_flight = GaugeMetricFamily(self.name + "_total", self.desc, labels=self.labels)
+        in_flight = GaugeMetricFamily(
+            self.name + "_total", self.desc, labels=self.labels
+        )
 
         metrics_by_key = {}
 
@@ -179,7 +175,9 @@ class InFlightGauge(object):
         yield in_flight
 
         for name in self.sub_metrics:
-            gauge = GaugeMetricFamily("_".join([self.name, name]), "", labels=self.labels)
+            gauge = GaugeMetricFamily(
+                "_".join([self.name, name]), "", labels=self.labels
+            )
             for key, metrics in six.iteritems(metrics_by_key):
                 gauge.add_metric(key, getattr(metrics, name))
             yield gauge
@@ -193,17 +191,76 @@ class InFlightGauge(object):
         all_gauges[self.name] = self
 
 
+@attr.s(hash=True)
+class BucketCollector(object):
+    """
+    Like a Histogram, but allows buckets to be point-in-time instead of
+    incrementally added to.
+
+    Args:
+        name (str): Base name of metric to be exported to Prometheus.
+        data_collector (callable -> dict): A synchronous callable that
+            returns a dict mapping bucket to number of items in the
+            bucket. If these buckets are not the same as the buckets
+            given to this class, they will be remapped into them.
+        buckets (list[float]): List of floats/ints of the buckets to
+            give to Prometheus. +Inf is ignored, if given.
+
+    """
+
+    name = attr.ib()
+    data_collector = attr.ib()
+    buckets = attr.ib()
+
+    def collect(self):
+
+        # Fetch the data -- this must be synchronous!
+        data = self.data_collector()
+
+        buckets = {}
+
+        res = []
+        for x in data.keys():
+            for i, bound in enumerate(self.buckets):
+                if x <= bound:
+                    buckets[bound] = buckets.get(bound, 0) + data[x]
+
+        for i in self.buckets:
+            res.append([str(i), buckets.get(i, 0)])
+
+        res.append(["+Inf", sum(data.values())])
+
+        metric = HistogramMetricFamily(
+            self.name, "", buckets=res, sum_value=sum([x * y for x, y in data.items()])
+        )
+        yield metric
+
+    def __attrs_post_init__(self):
+        self.buckets = [float(x) for x in self.buckets if x != "+Inf"]
+        if self.buckets != sorted(self.buckets):
+            raise ValueError("Buckets not sorted")
+
+        self.buckets = tuple(self.buckets)
+
+        if self.name in all_gauges.keys():
+            logger.warning("%s already registered, reregistering" % (self.name,))
+            REGISTRY.unregister(all_gauges.pop(self.name))
+
+        REGISTRY.register(self)
+        all_gauges[self.name] = self
+
+
 #
 # Detailed CPU metrics
 #
 
-class CPUMetrics(object):
 
+class CPUMetrics(object):
     def __init__(self):
         ticks_per_sec = 100
         try:
             # Try and get the system config
-            ticks_per_sec = os.sysconf('SC_CLK_TCK')
+            ticks_per_sec = os.sysconf("SC_CLK_TCK")
         except (ValueError, TypeError, AttributeError):
             pass
 
@@ -237,13 +294,28 @@ gc_time = Histogram(
     "python_gc_time",
     "Time taken to GC (sec)",
     ["gen"],
-    buckets=[0.0025, 0.005, 0.01, 0.025, 0.05, 0.10, 0.25, 0.50, 1.00, 2.50,
-             5.00, 7.50, 15.00, 30.00, 45.00, 60.00],
+    buckets=[
+        0.0025,
+        0.005,
+        0.01,
+        0.025,
+        0.05,
+        0.10,
+        0.25,
+        0.50,
+        1.00,
+        2.50,
+        5.00,
+        7.50,
+        15.00,
+        30.00,
+        45.00,
+        60.00,
+    ],
 )
 
 
 class GCCounts(object):
-
     def collect(self):
         cm = GaugeMetricFamily("python_gc_counts", "GC object counts", labels=["gen"])
         for n, m in enumerate(gc.get_count()):
@@ -279,9 +351,7 @@ sent_transactions_counter = Counter("synapse_federation_client_sent_transactions
 events_processed_counter = Counter("synapse_federation_client_events_processed", "")
 
 event_processing_loop_counter = Counter(
-    "synapse_event_processing_loop_count",
-    "Event processing loop iterations",
-    ["name"],
+    "synapse_event_processing_loop_count", "Event processing loop iterations", ["name"]
 )
 
 event_processing_loop_room_count = Counter(
@@ -311,7 +381,6 @@ last_ticked = time.time()
 
 
 class ReactorLastSeenMetric(object):
-
     def collect(self):
         cm = GaugeMetricFamily(
             "python_twisted_reactor_last_seen",
@@ -325,7 +394,6 @@ REGISTRY.register(ReactorLastSeenMetric())
 
 
 def runUntilCurrentTimer(func):
-
     @functools.wraps(func)
     def f(*args, **kwargs):
         now = reactor.seconds()
diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py
index 037f1c490e..167e2c068a 100644
--- a/synapse/metrics/background_process_metrics.py
+++ b/synapse/metrics/background_process_metrics.py
@@ -60,8 +60,10 @@ _background_process_db_txn_count = Counter(
 
 _background_process_db_txn_duration = Counter(
     "synapse_background_process_db_txn_duration_seconds",
-    ("Seconds spent by background processes waiting for database "
-     "transactions, excluding scheduling time"),
+    (
+        "Seconds spent by background processes waiting for database "
+        "transactions, excluding scheduling time"
+    ),
     ["name"],
     registry=None,
 )
@@ -94,6 +96,7 @@ class _Collector(object):
     Ensures that all of the metrics are up-to-date with any in-flight processes
     before they are returned.
     """
+
     def collect(self):
         background_process_in_flight_count = GaugeMetricFamily(
             "synapse_background_process_in_flight_count",
@@ -105,14 +108,11 @@ class _Collector(object):
         # We also copy the process lists as that can also change
         with _bg_metrics_lock:
             _background_processes_copy = {
-                k: list(v)
-                for k, v in six.iteritems(_background_processes)
+                k: list(v) for k, v in six.iteritems(_background_processes)
             }
 
         for desc, processes in six.iteritems(_background_processes_copy):
-            background_process_in_flight_count.add_metric(
-                (desc,), len(processes),
-            )
+            background_process_in_flight_count.add_metric((desc,), len(processes))
             for process in processes:
                 process.update_metrics()
 
@@ -121,11 +121,11 @@ class _Collector(object):
         # now we need to run collect() over each of the static Counters, and
         # yield each metric they return.
         for m in (
-                _background_process_ru_utime,
-                _background_process_ru_stime,
-                _background_process_db_txn_count,
-                _background_process_db_txn_duration,
-                _background_process_db_sched_duration,
+            _background_process_ru_utime,
+            _background_process_ru_stime,
+            _background_process_db_txn_count,
+            _background_process_db_txn_duration,
+            _background_process_db_sched_duration,
         ):
             for r in m.collect():
                 yield r
@@ -151,14 +151,12 @@ class _BackgroundProcess(object):
 
         _background_process_ru_utime.labels(self.desc).inc(diff.ru_utime)
         _background_process_ru_stime.labels(self.desc).inc(diff.ru_stime)
-        _background_process_db_txn_count.labels(self.desc).inc(
-            diff.db_txn_count,
-        )
+        _background_process_db_txn_count.labels(self.desc).inc(diff.db_txn_count)
         _background_process_db_txn_duration.labels(self.desc).inc(
-            diff.db_txn_duration_sec,
+            diff.db_txn_duration_sec
         )
         _background_process_db_sched_duration.labels(self.desc).inc(
-            diff.db_sched_duration_sec,
+            diff.db_sched_duration_sec
         )
 
 
@@ -182,6 +180,7 @@ def run_as_background_process(desc, func, *args, **kwargs):
     Returns: Deferred which returns the result of func, but note that it does not
         follow the synapse logcontext rules.
     """
+
     @defer.inlineCallbacks
     def run():
         with _bg_metrics_lock: