summary refs log tree commit diff
path: root/synapse/metrics
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/metrics')
-rw-r--r--synapse/metrics/__init__.py234
-rw-r--r--synapse/metrics/metric.py271
-rw-r--r--synapse/metrics/process_collector.py122
-rw-r--r--synapse/metrics/resource.py23
4 files changed, 139 insertions, 511 deletions
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index 50d99d7a5c..371c300cac 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -17,130 +17,170 @@ import logging
 import functools
 import time
 import gc
+import os
+import platform
+import attr
 
-from twisted.internet import reactor
+from prometheus_client import Gauge, Histogram, Counter
+from prometheus_client.core import GaugeMetricFamily, REGISTRY
 
-from .metric import (
-    CounterMetric, CallbackMetric, DistributionMetric, CacheMetric,
-    MemoryUsageMetric,
-)
-from .process_collector import register_process_collector
+from twisted.internet import reactor
 
 
 logger = logging.getLogger(__name__)
 
-
+running_on_pypy = platform.python_implementation() == "PyPy"
 all_metrics = []
 all_collectors = []
+all_gauges = {}
+
+
+class RegistryProxy(object):
+
+    def collect(self):
+        for metric in REGISTRY.collect():
+            if not metric.name.startswith("__"):
+                yield metric
+
 
+@attr.s(hash=True)
+class LaterGauge(object):
 
-class Metrics(object):
-    """ A single Metrics object gives a (mutable) slice view of the all_metrics
-    dict, allowing callers to easily register new metrics that are namespaced
-    nicely."""
+    name = attr.ib()
+    desc = attr.ib()
+    labels = attr.ib(hash=False)
+    caller = attr.ib()
 
-    def __init__(self, name):
-        self.name_prefix = name
+    def collect(self):
 
-    def make_subspace(self, name):
-        return Metrics("%s_%s" % (self.name_prefix, name))
+        g = GaugeMetricFamily(self.name, self.desc, labels=self.labels)
 
-    def register_collector(self, func):
-        all_collectors.append(func)
+        try:
+            calls = self.caller()
+        except Exception as e:
+            print(e)
+            logger.err()
+            yield g
+
+        if isinstance(calls, dict):
+            for k, v in calls.items():
+                g.add_metric(k, v)
+        else:
+            g.add_metric([], calls)
 
-    def _register(self, metric_class, name, *args, **kwargs):
-        full_name = "%s_%s" % (self.name_prefix, name)
+        yield g
 
-        metric = metric_class(full_name, *args, **kwargs)
+    def __attrs_post_init__(self):
+        self._register()
 
-        all_metrics.append(metric)
-        return metric
+    def _register(self):
+        if self.name in all_gauges.keys():
+            logger.warning("%s already registered, reregistering" % (self.name,))
+            REGISTRY.unregister(all_gauges.pop(self.name))
 
-    def register_counter(self, *args, **kwargs):
-        """
-        Returns:
-            CounterMetric
-        """
-        return self._register(CounterMetric, *args, **kwargs)
+        REGISTRY.register(self)
+        all_gauges[self.name] = self
 
-    def register_callback(self, *args, **kwargs):
-        """
-        Returns:
-            CallbackMetric
-        """
-        return self._register(CallbackMetric, *args, **kwargs)
 
-    def register_distribution(self, *args, **kwargs):
-        """
-        Returns:
-            DistributionMetric
-        """
-        return self._register(DistributionMetric, *args, **kwargs)
+#
+# Detailed CPU metrics
+#
 
-    def register_cache(self, *args, **kwargs):
-        """
-        Returns:
-            CacheMetric
-        """
-        return self._register(CacheMetric, *args, **kwargs)
+class CPUMetrics(object):
 
+    def __init__(self):
+        ticks_per_sec = 100
+        try:
+            # Try and get the system config
+            ticks_per_sec = os.sysconf('SC_CLK_TCK')
+        except (ValueError, TypeError, AttributeError):
+            pass
 
-def register_memory_metrics(hs):
-    try:
-        import psutil
-        process = psutil.Process()
-        process.memory_info().rss
-    except (ImportError, AttributeError):
-        logger.warn(
-            "psutil is not installed or incorrect version."
-            " Disabling memory metrics."
-        )
-        return
-    metric = MemoryUsageMetric(hs, psutil)
-    all_metrics.append(metric)
+        self.ticks_per_sec = ticks_per_sec
 
+    def collect(self):
 
-def get_metrics_for(pkg_name):
-    """ Returns a Metrics instance for conveniently creating metrics
-    namespaced with the given name prefix. """
+        with open("/proc/self/stat") as s:
+            line = s.read()
+            raw_stats = line.split(") ", 1)[1].split(" ")
 
