diff options
Diffstat (limited to 'synapse/metrics/__init__.py')
-rw-r--r-- | synapse/metrics/__init__.py | 260 |
1 files changed, 177 insertions, 83 deletions
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 2265e6e8d6..a9158fc066 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -13,118 +13,198 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging import functools -import time import gc +import logging +import os +import platform +import time -from twisted.internet import reactor - -from .metric import ( - CounterMetric, CallbackMetric, DistributionMetric, CacheMetric, - MemoryUsageMetric, -) -from .process_collector import register_process_collector +import attr +from prometheus_client import Counter, Gauge, Histogram +from prometheus_client.core import REGISTRY, GaugeMetricFamily +from twisted.internet import reactor logger = logging.getLogger(__name__) - +running_on_pypy = platform.python_implementation() == "PyPy" all_metrics = [] all_collectors = [] +all_gauges = {} +HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat") -class Metrics(object): - """ A single Metrics object gives a (mutable) slice view of the all_metrics - dict, allowing callers to easily register new metrics that are namespaced - nicely.""" - def __init__(self, name): - self.name_prefix = name +class RegistryProxy(object): - def make_subspace(self, name): - return Metrics("%s_%s" % (self.name_prefix, name)) + @staticmethod + def collect(): + for metric in REGISTRY.collect(): + if not metric.name.startswith("__"): + yield metric - def register_collector(self, func): - all_collectors.append(func) - def _register(self, metric_class, name, *args, **kwargs): - full_name = "%s_%s" % (self.name_prefix, name) +@attr.s(hash=True) +class LaterGauge(object): - metric = metric_class(full_name, *args, **kwargs) + name = attr.ib() + desc = attr.ib() + labels = attr.ib(hash=False) + caller = attr.ib() - all_metrics.append(metric) - return metric + def collect(self): - def register_counter(self, *args, **kwargs): - return self._register(CounterMetric, *args, **kwargs) + g = GaugeMetricFamily(self.name, self.desc, labels=self.labels) - def register_callback(self, *args, **kwargs): - return self._register(CallbackMetric, *args, **kwargs) + try: + calls = self.caller() + except Exception: + logger.exception( + "Exception running callback for LaterGauge(%s)", + self.name, + ) + yield g + return - def register_distribution(self, *args, **kwargs): - return self._register(DistributionMetric, *args, **kwargs) + if isinstance(calls, dict): + for k, v in calls.items(): + g.add_metric(k, v) + else: + g.add_metric([], calls) - def register_cache(self, *args, **kwargs): - return self._register(CacheMetric, *args, **kwargs) + yield g + def __attrs_post_init__(self): + self._register() -def register_memory_metrics(hs): - try: - import psutil - process = psutil.Process() - process.memory_info().rss - except (ImportError, AttributeError): - logger.warn( - "psutil is not installed or incorrect version." - " Disabling memory metrics." - ) - return - metric = MemoryUsageMetric(hs, psutil) - all_metrics.append(metric) + def _register(self): + if self.name in all_gauges.keys(): + logger.warning("%s already registered, reregistering" % (self.name,)) + REGISTRY.unregister(all_gauges.pop(self.name)) + REGISTRY.register(self) + all_gauges[self.name] = self -def get_metrics_for(pkg_name): - """ Returns a Metrics instance for conveniently creating metrics - namespaced with the given name prefix. """ - # Convert a "package.name" to "package_name" because Prometheus doesn't - # let us use . in metric names - return Metrics(pkg_name.replace(".", "_")) +# +# Detailed CPU metrics +# +class CPUMetrics(object): -def render_all(): - strs = [] + def __init__(self): + ticks_per_sec = 100 + try: + # Try and get the system config + ticks_per_sec = os.sysconf('SC_CLK_TCK') + except (ValueError, TypeError, AttributeError): + pass - for collector in all_collectors: - collector() + self.ticks_per_sec = ticks_per_sec - for metric in all_metrics: - try: - strs += metric.render() - except Exception: - strs += ["# FAILED to render"] - logger.exception("Failed to render metric") + def collect(self): + if not HAVE_PROC_SELF_STAT: + return - strs.append("") # to generate a final CRLF + with open("/proc/self/stat") as s: + line = s.read() + raw_stats = line.split(") ", 1)[1].split(" ") - return "\n".join(strs) + user = GaugeMetricFamily("process_cpu_user_seconds_total", "") + user.add_metric([], float(raw_stats[11]) / self.ticks_per_sec) + yield user + sys = GaugeMetricFamily("process_cpu_system_seconds_total", "") + sys.add_metric([], float(raw_stats[12]) / self.ticks_per_sec) + yield sys -register_process_collector(get_metrics_for("process")) +REGISTRY.register(CPUMetrics()) -python_metrics = get_metrics_for("python") +# +# Python GC metrics +# -gc_time = python_metrics.register_distribution("gc_time", labels=["gen"]) -gc_unreachable = python_metrics.register_counter("gc_unreachable_total", labels=["gen"]) -python_metrics.register_callback( - "gc_counts", lambda: {(i,): v for i, v in enumerate(gc.get_count())}, labels=["gen"] +gc_unreachable = Gauge("python_gc_unreachable_total", "Unreachable GC objects", ["gen"]) +gc_time = Histogram( + "python_gc_time", + "Time taken to GC (sec)", + ["gen"], + buckets=[0.0025, 0.005, 0.01, 0.025, 0.05, 0.10, 0.25, 0.50, 1.00, 2.50, + 5.00, 7.50, 15.00, 30.00, 45.00, 60.00], ) -reactor_metrics = get_metrics_for("python.twisted.reactor") -tick_time = reactor_metrics.register_distribution("tick_time") -pending_calls_metric = reactor_metrics.register_distribution("pending_calls") + +class GCCounts(object): + + def collect(self): + cm = GaugeMetricFamily("python_gc_counts", "GC object counts", labels=["gen"]) + for n, m in enumerate(gc.get_count()): + cm.add_metric([str(n)], m) + + yield cm + + +if not running_on_pypy: + REGISTRY.register(GCCounts()) + +# +# Twisted reactor metrics +# + +tick_time = Histogram( + "python_twisted_reactor_tick_time", + "Tick time of the Twisted reactor (sec)", + buckets=[0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.2, 0.5, 1, 2, 5], +) +pending_calls_metric = Histogram( + "python_twisted_reactor_pending_calls", + "Pending calls", + buckets=[1, 2, 5, 10, 25, 50, 100, 250, 500, 1000], +) + +# +# Federation Metrics +# + +sent_edus_counter = Counter("synapse_federation_client_sent_edus", "") + +sent_transactions_counter = Counter("synapse_federation_client_sent_transactions", "") + +events_processed_counter = Counter("synapse_federation_client_events_processed", "") + +# Used to track where various components have processed in the event stream, +# e.g. federation sending, appservice sending, etc. +event_processing_positions = Gauge("synapse_event_processing_positions", "", ["name"]) + +# Used to track the current max events stream position +event_persisted_position = Gauge("synapse_event_persisted_position", "") + +# Used to track the received_ts of the last event processed by various +# components +event_processing_last_ts = Gauge("synapse_event_processing_last_ts", "", ["name"]) + +# Used to track the lag processing events. This is the time difference +# between the last processed event's received_ts and the time it was +# finished being processed. +event_processing_lag = Gauge("synapse_event_processing_lag", "", ["name"]) + +last_ticked = time.time() + + +class ReactorLastSeenMetric(object): + + def collect(self): + cm = GaugeMetricFamily( + "python_twisted_reactor_last_seen", + "Seconds since the Twisted reactor was last seen", + ) + cm.add_metric([], time.time() - last_ticked) + yield cm + + +REGISTRY.register(ReactorLastSeenMetric()) def runUntilCurrentTimer(func): @@ -146,12 +226,25 @@ def runUntilCurrentTimer(func): num_pending += 1 num_pending += len(reactor.threadCallQueue) - - start = time.time() * 1000 + start = time.time() ret = func(*args, **kwargs) - end = time.time() * 1000 - tick_time.inc_by(end - start) - pending_calls_metric.inc_by(num_pending) + end = time.time() + + # record the amount of wallclock time spent running pending calls. + # This is a proxy for the actual amount of time between reactor polls, + # since about 25% of time is actually spent running things triggered by + # I/O events, but that is harder to capture without rewriting half the + # reactor. + tick_time.observe(end - start) + pending_calls_metric.observe(num_pending) + + # Update the time we last ticked, for the metric to test whether + # Synapse's reactor has frozen + global last_ticked + last_ticked = end + + if running_on_pypy: + return ret # Check if we need to do a manual GC (since its been disabled), and do # one if necessary. @@ -161,12 +254,12 @@ def runUntilCurrentTimer(func): if threshold[i] < counts[i]: logger.info("Collecting gc %d", i) - start = time.time() * 1000 + start = time.time() unreachable = gc.collect(i) - end = time.time() * 1000 + end = time.time() - gc_time.inc_by(end - start, i) - gc_unreachable.inc_by(unreachable, i) + gc_time.labels(i).observe(end - start) + gc_unreachable.labels(i).set(unreachable) return ret @@ -185,6 +278,7 @@ try: # We manually run the GC each reactor tick so that we can get some metrics # about time spent doing GC, - gc.disable() + if not running_on_pypy: + gc.disable() except AttributeError: pass |