diff options
Diffstat (limited to 'synapse/metrics/__init__.py')
-rw-r--r-- | synapse/metrics/__init__.py | 234 |
1 files changed, 139 insertions, 95 deletions
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 50d99d7a5c..371c300cac 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -17,130 +17,170 @@ import logging import functools import time import gc +import os +import platform +import attr -from twisted.internet import reactor +from prometheus_client import Gauge, Histogram, Counter +from prometheus_client.core import GaugeMetricFamily, REGISTRY -from .metric import ( - CounterMetric, CallbackMetric, DistributionMetric, CacheMetric, - MemoryUsageMetric, -) -from .process_collector import register_process_collector +from twisted.internet import reactor logger = logging.getLogger(__name__) - +running_on_pypy = platform.python_implementation() == "PyPy" all_metrics = [] all_collectors = [] +all_gauges = {} + + +class RegistryProxy(object): + + def collect(self): + for metric in REGISTRY.collect(): + if not metric.name.startswith("__"): + yield metric + +@attr.s(hash=True) +class LaterGauge(object): -class Metrics(object): - """ A single Metrics object gives a (mutable) slice view of the all_metrics - dict, allowing callers to easily register new metrics that are namespaced - nicely.""" + name = attr.ib() + desc = attr.ib() + labels = attr.ib(hash=False) + caller = attr.ib() - def __init__(self, name): - self.name_prefix = name + def collect(self): - def make_subspace(self, name): - return Metrics("%s_%s" % (self.name_prefix, name)) + g = GaugeMetricFamily(self.name, self.desc, labels=self.labels) - def register_collector(self, func): - all_collectors.append(func) + try: + calls = self.caller() + except Exception as e: + print(e) + logger.err() + yield g + + if isinstance(calls, dict): + for k, v in calls.items(): + g.add_metric(k, v) + else: + g.add_metric([], calls) - def _register(self, metric_class, name, *args, **kwargs): - full_name = "%s_%s" % (self.name_prefix, name) + yield g - metric = metric_class(full_name, *args, **kwargs) + def __attrs_post_init__(self): + self._register() - all_metrics.append(metric) - return metric + def _register(self): + if self.name in all_gauges.keys(): + logger.warning("%s already registered, reregistering" % (self.name,)) + REGISTRY.unregister(all_gauges.pop(self.name)) - def register_counter(self, *args, **kwargs): - """ - Returns: - CounterMetric - """ - return self._register(CounterMetric, *args, **kwargs) + REGISTRY.register(self) + all_gauges[self.name] = self - def register_callback(self, *args, **kwargs): - """ - Returns: - CallbackMetric - """ - return self._register(CallbackMetric, *args, **kwargs) - def register_distribution(self, *args, **kwargs): - """ - Returns: - DistributionMetric - """ - return self._register(DistributionMetric, *args, **kwargs) +# +# Detailed CPU metrics +# - def register_cache(self, *args, **kwargs): - """ - Returns: - CacheMetric - """ - return self._register(CacheMetric, *args, **kwargs) +class CPUMetrics(object): + def __init__(self): + ticks_per_sec = 100 + try: + # Try and get the system config + ticks_per_sec = os.sysconf('SC_CLK_TCK') + except (ValueError, TypeError, AttributeError): + pass -def register_memory_metrics(hs): - try: - import psutil - process = psutil.Process() - process.memory_info().rss - except (ImportError, AttributeError): - logger.warn( - "psutil is not installed or incorrect version." - " Disabling memory metrics." - ) - return - metric = MemoryUsageMetric(hs, psutil) - all_metrics.append(metric) + self.ticks_per_sec = ticks_per_sec + def collect(self): -def get_metrics_for(pkg_name): - """ Returns a Metrics instance for conveniently creating metrics - namespaced with the given name prefix. """ + with open("/proc/self/stat") as s: + line = s.read() + raw_stats = line.split(") ", 1)[1].split(" ") - # Convert a "package.name" to "package_name" because Prometheus doesn't - # let us use . in metric names - return Metrics(pkg_name.replace(".", "_")) + user = GaugeMetricFamily("process_cpu_user_seconds_total", "") + user.add_metric([], float(raw_stats[11]) / self.ticks_per_sec) + yield user + sys = GaugeMetricFamily("process_cpu_system_seconds_total", "") + sys.add_metric([], float(raw_stats[12]) / self.ticks_per_sec) + yield sys -def render_all(): - strs = [] - for collector in all_collectors: - collector() +REGISTRY.register(CPUMetrics()) - for metric in all_metrics: - try: - strs += metric.render() - except Exception: - strs += ["# FAILED to render"] - logger.exception("Failed to render metric") +# +# Python GC metrics +# + +gc_unreachable = Gauge("python_gc_unreachable_total", "Unreachable GC objects", ["gen"]) +gc_time = Histogram( + "python_gc_time", + "Time taken to GC (sec)", + ["gen"], + buckets=[0.0025, 0.005, 0.01, 0.025, 0.05, 0.10, 0.25, 0.50, 1.00, 2.50, + 5.00, 7.50, 15.00, 30.00, 45.00, 60.00], +) - strs.append("") # to generate a final CRLF - return "\n".join(strs) +class GCCounts(object): + def collect(self): + cm = GaugeMetricFamily("python_gc_counts", "GC cycle counts", labels=["gen"]) + for n, m in enumerate(gc.get_count()): + cm.add_metric([str(n)], m) -register_process_collector(get_metrics_for("process")) + yield cm -python_metrics = get_metrics_for("python") +REGISTRY.register(GCCounts()) + +# +# Twisted reactor metrics +# -gc_time = python_metrics.register_distribution("gc_time", labels=["gen"]) -gc_unreachable = python_metrics.register_counter("gc_unreachable_total", labels=["gen"]) -python_metrics.register_callback( - "gc_counts", lambda: {(i,): v for i, v in enumerate(gc.get_count())}, labels=["gen"] +tick_time = Histogram( + "python_twisted_reactor_tick_time", + "Tick time of the Twisted reactor (sec)", + buckets=[0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.2, 0.5, 1, 2, 5], +) +pending_calls_metric = Histogram( + "python_twisted_reactor_pending_calls", + "Pending calls", + buckets=[1, 2, 5, 10, 25, 50, 100, 250, 500, 1000], ) -reactor_metrics = get_metrics_for("python.twisted.reactor") -tick_time = reactor_metrics.register_distribution("tick_time") -pending_calls_metric = reactor_metrics.register_distribution("pending_calls") +# +# Federation Metrics +# + +sent_edus_counter = Counter("synapse_federation_client_sent_edus", "") + +sent_transactions_counter = Counter("synapse_federation_client_sent_transactions", "") + +events_processed_counter = Counter("synapse_federation_client_events_processed", "") + +# Used to track where various components have processed in the event stream, +# e.g. federation sending, appservice sending, etc. +event_processing_positions = Gauge("synapse_event_processing_positions", "", ["name"]) + +# Used to track the current max events stream position +event_persisted_position = Gauge("synapse_event_persisted_position", "") + +# Used to track the received_ts of the last event processed by various +# components +event_processing_last_ts = Gauge("synapse_event_processing_last_ts", "", ["name"]) + +# Used to track the lag processing events. This is the time difference +# between the last processed event's received_ts and the time it was +# finished being processed. +event_processing_lag = Gauge("synapse_event_processing_lag", "", ["name"]) def runUntilCurrentTimer(func): @@ -162,17 +202,20 @@ def runUntilCurrentTimer(func): num_pending += 1 num_pending += len(reactor.threadCallQueue) - start = time.time() * 1000 + start = time.time() ret = func(*args, **kwargs) - end = time.time() * 1000 + end = time.time() # record the amount of wallclock time spent running pending calls. # This is a proxy for the actual amount of time between reactor polls, # since about 25% of time is actually spent running things triggered by # I/O events, but that is harder to capture without rewriting half the # reactor. - tick_time.inc_by(end - start) - pending_calls_metric.inc_by(num_pending) + tick_time.observe(end - start) + pending_calls_metric.observe(num_pending) + + if running_on_pypy: + return ret # Check if we need to do a manual GC (since its been disabled), and do # one if necessary. @@ -182,12 +225,12 @@ def runUntilCurrentTimer(func): if threshold[i] < counts[i]: logger.info("Collecting gc %d", i) - start = time.time() * 1000 + start = time.time() unreachable = gc.collect(i) - end = time.time() * 1000 + end = time.time() - gc_time.inc_by(end - start, i) - gc_unreachable.inc_by(unreachable, i) + gc_time.labels(i).observe(end - start) + gc_unreachable.labels(i).set(unreachable) return ret @@ -206,6 +249,7 @@ try: # We manually run the GC each reactor tick so that we can get some metrics # about time spent doing GC, - gc.disable() + if not running_on_pypy: + gc.disable() except AttributeError: pass |