summary refs log tree commit diff
path: root/synapse/metrics
diff options
Diffstat (limited to 'synapse/metrics')
3 files changed, 300 insertions, 262 deletions
diff --git a/synapse/metrics/ b/synapse/metrics/
index ceef57ad88..9e6c1b2f3b 100644
--- a/synapse/metrics/
+++ b/synapse/metrics/
@@ -12,16 +12,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
-import functools
-import gc
 import itertools
 import logging
 import os
 import platform
 import threading
-import time
 from typing import (
-    Any,
@@ -34,35 +30,31 @@ from typing import (
-    cast,
 import attr
 from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram, Metric
 from prometheus_client.core import (
-    CounterMetricFamily,
-from twisted.internet import reactor
-from twisted.internet.base import ReactorBase
 from twisted.python.threadpool import ThreadPool
-import synapse
+import synapse.metrics._reactor_metrics
 from synapse.metrics._exposition import (
+from synapse.metrics._gc import MIN_TIME_BETWEEN_GCS, install_gc_manager
 from synapse.util.versionstring import get_version_string
 logger = logging.getLogger(__name__)
 METRICS_PREFIX = "/_synapse/metrics"
-running_on_pypy = platform.python_implementation() == "PyPy"
 all_gauges: "Dict[str, Union[LaterGauge, InFlightGauge]]" = {}
 HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
@@ -76,19 +68,17 @@ class RegistryProxy:
                 yield metric
-@attr.s(slots=True, hash=True)
+@attr.s(slots=True, hash=True, auto_attribs=True)
 class LaterGauge:
-    name = attr.ib(type=str)
-    desc = attr.ib(type=str)
-    labels = attr.ib(hash=False, type=Optional[Iterable[str]])
+    name: str
+    desc: str
+    labels: Optional[Iterable[str]] = attr.ib(hash=False)
     # callback: should either return a value (if there are no labels for this metric),
     # or dict mapping from a label tuple to a value
-    caller = attr.ib(
-        type=Callable[
-            [], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
-        ]
-    )
+    caller: Callable[
+        [], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
+    ]
     def collect(self) -> Iterable[Metric]:
@@ -157,7 +147,9 @@ class InFlightGauge(Generic[MetricsEntry]):
         # Create a class which have the sub_metrics values as attributes, which
         # default to 0 on initialization. Used to pass to registered callbacks.
         self._metrics_class: Type[MetricsEntry] = attr.make_class(
-            "_MetricsEntry", attrs={x: attr.ib(0) for x in sub_metrics}, slots=True
+            "_MetricsEntry",
+            attrs={x: attr.ib(default=0) for x in sub_metrics},
+            slots=True,
         # Counts number of in flight blocks for a given set of label values
@@ -369,136 +361,6 @@ class CPUMetrics:
-# Python GC metrics
-gc_unreachable = Gauge("python_gc_unreachable_total", "Unreachable GC objects", ["gen"])
-gc_time = Histogram(
-    "python_gc_time",
-    "Time taken to GC (sec)",
-    ["gen"],
-    buckets=[
-        0.0025,
-        0.005,
-        0.01,
-        0.025,
-        0.05,
-        0.10,
-        0.25,
-        0.50,
-        1.00,
-        2.50,
-        5.00,
-        7.50,
-        15.00,
-        30.00,
-        45.00,
-        60.00,
-    ],
-class GCCounts:
-    def collect(self) -> Iterable[Metric]:
-        cm = GaugeMetricFamily("python_gc_counts", "GC object counts", labels=["gen"])
-        for n, m in enumerate(gc.get_count()):
-            cm.add_metric([str(n)], m)
-        yield cm
-if not running_on_pypy:
-    REGISTRY.register(GCCounts())
-# PyPy GC / memory metrics
-class PyPyGCStats:
-    def collect(self) -> Iterable[Metric]:
-        # @stats is a pretty-printer object with __str__() returning a nice table,
-        # plus some fields that contain data from that table.
-        # unfortunately, fields are pretty-printed themselves (i. e. '4.5MB').
-        stats = gc.get_stats(memory_pressure=False)  # type: ignore
-        # @s contains same fields as @stats, but as actual integers.
-        s = stats._s  # type: ignore
-        # also note that field naming is completely braindead
-        # and only vaguely correlates with the pretty-printed table.
-        # >>>> gc.get_stats(False)
-        # Total memory consumed:
-        #     GC used:            8.7MB (peak: 39.0MB)        # s.total_gc_memory, s.peak_memory
-        #        in arenas:            3.0MB                  # s.total_arena_memory
-        #        rawmalloced:          1.7MB                  # s.total_rawmalloced_memory
-        #        nursery:              4.0MB                  # s.nursery_size
-        #     raw assembler used: 31.0kB                      # s.jit_backend_used
-        #     -----------------------------
-        #     Total:              8.8MB                       # stats.memory_used_sum
-        #
-        #     Total memory allocated:
-        #     GC allocated:            38.7MB (peak: 41.1MB)  # s.total_allocated_memory, s.peak_allocated_memory
-        #        in arenas:            30.9MB                 # s.peak_arena_memory
-        #        rawmalloced:          4.1MB                  # s.peak_rawmalloced_memory
-        #        nursery:              4.0MB                  # s.nursery_size
-        #     raw assembler allocated: 1.0MB                  # s.jit_backend_allocated
-        #     -----------------------------
-        #     Total:                   39.7MB                 # stats.memory_allocated_sum
-        #
-        #     Total time spent in GC:  0.073                  # s.total_gc_time
-        pypy_gc_time = CounterMetricFamily(
-            "pypy_gc_time_seconds_total",
-            "Total time spent in PyPy GC",
-            labels=[],
-        )
-        pypy_gc_time.add_metric([], s.total_gc_time / 1000)
-        yield pypy_gc_time
-        pypy_mem = GaugeMetricFamily(
-            "pypy_memory_bytes",
-            "Memory tracked by PyPy allocator",
-            labels=["state", "class", "kind"],
-        )
-        # memory used by JIT assembler
-        pypy_mem.add_metric(["used", "", "jit"], s.jit_backend_used)
-        pypy_mem.add_metric(["allocated", "", "jit"], s.jit_backend_allocated)
-        # memory used by GCed objects
-        pypy_mem.add_metric(["used", "", "arenas"], s.total_arena_memory)
-        pypy_mem.add_metric(["allocated", "", "arenas"], s.peak_arena_memory)
-        pypy_mem.add_metric(["used", "", "rawmalloced"], s.total_rawmalloced_memory)
-        pypy_mem.add_metric(["allocated", "", "rawmalloced"], s.peak_rawmalloced_memory)
-        pypy_mem.add_metric(["used", "", "nursery"], s.nursery_size)
-        pypy_mem.add_metric(["allocated", "", "nursery"], s.nursery_size)
-        # totals
-        pypy_mem.add_metric(["used", "totals", "gc"], s.total_gc_memory)
-        pypy_mem.add_metric(["allocated", "totals", "gc"], s.total_allocated_memory)
-        pypy_mem.add_metric(["used", "totals", "gc_peak"], s.peak_memory)
-        pypy_mem.add_metric(["allocated", "totals", "gc_peak"], s.peak_allocated_memory)
-        yield pypy_mem
-if running_on_pypy:
-    REGISTRY.register(PyPyGCStats())
-# Twisted reactor metrics
-tick_time = Histogram(
-    "python_twisted_reactor_tick_time",
-    "Tick time of the Twisted reactor (sec)",
-    buckets=[0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.2, 0.5, 1, 2, 5],
-pending_calls_metric = Histogram(
-    "python_twisted_reactor_pending_calls",
-    "Pending calls",
-    buckets=[1, 2, 5, 10, 25, 50, 100, 250, 500, 1000],
 # Federation Metrics
@@ -551,8 +413,6 @@ build_info.labels(
     " ".join([platform.system(), platform.release()]),
-last_ticked = time.time()
 # 3PID send info
 threepid_send_requests = Histogram(
@@ -600,116 +460,6 @@ def register_threadpool(name: str, threadpool: ThreadPool) -> None:
-class ReactorLastSeenMetric:
-    def collect(self) -> Iterable[Metric]:
-        cm = GaugeMetricFamily(
-            "python_twisted_reactor_last_seen",
-            "Seconds since the Twisted reactor was last seen",
-        )
-        cm.add_metric([], time.time() - last_ticked)
-        yield cm
-# The minimum time in seconds between GCs for each generation, regardless of the current GC
-# thresholds and counts.
-MIN_TIME_BETWEEN_GCS = (1.0, 10.0, 30.0)
-# The time (in seconds since the epoch) of the last time we did a GC for each generation.
-_last_gc = [0.0, 0.0, 0.0]
-F = TypeVar("F", bound=Callable[..., Any])
-def runUntilCurrentTimer(reactor: ReactorBase, func: F) -> F:
-    @functools.wraps(func)
-    def f(*args: Any, **kwargs: Any) -> Any:
-        now = reactor.seconds()
-        num_pending = 0
-        # _newTimedCalls is one long list of *all* pending calls. Below loop
-        # is based off of impl of reactor.runUntilCurrent
-        for delayed_call in reactor._newTimedCalls:
-            if delayed_call.time > now:
-                break
-            if delayed_call.delayed_time > 0:
-                continue
-            num_pending += 1
-        num_pending += len(reactor.threadCallQueue)
-        start = time.time()
-        ret = func(*args, **kwargs)
-        end = time.time()
-        # record the amount of wallclock time spent running pending calls.
-        # This is a proxy for the actual amount of time between reactor polls,
-        # since about 25% of time is actually spent running things triggered by
-        # I/O events, but that is harder to capture without rewriting half the
-        # reactor.
-        tick_time.observe(end - start)
-        pending_calls_metric.observe(num_pending)
-        # Update the time we last ticked, for the metric to test whether
-        # Synapse's reactor has frozen
-        global last_ticked
-        last_ticked = end
-        if running_on_pypy:
-            return ret
-        # Check if we need to do a manual GC (since its been disabled), and do
-        # one if necessary. Note we go in reverse order as e.g. a gen 1 GC may
-        # promote an object into gen 2, and we don't want to handle the same
-        # object multiple times.
-        threshold = gc.get_threshold()
-        counts = gc.get_count()
-        for i in (2, 1, 0):
-            # We check if we need to do one based on a straightforward
-            # comparison between the threshold and count. We also do an extra
-            # check to make sure that we don't a GC too often.
-            if threshold[i] < counts[i] and MIN_TIME_BETWEEN_GCS[i] < end - _last_gc[i]:
-                if i == 0:
-                    logger.debug("Collecting gc %d", i)
-                else:
-          "Collecting gc %d", i)
-                start = time.time()
-                unreachable = gc.collect(i)
-                end = time.time()
-                _last_gc[i] = end
-                gc_time.labels(i).observe(end - start)
-                gc_unreachable.labels(i).set(unreachable)
-        return ret
-    return cast(F, f)
-    # Ensure the reactor has all the attributes we expect
-    reactor.seconds  # type: ignore
-    reactor.runUntilCurrent  # type: ignore
-    reactor._newTimedCalls  # type: ignore
-    reactor.threadCallQueue  # type: ignore
-    # runUntilCurrent is called when we have pending calls. It is called once
-    # per iteratation after fd polling.
-    reactor.runUntilCurrent = runUntilCurrentTimer(reactor, reactor.runUntilCurrent)  # type: ignore
-    # We manually run the GC each reactor tick so that we can get some metrics
-    # about time spent doing GC,
-    if not running_on_pypy:
-        gc.disable()
-except AttributeError:
-    pass
 __all__ = [
@@ -717,4 +467,6 @@ __all__ = [
+    "install_gc_manager",
diff --git a/synapse/metrics/ b/synapse/metrics/
new file mode 100644
index 0000000000..2bc909efa0
--- /dev/null
+++ b/synapse/metrics/
@@ -0,0 +1,203 @@
+# Copyright 2015-2022 The Foundation C.I.C.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import gc
+import logging
+import platform
+import time
+from typing import Iterable
+from prometheus_client.core import (
+    CounterMetricFamily,
+    Gauge,
+    GaugeMetricFamily,
+    Histogram,
+    Metric,
+from twisted.internet import task
+"""Prometheus metrics for garbage collection"""
+logger = logging.getLogger(__name__)
+# The minimum time in seconds between GCs for each generation, regardless of the current GC
+# thresholds and counts.
+MIN_TIME_BETWEEN_GCS = (1.0, 10.0, 30.0)
+running_on_pypy = platform.python_implementation() == "PyPy"
+# Python GC metrics
+gc_unreachable = Gauge("python_gc_unreachable_total", "Unreachable GC objects", ["gen"])
+gc_time = Histogram(
+    "python_gc_time",
+    "Time taken to GC (sec)",
+    ["gen"],
+    buckets=[
+        0.0025,
+        0.005,
+        0.01,
+        0.025,
+        0.05,
+        0.10,
+        0.25,
+        0.50,
+        1.00,
+        2.50,
+        5.00,
+        7.50,
+        15.00,
+        30.00,
+        45.00,
+        60.00,
+    ],
+class GCCounts:
+    def collect(self) -> Iterable[Metric]:
+        cm = GaugeMetricFamily("python_gc_counts", "GC object counts", labels=["gen"])
+        for n, m in enumerate(gc.get_count()):
+            cm.add_metric([str(n)], m)
+        yield cm
+def install_gc_manager() -> None:
+    """Disable automatic GC, and replace it with a task that runs every 100ms
+    This means that (a) we can limit how often GC runs; (b) we can get some metrics
+    about GC activity.
+    It does nothing on PyPy.
+    """
+    if running_on_pypy:
+        return
+    REGISTRY.register(GCCounts())
+    gc.disable()
+    # The time (in seconds since the epoch) of the last time we did a GC for each generation.
+    _last_gc = [0.0, 0.0, 0.0]
+    def _maybe_gc() -> None:
+        # Check if we need to do a manual GC (since its been disabled), and do
+        # one if necessary. Note we go in reverse order as e.g. a gen 1 GC may
+        # promote an object into gen 2, and we don't want to handle the same
+        # object multiple times.
+        threshold = gc.get_threshold()
+        counts = gc.get_count()
+        end = time.time()
+        for i in (2, 1, 0):
+            # We check if we need to do one based on a straightforward
+            # comparison between the threshold and count. We also do an extra
+            # check to make sure that we don't a GC too often.
+            if threshold[i] < counts[i] and MIN_TIME_BETWEEN_GCS[i] < end - _last_gc[i]:
+                if i == 0:
+                    logger.debug("Collecting gc %d", i)
+                else:
+          "Collecting gc %d", i)
+                start = time.time()
+                unreachable = gc.collect(i)
+                end = time.time()
+                _last_gc[i] = end
+                gc_time.labels(i).observe(end - start)
+                gc_unreachable.labels(i).set(unreachable)
+    gc_task = task.LoopingCall(_maybe_gc)
+    gc_task.start(0.1)
+# PyPy GC / memory metrics
+class PyPyGCStats:
+    def collect(self) -> Iterable[Metric]:
+        # @stats is a pretty-printer object with __str__() returning a nice table,
+        # plus some fields that contain data from that table.
+        # unfortunately, fields are pretty-printed themselves (i. e. '4.5MB').
+        stats = gc.get_stats(memory_pressure=False)  # type: ignore
+        # @s contains same fields as @stats, but as actual integers.
+        s = stats._s  # type: ignore
+        # also note that field naming is completely braindead
+        # and only vaguely correlates with the pretty-printed table.
+        # >>>> gc.get_stats(False)
+        # Total memory consumed:
+        #     GC used:            8.7MB (peak: 39.0MB)        # s.total_gc_memory, s.peak_memory
+        #        in arenas:            3.0MB                  # s.total_arena_memory
+        #        rawmalloced:          1.7MB                  # s.total_rawmalloced_memory
+        #        nursery:              4.0MB                  # s.nursery_size
+        #     raw assembler used: 31.0kB                      # s.jit_backend_used
+        #     -----------------------------
+        #     Total:              8.8MB                       # stats.memory_used_sum
+        #
+        #     Total memory allocated:
+        #     GC allocated:            38.7MB (peak: 41.1MB)  # s.total_allocated_memory, s.peak_allocated_memory
+        #        in arenas:            30.9MB                 # s.peak_arena_memory
+        #        rawmalloced:          4.1MB                  # s.peak_rawmalloced_memory
+        #        nursery:              4.0MB                  # s.nursery_size
+        #     raw assembler allocated: 1.0MB                  # s.jit_backend_allocated
+        #     -----------------------------
+        #     Total:                   39.7MB                 # stats.memory_allocated_sum
+        #
+        #     Total time spent in GC:  0.073                  # s.total_gc_time
+        pypy_gc_time = CounterMetricFamily(
+            "pypy_gc_time_seconds_total",
+            "Total time spent in PyPy GC",
+            labels=[],
+        )
+        pypy_gc_time.add_metric([], s.total_gc_time / 1000)
+        yield pypy_gc_time
+        pypy_mem = GaugeMetricFamily(
+            "pypy_memory_bytes",
+            "Memory tracked by PyPy allocator",
+            labels=["state", "class", "kind"],
+        )
+        # memory used by JIT assembler
+        pypy_mem.add_metric(["used", "", "jit"], s.jit_backend_used)
+        pypy_mem.add_metric(["allocated", "", "jit"], s.jit_backend_allocated)
+        # memory used by GCed objects
+        pypy_mem.add_metric(["used", "", "arenas"], s.total_arena_memory)
+        pypy_mem.add_metric(["allocated", "", "arenas"], s.peak_arena_memory)
+        pypy_mem.add_metric(["used", "", "rawmalloced"], s.total_rawmalloced_memory)
+        pypy_mem.add_metric(["allocated", "", "rawmalloced"], s.peak_rawmalloced_memory)
+        pypy_mem.add_metric(["used", "", "nursery"], s.nursery_size)
+        pypy_mem.add_metric(["allocated", "", "nursery"], s.nursery_size)
+        # totals
+        pypy_mem.add_metric(["used", "totals", "gc"], s.total_gc_memory)
+        pypy_mem.add_metric(["allocated", "totals", "gc"], s.total_allocated_memory)
+        pypy_mem.add_metric(["used", "totals", "gc_peak"], s.peak_memory)
+        pypy_mem.add_metric(["allocated", "totals", "gc_peak"], s.peak_allocated_memory)
+        yield pypy_mem
+if running_on_pypy:
+    REGISTRY.register(PyPyGCStats())
diff --git a/synapse/metrics/ b/synapse/metrics/
new file mode 100644
index 0000000000..f38f798313
--- /dev/null
+++ b/synapse/metrics/
@@ -0,0 +1,83 @@
+# Copyright 2022 The Foundation C.I.C.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import select
+import time
+from typing import Any, Iterable, List, Tuple
+from prometheus_client import Histogram, Metric
+from prometheus_client.core import REGISTRY, GaugeMetricFamily
+from twisted.internet import reactor
+# Twisted reactor metrics
+tick_time = Histogram(
+    "python_twisted_reactor_tick_time",
+    "Tick time of the Twisted reactor (sec)",
+    buckets=[0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.2, 0.5, 1, 2, 5],
+class EpollWrapper:
+    """a wrapper for an epoll object which records the time between polls"""
+    def __init__(self, poller: "select.epoll"):  # type: ignore[name-defined]
+        self.last_polled = time.time()
+        self._poller = poller
+    def poll(self, *args, **kwargs) -> List[Tuple[int, int]]:  # type: ignore[no-untyped-def]
+        # record the time since poll() was last called. This gives a good proxy for
+        # how long it takes to run everything in the reactor - ie, how long anything
+        # waiting for the next tick will have to wait.
+        tick_time.observe(time.time() - self.last_polled)
+        ret = self._poller.poll(*args, **kwargs)
+        self.last_polled = time.time()
+        return ret
+    def __getattr__(self, item: str) -> Any:
+        return getattr(self._poller, item)
+class ReactorLastSeenMetric:
+    def __init__(self, epoll_wrapper: EpollWrapper):
+        self._epoll_wrapper = epoll_wrapper
+    def collect(self) -> Iterable[Metric]:
+        cm = GaugeMetricFamily(
+            "python_twisted_reactor_last_seen",
+            "Seconds since the Twisted reactor was last seen",
+        )
+        cm.add_metric([], time.time() - self._epoll_wrapper.last_polled)
+        yield cm
+    # if the reactor has a `_poller` attribute, which is an `epoll` object
+    # (ie, it's an EPollReactor), we wrap the `epoll` with a thing that will
+    # measure the time between ticks
+    from select import epoll  # type: ignore[attr-defined]
+    poller = reactor._poller  # type: ignore[attr-defined]
+except (AttributeError, ImportError):
+    pass
+    if isinstance(poller, epoll):
+        poller = EpollWrapper(poller)
+        reactor._poller = poller  # type: ignore[attr-defined]
+        REGISTRY.register(ReactorLastSeenMetric(poller))