# Copyright 2015, 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import ctypes import ctypes.util import functools import gc import itertools import logging import os import platform import re import threading import time from typing import Callable, Dict, Iterable, Optional, Tuple, Union import attr from prometheus_client import Counter, Gauge, Histogram from prometheus_client.core import ( REGISTRY, CounterMetricFamily, GaugeHistogramMetricFamily, GaugeMetricFamily, ) from twisted.internet import reactor import synapse from synapse.metrics._exposition import ( MetricsResource, generate_latest, start_http_server, ) from synapse.util.versionstring import get_version_string logger = logging.getLogger(__name__) METRICS_PREFIX = "/_synapse/metrics" running_on_pypy = platform.python_implementation() == "PyPy" all_gauges = {} # type: Dict[str, Union[LaterGauge, InFlightGauge]] HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat") class RegistryProxy: @staticmethod def collect(): for metric in REGISTRY.collect(): if not metric.name.startswith("__"): yield metric @attr.s(slots=True, hash=True) class LaterGauge: name = attr.ib(type=str) desc = attr.ib(type=str) labels = attr.ib(hash=False, type=Optional[Iterable[str]]) # callback: should either return a value (if there are no labels for this metric), # or dict mapping from a label tuple to a value caller = attr.ib(type=Callable[[], Union[Dict[Tuple[str, ...], float], float]]) def collect(self): g = GaugeMetricFamily(self.name, self.desc, labels=self.labels) try: calls = self.caller() except Exception: logger.exception("Exception running callback for LaterGauge(%s)", self.name) yield g return if isinstance(calls, dict): for k, v in calls.items(): g.add_metric(k, v) else: g.add_metric([], calls) yield g def __attrs_post_init__(self): self._register() def _register(self): if self.name in all_gauges.keys(): logger.warning("%s already registered, reregistering" % (self.name,)) REGISTRY.unregister(all_gauges.pop(self.name)) REGISTRY.register(self) all_gauges[self.name] = self class InFlightGauge: """Tracks number of things (e.g. requests, Measure blocks, etc) in flight at any given time. Each InFlightGauge will create a metric called `_total` that counts the number of in flight blocks, as well as a metrics for each item in the given `sub_metrics` as `_` which will get updated by the callbacks. Args: name (str) desc (str) labels (list[str]) sub_metrics (list[str]): A list of sub metrics that the callbacks will update. """ def __init__(self, name, desc, labels, sub_metrics): self.name = name self.desc = desc self.labels = labels self.sub_metrics = sub_metrics # Create a class which have the sub_metrics values as attributes, which # default to 0 on initialization. Used to pass to registered callbacks. self._metrics_class = attr.make_class( "_MetricsEntry", attrs={x: attr.ib(0) for x in sub_metrics}, slots=True ) # Counts number of in flight blocks for a given set of label values self._registrations = {} # type: Dict # Protects access to _registrations self._lock = threading.Lock() self._register_with_collector() def register(self, key, callback): """Registers that we've entered a new block with labels `key`. `callback` gets called each time the metrics are collected. The same value must also be given to `unregister`. `callback` gets called with an object that has an attribute per sub_metric, which should be updated with the necessary values. Note that the metrics object is shared between all callbacks registered with the same key. Note that `callback` may be called on a separate thread. """ with self._lock: self._registrations.setdefault(key, set()).add(callback) def unregister(self, key, callback): """Registers that we've exited a block with labels `key`.""" with self._lock: self._registrations.setdefault(key, set()).discard(callback) def collect(self): """Called by prometheus client when it reads metrics. Note: may be called by a separate thread. """ in_flight = GaugeMetricFamily( self.name + "_total", self.desc, labels=self.labels ) metrics_by_key = {} # We copy so that we don't mutate the list while iterating with self._lock: keys = list(self._registrations) for key in keys: with self._lock: callbacks = set(self._registrations[key]) in_flight.add_metric(key, len(callbacks)) metrics = self._metrics_class() metrics_by_key[key] = metrics for callback in callbacks: callback(metrics) yield in_flight for name in self.sub_metrics: gauge = GaugeMetricFamily( "_".join([self.name, name]), "", labels=self.labels ) for key, metrics in metrics_by_key.items(): gauge.add_metric(key, getattr(metrics, name)) yield gauge def _register_with_collector(self): if self.name in all_gauges.keys(): logger.warning("%s already registered, reregistering" % (self.name,)) REGISTRY.unregister(all_gauges.pop(self.name)) REGISTRY.register(self) all_gauges[self.name] = self class GaugeBucketCollector: """Like a Histogram, but the buckets are Gauges which are updated atomically. The data is updated by calling `update_data` with an iterable of measurements. We assume that the data is updated less frequently than it is reported to Prometheus, and optimise for that case. """ __slots__ = ( "_name", "_documentation", "_bucket_bounds", "_metric", ) def __init__( self, name: str, documentation: str, buckets: Iterable[float], registry=REGISTRY, ): """ Args: name: base name of metric to be exported to Prometheus. (a _bucket suffix will be added.) documentation: help text for the metric buckets: The top bounds of the buckets to report registry: metric registry to register with """ self._name = name self._documentation = documentation # the tops of the buckets self._bucket_bounds = [float(b) for b in buckets] if self._bucket_bounds != sorted(self._bucket_bounds): raise ValueError("Buckets not in sorted order") if self._bucket_bounds[-1] != float("inf"): self._bucket_bounds.append(float("inf")) # We initially set this to None. We won't report metrics until # this has been initialised after a successful data update self._metric = None # type: Optional[GaugeHistogramMetricFamily] registry.register(self) def collect(self): # Don't report metrics unless we've already collected some data if self._metric is not None: yield self._metric def update_data(self, values: Iterable[float]): """Update the data to be reported by the metric The existing data is cleared, and each measurement in the input is assigned to the relevant bucket. """ self._metric = self._values_to_metric(values) def _values_to_metric(self, values: Iterable[float]) -> GaugeHistogramMetricFamily: total = 0.0 bucket_values = [0 for _ in self._bucket_bounds] for v in values: # assign each value to a bucket for i, bound in enumerate(self._bucket_bounds): if v <= bound: bucket_values[i] += 1 break # ... and increment the sum total += v # now, aggregate the bucket values so that they count the number of entries in # that bucket or below. accumulated_values = itertools.accumulate(bucket_values) return GaugeHistogramMetricFamily( self._name, self._documentation, buckets=list( zip((str(b) for b in self._bucket_bounds), accumulated_values) ), gsum_value=total, ) # # Detailed CPU metrics # class CPUMetrics: def __init__(self): ticks_per_sec = 100 try: # Try and get the system config ticks_per_sec = os.sysconf("SC_CLK_TCK") except (ValueError, TypeError, AttributeError): pass self.ticks_per_sec = ticks_per_sec def collect(self): if not HAVE_PROC_SELF_STAT: return with open("/proc/self/stat") as s: line = s.read() raw_stats = line.split(") ", 1)[1].split(" ") user = GaugeMetricFamily("process_cpu_user_seconds_total", "") user.add_metric([], float(raw_stats[11]) / self.ticks_per_sec) yield user sys = GaugeMetricFamily("process_cpu_system_seconds_total", "") sys.add_metric([], float(raw_stats[12]) / self.ticks_per_sec) yield sys REGISTRY.register(CPUMetrics()) # # Python GC metrics # gc_unreachable = Gauge("python_gc_unreachable_total", "Unreachable GC objects", ["gen"]) gc_time = Histogram( "python_gc_time", "Time taken to GC (sec)", ["gen"], buckets=[ 0.0025, 0.005, 0.01, 0.025, 0.05, 0.10, 0.25, 0.50, 1.00, 2.50, 5.00, 7.50, 15.00, 30.00, 45.00, 60.00, ], ) class GCCounts: def collect(self): cm = GaugeMetricFamily("python_gc_counts", "GC object counts", labels=["gen"]) for n, m in enumerate(gc.get_count()): cm.add_metric([str(n)], m) yield cm if not running_on_pypy: REGISTRY.register(GCCounts()) # # PyPy GC / memory metrics # class PyPyGCStats: def collect(self): # @stats is a pretty-printer object with __str__() returning a nice table, # plus some fields that contain data from that table. # unfortunately, fields are pretty-printed themselves (i. e. '4.5MB'). stats = gc.get_stats(memory_pressure=False) # type: ignore # @s contains same fields as @stats, but as actual integers. s = stats._s # type: ignore # also note that field naming is completely braindead # and only vaguely correlates with the pretty-printed table. # >>>> gc.get_stats(False) # Total memory consumed: # GC used: 8.7MB (peak: 39.0MB) # s.total_gc_memory, s.peak_memory # in arenas: 3.0MB # s.total_arena_memory # rawmalloced: 1.7MB # s.total_rawmalloced_memory # nursery: 4.0MB # s.nursery_size # raw assembler used: 31.0kB # s.jit_backend_used # ----------------------------- # Total: 8.8MB # stats.memory_used_sum # # Total memory allocated: # GC allocated: 38.7MB (peak: 41.1MB) # s.total_allocated_memory, s.peak_allocated_memory # in arenas: 30.9MB # s.peak_arena_memory # rawmalloced: 4.1MB # s.peak_rawmalloced_memory # nursery: 4.0MB # s.nursery_size # raw assembler allocated: 1.0MB # s.jit_backend_allocated # ----------------------------- # Total: 39.7MB # stats.memory_allocated_sum # # Total time spent in GC: 0.073 # s.total_gc_time pypy_gc_time = CounterMetricFamily( "pypy_gc_time_seconds_total", "Total time spent in PyPy GC", labels=[], ) pypy_gc_time.add_metric([], s.total_gc_time / 1000) yield pypy_gc_time pypy_mem = GaugeMetricFamily( "pypy_memory_bytes", "Memory tracked by PyPy allocator", labels=["state", "class", "kind"], ) # memory used by JIT assembler pypy_mem.add_metric(["used", "", "jit"], s.jit_backend_used) pypy_mem.add_metric(["allocated", "", "jit"], s.jit_backend_allocated) # memory used by GCed objects pypy_mem.add_metric(["used", "", "arenas"], s.total_arena_memory) pypy_mem.add_metric(["allocated", "", "arenas"], s.peak_arena_memory) pypy_mem.add_metric(["used", "", "rawmalloced"], s.total_rawmalloced_memory) pypy_mem.add_metric(["allocated", "", "rawmalloced"], s.peak_rawmalloced_memory) pypy_mem.add_metric(["used", "", "nursery"], s.nursery_size) pypy_mem.add_metric(["allocated", "", "nursery"], s.nursery_size) # totals pypy_mem.add_metric(["used", "totals", "gc"], s.total_gc_memory) pypy_mem.add_metric(["allocated", "totals", "gc"], s.total_allocated_memory) pypy_mem.add_metric(["used", "totals", "gc_peak"], s.peak_memory) pypy_mem.add_metric(["allocated", "totals", "gc_peak"], s.peak_allocated_memory) yield pypy_mem if running_on_pypy: REGISTRY.register(PyPyGCStats()) # # Twisted reactor metrics # tick_time = Histogram( "python_twisted_reactor_tick_time", "Tick time of the Twisted reactor (sec)", buckets=[0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.2, 0.5, 1, 2, 5], ) pending_calls_metric = Histogram( "python_twisted_reactor_pending_calls", "Pending calls", buckets=[1, 2, 5, 10, 25, 50, 100, 250, 500, 1000], ) # # Federation Metrics # sent_transactions_counter = Counter("synapse_federation_client_sent_transactions", "") events_processed_counter = Counter("synapse_federation_client_events_processed", "") event_processing_loop_counter = Counter( "synapse_event_processing_loop_count", "Event processing loop iterations", ["name"] ) event_processing_loop_room_count = Counter( "synapse_event_processing_loop_room_count", "Rooms seen per event processing loop iteration", ["name"], ) # Used to track where various components have processed in the event stream, # e.g. federation sending, appservice sending, etc. event_processing_positions = Gauge("synapse_event_processing_positions", "", ["name"]) # Used to track the current max events stream position event_persisted_position = Gauge("synapse_event_persisted_position", "") # Used to track the received_ts of the last event processed by various # components event_processing_last_ts = Gauge("synapse_event_processing_last_ts", "", ["name"]) # Used to track the lag processing events. This is the time difference # between the last processed event's received_ts and the time it was # finished being processed. event_processing_lag = Gauge("synapse_event_processing_lag", "", ["name"]) event_processing_lag_by_event = Histogram( "synapse_event_processing_lag_by_event", "Time between an event being persisted and it being queued up to be sent to the relevant remote servers", ["name"], ) # Build info of the running server. build_info = Gauge( "synapse_build_info", "Build information", ["pythonversion", "version", "osversion"] ) build_info.labels( " ".join([platform.python_implementation(), platform.python_version()]), get_version_string(synapse), " ".join([platform.system(), platform.release()]), ).set(1) last_ticked = time.time() # 3PID send info threepid_send_requests = Histogram( "synapse_threepid_send_requests_with_tries", documentation="Number of requests for a 3pid token by try count. Note if" " there is a request with try count of 4, then there would have been one" " each for 1, 2 and 3", buckets=(1, 2, 3, 4, 5, 10), labelnames=("type", "reason"), ) class ReactorLastSeenMetric: def collect(self): cm = GaugeMetricFamily( "python_twisted_reactor_last_seen", "Seconds since the Twisted reactor was last seen", ) cm.add_metric([], time.time() - last_ticked) yield cm REGISTRY.register(ReactorLastSeenMetric()) # The minimum time in seconds between GCs for each generation, regardless of the current GC # thresholds and counts. MIN_TIME_BETWEEN_GCS = [1, 10, 30] # The time in seconds of the last time we did a GC for each generation. _last_gc = [0, 0, 0] def runUntilCurrentTimer(reactor, func): @functools.wraps(func) def f(*args, **kwargs): now = reactor.seconds() num_pending = 0 # _newTimedCalls is one long list of *all* pending calls. Below loop # is based off of impl of reactor.runUntilCurrent for delayed_call in reactor._newTimedCalls: if delayed_call.time > now: break if delayed_call.delayed_time > 0: continue num_pending += 1 num_pending += len(reactor.threadCallQueue) start = time.time() ret = func(*args, **kwargs) end = time.time() # record the amount of wallclock time spent running pending calls. # This is a proxy for the actual amount of time between reactor polls, # since about 25% of time is actually spent running things triggered by # I/O events, but that is harder to capture without rewriting half the # reactor. tick_time.observe(end - start) pending_calls_metric.observe(num_pending) # Update the time we last ticked, for the metric to test whether # Synapse's reactor has frozen global last_ticked last_ticked = end if running_on_pypy: return ret # Check if we need to do a manual GC (since its been disabled), and do # one if necessary. Note we go in reverse order as e.g. a gen 1 GC may # promote an object into gen 2, and we don't want to handle the same # object multiple times. threshold = gc.get_threshold() counts = gc.get_count() for i in (2, 1, 0): # We check if we need to do one based on a straightforward # comparison between the threshold and count. We also do an extra # check to make sure that we don't a GC too often. if threshold[i] < counts[i] and MIN_TIME_BETWEEN_GCS[i] < end - _last_gc[i]: if i == 0: logger.debug("Collecting gc %d", i) else: logger.info("Collecting gc %d", i) start = time.time() unreachable = gc.collect(i) end = time.time() _last_gc[i] = int(end) gc_time.labels(i).observe(end - start) gc_unreachable.labels(i).set(unreachable) return ret return f def _setup_jemalloc_stats(): """Checks to see if jemalloc is loaded, and hooks up a collector to record statistics exposed by jemalloc. """ # Try to find the loaded jemalloc shared library, if any. We need to # introspect into what is loaded, rather than loading whatever is on the # path, as if we load a *different* jemalloc version things will seg fault. pid = os.getpid() # We're looking for a path at the end of the line that includes # "libjemalloc". regex = re.compile(r"/\S+/libjemalloc.*$") jemalloc_path = None with open(f"/proc/{pid}/maps") as f: for line in f.readlines(): match = regex.search(line.strip()) if match: jemalloc_path = match.group() if not jemalloc_path: # No loaded jemalloc was found. return jemalloc = ctypes.CDLL(jemalloc_path) def _mallctl( name: str, read: bool = True, write: Optional[int] = None ) -> Optional[int]: """Wrapper around `mallctl` for reading and writing integers to jemalloc. Args: name: The name of the option to read from/write to. read: Whether to try and read the value. write: The value to write, if given. Returns: The value read if `read` is True, otherwise None. Raises: An exception if `mallctl` returns a non-zero error code. """ input_var = None input_var_ref = None input_len_ref = None if read: input_var = ctypes.c_size_t(0) input_len = ctypes.c_size_t(ctypes.sizeof(input_var)) input_var_ref = ctypes.byref(input_var) input_len_ref = ctypes.byref(input_len) write_var_ref = None write_len = ctypes.c_size_t(0) if write is not None: write_var = ctypes.c_size_t(write) write_len = ctypes.c_size_t(ctypes.sizeof(write_var)) write_var_ref = ctypes.byref(write_var) # The interface is: # # int mallctl( # const char *name, # void *oldp, # size_t *oldlenp, # void *newp, # size_t newlen # ) # # Where oldp/oldlenp is a buffer where the old value will be written to # (if not null), and newp/newlen is the buffer with the new value to set # (if not null). Note that they're all references *except* newlen. result = jemalloc.mallctl( name.encode("ascii"), input_var_ref, input_len_ref, write_var_ref, write_len, ) if result != 0: raise Exception("Failed to call mallctl") if input_var is None: return None return input_var.value def _jemalloc_refresh_stats() -> None: """Request that jemalloc updates its internal statistics. This needs to be called before querying for stats, otherwise it will return stale values. """ try: _mallctl("epoch", read=False, write=1) except Exception: pass class JemallocCollector: """Metrics for internal jemalloc stats.""" def collect(self): _jemalloc_refresh_stats() g = GaugeMetricFamily( "jemalloc_stats_app_memory", "The stats reported by jemalloc", labels=["type"], ) # Read the relevant global stats from jemalloc. Note that these may # not be accurate if python is configured to use its internal small # object allocator (which is on by default, disable by setting the # env `PYTHONMALLOC=malloc`). # # See the jemalloc manpage for details about what each value means, # roughly: # - allocated ─ Total number of bytes allocated by the app # - active ─ Total number of bytes in active pages allocated by # the application, this is bigger than `allocated`. # - resident ─ Maximum number of bytes in physically resident data # pages mapped by the allocator, comprising all pages dedicated # to allocator metadata, pages backing active allocations, and # unused dirty pages. This is bigger than `active`. # - mapped ─ Total number of bytes in active extents mapped by the # allocator. # - metadata ─ Total number of bytes dedicated to jemalloc # metadata. for t in ( "allocated", "active", "resident", "mapped", "metadata", ): try: value = _mallctl(f"stats.{t}") except Exception: # There was an error fetching the value, skip. continue g.add_metric([t], value=value) yield g REGISTRY.register(JemallocCollector()) try: _setup_jemalloc_stats() except Exception: logger.info("Failed to setup collector to record jemalloc stats.") try: # Ensure the reactor has all the attributes we expect reactor.seconds # type: ignore reactor.runUntilCurrent # type: ignore reactor._newTimedCalls # type: ignore reactor.threadCallQueue # type: ignore # runUntilCurrent is called when we have pending calls. It is called once # per iteratation after fd polling. reactor.runUntilCurrent = runUntilCurrentTimer(reactor, reactor.runUntilCurrent) # type: ignore # We manually run the GC each reactor tick so that we can get some metrics # about time spent doing GC, if not running_on_pypy: gc.disable() except AttributeError: pass __all__ = [ "MetricsResource", "generate_latest", "start_http_server", "LaterGauge", "InFlightGauge", "BucketCollector", ]