1 files changed, 51 insertions, 0 deletions
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index 9233ea3da9..d7bcad8a8a 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -18,8 +18,12 @@ from __future__ import absolute_import
import logging
from resource import getrusage, getpagesize, RUSAGE_SELF
+import functools
import os
import stat
+import time
+
+from twisted.internet import reactor
from .metric import (
CounterMetric, CallbackMetric, DistributionMetric, CacheMetric
@@ -144,3 +148,50 @@ def _process_fds():
return counts
get_metrics_for("process").register_callback("fds", _process_fds, labels=["type"])
+
+reactor_metrics = get_metrics_for("reactor")
+tick_time = reactor_metrics.register_distribution("tick_time")
+pending_calls_metric = reactor_metrics.register_distribution("pending_calls")
+
+
+def runUntilCurrentTimer(func):
+
+ @functools.wraps(func)
+ def f(*args, **kwargs):
+ now = reactor.seconds()
+ num_pending = 0
+
+ # _newTimedCalls is one long list of *all* pending calls. Below loop
+ # is based off of impl of reactor.runUntilCurrent
+ for delayed_call in reactor._newTimedCalls:
+ if delayed_call.time > now:
+ break
+
+ if delayed_call.delayed_time > 0:
+ continue
+
+ num_pending += 1
+
+ num_pending += len(reactor.threadCallQueue)
+
+ start = time.time() * 1000
+ ret = func(*args, **kwargs)
+ end = time.time() * 1000
+ tick_time.inc_by(end - start)
+ pending_calls_metric.inc_by(num_pending)
+ return ret
+
+ return f
+
+
+try:
+ # Ensure the reactor has all the attributes we expect
+ reactor.runUntilCurrent
+ reactor._newTimedCalls
+ reactor.threadCallQueue
+
+ # runUntilCurrent is called when we have pending calls. It is called once
+ # per iteratation after fd polling.
+ reactor.runUntilCurrent = runUntilCurrentTimer(reactor.runUntilCurrent)
+except AttributeError:
+ pass
|