-    # Convert a "package.name" to "package_name" because Prometheus doesn't
-    # let us use . in metric names
-    return Metrics(pkg_name.replace(".", "_"))
+            user = GaugeMetricFamily("process_cpu_user_seconds_total", "")
+            user.add_metric([], float(raw_stats[11]) / self.ticks_per_sec)
+            yield user
 
+            sys = GaugeMetricFamily("process_cpu_system_seconds_total", "")
+            sys.add_metric([], float(raw_stats[12]) / self.ticks_per_sec)
+            yield sys
 
-def render_all():
-    strs = []
 
-    for collector in all_collectors:
-        collector()
+REGISTRY.register(CPUMetrics())
 
-    for metric in all_metrics:
-        try:
-            strs += metric.render()
-        except Exception:
-            strs += ["# FAILED to render"]
-            logger.exception("Failed to render metric")
+#
+# Python GC metrics
+#
+
+gc_unreachable = Gauge("python_gc_unreachable_total", "Unreachable GC objects", ["gen"])
+gc_time = Histogram(
+    "python_gc_time",
+    "Time taken to GC (sec)",
+    ["gen"],
+    buckets=[0.0025, 0.005, 0.01, 0.025, 0.05, 0.10, 0.25, 0.50, 1.00, 2.50,
+             5.00, 7.50, 15.00, 30.00, 45.00, 60.00],
+)
 
-    strs.append("")  # to generate a final CRLF
 
-    return "\n".join(strs)
+class GCCounts(object):
 
+    def collect(self):
+        cm = GaugeMetricFamily("python_gc_counts", "GC cycle counts", labels=["gen"])
+        for n, m in enumerate(gc.get_count()):
+            cm.add_metric([str(n)], m)
 
-register_process_collector(get_metrics_for("process"))
+        yield cm
 
 
-python_metrics = get_metrics_for("python")
+REGISTRY.register(GCCounts())
+
+#
+# Twisted reactor metrics
+#
 
