summary refs log tree commit diff
path: root/synapse/metrics/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/metrics/__init__.py')
-rw-r--r--synapse/metrics/__init__.py115
1 files changed, 68 insertions, 47 deletions
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index a1f7ca3449..b8d2a8e8a9 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -15,6 +15,7 @@
 
 import functools
 import gc
+import itertools
 import logging
 import os
 import platform
@@ -27,8 +28,8 @@ from prometheus_client import Counter, Gauge, Histogram
 from prometheus_client.core import (
     REGISTRY,
     CounterMetricFamily,
+    GaugeHistogramMetricFamily,
     GaugeMetricFamily,
-    HistogramMetricFamily,
 )
 
 from twisted.internet import reactor
@@ -46,7 +47,7 @@ logger = logging.getLogger(__name__)
 METRICS_PREFIX = "/_synapse/metrics"
 
 running_on_pypy = platform.python_implementation() == "PyPy"
-all_gauges = {}  # type: Dict[str, Union[LaterGauge, InFlightGauge, BucketCollector]]
+all_gauges = {}  # type: Dict[str, Union[LaterGauge, InFlightGauge]]
 
 HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
 
@@ -205,63 +206,83 @@ class InFlightGauge:
         all_gauges[self.name] = self
 
 
-@attr.s(slots=True, hash=True)
-class BucketCollector:
-    """
-    Like a Histogram, but allows buckets to be point-in-time instead of
-    incrementally added to.
+class GaugeBucketCollector:
+    """Like a Histogram, but the buckets are Gauges which are updated atomically.
 
-    Args:
-        name (str): Base name of metric to be exported to Prometheus.
-        data_collector (callable -> dict): A synchronous callable that
-            returns a dict mapping bucket to number of items in the
-            bucket. If these buckets are not the same as the buckets
-            given to this class, they will be remapped into them.
-        buckets (list[float]): List of floats/ints of the buckets to
-            give to Prometheus. +Inf is ignored, if given.
+    The data is updated by calling `update_data` with an iterable of measurements.
 
+    We assume that the data is updated less frequently than it is reported to
+    Prometheus, and optimise for that case.
     """
 
-    name = attr.ib()
-    data_collector = attr.ib()
-    buckets = attr.ib()
+    __slots__ = ("_name", "_documentation", "_bucket_bounds", "_metric")
 
-    def collect(self):
+    def __init__(
+        self,
+        name: str,
+        documentation: str,
+        buckets: Iterable[float],
+        registry=REGISTRY,
+    ):
+        """
+        Args:
+            name: base name of metric to be exported to Prometheus. (a _bucket suffix
+               will be added.)
+            documentation: help text for the metric
+            buckets: The top bounds of the buckets to report
+            registry: metric registry to register with
+        """
+        self._name = name
+        self._documentation = documentation
 
-        # Fetch the data -- this must be synchronous!
-        data = self.data_collector()
+        # the tops of the buckets
+        self._bucket_bounds = [float(b) for b in buckets]
+        if self._bucket_bounds != sorted(self._bucket_bounds):
+            raise ValueError("Buckets not in sorted order")
 
-        buckets = {}  # type: Dict[float, int]
+        if self._bucket_bounds[-1] != float("inf"):
+            self._bucket_bounds.append(float("inf"))
 
-        res = []
-        for x in data.keys():
-            for i, bound in enumerate(self.buckets):
-                if x <= bound:
-                    buckets[bound] = buckets.get(bound, 0) + data[x]
+        self._metric = self._values_to_metric([])
+        registry.register(self)
 
-        for i in self.buckets:
-            res.append([str(i), buckets.get(i, 0)])
+    def collect(self):
+        yield self._metric
 
-        res.append(["+Inf", sum(data.values())])
+    def update_data(self, values: Iterable[float]):
+        """Update the data to be reported by the metric
 
-        metric = HistogramMetricFamily(
-            self.name, "", buckets=res, sum_value=sum(x * y for x, y in data.items())
+        The existing data is cleared, and each measurement in the input is assigned
+        to the relevant bucket.
+        """
+        self._metric = self._values_to_metric(values)
+
+    def _values_to_metric(self, values: Iterable[float]) -> GaugeHistogramMetricFamily:
+        total = 0.0
+        bucket_values = [0 for _ in self._bucket_bounds]
+
+        for v in values:
+            # assign each value to a bucket
+            for i, bound in enumerate(self._bucket_bounds):
+                if v <= bound:
+                    bucket_values[i] += 1
+                    break
+
+            # ... and increment the sum
+            total += v
+
+        # now, aggregate the bucket values so that they count the number of entries in
+        # that bucket or below.
+        accumulated_values = itertools.accumulate(bucket_values)
+
+        return GaugeHistogramMetricFamily(
+            self._name,
+            self._documentation,
+            buckets=list(
+                zip((str(b) for b in self._bucket_bounds), accumulated_values)
+            ),
+            gsum_value=total,
         )
-        yield metric
-
-    def __attrs_post_init__(self):
-        self.buckets = [float(x) for x in self.buckets if x != "+Inf"]
-        if self.buckets != sorted(self.buckets):
-            raise ValueError("Buckets not sorted")
-
-        self.buckets = tuple(self.buckets)
-
-        if self.name in all_gauges.keys():
-            logger.warning("%s already registered, reregistering" % (self.name,))
-            REGISTRY.unregister(all_gauges.pop(self.name))
-
-        REGISTRY.register(self)
-        all_gauges[self.name] = self
 
 
 #