From aec294ee0d0f2fa4ccef57085d670b8939de3669 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 14 Sep 2020 12:50:06 -0400 Subject: Use slots in attrs classes where possible (#8296) slots use less memory (and attribute access is faster) while slightly limiting the flexibility of the class attributes. This focuses on objects which are instantiated "often" and for short periods of time. --- synapse/metrics/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/metrics') diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 2643380d9e..a1f7ca3449 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -59,7 +59,7 @@ class RegistryProxy: yield metric -@attr.s(hash=True) +@attr.s(slots=True, hash=True) class LaterGauge: name = attr.ib(type=str) @@ -205,7 +205,7 @@ class InFlightGauge: all_gauges[self.name] = self -@attr.s(hash=True) +@attr.s(slots=True, hash=True) class BucketCollector: """ Like a Histogram, but allows buckets to be point-in-time instead of -- cgit 1.5.1 From ceafb5a1c61f699d659b1b38577b1c2264721e28 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Wed, 30 Sep 2020 16:42:05 +0100 Subject: Drop support for ancient prometheus_client (#8426) Drop compatibility hacks for prometheus-client pre 0.4.0. Debian stretch and Fedora 31 both have newer versions, so hopefully this will be ok. --- changelog.d/8426.removal | 1 + synapse/metrics/_exposition.py | 24 ++---------------------- synapse/python_dependencies.py | 6 +++++- 3 files changed, 8 insertions(+), 23 deletions(-) create mode 100644 changelog.d/8426.removal (limited to 'synapse/metrics') diff --git a/changelog.d/8426.removal b/changelog.d/8426.removal new file mode 100644 index 0000000000..a56277fe7a --- /dev/null +++ b/changelog.d/8426.removal @@ -0,0 +1 @@ +Drop support for `prometheus_client` older than 0.4.0. diff --git a/synapse/metrics/_exposition.py b/synapse/metrics/_exposition.py index 4304c60d56..c6457ba450 100644 --- a/synapse/metrics/_exposition.py +++ b/synapse/metrics/_exposition.py @@ -24,7 +24,6 @@ expect, and the newer "best practice" version of the up-to-date official client. import math import threading -from collections import namedtuple from http.server import BaseHTTPRequestHandler, HTTPServer from socketserver import ThreadingMixIn from urllib.parse import parse_qs, urlparse @@ -35,14 +34,6 @@ from twisted.web.resource import Resource from synapse.util import caches -try: - from prometheus_client.samples import Sample -except ImportError: - Sample = namedtuple( # type: ignore[no-redef] # noqa - "Sample", ["name", "labels", "value", "timestamp", "exemplar"] - ) - - CONTENT_TYPE_LATEST = str("text/plain; version=0.0.4; charset=utf-8") @@ -93,17 +84,6 @@ def sample_line(line, name): ) -def nameify_sample(sample): - """ - If we get a prometheus_client<0.4.0 sample as a tuple, transform it into a - namedtuple which has the names we expect. - """ - if not isinstance(sample, Sample): - sample = Sample(*sample, None, None) - - return sample - - def generate_latest(registry, emit_help=False): # Trigger the cache metrics to be rescraped, which updates the common @@ -144,7 +124,7 @@ def generate_latest(registry, emit_help=False): ) ) output.append("# TYPE {0} {1}\n".format(mname, mtype)) - for sample in map(nameify_sample, metric.samples): + for sample in metric.samples: # Get rid of the OpenMetrics specific samples for suffix in ["_created", "_gsum", "_gcount"]: if sample.name.endswith(suffix): @@ -172,7 +152,7 @@ def generate_latest(registry, emit_help=False): ) ) output.append("# TYPE {0} {1}\n".format(mnewname, mtype)) - for sample in map(nameify_sample, metric.samples): + for sample in metric.samples: # Get rid of the OpenMetrics specific samples for suffix in ["_created", "_gsum", "_gcount"]: if sample.name.endswith(suffix): diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index 288631477e..0ddead8a0f 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -68,7 +68,11 @@ REQUIREMENTS = [ "pymacaroons>=0.13.0", "msgpack>=0.5.2", "phonenumbers>=8.2.0", - "prometheus_client>=0.0.18,<0.9.0", + # we use GaugeHistogramMetric, which was added in prom-client 0.4.0. + # prom-client has a history of breaking backwards compatibility between + # minor versions (https://github.com/prometheus/client_python/issues/317), + # so we also pin the minor version. + "prometheus_client>=0.4.0,<0.9.0", # we use attr.validators.deep_iterable, which arrived in 19.1.0 (Note: # Fedora 31 only has 19.1, so if we want to upgrade we should wait until 33 # is out in November.) -- cgit 1.5.1 From 1c8ca2c54363dc09744f9618f30181f015e63ffe Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 30 Sep 2020 16:44:10 +0100 Subject: Fix _exposition.py to stop stripping samples Our hacked-up `_exposition.py` was stripping out some samples it shouldn't have been. Put them back in, to more closely match the upstream `exposition.py`. --- synapse/metrics/_exposition.py | 40 +++++++++++++++++++++++++++++----------- 1 file changed, 29 insertions(+), 11 deletions(-) (limited to 'synapse/metrics') diff --git a/synapse/metrics/_exposition.py b/synapse/metrics/_exposition.py index c6457ba450..734271e765 100644 --- a/synapse/metrics/_exposition.py +++ b/synapse/metrics/_exposition.py @@ -26,6 +26,7 @@ import math import threading from http.server import BaseHTTPRequestHandler, HTTPServer from socketserver import ThreadingMixIn +from typing import Dict, List from urllib.parse import parse_qs, urlparse from prometheus_client import REGISTRY @@ -124,16 +125,33 @@ def generate_latest(registry, emit_help=False): ) ) output.append("# TYPE {0} {1}\n".format(mname, mtype)) - for sample in metric.samples: - # Get rid of the OpenMetrics specific samples + + om_samples = {} # type: Dict[str, List[str]] + for s in metric.samples: for suffix in ["_created", "_gsum", "_gcount"]: - if sample.name.endswith(suffix): + if s.name == metric.name + suffix: + # OpenMetrics specific sample, put in a gauge at the end. + # (these come from gaugehistograms which don't get renamed, + # so no need to faff with mnewname) + om_samples.setdefault(suffix, []).append(sample_line(s, s.name)) break else: - newname = sample.name.replace(mnewname, mname) + newname = s.name.replace(mnewname, mname) if ":" in newname and newname.endswith("_total"): newname = newname[: -len("_total")] - output.append(sample_line(sample, newname)) + output.append(sample_line(s, newname)) + + for suffix, lines in sorted(om_samples.items()): + if emit_help: + output.append( + "# HELP {0}{1} {2}\n".format( + metric.name, + suffix, + metric.documentation.replace("\\", r"\\").replace("\n", r"\n"), + ) + ) + output.append("# TYPE {0}{1} gauge\n".format(metric.name, suffix)) + output.extend(lines) # Get rid of the weird colon things while we're at it if mtype == "counter": @@ -152,16 +170,16 @@ def generate_latest(registry, emit_help=False): ) ) output.append("# TYPE {0} {1}\n".format(mnewname, mtype)) - for sample in metric.samples: - # Get rid of the OpenMetrics specific samples + + for s in metric.samples: + # Get rid of the OpenMetrics specific samples (we should already have + # dealt with them above anyway.) for suffix in ["_created", "_gsum", "_gcount"]: - if sample.name.endswith(suffix): + if s.name == metric.name + suffix: break else: output.append( - sample_line( - sample, sample.name.replace(":total", "").replace(":", "_") - ) + sample_line(s, s.name.replace(":total", "").replace(":", "_")) ) return "".join(output).encode("utf-8") -- cgit 1.5.1 From 6d2d42f8fb04599713d3e6e7fc3bc4c9b7063c9a Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 29 Sep 2020 22:26:28 +0100 Subject: Rewrite BucketCollector This was a bit unweildy for what I wanted: in particular, I wanted to assign each measurement straight into a bucket, rather than storing an intermediate Counter which didn't do any bucketing at all. I've replaced it with something that is hopefully a bit easier to use. (I'm not entirely sure what the difference between a HistogramMetricFamily and a GaugeHistogramMetricFamily is, but given our counters can go down as well as up the latter *sounds* more accurate?) --- synapse/metrics/__init__.py | 115 ++++++++++++++++++------------ synapse/storage/databases/main/metrics.py | 26 +++---- tests/storage/test_event_metrics.py | 19 ++--- 3 files changed, 89 insertions(+), 71 deletions(-) (limited to 'synapse/metrics') diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index a1f7ca3449..b8d2a8e8a9 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -15,6 +15,7 @@ import functools import gc +import itertools import logging import os import platform @@ -27,8 +28,8 @@ from prometheus_client import Counter, Gauge, Histogram from prometheus_client.core import ( REGISTRY, CounterMetricFamily, + GaugeHistogramMetricFamily, GaugeMetricFamily, - HistogramMetricFamily, ) from twisted.internet import reactor @@ -46,7 +47,7 @@ logger = logging.getLogger(__name__) METRICS_PREFIX = "/_synapse/metrics" running_on_pypy = platform.python_implementation() == "PyPy" -all_gauges = {} # type: Dict[str, Union[LaterGauge, InFlightGauge, BucketCollector]] +all_gauges = {} # type: Dict[str, Union[LaterGauge, InFlightGauge]] HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat") @@ -205,63 +206,83 @@ class InFlightGauge: all_gauges[self.name] = self -@attr.s(slots=True, hash=True) -class BucketCollector: - """ - Like a Histogram, but allows buckets to be point-in-time instead of - incrementally added to. +class GaugeBucketCollector: + """Like a Histogram, but the buckets are Gauges which are updated atomically. - Args: - name (str): Base name of metric to be exported to Prometheus. - data_collector (callable -> dict): A synchronous callable that - returns a dict mapping bucket to number of items in the - bucket. If these buckets are not the same as the buckets - given to this class, they will be remapped into them. - buckets (list[float]): List of floats/ints of the buckets to - give to Prometheus. +Inf is ignored, if given. + The data is updated by calling `update_data` with an iterable of measurements. + We assume that the data is updated less frequently than it is reported to + Prometheus, and optimise for that case. """ - name = attr.ib() - data_collector = attr.ib() - buckets = attr.ib() + __slots__ = ("_name", "_documentation", "_bucket_bounds", "_metric") - def collect(self): + def __init__( + self, + name: str, + documentation: str, + buckets: Iterable[float], + registry=REGISTRY, + ): + """ + Args: + name: base name of metric to be exported to Prometheus. (a _bucket suffix + will be added.) + documentation: help text for the metric + buckets: The top bounds of the buckets to report + registry: metric registry to register with + """ + self._name = name + self._documentation = documentation - # Fetch the data -- this must be synchronous! - data = self.data_collector() + # the tops of the buckets + self._bucket_bounds = [float(b) for b in buckets] + if self._bucket_bounds != sorted(self._bucket_bounds): + raise ValueError("Buckets not in sorted order") - buckets = {} # type: Dict[float, int] + if self._bucket_bounds[-1] != float("inf"): + self._bucket_bounds.append(float("inf")) - res = [] - for x in data.keys(): - for i, bound in enumerate(self.buckets): - if x <= bound: - buckets[bound] = buckets.get(bound, 0) + data[x] + self._metric = self._values_to_metric([]) + registry.register(self) - for i in self.buckets: - res.append([str(i), buckets.get(i, 0)]) + def collect(self): + yield self._metric - res.append(["+Inf", sum(data.values())]) + def update_data(self, values: Iterable[float]): + """Update the data to be reported by the metric - metric = HistogramMetricFamily( - self.name, "", buckets=res, sum_value=sum(x * y for x, y in data.items()) + The existing data is cleared, and each measurement in the input is assigned + to the relevant bucket. + """ + self._metric = self._values_to_metric(values) + + def _values_to_metric(self, values: Iterable[float]) -> GaugeHistogramMetricFamily: + total = 0.0 + bucket_values = [0 for _ in self._bucket_bounds] + + for v in values: + # assign each value to a bucket + for i, bound in enumerate(self._bucket_bounds): + if v <= bound: + bucket_values[i] += 1 + break + + # ... and increment the sum + total += v + + # now, aggregate the bucket values so that they count the number of entries in + # that bucket or below. + accumulated_values = itertools.accumulate(bucket_values) + + return GaugeHistogramMetricFamily( + self._name, + self._documentation, + buckets=list( + zip((str(b) for b in self._bucket_bounds), accumulated_values) + ), + gsum_value=total, ) - yield metric - - def __attrs_post_init__(self): - self.buckets = [float(x) for x in self.buckets if x != "+Inf"] - if self.buckets != sorted(self.buckets): - raise ValueError("Buckets not sorted") - - self.buckets = tuple(self.buckets) - - if self.name in all_gauges.keys(): - logger.warning("%s already registered, reregistering" % (self.name,)) - REGISTRY.unregister(all_gauges.pop(self.name)) - - REGISTRY.register(self) - all_gauges[self.name] = self # diff --git a/synapse/storage/databases/main/metrics.py b/synapse/storage/databases/main/metrics.py index 686052bd83..4efc093b9e 100644 --- a/synapse/storage/databases/main/metrics.py +++ b/synapse/storage/databases/main/metrics.py @@ -12,10 +12,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import typing -from collections import Counter -from synapse.metrics import BucketCollector +from synapse.metrics import GaugeBucketCollector from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage._base import SQLBaseStore from synapse.storage.database import DatabasePool @@ -23,6 +21,14 @@ from synapse.storage.databases.main.event_push_actions import ( EventPushActionsWorkerStore, ) +# Collect metrics on the number of forward extremities that exist. +_extremities_collecter = GaugeBucketCollector( + "synapse_forward_extremities", + "Number of rooms on the server with the given number of forward extremities" + " or fewer", + buckets=[1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500], +) + class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore): """Functions to pull various metrics from the DB, for e.g. phone home @@ -32,18 +38,6 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore): def __init__(self, database: DatabasePool, db_conn, hs): super().__init__(database, db_conn, hs) - # Collect metrics on the number of forward extremities that exist. - # Counter of number of extremities to count - self._current_forward_extremities_amount = ( - Counter() - ) # type: typing.Counter[int] - - BucketCollector( - "synapse_forward_extremities", - lambda: self._current_forward_extremities_amount, - buckets=[1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"], - ) - # Read the extrems every 60 minutes def read_forward_extremities(): # run as a background process to make sure that the database transactions @@ -65,7 +59,7 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore): return txn.fetchall() res = await self.db_pool.runInteraction("read_forward_extremities", fetch) - self._current_forward_extremities_amount = Counter([x[0] for x in res]) + _extremities_collecter.update_data(x[0] for x in res) async def count_daily_messages(self): """ diff --git a/tests/storage/test_event_metrics.py b/tests/storage/test_event_metrics.py index 949846fe33..3957471f3f 100644 --- a/tests/storage/test_event_metrics.py +++ b/tests/storage/test_event_metrics.py @@ -52,14 +52,14 @@ class ExtremStatisticsTestCase(HomeserverTestCase): self.reactor.advance(60 * 60 * 1000) self.pump(1) - items = set( + items = list( filter( lambda x: b"synapse_forward_extremities_" in x, - generate_latest(REGISTRY).split(b"\n"), + generate_latest(REGISTRY, emit_help=False).split(b"\n"), ) ) - expected = { + expected = [ b'synapse_forward_extremities_bucket{le="1.0"} 0.0', b'synapse_forward_extremities_bucket{le="2.0"} 2.0', b'synapse_forward_extremities_bucket{le="3.0"} 2.0', @@ -72,9 +72,12 @@ class ExtremStatisticsTestCase(HomeserverTestCase): b'synapse_forward_extremities_bucket{le="100.0"} 3.0', b'synapse_forward_extremities_bucket{le="200.0"} 3.0', b'synapse_forward_extremities_bucket{le="500.0"} 3.0', - b'synapse_forward_extremities_bucket{le="+Inf"} 3.0', - b"synapse_forward_extremities_count 3.0", - b"synapse_forward_extremities_sum 10.0", - } - + # per https://docs.google.com/document/d/1KwV0mAXwwbvvifBvDKH_LU1YjyXE_wxCkHNoCGq1GX0/edit#heading=h.wghdjzzh72j9, + # "inf" is valid: "this includes variants such as inf" + b'synapse_forward_extremities_bucket{le="inf"} 3.0', + b"# TYPE synapse_forward_extremities_gcount gauge", + b"synapse_forward_extremities_gcount 3.0", + b"# TYPE synapse_forward_extremities_gsum gauge", + b"synapse_forward_extremities_gsum 10.0", + ] self.assertEqual(items, expected) -- cgit 1.5.1