-gc_time = python_metrics.register_distribution("gc_time", labels=["gen"])
-gc_unreachable = python_metrics.register_counter("gc_unreachable_total", labels=["gen"])
-python_metrics.register_callback(
-    "gc_counts", lambda: {(i,): v for i, v in enumerate(gc.get_count())}, labels=["gen"]
+tick_time = Histogram(
+    "python_twisted_reactor_tick_time",
+    "Tick time of the Twisted reactor (sec)",
+    buckets=[0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.2, 0.5, 1, 2, 5],
+)
+pending_calls_metric = Histogram(
+    "python_twisted_reactor_pending_calls",
+    "Pending calls",
+    buckets=[1, 2, 5, 10, 25, 50, 100, 250, 500, 1000],
 )
 
-reactor_metrics = get_metrics_for("python.twisted.reactor")
-tick_time = reactor_metrics.register_distribution("tick_time")
-pending_calls_metric = reactor_metrics.register_distribution("pending_calls")
+#
+# Federation Metrics
+#
+
+sent_edus_counter = Counter("synapse_federation_client_sent_edus", "")
+
+sent_transactions_counter = Counter("synapse_federation_client_sent_transactions", "")
+
+events_processed_counter = Counter("synapse_federation_client_events_processed", "")
+
+# Used to track where various components have processed in the event stream,
+# e.g. federation sending, appservice sending, etc.
+event_processing_positions = Gauge("synapse_event_processing_positions", "", ["name"])
+
+# Used to track the current max events stream position
+event_persisted_position = Gauge("synapse_event_persisted_position", "")
+
+# Used to track the received_ts of the last event processed by various
+# components
+event_processing_last_ts = Gauge("synapse_event_processing_last_ts", "", ["name"])
+
+# Used to track the lag processing events. This is the time difference
+# between the last processed event's received_ts and the time it was
+# finished being processed.
+event_processing_lag = Gauge("synapse_event_processing_lag", "", ["name"])
 
 
 def runUntilCurrentTimer(func):
@@ -162,17 +202,20 @@ def runUntilCurrentTimer(func):
             num_pending += 1
 
         num_pending += len(reactor.threadCallQueue)
-        start = time.time() * 1000
+        start = time.time()
         ret = func(*args, **kwargs)
-        end = time.time() * 1000
+        end = time.time()
 
         # record the amount of wallclock time spent running pending calls.
         # This is a proxy for the actual amount of time between reactor polls,
         # since about 25% of time is actually spent running things triggered by
         # I/O events, but that is harder to capture without rewriting half the
         # reactor.
-        tick_time.inc_by(end - start)
-        pending_calls_metric.inc_by(num_pending)
+        tick_time.observe(end - start)
+        pending_calls_metric.observe(num_pending)
+
+        if running_on_pypy:
+            return ret
 
         # Check if we need to do a manual GC (since its been disabled), and do
         # one if necessary.
@@ -182,12 +225,12 @@ def runUntilCurrentTimer(func):
             if threshold[i] < counts[i]:
                 logger.info("Collecting gc %d", i)
 
-                start = time.time() * 1000
+                start = time.time()
                 unreachable = gc.collect(i)
-                end = time.time() * 1000
+                end = time.time()
 
-                gc_time.inc_by(end - start, i)
-                gc_unreachable.inc_by(unreachable, i)
+                gc_time.labels(i).observe(end - start)
+                gc_unreachable.labels(i).set(unreachable)
 
         return ret
 
@@ -206,6 +249,7 @@ try:
 
     # We manually run the GC each reactor tick so that we can get some metrics
     # about time spent doing GC,
-    gc.disable()
+    if not running_on_pypy:
+        gc.disable()
 except AttributeError:
     pass
diff --git a/synapse/metrics/metric.py b/synapse/metrics/metric.py
deleted file mode 100644
index ff5aa8c0e1..0000000000
--- a/synapse/metrics/metric.py
+++ /dev/null
@@ -1,271 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2015, 2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-from itertools import chain
-import logging
-
-logger = logging.getLogger(__name__)
-
-
-def flatten(items):
-    """Flatten a list of lists
-
-    Args:
-        items: iterable[iterable[X]]
-
-    Returns:
-        list[X]: flattened list
-    """
-    return list(chain.from_iterable(items))
-
-
-class BaseMetric(object):
-    """Base class for metrics which report a single value per label set
-    """
-
-    def __init__(self, name, labels=[], alternative_names=[]):
-        """
-        Args:
-            name (str): principal name for this metric
-            labels (list(str)): names of the labels which will be reported
-                for this metric
-            alternative_names (iterable(str)): list of alternative names for
-                 this metric. This can be useful to provide a migration path
-                when renaming metrics.
-        """
-        self._names = [name] + list(alternative_names)
-        self.labels = labels  # OK not to clone as we never write it
-
-    def dimension(self):
-        return len(self.labels)
-
-    def is_scalar(self):
-        return not len(self.labels)
-
-    def _render_labelvalue(self, value):
-        # TODO: escape backslashes, quotes and newlines
-        return '"%s"' % (value)
-
-    def _render_key(self, values):
-        if self.is_scalar():
-            return ""
-        return "{%s}" % (
-            ",".join(["%s=%s" % (k, self._render_labelvalue(v))
-                      for k, v in zip(self.labels, values)])
-        )
-
-    def _render_for_labels(self, label_values, value):
-        """Render this metric for a single set of labels
-
-        Args:
-            label_values (list[str]): values for each of the labels
-            value: value of the metric at with these labels
-
-        Returns:
-            iterable[str]: rendered metric
-        """
-        rendered_labels = self._render_key(label_values)
-        return (
-            "%s%s %.12g" % (name, rendered_labels, value)
-            for name in self._names
-        )
-
-    def render(self):
-        """Render this metric
-
-        Each metric is rendered as:
-
-            name{label1="val1",label2="val2"} value
-
-        https://prometheus.io/docs/instrumenting/exposition_formats/#text-format-details
-
-        Returns:
-            iterable[str]: rendered metrics
-        """
-        raise NotImplementedError()
-
-
-class CounterMetric(BaseMetric):
-    """The simplest kind of metric; one that stores a monotonically-increasing
-    value that counts events or running totals.
-
-    Example use cases for Counters:
-    - Number of requests processed
-    - Number of items that were inserted into a queue
-    - Total amount of data that a system has processed
-    Counters can only go up (and be reset when the process restarts).
-    """
-
-    def __init__(self, *args, **kwargs):
-        super(CounterMetric, self).__init__(*args, **kwargs)
-
-        # dict[list[str]]: value for each set of label values. the keys are the
-        # label values, in the same order as the labels in self.labels.
-        #
-        # (if the metric is a scalar, the (single) key is the empty list).
-        self.counts = {}
-
-        # Scalar metrics are never empty
-        if self.is_scalar():
-            self.counts[()] = 0.
-
-    def inc_by(self, incr, *values):
-        if len(values) != self.dimension():
-            raise ValueError(
-                "Expected as many values to inc() as labels (%d)" % (self.dimension())
-            )
-
-        # TODO: should assert that the tag values are all strings
-
-        if values not in self.counts:
-            self.counts[values] = incr
-        else:
-            self.counts[values] += incr
-
-    def inc(self, *values):
-        self.inc_by(1, *values)
-
-    def render(self):
-        return flatten(
-            self._render_for_labels(k, self.counts[k])
-            for k in sorted(self.counts.keys())
-        )
-
-
-class CallbackMetric(BaseMetric):
-    """A metric that returns the numeric value returned by a callback whenever
-    it is rendered. Typically this is used to implement gauges that yield the
-    size or other state of some in-memory object by actively querying it."""
-
-    def __init__(self, name, callback, labels=[]):
-        super(CallbackMetric, self).__init__(name, labels=labels)
-
-        self.callback = callback
-
-    def render(self):
-        try:
-            value = self.callback()
-        except Exception:
-            logger.exception("Failed to render %s", self.name)
-            return ["# FAILED to render " + self.name]
-
-        if self.is_scalar():
-            return list(self._render_for_labels([], value))
-
-        return flatten(
-            self._render_for_labels(k, value[k])
-            for k in sorted(value.keys())
-        )
-
-
-class DistributionMetric(object):
-    """A combination of an event counter and an accumulator, which counts
-    both the number of events and accumulates the total value. Typically this
-    could be used to keep track of method-running times, or other distributions
-    of values that occur in discrete occurances.
-
-    TODO(paul): Try to export some heatmap-style stats?
-    """
-
-    def __init__(self, name, *args, **kwargs):
-        self.counts = CounterMetric(name + ":count", **kwargs)
-        self.totals = CounterMetric(name + ":total", **kwargs)
-
-    def inc_by(self, inc, *values):
-        self.counts.inc(*values)
-        self.totals.inc_by(inc, *values)
-
-    def render(self):
-        return self.counts.render() + self.totals.render()
-
-
-class CacheMetric(object):
-    __slots__ = (
-        "name", "cache_name", "hits", "misses", "evicted_size", "size_callback",
-    )
-
-    def __init__(self, name, size_callback, cache_name):
-        self.name = name
-        self.cache_name = cache_name
-
-        self.hits = 0
-        self.misses = 0
-        self.evicted_size = 0
-
-        self.size_callback = size_callback
-
-    def inc_hits(self):
-        self.hits += 1
-
-    def inc_misses(self):
-        self.misses += 1
-
-    def inc_evictions(self, size=1):
-        self.evicted_size += size
-
-    def render(self):
-        size = self.size_callback()
-        hits = self.hits
-        total = self.misses + self.hits
-
-        return [
-            """%s:hits{name="%s"} %d""" % (self.name, self.cache_name, hits),
-            """%s:total{name="%s"} %d""" % (self.name, self.cache_name, total),
-            """%s:size{name="%s"} %d""" % (self.name, self.cache_name, size),
-            """%s:evicted_size{name="%s"} %d""" % (
-                self.name, self.cache_name, self.evicted_size
-            ),
-        ]
-
-
-class MemoryUsageMetric(object):
-    """Keeps track of the current memory usage, using psutil.
-
-    The class will keep the current min/max/sum/counts of rss over the last
-    WINDOW_SIZE_SEC, by polling UPDATE_HZ times per second
-    """
-
-    UPDATE_HZ = 2  # number of times to get memory per second
-    WINDOW_SIZE_SEC = 30  # the size of the window in seconds
-
-    def __init__(self, hs, psutil):
-        clock = hs.get_clock()
-        self.memory_snapshots = []
-
-        self.process = psutil.Process()
-
-        clock.looping_call(self._update_curr_values, 1000 / self.UPDATE_HZ)
-
-    def _update_curr_values(self):
-        max_size = self.UPDATE_HZ * self.WINDOW_SIZE_SEC
-        self.memory_snapshots.append(self.process.memory_info().rss)
-        self.memory_snapshots[:] = self.memory_snapshots[-max_size:]
-
-    def render(self):
-        if not self.memory_snapshots:
-            return []
-
-        max_rss = max(self.memory_snapshots)
-        min_rss = min(self.memory_snapshots)
-        sum_rss = sum(self.memory_snapshots)
-        len_rss = len(self.memory_snapshots)
-
-        return [
-            "process_psutil_rss:max %d" % max_rss,
-            "process_psutil_rss:min %d" % min_rss,
-            "process_psutil_rss:total %d" % sum_rss,
-            "process_psutil_rss:count %d" % len_rss,
-        ]
diff --git a/synapse/metrics/process_collector.py b/synapse/metrics/process_collector.py
deleted file mode 100644
index 6fec3de399..0000000000
--- a/synapse/metrics/process_collector.py
+++ /dev/null
@@ -1,122 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2015, 2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import os
-
-
-TICKS_PER_SEC = 100
-BYTES_PER_PAGE = 4096
-
-HAVE_PROC_STAT = os.path.exists("/proc/stat")
-HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
-HAVE_PROC_SELF_LIMITS = os.path.exists("/proc/self/limits")
-HAVE_PROC_SELF_FD = os.path.exists("/proc/self/fd")
-
-# Field indexes from /proc/self/stat, taken from the proc(5) manpage
-STAT_FIELDS = {
-    "utime": 14,
-    "stime": 15,
-    "starttime": 22,
-    "vsize": 23,
-    "rss": 24,
-}
-
-
-stats = {}
-
-# In order to report process_start_time_seconds we need to know the
-# machine's boot time, because the value in /proc/self/stat is relative to
-# this
-boot_time = None
-if HAVE_PROC_STAT:
-    with open("/proc/stat") as _procstat:
-        for line in _procstat:
-            if line.startswith("btime "):
-                boot_time = int(line.split()[1])
-
-
-def update_resource_metrics():
-    if HAVE_PROC_SELF_STAT:
-        global stats
-        with open("/proc/self/stat") as s:
-            line = s.read()
-            # line is PID (command) more stats go here ...
-            raw_stats = line.split(") ", 1)[1].split(" ")
-
-            for (name, index) in STAT_FIELDS.iteritems():
-                # subtract 3 from the index, because proc(5) is 1-based, and
-                # we've lost the first two fields in PID and COMMAND above
-                stats[name] = int(raw_stats[index - 3])
-
-
-def _count_fds():
-    # Not every OS will have a /proc/self/fd directory
-    if not HAVE_PROC_SELF_FD:
-        return 0
-
-    return len(os.listdir("/proc/self/fd"))
-
-
-def register_process_collector(process_metrics):
-    process_metrics.register_collector(update_resource_metrics)
-
-    if HAVE_PROC_SELF_STAT:
-        process_metrics.register_callback(
-            "cpu_user_seconds_total",
-            lambda: float(stats["utime"]) / TICKS_PER_SEC
-        )
-        process_metrics.register_callback(
-            "cpu_system_seconds_total",
-            lambda: float(stats["stime"]) / TICKS_PER_SEC
-        )
-        process_metrics.register_callback(
-            "cpu_seconds_total",
-            lambda: (float(stats["utime"] + stats["stime"])) / TICKS_PER_SEC
-        )
-
-        process_metrics.register_callback(
-            "virtual_memory_bytes",
-            lambda: int(stats["vsize"])
-        )
-        process_metrics.register_callback(
-            "resident_memory_bytes",
-            lambda: int(stats["rss"]) * BYTES_PER_PAGE
-        )
-
-        process_metrics.register_callback(
-            "start_time_seconds",
-            lambda: boot_time + int(stats["starttime"]) / TICKS_PER_SEC
-        )
-
-    if HAVE_PROC_SELF_FD:
-        process_metrics.register_callback(
-            "open_fds",
-            lambda: _count_fds()
-        )
-
-    if HAVE_PROC_SELF_LIMITS:
-        def _get_max_fds():
-            with open("/proc/self/limits") as limits:
-                for line in limits:
-                    if not line.startswith("Max open files "):
-                        continue
-                    # Line is  Max open files  $SOFT  $HARD
-                    return int(line.split()[3])
-            return None
-
-        process_metrics.register_callback(
-            "max_fds",
-            lambda: _get_max_fds()
-        )
diff --git a/synapse/metrics/resource.py b/synapse/metrics/resource.py
index 870f400600..7996e6ab66 100644
--- a/synapse/metrics/resource.py
+++ b/synapse/metrics/resource.py
@@ -13,27 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.web.resource import Resource
-
-import synapse.metrics
-
-
 METRICS_PREFIX = "/_synapse/metrics"
-
-
-class MetricsResource(Resource):
-    isLeaf = True
-
-    def __init__(self, hs):
-        Resource.__init__(self)  # Resource is old-style, so no super()
-
-        self.hs = hs
-
-    def render_GET(self, request):
-        response = synapse.metrics.render_all()
-
-        request.setHeader("Content-Type", "text/plain")
-        request.setHeader("Content-Length", str(len(response)))
-
-        # Encode as UTF-8 (default)
-        return response.encode()