summary refs log tree commit diff
path: root/synapse/metrics/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/metrics/__init__.py')
-rw-r--r--synapse/metrics/__init__.py182
1 files changed, 63 insertions, 119 deletions
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index e3b831db67..973ba6506f 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -18,14 +18,13 @@ import functools
 import time
 import gc
 import platform
+import attr
 
-from twisted.internet import reactor
+from prometheus_client import Gauge, Histogram, Counter
+from prometheus_client.core import (
+    GaugeMetricFamily, CounterMetricFamily, REGISTRY)
 
-from .metric import (
-    CounterMetric, CallbackMetric, DistributionMetric, CacheMetric,
-    MemoryUsageMetric, GaugeMetric,
-)
-from .process_collector import register_process_collector
+from twisted.internet import reactor
 
 
 logger = logging.getLogger(__name__)
@@ -34,149 +33,94 @@ logger = logging.getLogger(__name__)
 running_on_pypy = platform.python_implementation() == 'PyPy'
 all_metrics = []
 all_collectors = []
+all_gauges = {}
 
+@attr.s(hash=True)
+class LaterGauge(object):
 
-class Metrics(object):
-    """ A single Metrics object gives a (mutable) slice view of the all_metrics
-    dict, allowing callers to easily register new metrics that are namespaced
-    nicely."""
-
-    def __init__(self, name):
-        self.name_prefix = name
-
-    def make_subspace(self, name):
-        return Metrics("%s_%s" % (self.name_prefix, name))
-
-    def register_collector(self, func):
-        all_collectors.append(func)
-
-    def _register(self, metric_class, name, *args, **kwargs):
-        full_name = "%s_%s" % (self.name_prefix, name)
-
-        metric = metric_class(full_name, *args, **kwargs)
-
-        all_metrics.append(metric)
-        return metric
-
-    def register_counter(self, *args, **kwargs):
-        """
-        Returns:
-            CounterMetric
-        """
-        return self._register(CounterMetric, *args, **kwargs)
-
-    def register_gauge(self, *args, **kwargs):
-        """
-        Returns:
-            GaugeMetric
-        """
-        return self._register(GaugeMetric, *args, **kwargs)
+    name = attr.ib()
+    desc = attr.ib()
+    labels = attr.ib(hash=False)
+    caller = attr.ib()
 
-    def register_callback(self, *args, **kwargs):
-        """
-        Returns:
-            CallbackMetric
-        """
-        return self._register(CallbackMetric, *args, **kwargs)
+    def collect(self):
 
-    def register_distribution(self, *args, **kwargs):
-        """
-        Returns:
-            DistributionMetric
-        """
-        return self._register(DistributionMetric, *args, **kwargs)
-
-    def register_cache(self, *args, **kwargs):
-        """
-        Returns:
-            CacheMetric
-        """
-        return self._register(CacheMetric, *args, **kwargs)
+        g = GaugeMetricFamily(self.name, self.desc, self.labels)
 
+        try:
+            calls = self.caller()
+        except Exception as e:
+            print(e)
+            logger.err()
+            yield g
 
-def register_memory_metrics(hs):
-    try:
-        import psutil
-        process = psutil.Process()
-        process.memory_info().rss
-    except (ImportError, AttributeError):
-        logger.warn(
-            "psutil is not installed or incorrect version."
-            " Disabling memory metrics."
-        )
-        return
-    metric = MemoryUsageMetric(hs, psutil)
-    all_metrics.append(metric)
+        if isinstance(calls, dict):
+            for k, v in calls.items():
+                g.add_metric(k, v)
+        else:
+            g.add_metric([], calls)
 
+        yield g
 
-def get_metrics_for(pkg_name):
-    """ Returns a Metrics instance for conveniently creating metrics
-    namespaced with the given name prefix. """
+    def register(self):
+        if self.name in all_gauges.keys():
+            REGISTRY.unregister(all_gauges.pop(self.name))
 
-    # Convert a "package.name" to "package_name" because Prometheus doesn't
-    # let us use . in metric names
-    return Metrics(pkg_name.replace(".", "_"))
+        REGISTRY.register(self)
+        all_gauges[self.name] = self
 
 
-def render_all():
-    strs = []
+#
+# Python GC metrics
+#
 
