diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py
index 9b3f2f4b96..40c2946129 100644
--- a/synapse/util/async_helpers.py
+++ b/synapse/util/async_helpers.py
@@ -438,3 +438,55 @@ def _cancelled_to_timed_out_error(value, timeout):
value.trap(CancelledError)
raise DeferredTimeoutError(timeout, "Deferred")
return value
+
+
+def timeout_no_seriously(deferred, timeout, reactor):
+ """The in build twisted deferred addTimeout (and the method above)
+ completely fail to time things out under some unknown circumstances.
+
+ Lets try a different way of timing things out and maybe that will make
+ things work?!
+
+ TODO: Kill this with fire.
+ """
+
+ new_d = defer.Deferred()
+
+ timed_out = [False]
+
+ def time_it_out():
+ timed_out[0] = True
+
+ if not new_d.called:
+ new_d.errback(DeferredTimeoutError(timeout, "Deferred"))
+
+ deferred.cancel()
+
+ delayed_call = reactor.callLater(timeout, time_it_out)
+
+ def convert_cancelled(value):
+ if timed_out[0]:
+ return _cancelled_to_timed_out_error(value, timeout)
+ return value
+
+ deferred.addBoth(convert_cancelled)
+
+ def cancel_timeout(result):
+ # stop the pending call to cancel the deferred if it's been fired
+ if delayed_call.active():
+ delayed_call.cancel()
+ return result
+
+ deferred.addBoth(cancel_timeout)
+
+ def success_cb(val):
+ if not new_d.called:
+ new_d.callback(val)
+
+ def failure_cb(val):
+ if not new_d.called:
+ new_d.errback(val)
+
+ deferred.addCallbacks(success_cb, failure_cb)
+
+ return new_d
diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py
index 97f1267380..4b4ac5f6c7 100644
--- a/synapse/util/metrics.py
+++ b/synapse/util/metrics.py
@@ -20,6 +20,7 @@ from prometheus_client import Counter
from twisted.internet import defer
+from synapse.metrics import InFlightGauge
from synapse.util.logcontext import LoggingContext
logger = logging.getLogger(__name__)
@@ -45,6 +46,13 @@ block_db_txn_duration = Counter(
block_db_sched_duration = Counter(
"synapse_util_metrics_block_db_sched_duration_seconds", "", ["block_name"])
+# Tracks the number of blocks currently active
+in_flight = InFlightGauge(
+ "synapse_util_metrics_block_in_flight", "",
+ labels=["block_name"],
+ sub_metrics=["real_time_max", "real_time_sum"],
+)
+
def measure_func(name):
def wrapper(func):
@@ -82,10 +90,14 @@ class Measure(object):
self.start_usage = self.start_context.get_resource_usage()
+ in_flight.register((self.name,), self._update_in_flight)
+
def __exit__(self, exc_type, exc_val, exc_tb):
if isinstance(exc_type, Exception) or not self.start_context:
return
+ in_flight.unregister((self.name,), self._update_in_flight)
+
duration = self.clock.time() - self.start
block_counter.labels(self.name).inc()
@@ -120,3 +132,13 @@ class Measure(object):
if self.created_context:
self.start_context.__exit__(exc_type, exc_val, exc_tb)
+
+ def _update_in_flight(self, metrics):
+ """Gets called when processing in flight metrics
+ """
+ duration = self.clock.time() - self.start
+
+ metrics.real_time_max = max(metrics.real_time_max, duration)
+ metrics.real_time_sum += duration
+
+ # TODO: Add other in flight metrics.
|