diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index bec3b13397..9cf31f96b3 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -20,13 +20,18 @@ import os
import platform
import threading
import time
-from typing import Dict, Union
+from typing import Callable, Dict, Iterable, Optional, Tuple, Union
import six
import attr
from prometheus_client import Counter, Gauge, Histogram
-from prometheus_client.core import REGISTRY, GaugeMetricFamily, HistogramMetricFamily
+from prometheus_client.core import (
+ REGISTRY,
+ CounterMetricFamily,
+ GaugeMetricFamily,
+ HistogramMetricFamily,
+)
from twisted.internet import reactor
@@ -59,10 +64,12 @@ class RegistryProxy(object):
@attr.s(hash=True)
class LaterGauge(object):
- name = attr.ib()
- desc = attr.ib()
- labels = attr.ib(hash=False)
- caller = attr.ib()
+ name = attr.ib(type=str)
+ desc = attr.ib(type=str)
+ labels = attr.ib(hash=False, type=Optional[Iterable[str]])
+ # callback: should either return a value (if there are no labels for this metric),
+ # or dict mapping from a label tuple to a value
+ caller = attr.ib(type=Callable[[], Union[Dict[Tuple[str, ...], float], float]])
def collect(self):
@@ -125,7 +132,7 @@ class InFlightGauge(object):
)
# Counts number of in flight blocks for a given set of label values
- self._registrations = {}
+ self._registrations = {} # type: Dict
# Protects access to _registrations
self._lock = threading.Lock()
@@ -226,7 +233,7 @@ class BucketCollector(object):
# Fetch the data -- this must be synchronous!
data = self.data_collector()
- buckets = {}
+ buckets = {} # type: Dict[float, int]
res = []
for x in data.keys():
@@ -240,7 +247,7 @@ class BucketCollector(object):
res.append(["+Inf", sum(data.values())])
metric = HistogramMetricFamily(
- self.name, "", buckets=res, sum_value=sum([x * y for x, y in data.items()])
+ self.name, "", buckets=res, sum_value=sum(x * y for x, y in data.items())
)
yield metric
@@ -336,6 +343,78 @@ class GCCounts(object):
if not running_on_pypy:
REGISTRY.register(GCCounts())
+
+#
+# PyPy GC / memory metrics
+#
+
+
+class PyPyGCStats(object):
+ def collect(self):
+
+ # @stats is a pretty-printer object with __str__() returning a nice table,
+ # plus some fields that contain data from that table.
+ # unfortunately, fields are pretty-printed themselves (i. e. '4.5MB').
+ stats = gc.get_stats(memory_pressure=False) # type: ignore
+ # @s contains same fields as @stats, but as actual integers.
+ s = stats._s # type: ignore
+
+ # also note that field naming is completely braindead
+ # and only vaguely correlates with the pretty-printed table.
+ # >>>> gc.get_stats(False)
+ # Total memory consumed:
+ # GC used: 8.7MB (peak: 39.0MB) # s.total_gc_memory, s.peak_memory
+ # in arenas: 3.0MB # s.total_arena_memory
+ # rawmalloced: 1.7MB # s.total_rawmalloced_memory
+ # nursery: 4.0MB # s.nursery_size
+ # raw assembler used: 31.0kB # s.jit_backend_used
+ # -----------------------------
+ # Total: 8.8MB # stats.memory_used_sum
+ #
+ # Total memory allocated:
+ # GC allocated: 38.7MB (peak: 41.1MB) # s.total_allocated_memory, s.peak_allocated_memory
+ # in arenas: 30.9MB # s.peak_arena_memory
+ # rawmalloced: 4.1MB # s.peak_rawmalloced_memory
+ # nursery: 4.0MB # s.nursery_size
+ # raw assembler allocated: 1.0MB # s.jit_backend_allocated
+ # -----------------------------
+ # Total: 39.7MB # stats.memory_allocated_sum
+ #
+ # Total time spent in GC: 0.073 # s.total_gc_time
+
+ pypy_gc_time = CounterMetricFamily(
+ "pypy_gc_time_seconds_total", "Total time spent in PyPy GC", labels=[],
+ )
+ pypy_gc_time.add_metric([], s.total_gc_time / 1000)
+ yield pypy_gc_time
+
+ pypy_mem = GaugeMetricFamily(
+ "pypy_memory_bytes",
+ "Memory tracked by PyPy allocator",
+ labels=["state", "class", "kind"],
+ )
+ # memory used by JIT assembler
+ pypy_mem.add_metric(["used", "", "jit"], s.jit_backend_used)
+ pypy_mem.add_metric(["allocated", "", "jit"], s.jit_backend_allocated)
+ # memory used by GCed objects
+ pypy_mem.add_metric(["used", "", "arenas"], s.total_arena_memory)
+ pypy_mem.add_metric(["allocated", "", "arenas"], s.peak_arena_memory)
+ pypy_mem.add_metric(["used", "", "rawmalloced"], s.total_rawmalloced_memory)
+ pypy_mem.add_metric(["allocated", "", "rawmalloced"], s.peak_rawmalloced_memory)
+ pypy_mem.add_metric(["used", "", "nursery"], s.nursery_size)
+ pypy_mem.add_metric(["allocated", "", "nursery"], s.nursery_size)
+ # totals
+ pypy_mem.add_metric(["used", "totals", "gc"], s.total_gc_memory)
+ pypy_mem.add_metric(["allocated", "totals", "gc"], s.total_allocated_memory)
+ pypy_mem.add_metric(["used", "totals", "gc_peak"], s.peak_memory)
+ pypy_mem.add_metric(["allocated", "totals", "gc_peak"], s.peak_allocated_memory)
+ yield pypy_mem
+
+
+if running_on_pypy:
+ REGISTRY.register(PyPyGCStats())
+
+
#
# Twisted reactor metrics
#
diff --git a/synapse/metrics/_exposition.py b/synapse/metrics/_exposition.py
index 74d9c3ecd3..ab7f948ed4 100644
--- a/synapse/metrics/_exposition.py
+++ b/synapse/metrics/_exposition.py
@@ -33,12 +33,14 @@ from prometheus_client import REGISTRY
from twisted.web.resource import Resource
+from synapse.util import caches
+
try:
from prometheus_client.samples import Sample
except ImportError:
- Sample = namedtuple(
+ Sample = namedtuple( # type: ignore[no-redef] # noqa
"Sample", ["name", "labels", "value", "timestamp", "exemplar"]
- ) # type: ignore
+ )
CONTENT_TYPE_LATEST = str("text/plain; version=0.0.4; charset=utf-8")
@@ -103,13 +105,15 @@ def nameify_sample(sample):
def generate_latest(registry, emit_help=False):
- output = []
- for metric in registry.collect():
+ # Trigger the cache metrics to be rescraped, which updates the common
+ # metrics but do not produce metrics themselves
+ for collector in caches.collectors_by_name.values():
+ collector.collect()
- if metric.name.startswith("__unused"):
- continue
+ output = []
+ for metric in registry.collect():
if not metric.samples:
# No samples, don't bother.
continue
diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py
index edd6b42db3..13785038ad 100644
--- a/synapse/metrics/background_process_metrics.py
+++ b/synapse/metrics/background_process_metrics.py
@@ -15,15 +15,20 @@
import logging
import threading
+from asyncio import iscoroutine
+from functools import wraps
+from typing import TYPE_CHECKING, Dict, Optional, Set
-import six
-
-from prometheus_client.core import REGISTRY, Counter, GaugeMetricFamily
+from prometheus_client.core import REGISTRY, Counter, Gauge
from twisted.internet import defer
from synapse.logging.context import LoggingContext, PreserveLoggingContext
+if TYPE_CHECKING:
+ import resource
+
+
logger = logging.getLogger(__name__)
@@ -33,6 +38,12 @@ _background_process_start_count = Counter(
["name"],
)
+_background_process_in_flight_count = Gauge(
+ "synapse_background_process_in_flight_count",
+ "Number of background processes in flight",
+ labelnames=["name"],
+)
+
# we set registry=None in all of these to stop them getting registered with
# the default registry. Instead we collect them all via the CustomCollector,
# which ensures that we can update them before they are collected.
@@ -78,15 +89,19 @@ _background_process_db_sched_duration = Counter(
# map from description to a counter, so that we can name our logcontexts
# incrementally. (It actually duplicates _background_process_start_count, but
# it's much simpler to do so than to try to combine them.)
-_background_process_counts = dict() # type: dict[str, int]
+_background_process_counts = {} # type: Dict[str, int]
-# map from description to the currently running background processes.
+# Set of all running background processes that became active active since the
+# last time metrics were scraped (i.e. background processes that performed some
+# work since the last scrape.)
#
-# it's kept as a dict of sets rather than a big set so that we can keep track
-# of process descriptions that no longer have any active processes.
-_background_processes = dict() # type: dict[str, set[_BackgroundProcess]]
+# We do it like this to handle the case where we have a large number of
+# background processes stacking up behind a lock or linearizer, where we then
+# only need to iterate over and update metrics for the process that have
+# actually been active and can ignore the idle ones.
+_background_processes_active_since_last_scrape = set() # type: Set[_BackgroundProcess]
-# A lock that covers the above dicts
+# A lock that covers the above set and dict
_bg_metrics_lock = threading.Lock()
@@ -98,25 +113,16 @@ class _Collector(object):
"""
def collect(self):
- background_process_in_flight_count = GaugeMetricFamily(
- "synapse_background_process_in_flight_count",
- "Number of background processes in flight",
- labels=["name"],
- )
+ global _background_processes_active_since_last_scrape
- # We copy the dict so that it doesn't change from underneath us.
- # We also copy the process lists as that can also change
+ # We swap out the _background_processes set with an empty one so that
+ # we can safely iterate over the set without holding the lock.
with _bg_metrics_lock:
- _background_processes_copy = {
- k: list(v) for k, v in six.iteritems(_background_processes)
- }
-
- for desc, processes in six.iteritems(_background_processes_copy):
- background_process_in_flight_count.add_metric((desc,), len(processes))
- for process in processes:
- process.update_metrics()
+ _background_processes_copy = _background_processes_active_since_last_scrape
+ _background_processes_active_since_last_scrape = set()
- yield background_process_in_flight_count
+ for process in _background_processes_copy:
+ process.update_metrics()
# now we need to run collect() over each of the static Counters, and
# yield each metric they return.
@@ -173,7 +179,7 @@ def run_as_background_process(desc, func, *args, **kwargs):
Args:
desc (str): a description for this background process type
- func: a function, which may return a Deferred
+ func: a function, which may return a Deferred or a coroutine
args: positional args for func
kwargs: keyword args for func
@@ -188,23 +194,83 @@ def run_as_background_process(desc, func, *args, **kwargs):
_background_process_counts[desc] = count + 1
_background_process_start_count.labels(desc).inc()
+ _background_process_in_flight_count.labels(desc).inc()
- with LoggingContext(desc) as context:
+ with BackgroundProcessLoggingContext(desc) as context:
context.request = "%s-%i" % (desc, count)
- proc = _BackgroundProcess(desc, context)
-
- with _bg_metrics_lock:
- _background_processes.setdefault(desc, set()).add(proc)
try:
- yield func(*args, **kwargs)
+ result = func(*args, **kwargs)
+
+ # We probably don't have an ensureDeferred in our call stack to handle
+ # coroutine results, so we need to ensureDeferred here.
+ #
+ # But we need this check because ensureDeferred doesn't like being
+ # called on immediate values (as opposed to Deferreds or coroutines).
+ if iscoroutine(result):
+ result = defer.ensureDeferred(result)
+
+ return (yield result)
except Exception:
logger.exception("Background process '%s' threw an exception", desc)
finally:
- proc.update_metrics()
-
- with _bg_metrics_lock:
- _background_processes[desc].remove(proc)
+ _background_process_in_flight_count.labels(desc).dec()
with PreserveLoggingContext():
return run()
+
+
+def wrap_as_background_process(desc):
+ """Decorator that wraps a function that gets called as a background
+ process.
+
+ Equivalent of calling the function with `run_as_background_process`
+ """
+
+ def wrap_as_background_process_inner(func):
+ @wraps(func)
+ def wrap_as_background_process_inner_2(*args, **kwargs):
+ return run_as_background_process(desc, func, *args, **kwargs)
+
+ return wrap_as_background_process_inner_2
+
+ return wrap_as_background_process_inner
+
+
+class BackgroundProcessLoggingContext(LoggingContext):
+ """A logging context that tracks in flight metrics for background
+ processes.
+ """
+
+ __slots__ = ["_proc"]
+
+ def __init__(self, name: str):
+ super().__init__(name)
+
+ self._proc = _BackgroundProcess(name, self)
+
+ def start(self, rusage: "Optional[resource._RUsage]"):
+ """Log context has started running (again).
+ """
+
+ super().start(rusage)
+
+ # We've become active again so we make sure we're in the list of active
+ # procs. (Note that "start" here means we've become active, as opposed
+ # to starting for the first time.)
+ with _bg_metrics_lock:
+ _background_processes_active_since_last_scrape.add(self._proc)
+
+ def __exit__(self, type, value, traceback) -> None:
+ """Log context has finished.
+ """
+
+ super().__exit__(type, value, traceback)
+
+ # The background process has finished. We explictly remove and manually
+ # update the metrics here so that if nothing is scraping metrics the set
+ # doesn't infinitely grow.
+ with _bg_metrics_lock:
+ _background_processes_active_since_last_scrape.discard(self._proc)
+
+ self._proc.update_metrics()
|