summary refs log tree commit diff
diff options
context:
space:
mode:
authorRichard van der Hoff <1389908+richvdh@users.noreply.github.com>2020-09-30 19:33:27 +0100
committerGitHub <noreply@github.com>2020-09-30 19:33:27 +0100
commita0a1ba697325601e1b1edf81a722b2354680b2c6 (patch)
treecf4916637fe948f8d2b3a7783e968f06b2b55f74
parentAllow additional SSO properties to be passed to the client (#8413) (diff)
parentchangelog (diff)
downloadsynapse-a0a1ba697325601e1b1edf81a722b2354680b2c6.tar.xz
Merge pull request #8425 from matrix-org/rav/extremity_metrics
Add an improved "forward extremities" metric
-rw-r--r--changelog.d/8425.feature1
-rw-r--r--synapse/metrics/__init__.py115
-rw-r--r--synapse/metrics/_exposition.py40
-rw-r--r--synapse/storage/databases/main/metrics.py53
-rw-r--r--tests/storage/test_event_metrics.py19
5 files changed, 144 insertions, 84 deletions
diff --git a/changelog.d/8425.feature b/changelog.d/8425.feature
new file mode 100644
index 0000000000..b4ee5bb74b
--- /dev/null
+++ b/changelog.d/8425.feature
@@ -0,0 +1 @@
+Add experimental prometheus metric to track numbers of "large" rooms for state resolutiom.
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index a1f7ca3449..b8d2a8e8a9 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -15,6 +15,7 @@
 
 import functools
 import gc
+import itertools
 import logging
 import os
 import platform
@@ -27,8 +28,8 @@ from prometheus_client import Counter, Gauge, Histogram
 from prometheus_client.core import (
     REGISTRY,
     CounterMetricFamily,
+    GaugeHistogramMetricFamily,
     GaugeMetricFamily,
-    HistogramMetricFamily,
 )
 
 from twisted.internet import reactor
@@ -46,7 +47,7 @@ logger = logging.getLogger(__name__)
 METRICS_PREFIX = "/_synapse/metrics"
 
 running_on_pypy = platform.python_implementation() == "PyPy"
-all_gauges = {}  # type: Dict[str, Union[LaterGauge, InFlightGauge, BucketCollector]]
+all_gauges = {}  # type: Dict[str, Union[LaterGauge, InFlightGauge]]
 
 HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
 
@@ -205,63 +206,83 @@ class InFlightGauge:
         all_gauges[self.name] = self
 
 
-@attr.s(slots=True, hash=True)
-class BucketCollector:
-    """
-    Like a Histogram, but allows buckets to be point-in-time instead of
-    incrementally added to.
+class GaugeBucketCollector:
+    """Like a Histogram, but the buckets are Gauges which are updated atomically.
 
-    Args:
-        name (str): Base name of metric to be exported to Prometheus.
-        data_collector (callable -> dict): A synchronous callable that
-            returns a dict mapping bucket to number of items in the
-            bucket. If these buckets are not the same as the buckets
-            given to this class, they will be remapped into them.
-        buckets (list[float]): List of floats/ints of the buckets to
-            give to Prometheus. +Inf is ignored, if given.
+    The data is updated by calling `update_data` with an iterable of measurements.
 
+    We assume that the data is updated less frequently than it is reported to
+    Prometheus, and optimise for that case.
     """
 
-    name = attr.ib()
-    data_collector = attr.ib()
-    buckets = attr.ib()
+    __slots__ = ("_name", "_documentation", "_bucket_bounds", "_metric")
 
-    def collect(self):
+    def __init__(
+        self,
+        name: str,
+        documentation: str,
+        buckets: Iterable[float],
+        registry=REGISTRY,
+    ):
+        """
+        Args:
+            name: base name of metric to be exported to Prometheus. (a _bucket suffix
+               will be added.)
+            documentation: help text for the metric
+            buckets: The top bounds of the buckets to report
+            registry: metric registry to register with
+        """
+        self._name = name
+        self._documentation = documentation
 
-        # Fetch the data -- this must be synchronous!
-        data = self.data_collector()
+        # the tops of the buckets
+        self._bucket_bounds = [float(b) for b in buckets]
+        if self._bucket_bounds != sorted(self._bucket_bounds):
+            raise ValueError("Buckets not in sorted order")
 
-        buckets = {}  # type: Dict[float, int]
+        if self._bucket_bounds[-1] != float("inf"):
+            self._bucket_bounds.append(float("inf"))
 
-        res = []
-        for x in data.keys():
-            for i, bound in enumerate(self.buckets):
-                if x <= bound:
-                    buckets[bound] = buckets.get(bound, 0) + data[x]
+        self._metric = self._values_to_metric([])
+        registry.register(self)
 
-        for i in self.buckets:
-            res.append([str(i), buckets.get(i, 0)])
+    def collect(self):
+        yield self._metric
 
-        res.append(["+Inf", sum(data.values())])
+    def update_data(self, values: Iterable[float]):
+        """Update the data to be reported by the metric
 
-        metric = HistogramMetricFamily(
-            self.name, "", buckets=res, sum_value=sum(x * y for x, y in data.items())
+        The existing data is cleared, and each measurement in the input is assigned
+        to the relevant bucket.
+        """
+        self._metric = self._values_to_metric(values)
+
+    def _values_to_metric(self, values: Iterable[float]) -> GaugeHistogramMetricFamily:
+        total = 0.0
+        bucket_values = [0 for _ in self._bucket_bounds]
+
+        for v in values:
+            # assign each value to a bucket
+            for i, bound in enumerate(self._bucket_bounds):
+                if v <= bound:
+                    bucket_values[i] += 1
+                    break
+
+            # ... and increment the sum
+            total += v
+
+        # now, aggregate the bucket values so that they count the number of entries in
+        # that bucket or below.
+        accumulated_values = itertools.accumulate(bucket_values)
+
+        return GaugeHistogramMetricFamily(
+            self._name,
+            self._documentation,
+            buckets=list(
+                zip((str(b) for b in self._bucket_bounds), accumulated_values)
+            ),
+            gsum_value=total,
         )
-        yield metric
-
-    def __attrs_post_init__(self):
-        self.buckets = [float(x) for x in self.buckets if x != "+Inf"]
-        if self.buckets != sorted(self.buckets):
-            raise ValueError("Buckets not sorted")
-
-        self.buckets = tuple(self.buckets)
-
-        if self.name in all_gauges.keys():
-            logger.warning("%s already registered, reregistering" % (self.name,))
-            REGISTRY.unregister(all_gauges.pop(self.name))
-
-        REGISTRY.register(self)
-        all_gauges[self.name] = self
 
 
 #
diff --git a/synapse/metrics/_exposition.py b/synapse/metrics/_exposition.py
index c6457ba450..734271e765 100644
--- a/synapse/metrics/_exposition.py
+++ b/synapse/metrics/_exposition.py
@@ -26,6 +26,7 @@ import math
 import threading
 from http.server import BaseHTTPRequestHandler, HTTPServer
 from socketserver import ThreadingMixIn
+from typing import Dict, List
 from urllib.parse import parse_qs, urlparse
 
 from prometheus_client import REGISTRY
@@ -124,16 +125,33 @@ def generate_latest(registry, emit_help=False):
                 )
             )
         output.append("# TYPE {0} {1}\n".format(mname, mtype))
-        for sample in metric.samples:
-            # Get rid of the OpenMetrics specific samples
+
+        om_samples = {}  # type: Dict[str, List[str]]
+        for s in metric.samples:
             for suffix in ["_created", "_gsum", "_gcount"]:
-                if sample.name.endswith(suffix):
+                if s.name == metric.name + suffix:
+                    # OpenMetrics specific sample, put in a gauge at the end.
+                    # (these come from gaugehistograms which don't get renamed,
+                    # so no need to faff with mnewname)
+                    om_samples.setdefault(suffix, []).append(sample_line(s, s.name))
                     break
             else:
-                newname = sample.name.replace(mnewname, mname)
+                newname = s.name.replace(mnewname, mname)
                 if ":" in newname and newname.endswith("_total"):
                     newname = newname[: -len("_total")]
-                output.append(sample_line(sample, newname))
+                output.append(sample_line(s, newname))
+
+        for suffix, lines in sorted(om_samples.items()):
+            if emit_help:
+                output.append(
+                    "# HELP {0}{1} {2}\n".format(
+                        metric.name,
+                        suffix,
+                        metric.documentation.replace("\\", r"\\").replace("\n", r"\n"),
+                    )
+                )
+            output.append("# TYPE {0}{1} gauge\n".format(metric.name, suffix))
+            output.extend(lines)
 
         # Get rid of the weird colon things while we're at it
         if mtype == "counter":
@@ -152,16 +170,16 @@ def generate_latest(registry, emit_help=False):
                 )
             )
         output.append("# TYPE {0} {1}\n".format(mnewname, mtype))
-        for sample in metric.samples:
-            # Get rid of the OpenMetrics specific samples
+
+        for s in metric.samples:
+            # Get rid of the OpenMetrics specific samples (we should already have
+            # dealt with them above anyway.)
             for suffix in ["_created", "_gsum", "_gcount"]:
-                if sample.name.endswith(suffix):
+                if s.name == metric.name + suffix:
                     break
             else:
                 output.append(
-                    sample_line(
-                        sample, sample.name.replace(":total", "").replace(":", "_")
-                    )
+                    sample_line(s, s.name.replace(":total", "").replace(":", "_"))
                 )
 
     return "".join(output).encode("utf-8")
diff --git a/synapse/storage/databases/main/metrics.py b/synapse/storage/databases/main/metrics.py
index 686052bd83..92099f95ce 100644
--- a/synapse/storage/databases/main/metrics.py
+++ b/synapse/storage/databases/main/metrics.py
@@ -12,10 +12,8 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-import typing
-from collections import Counter
 
-from synapse.metrics import BucketCollector
+from synapse.metrics import GaugeBucketCollector
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage._base import SQLBaseStore
 from synapse.storage.database import DatabasePool
@@ -23,6 +21,26 @@ from synapse.storage.databases.main.event_push_actions import (
     EventPushActionsWorkerStore,
 )
 
+# Collect metrics on the number of forward extremities that exist.
+_extremities_collecter = GaugeBucketCollector(
+    "synapse_forward_extremities",
+    "Number of rooms on the server with the given number of forward extremities"
+    " or fewer",
+    buckets=[1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500],
+)
+
+# we also expose metrics on the "number of excess extremity events", which is
+# (E-1)*N, where E is the number of extremities and N is the number of state
+# events in the room. This is an approximation to the number of state events
+# we could remove from state resolution by reducing the graph to a single
+# forward extremity.
+_excess_state_events_collecter = GaugeBucketCollector(
+    "synapse_excess_extremity_events",
+    "Number of rooms on the server with the given number of excess extremity "
+    "events, or fewer",
+    buckets=[0] + [1 << n for n in range(12)],
+)
+
 
 class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
     """Functions to pull various metrics from the DB, for e.g. phone home
@@ -32,18 +50,6 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
     def __init__(self, database: DatabasePool, db_conn, hs):
         super().__init__(database, db_conn, hs)
 
-        # Collect metrics on the number of forward extremities that exist.
-        # Counter of number of extremities to count
-        self._current_forward_extremities_amount = (
-            Counter()
-        )  # type: typing.Counter[int]
-
-        BucketCollector(
-            "synapse_forward_extremities",
-            lambda: self._current_forward_extremities_amount,
-            buckets=[1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"],
-        )
-
         # Read the extrems every 60 minutes
         def read_forward_extremities():
             # run as a background process to make sure that the database transactions
@@ -58,14 +64,25 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
         def fetch(txn):
             txn.execute(
                 """
-                select count(*) c from event_forward_extremities
-                group by room_id
+                SELECT t1.c, t2.c
+                FROM (
+                    SELECT room_id, COUNT(*) c FROM event_forward_extremities
+                    GROUP BY room_id
+                ) t1 LEFT JOIN (
+                    SELECT room_id, COUNT(*) c FROM current_state_events
+                    GROUP BY room_id
+                ) t2 ON t1.room_id = t2.room_id
                 """
             )
             return txn.fetchall()
 
         res = await self.db_pool.runInteraction("read_forward_extremities", fetch)
-        self._current_forward_extremities_amount = Counter([x[0] for x in res])
+
+        _extremities_collecter.update_data(x[0] for x in res)
+
+        _excess_state_events_collecter.update_data(
+            (x[0] - 1) * x[1] for x in res if x[1]
+        )
 
     async def count_daily_messages(self):
         """
diff --git a/tests/storage/test_event_metrics.py b/tests/storage/test_event_metrics.py
index 949846fe33..3957471f3f 100644
--- a/tests/storage/test_event_metrics.py
+++ b/tests/storage/test_event_metrics.py
@@ -52,14 +52,14 @@ class ExtremStatisticsTestCase(HomeserverTestCase):
         self.reactor.advance(60 * 60 * 1000)
         self.pump(1)
 
-        items = set(
+        items = list(
             filter(
                 lambda x: b"synapse_forward_extremities_" in x,
-                generate_latest(REGISTRY).split(b"\n"),
+                generate_latest(REGISTRY, emit_help=False).split(b"\n"),
             )
         )
 
-        expected = {
+        expected = [
             b'synapse_forward_extremities_bucket{le="1.0"} 0.0',
             b'synapse_forward_extremities_bucket{le="2.0"} 2.0',
             b'synapse_forward_extremities_bucket{le="3.0"} 2.0',
@@ -72,9 +72,12 @@ class ExtremStatisticsTestCase(HomeserverTestCase):
             b'synapse_forward_extremities_bucket{le="100.0"} 3.0',
             b'synapse_forward_extremities_bucket{le="200.0"} 3.0',
             b'synapse_forward_extremities_bucket{le="500.0"} 3.0',
-            b'synapse_forward_extremities_bucket{le="+Inf"} 3.0',
-            b"synapse_forward_extremities_count 3.0",
-            b"synapse_forward_extremities_sum 10.0",
-        }
-
+            # per https://docs.google.com/document/d/1KwV0mAXwwbvvifBvDKH_LU1YjyXE_wxCkHNoCGq1GX0/edit#heading=h.wghdjzzh72j9,
+            # "inf" is valid: "this includes variants such as inf"
+            b'synapse_forward_extremities_bucket{le="inf"} 3.0',
+            b"# TYPE synapse_forward_extremities_gcount gauge",
+            b"synapse_forward_extremities_gcount 3.0",
+            b"# TYPE synapse_forward_extremities_gsum gauge",
+            b"synapse_forward_extremities_gsum 10.0",
+        ]
         self.assertEqual(items, expected)