diff options
Diffstat (limited to 'synapse/metrics/__init__.py')
-rw-r--r-- | synapse/metrics/__init__.py | 233 |
1 files changed, 119 insertions, 114 deletions
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index e3b831db67..371c300cac 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -17,165 +17,170 @@ import logging import functools import time import gc +import os import platform +import attr -from twisted.internet import reactor +from prometheus_client import Gauge, Histogram, Counter +from prometheus_client.core import GaugeMetricFamily, REGISTRY -from .metric import ( - CounterMetric, CallbackMetric, DistributionMetric, CacheMetric, - MemoryUsageMetric, GaugeMetric, -) -from .process_collector import register_process_collector +from twisted.internet import reactor logger = logging.getLogger(__name__) - -running_on_pypy = platform.python_implementation() == 'PyPy' +running_on_pypy = platform.python_implementation() == "PyPy" all_metrics = [] all_collectors = [] +all_gauges = {} -class Metrics(object): - """ A single Metrics object gives a (mutable) slice view of the all_metrics - dict, allowing callers to easily register new metrics that are namespaced - nicely.""" +class RegistryProxy(object): - def __init__(self, name): - self.name_prefix = name + def collect(self): + for metric in REGISTRY.collect(): + if not metric.name.startswith("__"): + yield metric - def make_subspace(self, name): - return Metrics("%s_%s" % (self.name_prefix, name)) - def register_collector(self, func): - all_collectors.append(func) +@attr.s(hash=True) +class LaterGauge(object): - def _register(self, metric_class, name, *args, **kwargs): - full_name = "%s_%s" % (self.name_prefix, name) + name = attr.ib() + desc = attr.ib() + labels = attr.ib(hash=False) + caller = attr.ib() - metric = metric_class(full_name, *args, **kwargs) + def collect(self): - all_metrics.append(metric) - return metric + g = GaugeMetricFamily(self.name, self.desc, labels=self.labels) - def register_counter(self, *args, **kwargs): - """ - Returns: - CounterMetric - """ - return self._register(CounterMetric, *args, **kwargs) + try: + calls = self.caller() + except Exception as e: + print(e) + logger.err() + yield g - def register_gauge(self, *args, **kwargs): - """ - Returns: - GaugeMetric - """ - return self._register(GaugeMetric, *args, **kwargs) + if isinstance(calls, dict): + for k, v in calls.items(): + g.add_metric(k, v) + else: + g.add_metric([], calls) - def register_callback(self, *args, **kwargs): - """ - Returns: - CallbackMetric - """ - return self._register(CallbackMetric, *args, **kwargs) + yield g - def register_distribution(self, *args, **kwargs): - """ - Returns: - DistributionMetric - """ - return self._register(DistributionMetric, *args, **kwargs) + def __attrs_post_init__(self): + self._register() - def register_cache(self, *args, **kwargs): - """ - Returns: - CacheMetric - """ - return self._register(CacheMetric, *args, **kwargs) + def _register(self): + if self.name in all_gauges.keys(): + logger.warning("%s already registered, reregistering" % (self.name,)) + REGISTRY.unregister(all_gauges.pop(self.name)) + REGISTRY.register(self) + all_gauges[self.name] = self -def register_memory_metrics(hs): - try: - import psutil - process = psutil.Process() - process.memory_info().rss - except (ImportError, AttributeError): - logger.warn( - "psutil is not installed or incorrect version." - " Disabling memory metrics." - ) - return - metric = MemoryUsageMetric(hs, psutil) - all_metrics.append(metric) +# +# Detailed CPU metrics +# -def get_metrics_for(pkg_name): - """ Returns a Metrics instance for conveniently creating metrics - namespaced with the given name prefix. """ +class CPUMetrics(object): - # Convert a "package.name" to "package_name" because Prometheus doesn't - # let us use . in metric names - return Metrics(pkg_name.replace(".", "_")) + def __init__(self): + ticks_per_sec = 100 + try: + # Try and get the system config + ticks_per_sec = os.sysconf('SC_CLK_TCK') + except (ValueError, TypeError, AttributeError): + pass + self.ticks_per_sec = ticks_per_sec -def render_all(): - strs = [] + def collect(self): - for collector in all_collectors: - collector() + with open("/proc/self/stat") as s: + line = s.read() + raw_stats = line.split(") ", 1)[1].split(" ") - for metric in all_metrics: - try: - strs += metric.render() - except Exception: - strs += ["# FAILED to render"] - logger.exception("Failed to render metric") + user = GaugeMetricFamily("process_cpu_user_seconds_total", "") + user.add_metric([], float(raw_stats[11]) / self.ticks_per_sec) + yield user + + sys = GaugeMetricFamily("process_cpu_system_seconds_total", "") + sys.add_metric([], float(raw_stats[12]) / self.ticks_per_sec) + yield sys - strs.append("") # to generate a final CRLF - return "\n".join(strs) +REGISTRY.register(CPUMetrics()) + +# +# Python GC metrics +# + +gc_unreachable = Gauge("python_gc_unreachable_total", "Unreachable GC objects", ["gen"]) +gc_time = Histogram( + "python_gc_time", + "Time taken to GC (sec)", + ["gen"], + buckets=[0.0025, 0.005, 0.01, 0.025, 0.05, 0.10, 0.25, 0.50, 1.00, 2.50, + 5.00, 7.50, 15.00, 30.00, 45.00, 60.00], +) -register_process_collector(get_metrics_for("process")) +class GCCounts(object): + def collect(self): + cm = GaugeMetricFamily("python_gc_counts", "GC cycle counts", labels=["gen"]) + for n, m in enumerate(gc.get_count()): + cm.add_metric([str(n)], m) -python_metrics = get_metrics_for("python") + yield cm -gc_time = python_metrics.register_distribution("gc_time", labels=["gen"]) -gc_unreachable = python_metrics.register_counter("gc_unreachable_total", labels=["gen"]) -python_metrics.register_callback( - "gc_counts", lambda: {(i,): v for i, v in enumerate(gc.get_count())}, labels=["gen"] + +REGISTRY.register(GCCounts()) + +# +# Twisted reactor metrics +# + +tick_time = Histogram( + "python_twisted_reactor_tick_time", + "Tick time of the Twisted reactor (sec)", + buckets=[0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.2, 0.5, 1, 2, 5], +) +pending_calls_metric = Histogram( + "python_twisted_reactor_pending_calls", + "Pending calls", + buckets=[1, 2, 5, 10, 25, 50, 100, 250, 500, 1000], ) -reactor_metrics = get_metrics_for("python.twisted.reactor") -tick_time = reactor_metrics.register_distribution("tick_time") -pending_calls_metric = reactor_metrics.register_distribution("pending_calls") +# +# Federation Metrics +# + +sent_edus_counter = Counter("synapse_federation_client_sent_edus", "") -synapse_metrics = get_metrics_for("synapse") +sent_transactions_counter = Counter("synapse_federation_client_sent_transactions", "") + +events_processed_counter = Counter("synapse_federation_client_events_processed", "") # Used to track where various components have processed in the event stream, # e.g. federation sending, appservice sending, etc. -event_processing_positions = synapse_metrics.register_gauge( - "event_processing_positions", labels=["name"], -) +event_processing_positions = Gauge("synapse_event_processing_positions", "", ["name"]) # Used to track the current max events stream position -event_persisted_position = synapse_metrics.register_gauge( - "event_persisted_position", -) +event_persisted_position = Gauge("synapse_event_persisted_position", "") # Used to track the received_ts of the last event processed by various # components -event_processing_last_ts = synapse_metrics.register_gauge( - "event_processing_last_ts", labels=["name"], -) +event_processing_last_ts = Gauge("synapse_event_processing_last_ts", "", ["name"]) # Used to track the lag processing events. This is the time difference # between the last processed event's received_ts and the time it was # finished being processed. -event_processing_lag = synapse_metrics.register_gauge( - "event_processing_lag", labels=["name"], -) +event_processing_lag = Gauge("synapse_event_processing_lag", "", ["name"]) def runUntilCurrentTimer(func): @@ -197,17 +202,17 @@ def runUntilCurrentTimer(func): num_pending += 1 num_pending += len(reactor.threadCallQueue) - start = time.time() * 1000 + start = time.time() ret = func(*args, **kwargs) - end = time.time() * 1000 + end = time.time() # record the amount of wallclock time spent running pending calls. # This is a proxy for the actual amount of time between reactor polls, # since about 25% of time is actually spent running things triggered by # I/O events, but that is harder to capture without rewriting half the # reactor. - tick_time.inc_by(end - start) - pending_calls_metric.inc_by(num_pending) + tick_time.observe(end - start) + pending_calls_metric.observe(num_pending) if running_on_pypy: return ret @@ -220,12 +225,12 @@ def runUntilCurrentTimer(func): if threshold[i] < counts[i]: logger.info("Collecting gc %d", i) - start = time.time() * 1000 + start = time.time() unreachable = gc.collect(i) - end = time.time() * 1000 + end = time.time() - gc_time.inc_by(end - start, i) - gc_unreachable.inc_by(unreachable, i) + gc_time.labels(i).observe(end - start) + gc_unreachable.labels(i).set(unreachable) return ret |