diff options
Diffstat (limited to 'synapse/metrics')
-rw-r--r-- | synapse/metrics/__init__.py | 97 | ||||
-rw-r--r-- | synapse/metrics/_exposition.py | 16 | ||||
-rw-r--r-- | synapse/metrics/background_process_metrics.py | 138 |
3 files changed, 200 insertions, 51 deletions
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index bec3b13397..9cf31f96b3 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -20,13 +20,18 @@ import os import platform import threading import time -from typing import Dict, Union +from typing import Callable, Dict, Iterable, Optional, Tuple, Union import six import attr from prometheus_client import Counter, Gauge, Histogram -from prometheus_client.core import REGISTRY, GaugeMetricFamily, HistogramMetricFamily +from prometheus_client.core import ( + REGISTRY, + CounterMetricFamily, + GaugeMetricFamily, + HistogramMetricFamily, +) from twisted.internet import reactor @@ -59,10 +64,12 @@ class RegistryProxy(object): @attr.s(hash=True) class LaterGauge(object): - name = attr.ib() - desc = attr.ib() - labels = attr.ib(hash=False) - caller = attr.ib() + name = attr.ib(type=str) + desc = attr.ib(type=str) + labels = attr.ib(hash=False, type=Optional[Iterable[str]]) + # callback: should either return a value (if there are no labels for this metric), + # or dict mapping from a label tuple to a value + caller = attr.ib(type=Callable[[], Union[Dict[Tuple[str, ...], float], float]]) def collect(self): @@ -125,7 +132,7 @@ class InFlightGauge(object): ) # Counts number of in flight blocks for a given set of label values - self._registrations = {} + self._registrations = {} # type: Dict # Protects access to _registrations self._lock = threading.Lock() @@ -226,7 +233,7 @@ class BucketCollector(object): # Fetch the data -- this must be synchronous! data = self.data_collector() - buckets = {} + buckets = {} # type: Dict[float, int] res = [] for x in data.keys(): @@ -240,7 +247,7 @@ class BucketCollector(object): res.append(["+Inf", sum(data.values())]) metric = HistogramMetricFamily( - self.name, "", buckets=res, sum_value=sum([x * y for x, y in data.items()]) + self.name, "", buckets=res, sum_value=sum(x * y for x, y in data.items()) ) yield metric @@ -336,6 +343,78 @@ class GCCounts(object): if not running_on_pypy: REGISTRY.register(GCCounts()) + +# +# PyPy GC / memory metrics +# + + +class PyPyGCStats(object): + def collect(self): + + # @stats is a pretty-printer object with __str__() returning a nice table, + # plus some fields that contain data from that table. + # unfortunately, fields are pretty-printed themselves (i. e. '4.5MB'). + stats = gc.get_stats(memory_pressure=False) # type: ignore + # @s contains same fields as @stats, but as actual integers. + s = stats._s # type: ignore + + # also note that field naming is completely braindead + # and only vaguely correlates with the pretty-printed table. + # >>>> gc.get_stats(False) + # Total memory consumed: + # GC used: 8.7MB (peak: 39.0MB) # s.total_gc_memory, s.peak_memory + # in arenas: 3.0MB # s.total_arena_memory + # rawmalloced: 1.7MB # s.total_rawmalloced_memory + # nursery: 4.0MB # s.nursery_size + # raw assembler used: 31.0kB # s.jit_backend_used + # ----------------------------- + # Total: 8.8MB # stats.memory_used_sum + # + # Total memory allocated: + # GC allocated: 38.7MB (peak: 41.1MB) # s.total_allocated_memory, s.peak_allocated_memory + # in arenas: 30.9MB # s.peak_arena_memory + # rawmalloced: 4.1MB # s.peak_rawmalloced_memory + # nursery: 4.0MB # s.nursery_size + # raw assembler allocated: 1.0MB # s.jit_backend_allocated + # ----------------------------- + # Total: 39.7MB # stats.memory_allocated_sum + # + # Total time spent in GC: 0.073 # s.total_gc_time + + pypy_gc_time = CounterMetricFamily( + "pypy_gc_time_seconds_total", "Total time spent in PyPy GC", labels=[], + ) + pypy_gc_time.add_metric([], s.total_gc_time / 1000) + yield pypy_gc_time + + pypy_mem = GaugeMetricFamily( + "pypy_memory_bytes", + "Memory tracked by PyPy allocator", + labels=["state", "class", "kind"], + ) + # memory used by JIT assembler + pypy_mem.add_metric(["used", "", "jit"], s.jit_backend_used) + pypy_mem.add_metric(["allocated", "", "jit"], s.jit_backend_allocated) + # memory used by GCed objects + pypy_mem.add_metric(["used", "", "arenas"], s.total_arena_memory) + pypy_mem.add_metric(["allocated", "", "arenas"], s.peak_arena_memory) + pypy_mem.add_metric(["used", "", "rawmalloced"], s.total_rawmalloced_memory) + pypy_mem.add_metric(["allocated", "", "rawmalloced"], s.peak_rawmalloced_memory) + pypy_mem.add_metric(["used", "", "nursery"], s.nursery_size) + pypy_mem.add_metric(["allocated", "", "nursery"], s.nursery_size) + # totals + pypy_mem.add_metric(["used", "totals", "gc"], s.total_gc_memory) + pypy_mem.add_metric(["allocated", "totals", "gc"], s.total_allocated_memory) + pypy_mem.add_metric(["used", "totals", "gc_peak"], s.peak_memory) + pypy_mem.add_metric(["allocated", "totals", "gc_peak"], s.peak_allocated_memory) + yield pypy_mem + + +if running_on_pypy: + REGISTRY.register(PyPyGCStats()) + + # # Twisted reactor metrics # diff --git a/synapse/metrics/_exposition.py b/synapse/metrics/_exposition.py index 74d9c3ecd3..ab7f948ed4 100644 --- a/synapse/metrics/_exposition.py +++ b/synapse/metrics/_exposition.py @@ -33,12 +33,14 @@ from prometheus_client import REGISTRY from twisted.web.resource import Resource +from synapse.util import caches + try: from prometheus_client.samples import Sample except ImportError: - Sample = namedtuple( + Sample = namedtuple( # type: ignore[no-redef] # noqa "Sample", ["name", "labels", "value", "timestamp", "exemplar"] - ) # type: ignore + ) CONTENT_TYPE_LATEST = str("text/plain; version=0.0.4; charset=utf-8") @@ -103,13 +105,15 @@ def nameify_sample(sample): def generate_latest(registry, emit_help=False): - output = [] - for metric in registry.collect(): + # Trigger the cache metrics to be rescraped, which updates the common + # metrics but do not produce metrics themselves + for collector in caches.collectors_by_name.values(): + collector.collect() - if metric.name.startswith("__unused"): - continue + output = [] + for metric in registry.collect(): if not metric.samples: # No samples, don't bother. continue diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index edd6b42db3..13785038ad 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -15,15 +15,20 @@ import logging import threading +from asyncio import iscoroutine +from functools import wraps +from typing import TYPE_CHECKING, Dict, Optional, Set -import six - -from prometheus_client.core import REGISTRY, Counter, GaugeMetricFamily +from prometheus_client.core import REGISTRY, Counter, Gauge from twisted.internet import defer from synapse.logging.context import LoggingContext, PreserveLoggingContext +if TYPE_CHECKING: + import resource + + logger = logging.getLogger(__name__) @@ -33,6 +38,12 @@ _background_process_start_count = Counter( ["name"], ) +_background_process_in_flight_count = Gauge( + "synapse_background_process_in_flight_count", + "Number of background processes in flight", + labelnames=["name"], +) + # we set registry=None in all of these to stop them getting registered with # the default registry. Instead we collect them all via the CustomCollector, # which ensures that we can update them before they are collected. @@ -78,15 +89,19 @@ _background_process_db_sched_duration = Counter( # map from description to a counter, so that we can name our logcontexts # incrementally. (It actually duplicates _background_process_start_count, but # it's much simpler to do so than to try to combine them.) -_background_process_counts = dict() # type: dict[str, int] +_background_process_counts = {} # type: Dict[str, int] -# map from description to the currently running background processes. +# Set of all running background processes that became active active since the +# last time metrics were scraped (i.e. background processes that performed some +# work since the last scrape.) # -# it's kept as a dict of sets rather than a big set so that we can keep track -# of process descriptions that no longer have any active processes. -_background_processes = dict() # type: dict[str, set[_BackgroundProcess]] +# We do it like this to handle the case where we have a large number of +# background processes stacking up behind a lock or linearizer, where we then +# only need to iterate over and update metrics for the process that have +# actually been active and can ignore the idle ones. +_background_processes_active_since_last_scrape = set() # type: Set[_BackgroundProcess] -# A lock that covers the above dicts +# A lock that covers the above set and dict _bg_metrics_lock = threading.Lock() @@ -98,25 +113,16 @@ class _Collector(object): """ def collect(self): - background_process_in_flight_count = GaugeMetricFamily( - "synapse_background_process_in_flight_count", - "Number of background processes in flight", - labels=["name"], - ) + global _background_processes_active_since_last_scrape - # We copy the dict so that it doesn't change from underneath us. - # We also copy the process lists as that can also change + # We swap out the _background_processes set with an empty one so that + # we can safely iterate over the set without holding the lock. with _bg_metrics_lock: - _background_processes_copy = { - k: list(v) for k, v in six.iteritems(_background_processes) - } - - for desc, processes in six.iteritems(_background_processes_copy): - background_process_in_flight_count.add_metric((desc,), len(processes)) - for process in processes: - process.update_metrics() + _background_processes_copy = _background_processes_active_since_last_scrape + _background_processes_active_since_last_scrape = set() - yield background_process_in_flight_count + for process in _background_processes_copy: + process.update_metrics() # now we need to run collect() over each of the static Counters, and # yield each metric they return. @@ -173,7 +179,7 @@ def run_as_background_process(desc, func, *args, **kwargs): Args: desc (str): a description for this background process type - func: a function, which may return a Deferred + func: a function, which may return a Deferred or a coroutine args: positional args for func kwargs: keyword args for func @@ -188,23 +194,83 @@ def run_as_background_process(desc, func, *args, **kwargs): _background_process_counts[desc] = count + 1 _background_process_start_count.labels(desc).inc() + _background_process_in_flight_count.labels(desc).inc() - with LoggingContext(desc) as context: + with BackgroundProcessLoggingContext(desc) as context: context.request = "%s-%i" % (desc, count) - proc = _BackgroundProcess(desc, context) - - with _bg_metrics_lock: - _background_processes.setdefault(desc, set()).add(proc) try: - yield func(*args, **kwargs) + result = func(*args, **kwargs) + + # We probably don't have an ensureDeferred in our call stack to handle + # coroutine results, so we need to ensureDeferred here. + # + # But we need this check because ensureDeferred doesn't like being + # called on immediate values (as opposed to Deferreds or coroutines). + if iscoroutine(result): + result = defer.ensureDeferred(result) + + return (yield result) except Exception: logger.exception("Background process '%s' threw an exception", desc) finally: - proc.update_metrics() - - with _bg_metrics_lock: - _background_processes[desc].remove(proc) + _background_process_in_flight_count.labels(desc).dec() with PreserveLoggingContext(): return run() + + +def wrap_as_background_process(desc): + """Decorator that wraps a function that gets called as a background + process. + + Equivalent of calling the function with `run_as_background_process` + """ + + def wrap_as_background_process_inner(func): + @wraps(func) + def wrap_as_background_process_inner_2(*args, **kwargs): + return run_as_background_process(desc, func, *args, **kwargs) + + return wrap_as_background_process_inner_2 + + return wrap_as_background_process_inner + + +class BackgroundProcessLoggingContext(LoggingContext): + """A logging context that tracks in flight metrics for background + processes. + """ + + __slots__ = ["_proc"] + + def __init__(self, name: str): + super().__init__(name) + + self._proc = _BackgroundProcess(name, self) + + def start(self, rusage: "Optional[resource._RUsage]"): + """Log context has started running (again). + """ + + super().start(rusage) + + # We've become active again so we make sure we're in the list of active + # procs. (Note that "start" here means we've become active, as opposed + # to starting for the first time.) + with _bg_metrics_lock: + _background_processes_active_since_last_scrape.add(self._proc) + + def __exit__(self, type, value, traceback) -> None: + """Log context has finished. + """ + + super().__exit__(type, value, traceback) + + # The background process has finished. We explictly remove and manually + # update the metrics here so that if nothing is scraping metrics the set + # doesn't infinitely grow. + with _bg_metrics_lock: + _background_processes_active_since_last_scrape.discard(self._proc) + + self._proc.update_metrics() |