summary refs log tree commit diff
path: root/synapse/metrics
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/metrics')
-rw-r--r--synapse/metrics/__init__.py112
1 files changed, 92 insertions, 20 deletions
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index ef48984fdd..539c353528 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -25,7 +25,7 @@ import six
 
 import attr
 from prometheus_client import Counter, Gauge, Histogram
-from prometheus_client.core import REGISTRY, GaugeMetricFamily
+from prometheus_client.core import REGISTRY, GaugeMetricFamily, HistogramMetricFamily
 
 from twisted.internet import reactor
 
@@ -40,7 +40,6 @@ HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
 
 
 class RegistryProxy(object):
-
     @staticmethod
     def collect():
         for metric in REGISTRY.collect():
@@ -63,10 +62,7 @@ class LaterGauge(object):
         try:
             calls = self.caller()
         except Exception:
-            logger.exception(
-                "Exception running callback for LaterGauge(%s)",
-                self.name,
-            )
+            logger.exception("Exception running callback for LaterGauge(%s)", self.name)
             yield g
             return
 
@@ -116,9 +112,7 @@ class InFlightGauge(object):
         # Create a class which have the sub_metrics values as attributes, which
         # default to 0 on initialization. Used to pass to registered callbacks.
         self._metrics_class = attr.make_class(
-            "_MetricsEntry",
-            attrs={x: attr.ib(0) for x in sub_metrics},
-            slots=True,
+            "_MetricsEntry", attrs={x: attr.ib(0) for x in sub_metrics}, slots=True
         )
 
         # Counts number of in flight blocks for a given set of label values
@@ -157,7 +151,9 @@ class InFlightGauge(object):
 
         Note: may be called by a separate thread.
         """
-        in_flight = GaugeMetricFamily(self.name + "_total", self.desc, labels=self.labels)
+        in_flight = GaugeMetricFamily(
+            self.name + "_total", self.desc, labels=self.labels
+        )
 
         metrics_by_key = {}
 
@@ -179,7 +175,9 @@ class InFlightGauge(object):
         yield in_flight
 
         for name in self.sub_metrics:
-            gauge = GaugeMetricFamily("_".join([self.name, name]), "", labels=self.labels)
+            gauge = GaugeMetricFamily(
+                "_".join([self.name, name]), "", labels=self.labels
+            )
             for key, metrics in six.iteritems(metrics_by_key):
                 gauge.add_metric(key, getattr(metrics, name))
             yield gauge
@@ -193,12 +191,75 @@ class InFlightGauge(object):
         all_gauges[self.name] = self
 
 
+@attr.s(hash=True)
+class BucketCollector(object):
+    """
+    Like a Histogram, but allows buckets to be point-in-time instead of
+    incrementally added to.
+
+    Args:
+        name (str): Base name of metric to be exported to Prometheus.
+        data_collector (callable -> dict): A synchronous callable that
+            returns a dict mapping bucket to number of items in the
+            bucket. If these buckets are not the same as the buckets
+            given to this class, they will be remapped into them.
+        buckets (list[float]): List of floats/ints of the buckets to
+            give to Prometheus. +Inf is ignored, if given.
+
+    """
+
+    name = attr.ib()
+    data_collector = attr.ib()
+    buckets = attr.ib()
+
+    def collect(self):
+
+        # Fetch the data -- this must be synchronous!
+        data = self.data_collector()
+
+        buckets = {}
+
+        res = []
+        for x in data.keys():
+            for i, bound in enumerate(self.buckets):
+                if x <= bound:
+                    buckets[bound] = buckets.get(bound, 0) + data[x]
+                    break
+
+        for i in self.buckets:
+            res.append([i, buckets.get(i, 0)])
+
+        res.append(["+Inf", sum(data.values())])
+
+        metric = HistogramMetricFamily(
+            self.name,
+            "",
+            buckets=res,
+            sum_value=sum([x * y for x, y in data.items()]),
+        )
+        yield metric
+
+    def __attrs_post_init__(self):
+        self.buckets = [float(x) for x in self.buckets if x != "+Inf"]
+        if self.buckets != sorted(self.buckets):
+            raise ValueError("Buckets not sorted")
+
+        self.buckets = tuple(self.buckets)
+
+        if self.name in all_gauges.keys():
+            logger.warning("%s already registered, reregistering" % (self.name,))
+            REGISTRY.unregister(all_gauges.pop(self.name))
+
+        REGISTRY.register(self)
+        all_gauges[self.name] = self
+
+
 #
 # Detailed CPU metrics
 #
 
-class CPUMetrics(object):
 
+class CPUMetrics(object):
     def __init__(self):
         ticks_per_sec = 100
         try:
@@ -237,13 +298,28 @@ gc_time = Histogram(
     "python_gc_time",
     "Time taken to GC (sec)",
     ["gen"],
-    buckets=[0.0025, 0.005, 0.01, 0.025, 0.05, 0.10, 0.25, 0.50, 1.00, 2.50,
-             5.00, 7.50, 15.00, 30.00, 45.00, 60.00],
+    buckets=[
+        0.0025,
+        0.005,
+        0.01,
+        0.025,
+        0.05,
+        0.10,
+        0.25,
+        0.50,
+        1.00,
+        2.50,
+        5.00,
+        7.50,
+        15.00,
+        30.00,
+        45.00,
+        60.00,
+    ],
 )
 
 
 class GCCounts(object):
-
     def collect(self):
         cm = GaugeMetricFamily("python_gc_counts", "GC object counts", labels=["gen"])
         for n, m in enumerate(gc.get_count()):
@@ -279,9 +355,7 @@ sent_transactions_counter = Counter("synapse_federation_client_sent_transactions
 events_processed_counter = Counter("synapse_federation_client_events_processed", "")
 
 event_processing_loop_counter = Counter(
-    "synapse_event_processing_loop_count",
-    "Event processing loop iterations",
-    ["name"],
+    "synapse_event_processing_loop_count", "Event processing loop iterations", ["name"]
 )
 
 event_processing_loop_room_count = Counter(
@@ -311,7 +385,6 @@ last_ticked = time.time()
 
 
 class ReactorLastSeenMetric(object):
-
     def collect(self):
         cm = GaugeMetricFamily(
             "python_twisted_reactor_last_seen",
@@ -325,7 +398,6 @@ REGISTRY.register(ReactorLastSeenMetric())
 
 
 def runUntilCurrentTimer(func):
-
     @functools.wraps(func)
     def f(*args, **kwargs):
         now = reactor.seconds()