diff options
author | David Baker <dave@matrix.org> | 2018-10-09 10:05:02 +0100 |
---|---|---|
committer | David Baker <dave@matrix.org> | 2018-10-09 10:05:02 +0100 |
commit | dc045ef20222bfbe8dcb5dae297e741509cce8d1 (patch) | |
tree | ee03ab45ce9791a06c12d15c01d3412cd101330a /synapse/metrics | |
parent | Apparently this blank line is Very Important (diff) | |
parent | Merge pull request #4017 from matrix-org/rav/optimise_filter_events_for_server (diff) | |
download | synapse-dc045ef20222bfbe8dcb5dae297e741509cce8d1.tar.xz |
Merge remote-tracking branch 'origin/develop' into dbkr/e2e_backups
Diffstat (limited to 'synapse/metrics')
-rw-r--r-- | synapse/metrics/__init__.py | 108 | ||||
-rw-r--r-- | synapse/metrics/background_process_metrics.py | 14 |
2 files changed, 119 insertions, 3 deletions
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 550f8443f7..59900aa5d1 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -18,8 +18,11 @@ import gc import logging import os import platform +import threading import time +import six + import attr from prometheus_client import Counter, Gauge, Histogram from prometheus_client.core import REGISTRY, GaugeMetricFamily @@ -68,7 +71,7 @@ class LaterGauge(object): return if isinstance(calls, dict): - for k, v in calls.items(): + for k, v in six.iteritems(calls): g.add_metric(k, v) else: g.add_metric([], calls) @@ -87,6 +90,109 @@ class LaterGauge(object): all_gauges[self.name] = self +class InFlightGauge(object): + """Tracks number of things (e.g. requests, Measure blocks, etc) in flight + at any given time. + + Each InFlightGauge will create a metric called `<name>_total` that counts + the number of in flight blocks, as well as a metrics for each item in the + given `sub_metrics` as `<name>_<sub_metric>` which will get updated by the + callbacks. + + Args: + name (str) + desc (str) + labels (list[str]) + sub_metrics (list[str]): A list of sub metrics that the callbacks + will update. + """ + + def __init__(self, name, desc, labels, sub_metrics): + self.name = name + self.desc = desc + self.labels = labels + self.sub_metrics = sub_metrics + + # Create a class which have the sub_metrics values as attributes, which + # default to 0 on initialization. Used to pass to registered callbacks. + self._metrics_class = attr.make_class( + "_MetricsEntry", + attrs={x: attr.ib(0) for x in sub_metrics}, + slots=True, + ) + + # Counts number of in flight blocks for a given set of label values + self._registrations = {} + + # Protects access to _registrations + self._lock = threading.Lock() + + self._register_with_collector() + + def register(self, key, callback): + """Registers that we've entered a new block with labels `key`. + + `callback` gets called each time the metrics are collected. The same + value must also be given to `unregister`. + + `callback` gets called with an object that has an attribute per + sub_metric, which should be updated with the necessary values. Note that + the metrics object is shared between all callbacks registered with the + same key. + + Note that `callback` may be called on a separate thread. + """ + with self._lock: + self._registrations.setdefault(key, set()).add(callback) + + def unregister(self, key, callback): + """Registers that we've exited a block with labels `key`. + """ + + with self._lock: + self._registrations.setdefault(key, set()).discard(callback) + + def collect(self): + """Called by prometheus client when it reads metrics. + + Note: may be called by a separate thread. + """ + in_flight = GaugeMetricFamily(self.name + "_total", self.desc, labels=self.labels) + + metrics_by_key = {} + + # We copy so that we don't mutate the list while iterating + with self._lock: + keys = list(self._registrations) + + for key in keys: + with self._lock: + callbacks = set(self._registrations[key]) + + in_flight.add_metric(key, len(callbacks)) + + metrics = self._metrics_class() + metrics_by_key[key] = metrics + for callback in callbacks: + callback(metrics) + + yield in_flight + + for name in self.sub_metrics: + gauge = GaugeMetricFamily("_".join([self.name, name]), "", labels=self.labels) + for key, metrics in six.iteritems(metrics_by_key): + gauge.add_metric(key, getattr(metrics, name)) + yield gauge + + def _register_with_collector(self): + if self.name in all_gauges.keys(): + logger.warning("%s already registered, reregistering" % (self.name,)) + REGISTRY.unregister(all_gauges.pop(self.name)) + + REGISTRY.register(self) + all_gauges[self.name] = self + + # # Detailed CPU metrics # diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index 167167be0a..037f1c490e 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging import threading import six @@ -23,6 +24,9 @@ from twisted.internet import defer from synapse.util.logcontext import LoggingContext, PreserveLoggingContext +logger = logging.getLogger(__name__) + + _background_process_start_count = Counter( "synapse_background_process_start_count", "Number of background processes started", @@ -97,9 +101,13 @@ class _Collector(object): labels=["name"], ) - # We copy the dict so that it doesn't change from underneath us + # We copy the dict so that it doesn't change from underneath us. + # We also copy the process lists as that can also change with _bg_metrics_lock: - _background_processes_copy = dict(_background_processes) + _background_processes_copy = { + k: list(v) + for k, v in six.iteritems(_background_processes) + } for desc, processes in six.iteritems(_background_processes_copy): background_process_in_flight_count.add_metric( @@ -191,6 +199,8 @@ def run_as_background_process(desc, func, *args, **kwargs): try: yield func(*args, **kwargs) + except Exception: + logger.exception("Background process '%s' threw an exception", desc) finally: proc.update_metrics() |