diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index e3b831db67..973ba6506f 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -18,14 +18,13 @@ import functools
import time
import gc
import platform
+import attr
-from twisted.internet import reactor
+from prometheus_client import Gauge, Histogram, Counter
+from prometheus_client.core import (
+ GaugeMetricFamily, CounterMetricFamily, REGISTRY)
-from .metric import (
- CounterMetric, CallbackMetric, DistributionMetric, CacheMetric,
- MemoryUsageMetric, GaugeMetric,
-)
-from .process_collector import register_process_collector
+from twisted.internet import reactor
logger = logging.getLogger(__name__)
@@ -34,149 +33,94 @@ logger = logging.getLogger(__name__)
running_on_pypy = platform.python_implementation() == 'PyPy'
all_metrics = []
all_collectors = []
+all_gauges = {}
+@attr.s(hash=True)
+class LaterGauge(object):
-class Metrics(object):
- """ A single Metrics object gives a (mutable) slice view of the all_metrics
- dict, allowing callers to easily register new metrics that are namespaced
- nicely."""
-
- def __init__(self, name):
- self.name_prefix = name
-
- def make_subspace(self, name):
- return Metrics("%s_%s" % (self.name_prefix, name))
-
- def register_collector(self, func):
- all_collectors.append(func)
-
- def _register(self, metric_class, name, *args, **kwargs):
- full_name = "%s_%s" % (self.name_prefix, name)
-
- metric = metric_class(full_name, *args, **kwargs)
-
- all_metrics.append(metric)
- return metric
-
- def register_counter(self, *args, **kwargs):
- """
- Returns:
- CounterMetric
- """
- return self._register(CounterMetric, *args, **kwargs)
-
- def register_gauge(self, *args, **kwargs):
- """
- Returns:
- GaugeMetric
- """
- return self._register(GaugeMetric, *args, **kwargs)
+ name = attr.ib()
+ desc = attr.ib()
+ labels = attr.ib(hash=False)
+ caller = attr.ib()
- def register_callback(self, *args, **kwargs):
- """
- Returns:
- CallbackMetric
- """
- return self._register(CallbackMetric, *args, **kwargs)
+ def collect(self):
- def register_distribution(self, *args, **kwargs):
- """
- Returns:
- DistributionMetric
- """
- return self._register(DistributionMetric, *args, **kwargs)
-
- def register_cache(self, *args, **kwargs):
- """
- Returns:
- CacheMetric
- """
- return self._register(CacheMetric, *args, **kwargs)
+ g = GaugeMetricFamily(self.name, self.desc, self.labels)
+ try:
+ calls = self.caller()
+ except Exception as e:
+ print(e)
+ logger.err()
+ yield g
-def register_memory_metrics(hs):
- try:
- import psutil
- process = psutil.Process()
- process.memory_info().rss
- except (ImportError, AttributeError):
- logger.warn(
- "psutil is not installed or incorrect version."
- " Disabling memory metrics."
- )
- return
- metric = MemoryUsageMetric(hs, psutil)
- all_metrics.append(metric)
+ if isinstance(calls, dict):
+ for k, v in calls.items():
+ g.add_metric(k, v)
+ else:
+ g.add_metric([], calls)
+ yield g
-def get_metrics_for(pkg_name):
- """ Returns a Metrics instance for conveniently creating metrics
- namespaced with the given name prefix. """
+ def register(self):
+ if self.name in all_gauges.keys():
+ REGISTRY.unregister(all_gauges.pop(self.name))
- # Convert a "package.name" to "package_name" because Prometheus doesn't
- # let us use . in metric names
- return Metrics(pkg_name.replace(".", "_"))
+ REGISTRY.register(self)
+ all_gauges[self.name] = self
-def render_all():
- strs = []
+#
+# Python GC metrics
+#
- for collector in all_collectors:
- collector()
+gc_unreachable = Gauge("python_gc_unreachable_total", "Unreachable GC objects", ["gen"])
+gc_time = Histogram("python_gc_time", "Time taken to GC (ms)", ["gen"], buckets=[1, 2, 5, 10, 25, 50, 100, 250, 500, 1000])
- for metric in all_metrics:
- try:
- strs += metric.render()
- except Exception:
- strs += ["# FAILED to render"]
- logger.exception("Failed to render metric")
+class GCCounts(object):
+ def collect(self):
+ gc_counts = gc.get_count()
- strs.append("") # to generate a final CRLF
+ cm = GaugeMetricFamily("python_gc_counts", "GC cycle counts", labels=["gen"])
+ for n, m in enumerate(gc.get_count()):
+ cm.add_metric([str(n)], m)
- return "\n".join(strs)
+ yield cm
+REGISTRY.register(GCCounts())
-register_process_collector(get_metrics_for("process"))
+#
+# Twisted reactor metrics
+#
+tick_time = Histogram("python_twisted_reactor_tick_time", "Tick time of the Twisted reactor (ms)", buckets=[1, 2, 5, 10, 50, 100, 250, 500, 1000, 2000])
+pending_calls_metric = Histogram("python_twisted_reactor_pending_calls", "Pending calls", buckets=[1, 2, 5, 10, 25, 50, 100, 250, 500, 1000])
-python_metrics = get_metrics_for("python")
+#
+# Federation Metrics
+#
-gc_time = python_metrics.register_distribution("gc_time", labels=["gen"])
-gc_unreachable = python_metrics.register_counter("gc_unreachable_total", labels=["gen"])
-python_metrics.register_callback(
- "gc_counts", lambda: {(i,): v for i, v in enumerate(gc.get_count())}, labels=["gen"]
-)
+sent_edus_counter = Counter("synapse_federation_client_sent_edus", "")
-reactor_metrics = get_metrics_for("python.twisted.reactor")
-tick_time = reactor_metrics.register_distribution("tick_time")
-pending_calls_metric = reactor_metrics.register_distribution("pending_calls")
+sent_transactions_counter = Counter("synapse_federation_client_sent_transactions", "")
-synapse_metrics = get_metrics_for("synapse")
+events_processed_counter = Counter("synapse_federation_client_events_processed", "")
# Used to track where various components have processed in the event stream,
# e.g. federation sending, appservice sending, etc.
-event_processing_positions = synapse_metrics.register_gauge(
- "event_processing_positions", labels=["name"],
-)
+event_processing_positions = Gauge("synapse_event_processing_positions", "", ["name"])
# Used to track the current max events stream position
-event_persisted_position = synapse_metrics.register_gauge(
- "event_persisted_position",
-)
+event_persisted_position = Gauge("synapse_event_persisted_position", "")
# Used to track the received_ts of the last event processed by various
# components
-event_processing_last_ts = synapse_metrics.register_gauge(
- "event_processing_last_ts", labels=["name"],
-)
+event_processing_last_ts = Gauge("synapse_event_processing_last_ts", "", ["name"])
# Used to track the lag processing events. This is the time difference
# between the last processed event's received_ts and the time it was
# finished being processed.
-event_processing_lag = synapse_metrics.register_gauge(
- "event_processing_lag", labels=["name"],
-)
-
+event_processing_lag = Gauge("synapse_event_processing_lag", "", ["name"])
def runUntilCurrentTimer(func):
@@ -206,8 +150,8 @@ def runUntilCurrentTimer(func):
# since about 25% of time is actually spent running things triggered by
# I/O events, but that is harder to capture without rewriting half the
# reactor.
- tick_time.inc_by(end - start)
- pending_calls_metric.inc_by(num_pending)
+ tick_time.observe(end - start)
+ pending_calls_metric.observe(num_pending)
if running_on_pypy:
return ret
@@ -224,8 +168,8 @@ def runUntilCurrentTimer(func):
unreachable = gc.collect(i)
end = time.time() * 1000
- gc_time.inc_by(end - start, i)
- gc_unreachable.inc_by(unreachable, i)
+ gc_time.labels(i).observe(end - start)
+ gc_unreachable.labels(i).set(unreachable)
return ret
|