summary refs log tree commit diff
path: root/synapse/metrics
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2018-05-03 14:59:29 +0100
committerRichard van der Hoff <richard@matrix.org>2018-05-03 14:59:29 +0100
commit093d8c415a303fa7c8900e3fe685843b60a85eed (patch)
tree8dd5ceabb4c8ecbae18fa95f10108ba335c6bbbf /synapse/metrics
parentMake 'unexpected logging context' into warnings (diff)
parentMerge pull request #3183 from matrix-org/rav/moar_logcontext_leaks (diff)
downloadsynapse-093d8c415a303fa7c8900e3fe685843b60a85eed.tar.xz
Merge remote-tracking branch 'origin/develop' into rav/warn_on_logcontext_fail
Diffstat (limited to 'synapse/metrics')
-rw-r--r--synapse/metrics/__init__.py43
-rw-r--r--synapse/metrics/metric.py62
2 files changed, 100 insertions, 5 deletions
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index 50d99d7a5c..e3b831db67 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -17,12 +17,13 @@ import logging
 import functools
 import time
 import gc
+import platform
 
 from twisted.internet import reactor
 
 from .metric import (
     CounterMetric, CallbackMetric, DistributionMetric, CacheMetric,
-    MemoryUsageMetric,
+    MemoryUsageMetric, GaugeMetric,
 )
 from .process_collector import register_process_collector
 
@@ -30,6 +31,7 @@ from .process_collector import register_process_collector
 logger = logging.getLogger(__name__)
 
 
+running_on_pypy = platform.python_implementation() == 'PyPy'
 all_metrics = []
 all_collectors = []
 
@@ -63,6 +65,13 @@ class Metrics(object):
         """
         return self._register(CounterMetric, *args, **kwargs)
 
+    def register_gauge(self, *args, **kwargs):
+        """
+        Returns:
+            GaugeMetric
+        """
+        return self._register(GaugeMetric, *args, **kwargs)
+
     def register_callback(self, *args, **kwargs):
         """
         Returns:
@@ -142,6 +151,32 @@ reactor_metrics = get_metrics_for("python.twisted.reactor")
 tick_time = reactor_metrics.register_distribution("tick_time")
 pending_calls_metric = reactor_metrics.register_distribution("pending_calls")
 
+synapse_metrics = get_metrics_for("synapse")
+
+# Used to track where various components have processed in the event stream,
+# e.g. federation sending, appservice sending, etc.
+event_processing_positions = synapse_metrics.register_gauge(
+    "event_processing_positions", labels=["name"],
+)
+
+# Used to track the current max events stream position
+event_persisted_position = synapse_metrics.register_gauge(
+    "event_persisted_position",
+)
+
+# Used to track the received_ts of the last event processed by various
+# components
+event_processing_last_ts = synapse_metrics.register_gauge(
+    "event_processing_last_ts", labels=["name"],
+)
+
+# Used to track the lag processing events. This is the time difference
+# between the last processed event's received_ts and the time it was
+# finished being processed.
+event_processing_lag = synapse_metrics.register_gauge(
+    "event_processing_lag", labels=["name"],
+)
+
 
 def runUntilCurrentTimer(func):
 
@@ -174,6 +209,9 @@ def runUntilCurrentTimer(func):
         tick_time.inc_by(end - start)
         pending_calls_metric.inc_by(num_pending)
 
+        if running_on_pypy:
+            return ret
+
         # Check if we need to do a manual GC (since its been disabled), and do
         # one if necessary.
         threshold = gc.get_threshold()
@@ -206,6 +244,7 @@ try:
 
     # We manually run the GC each reactor tick so that we can get some metrics
     # about time spent doing GC,
-    gc.disable()
+    if not running_on_pypy:
+        gc.disable()
 except AttributeError:
     pass
diff --git a/synapse/metrics/metric.py b/synapse/metrics/metric.py
index ff5aa8c0e1..fbba94e633 100644
--- a/synapse/metrics/metric.py
+++ b/synapse/metrics/metric.py
@@ -16,6 +16,7 @@
 
 from itertools import chain
 import logging
+import re
 
 logger = logging.getLogger(__name__)
 
@@ -56,8 +57,7 @@ class BaseMetric(object):
         return not len(self.labels)
 
     def _render_labelvalue(self, value):
-        # TODO: escape backslashes, quotes and newlines
-        return '"%s"' % (value)
+        return '"%s"' % (_escape_label_value(value),)
 
     def _render_key(self, values):
         if self.is_scalar():
@@ -115,7 +115,7 @@ class CounterMetric(BaseMetric):
         # dict[list[str]]: value for each set of label values. the keys are the
         # label values, in the same order as the labels in self.labels.
         #
-        # (if the metric is a scalar, the (single) key is the empty list).
+        # (if the metric is a scalar, the (single) key is the empty tuple).
         self.counts = {}
 
         # Scalar metrics are never empty
@@ -145,6 +145,36 @@ class CounterMetric(BaseMetric):
         )
 
 
+class GaugeMetric(BaseMetric):
+    """A metric that can go up or down
+    """
+
+    def __init__(self, *args, **kwargs):
+        super(GaugeMetric, self).__init__(*args, **kwargs)
+
+        # dict[list[str]]: value for each set of label values. the keys are the
+        # label values, in the same order as the labels in self.labels.
+        #
+        # (if the metric is a scalar, the (single) key is the empty tuple).
+        self.guages = {}
+
+    def set(self, v, *values):
+        if len(values) != self.dimension():
+            raise ValueError(
+                "Expected as many values to inc() as labels (%d)" % (self.dimension())
+            )
+
+        # TODO: should assert that the tag values are all strings
+
+        self.guages[values] = v
+
+    def render(self):
+        return flatten(
+            self._render_for_labels(k, self.guages[k])
+            for k in sorted(self.guages.keys())
+        )
+
+
 class CallbackMetric(BaseMetric):
     """A metric that returns the numeric value returned by a callback whenever
     it is rendered. Typically this is used to implement gauges that yield the
@@ -269,3 +299,29 @@ class MemoryUsageMetric(object):
             "process_psutil_rss:total %d" % sum_rss,
             "process_psutil_rss:count %d" % len_rss,
         ]
+
+
+def _escape_character(m):
+    """Replaces a single character with its escape sequence.
+
+    Args:
+        m (re.MatchObject): A match object whose first group is the single
+            character to replace
+
+    Returns:
+        str
+    """
+    c = m.group(1)
+    if c == "\\":
+        return "\\\\"
+    elif c == "\"":
+        return "\\\""
+    elif c == "\n":
+        return "\\n"
+    return c
+
+
+def _escape_label_value(value):
+    """Takes a label value and escapes quotes, newlines and backslashes
+    """
+    return re.sub(r"([\n\"\\])", _escape_character, value)