diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index a1f7ca3449..b8d2a8e8a9 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -15,6 +15,7 @@
import functools
import gc
+import itertools
import logging
import os
import platform
@@ -27,8 +28,8 @@ from prometheus_client import Counter, Gauge, Histogram
from prometheus_client.core import (
REGISTRY,
CounterMetricFamily,
+ GaugeHistogramMetricFamily,
GaugeMetricFamily,
- HistogramMetricFamily,
)
from twisted.internet import reactor
@@ -46,7 +47,7 @@ logger = logging.getLogger(__name__)
METRICS_PREFIX = "/_synapse/metrics"
running_on_pypy = platform.python_implementation() == "PyPy"
-all_gauges = {} # type: Dict[str, Union[LaterGauge, InFlightGauge, BucketCollector]]
+all_gauges = {} # type: Dict[str, Union[LaterGauge, InFlightGauge]]
HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
@@ -205,63 +206,83 @@ class InFlightGauge:
all_gauges[self.name] = self
-@attr.s(slots=True, hash=True)
-class BucketCollector:
- """
- Like a Histogram, but allows buckets to be point-in-time instead of
- incrementally added to.
+class GaugeBucketCollector:
+ """Like a Histogram, but the buckets are Gauges which are updated atomically.
- Args:
- name (str): Base name of metric to be exported to Prometheus.
- data_collector (callable -> dict): A synchronous callable that
- returns a dict mapping bucket to number of items in the
- bucket. If these buckets are not the same as the buckets
- given to this class, they will be remapped into them.
- buckets (list[float]): List of floats/ints of the buckets to
- give to Prometheus. +Inf is ignored, if given.
+ The data is updated by calling `update_data` with an iterable of measurements.
+ We assume that the data is updated less frequently than it is reported to
+ Prometheus, and optimise for that case.
"""
- name = attr.ib()
- data_collector = attr.ib()
- buckets = attr.ib()
+ __slots__ = ("_name", "_documentation", "_bucket_bounds", "_metric")
- def collect(self):
+ def __init__(
+ self,
+ name: str,
+ documentation: str,
+ buckets: Iterable[float],
+ registry=REGISTRY,
+ ):
+ """
+ Args:
+ name: base name of metric to be exported to Prometheus. (a _bucket suffix
+ will be added.)
+ documentation: help text for the metric
+ buckets: The top bounds of the buckets to report
+ registry: metric registry to register with
+ """
+ self._name = name
+ self._documentation = documentation
- # Fetch the data -- this must be synchronous!
- data = self.data_collector()
+ # the tops of the buckets
+ self._bucket_bounds = [float(b) for b in buckets]
+ if self._bucket_bounds != sorted(self._bucket_bounds):
+ raise ValueError("Buckets not in sorted order")
- buckets = {} # type: Dict[float, int]
+ if self._bucket_bounds[-1] != float("inf"):
+ self._bucket_bounds.append(float("inf"))
- res = []
- for x in data.keys():
- for i, bound in enumerate(self.buckets):
- if x <= bound:
- buckets[bound] = buckets.get(bound, 0) + data[x]
+ self._metric = self._values_to_metric([])
+ registry.register(self)
- for i in self.buckets:
- res.append([str(i), buckets.get(i, 0)])
+ def collect(self):
+ yield self._metric
- res.append(["+Inf", sum(data.values())])
+ def update_data(self, values: Iterable[float]):
+ """Update the data to be reported by the metric
- metric = HistogramMetricFamily(
- self.name, "", buckets=res, sum_value=sum(x * y for x, y in data.items())
+ The existing data is cleared, and each measurement in the input is assigned
+ to the relevant bucket.
+ """
+ self._metric = self._values_to_metric(values)
+
+ def _values_to_metric(self, values: Iterable[float]) -> GaugeHistogramMetricFamily:
+ total = 0.0
+ bucket_values = [0 for _ in self._bucket_bounds]
+
+ for v in values:
+ # assign each value to a bucket
+ for i, bound in enumerate(self._bucket_bounds):
+ if v <= bound:
+ bucket_values[i] += 1
+ break
+
+ # ... and increment the sum
+ total += v
+
+ # now, aggregate the bucket values so that they count the number of entries in
+ # that bucket or below.
+ accumulated_values = itertools.accumulate(bucket_values)
+
+ return GaugeHistogramMetricFamily(
+ self._name,
+ self._documentation,
+ buckets=list(
+ zip((str(b) for b in self._bucket_bounds), accumulated_values)
+ ),
+ gsum_value=total,
)
- yield metric
-
- def __attrs_post_init__(self):
- self.buckets = [float(x) for x in self.buckets if x != "+Inf"]
- if self.buckets != sorted(self.buckets):
- raise ValueError("Buckets not sorted")
-
- self.buckets = tuple(self.buckets)
-
- if self.name in all_gauges.keys():
- logger.warning("%s already registered, reregistering" % (self.name,))
- REGISTRY.unregister(all_gauges.pop(self.name))
-
- REGISTRY.register(self)
- all_gauges[self.name] = self
#
diff --git a/synapse/storage/databases/main/metrics.py b/synapse/storage/databases/main/metrics.py
index 686052bd83..4efc093b9e 100644
--- a/synapse/storage/databases/main/metrics.py
+++ b/synapse/storage/databases/main/metrics.py
@@ -12,10 +12,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-import typing
-from collections import Counter
-from synapse.metrics import BucketCollector
+from synapse.metrics import GaugeBucketCollector
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import DatabasePool
@@ -23,6 +21,14 @@ from synapse.storage.databases.main.event_push_actions import (
EventPushActionsWorkerStore,
)
+# Collect metrics on the number of forward extremities that exist.
+_extremities_collecter = GaugeBucketCollector(
+ "synapse_forward_extremities",
+ "Number of rooms on the server with the given number of forward extremities"
+ " or fewer",
+ buckets=[1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500],
+)
+
class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
"""Functions to pull various metrics from the DB, for e.g. phone home
@@ -32,18 +38,6 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
def __init__(self, database: DatabasePool, db_conn, hs):
super().__init__(database, db_conn, hs)
- # Collect metrics on the number of forward extremities that exist.
- # Counter of number of extremities to count
- self._current_forward_extremities_amount = (
- Counter()
- ) # type: typing.Counter[int]
-
- BucketCollector(
- "synapse_forward_extremities",
- lambda: self._current_forward_extremities_amount,
- buckets=[1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"],
- )
-
# Read the extrems every 60 minutes
def read_forward_extremities():
# run as a background process to make sure that the database transactions
@@ -65,7 +59,7 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
return txn.fetchall()
res = await self.db_pool.runInteraction("read_forward_extremities", fetch)
- self._current_forward_extremities_amount = Counter([x[0] for x in res])
+ _extremities_collecter.update_data(x[0] for x in res)
async def count_daily_messages(self):
"""
diff --git a/tests/storage/test_event_metrics.py b/tests/storage/test_event_metrics.py
index 949846fe33..3957471f3f 100644
--- a/tests/storage/test_event_metrics.py
+++ b/tests/storage/test_event_metrics.py
@@ -52,14 +52,14 @@ class ExtremStatisticsTestCase(HomeserverTestCase):
self.reactor.advance(60 * 60 * 1000)
self.pump(1)
- items = set(
+ items = list(
filter(
lambda x: b"synapse_forward_extremities_" in x,
- generate_latest(REGISTRY).split(b"\n"),
+ generate_latest(REGISTRY, emit_help=False).split(b"\n"),
)
)
- expected = {
+ expected = [
b'synapse_forward_extremities_bucket{le="1.0"} 0.0',
b'synapse_forward_extremities_bucket{le="2.0"} 2.0',
b'synapse_forward_extremities_bucket{le="3.0"} 2.0',
@@ -72,9 +72,12 @@ class ExtremStatisticsTestCase(HomeserverTestCase):
b'synapse_forward_extremities_bucket{le="100.0"} 3.0',
b'synapse_forward_extremities_bucket{le="200.0"} 3.0',
b'synapse_forward_extremities_bucket{le="500.0"} 3.0',
- b'synapse_forward_extremities_bucket{le="+Inf"} 3.0',
- b"synapse_forward_extremities_count 3.0",
- b"synapse_forward_extremities_sum 10.0",
- }
-
+ # per https://docs.google.com/document/d/1KwV0mAXwwbvvifBvDKH_LU1YjyXE_wxCkHNoCGq1GX0/edit#heading=h.wghdjzzh72j9,
+ # "inf" is valid: "this includes variants such as inf"
+ b'synapse_forward_extremities_bucket{le="inf"} 3.0',
+ b"# TYPE synapse_forward_extremities_gcount gauge",
+ b"synapse_forward_extremities_gcount 3.0",
+ b"# TYPE synapse_forward_extremities_gsum gauge",
+ b"synapse_forward_extremities_gsum 10.0",
+ ]
self.assertEqual(items, expected)
|