-    for collector in all_collectors:
-        collector()
+gc_unreachable = Gauge("python_gc_unreachable_total", "Unreachable GC objects", ["gen"])
+gc_time = Histogram("python_gc_time", "Time taken to GC (ms)", ["gen"], buckets=[1, 2, 5, 10, 25, 50, 100, 250, 500, 1000])
 
-    for metric in all_metrics:
-        try:
-            strs += metric.render()
-        except Exception:
-            strs += ["# FAILED to render"]
-            logger.exception("Failed to render metric")
+class GCCounts(object):
+    def collect(self):
+        gc_counts = gc.get_count()
 
-    strs.append("")  # to generate a final CRLF
+        cm = GaugeMetricFamily("python_gc_counts", "GC cycle counts", labels=["gen"])
+        for n, m in enumerate(gc.get_count()):
+            cm.add_metric([str(n)], m)
 
-    return "\n".join(strs)
+        yield cm
 
+REGISTRY.register(GCCounts())
 
-register_process_collector(get_metrics_for("process"))
+#
+# Twisted reactor metrics
+#
 
+tick_time = Histogram("python_twisted_reactor_tick_time", "Tick time of the Twisted reactor (ms)", buckets=[1, 2, 5, 10, 50, 100, 250, 500, 1000, 2000])
+pending_calls_metric = Histogram("python_twisted_reactor_pending_calls", "Pending calls", buckets=[1, 2, 5, 10, 25, 50, 100, 250, 500, 1000])
 
-python_metrics = get_metrics_for("python")
+#
+# Federation Metrics
+#
 
-gc_time = python_metrics.register_distribution("gc_time", labels=["gen"])
-gc_unreachable = python_metrics.register_counter("gc_unreachable_total", labels=["gen"])
-python_metrics.register_callback(
-    "gc_counts", lambda: {(i,): v for i, v in enumerate(gc.get_count())}, labels=["gen"]
-)
+sent_edus_counter = Counter("synapse_federation_client_sent_edus", "")
 
-reactor_metrics = get_metrics_for("python.twisted.reactor")
-tick_time = reactor_metrics.register_distribution("tick_time")
-pending_calls_metric = reactor_metrics.register_distribution("pending_calls")
+sent_transactions_counter = Counter("synapse_federation_client_sent_transactions", "")
 
-synapse_metrics = get_metrics_for("synapse")
+events_processed_counter = Counter("synapse_federation_client_events_processed", "")
 
 # Used to track where various components have processed in the event stream,
 # e.g. federation sending, appservice sending, etc.
-event_processing_positions = synapse_metrics.register_gauge(
-    "event_processing_positions", labels=["name"],
-)
+event_processing_positions = Gauge("synapse_event_processing_positions", "", ["name"])
 
 # Used to track the current max events stream position
-event_persisted_position = synapse_metrics.register_gauge(
-    "event_persisted_position",
-)
+event_persisted_position = Gauge("synapse_event_persisted_position", "")
 
 # Used to track the received_ts of the last event processed by various
 # components
-event_processing_last_ts = synapse_metrics.register_gauge(
-    "event_processing_last_ts", labels=["name"],
-)
+event_processing_last_ts = Gauge("synapse_event_processing_last_ts", "", ["name"])
 
 # Used to track the lag processing events. This is the time difference
 # between the last processed event's received_ts and the time it was
 # finished being processed.
-event_processing_lag = synapse_metrics.register_gauge(
-    "event_processing_lag", labels=["name"],
-)
-
+event_processing_lag = Gauge("synapse_event_processing_lag", "", ["name"])
 
 def runUntilCurrentTimer(func):
 
@@ -206,8 +150,8 @@ def runUntilCurrentTimer(func):
         # since about 25% of time is actually spent running things triggered by
         # I/O events, but that is harder to capture without rewriting half the
         # reactor.
-        tick_time.inc_by(end - start)
-        pending_calls_metric.inc_by(num_pending)
+        tick_time.observe(end - start)
+        pending_calls_metric.observe(num_pending)
 
         if running_on_pypy:
             return ret
@@ -224,8 +168,8 @@ def runUntilCurrentTimer(func):
                 unreachable = gc.collect(i)
                 end = time.time() * 1000
 
-                gc_time.inc_by(end - start, i)
-                gc_unreachable.inc_by(unreachable, i)
+                gc_time.labels(i).observe(end - start)
+                gc_unreachable.labels(i).set(unreachable)
 
         return ret