diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py
index 9b3f2f4b96..ec7b2c9672 100644
--- a/synapse/util/async_helpers.py
+++ b/synapse/util/async_helpers.py
@@ -374,29 +374,25 @@ class ReadWriteLock(object):
defer.returnValue(_ctx_manager())
-class DeferredTimeoutError(Exception):
- """
- This error is raised by default when a L{Deferred} times out.
- """
-
+def _cancelled_to_timed_out_error(value, timeout):
+ if isinstance(value, failure.Failure):
+ value.trap(CancelledError)
+ raise defer.TimeoutError(timeout, "Deferred")
+ return value
-def add_timeout_to_deferred(deferred, timeout, reactor, on_timeout_cancel=None):
- """
- Add a timeout to a deferred by scheduling it to be cancelled after
- timeout seconds.
- This is essentially a backport of deferred.addTimeout, which was introduced
- in twisted 16.5.
+def timeout_deferred(deferred, timeout, reactor, on_timeout_cancel=None):
+ """The in built twisted `Deferred.addTimeout` fails to time out deferreds
+ that have a canceller that throws exceptions. This method creates a new
+ deferred that wraps and times out the given deferred, correctly handling
+ the case where the given deferred's canceller throws.
- If the deferred gets timed out, it errbacks with a DeferredTimeoutError,
- unless a cancelable function was passed to its initialization or unless
- a different on_timeout_cancel callable is provided.
+ NOTE: Unlike `Deferred.addTimeout`, this function returns a new deferred
Args:
- deferred (defer.Deferred): deferred to be timed out
- timeout (Number): seconds to time out after
- reactor (twisted.internet.reactor): the Twisted reactor to use
-
+ deferred (Deferred)
+ timeout (float): Timeout in seconds
+ reactor (twisted.internet.reactor): The twisted reactor to use
on_timeout_cancel (callable): A callable which is called immediately
after the deferred times out, and not if this deferred is
otherwise cancelled before the timeout.
@@ -406,13 +402,26 @@ def add_timeout_to_deferred(deferred, timeout, reactor, on_timeout_cancel=None):
the timeout.
The default callable (if none is provided) will translate a
- CancelledError Failure into a DeferredTimeoutError.
+ CancelledError Failure into a defer.TimeoutError.
+
+ Returns:
+ Deferred
"""
+
+ new_d = defer.Deferred()
+
timed_out = [False]
def time_it_out():
timed_out[0] = True
- deferred.cancel()
+
+ try:
+ deferred.cancel()
+ except: # noqa: E722, if we throw any exception it'll break time outs
+ logger.exception("Canceller failed during timeout")
+
+ if not new_d.called:
+ new_d.errback(defer.TimeoutError(timeout, "Deferred"))
delayed_call = reactor.callLater(timeout, time_it_out)
@@ -432,9 +441,14 @@ def add_timeout_to_deferred(deferred, timeout, reactor, on_timeout_cancel=None):
deferred.addBoth(cancel_timeout)
+ def success_cb(val):
+ if not new_d.called:
+ new_d.callback(val)
-def _cancelled_to_timed_out_error(value, timeout):
- if isinstance(value, failure.Failure):
- value.trap(CancelledError)
- raise DeferredTimeoutError(timeout, "Deferred")
- return value
+ def failure_cb(val):
+ if not new_d.called:
+ new_d.errback(val)
+
+ deferred.addCallbacks(success_cb, failure_cb)
+
+ return new_d
diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py
index 7b065b195e..f37d5bec08 100644
--- a/synapse/util/caches/__init__.py
+++ b/synapse/util/caches/__init__.py
@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
import os
import six
@@ -20,6 +21,8 @@ from six.moves import intern
from prometheus_client.core import REGISTRY, Gauge, GaugeMetricFamily
+logger = logging.getLogger(__name__)
+
CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.5))
@@ -76,16 +79,20 @@ def register_cache(cache_type, cache_name, cache):
return []
def collect(self):
- if cache_type == "response_cache":
- response_cache_size.labels(cache_name).set(len(cache))
- response_cache_hits.labels(cache_name).set(self.hits)
- response_cache_evicted.labels(cache_name).set(self.evicted_size)
- response_cache_total.labels(cache_name).set(self.hits + self.misses)
- else:
- cache_size.labels(cache_name).set(len(cache))
- cache_hits.labels(cache_name).set(self.hits)
- cache_evicted.labels(cache_name).set(self.evicted_size)
- cache_total.labels(cache_name).set(self.hits + self.misses)
+ try:
+ if cache_type == "response_cache":
+ response_cache_size.labels(cache_name).set(len(cache))
+ response_cache_hits.labels(cache_name).set(self.hits)
+ response_cache_evicted.labels(cache_name).set(self.evicted_size)
+ response_cache_total.labels(cache_name).set(self.hits + self.misses)
+ else:
+ cache_size.labels(cache_name).set(len(cache))
+ cache_hits.labels(cache_name).set(self.hits)
+ cache_evicted.labels(cache_name).set(self.evicted_size)
+ cache_total.labels(cache_name).set(self.hits + self.misses)
+ except Exception as e:
+ logger.warn("Error calculating metrics for %s: %s", cache_name, e)
+ raise
yield GaugeMetricFamily("__unused", "")
diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py
index ce85b2ae11..f369780277 100644
--- a/synapse/util/caches/expiringcache.py
+++ b/synapse/util/caches/expiringcache.py
@@ -16,12 +16,17 @@
import logging
from collections import OrderedDict
+from six import iteritems, itervalues
+
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.caches import register_cache
logger = logging.getLogger(__name__)
+SENTINEL = object()
+
+
class ExpiringCache(object):
def __init__(self, cache_name, clock, max_len=0, expiry_ms=0,
reset_expiry_on_get=False, iterable=False):
@@ -54,11 +59,8 @@ class ExpiringCache(object):
self.iterable = iterable
- self._size_estimate = 0
-
self.metrics = register_cache("expiring", cache_name, self)
- def start(self):
if not self._expiry_ms:
# Don't bother starting the loop if things never expire
return
@@ -75,16 +77,11 @@ class ExpiringCache(object):
now = self._clock.time_msec()
self._cache[key] = _CacheEntry(now, value)
- if self.iterable:
- self._size_estimate += len(value)
-
# Evict if there are now too many items
while self._max_len and len(self) > self._max_len:
_key, value = self._cache.popitem(last=False)
if self.iterable:
- removed_len = len(value.value)
- self.metrics.inc_evictions(removed_len)
- self._size_estimate -= removed_len
+ self.metrics.inc_evictions(len(value.value))
else:
self.metrics.inc_evictions()
@@ -101,6 +98,21 @@ class ExpiringCache(object):
return entry.value
+ def pop(self, key, default=SENTINEL):
+ """Removes and returns the value with the given key from the cache.
+
+ If the key isn't in the cache then `default` will be returned if
+ specified, otherwise `KeyError` will get raised.
+
+ Identical functionality to `dict.pop(..)`.
+ """
+
+ value = self._cache.pop(key, default)
+ if value is SENTINEL:
+ raise KeyError(key)
+
+ return value
+
def __contains__(self, key):
return key in self._cache
@@ -128,14 +140,16 @@ class ExpiringCache(object):
keys_to_delete = set()
- for key, cache_entry in self._cache.items():
+ for key, cache_entry in iteritems(self._cache):
if now - cache_entry.time > self._expiry_ms:
keys_to_delete.add(key)
for k in keys_to_delete:
value = self._cache.pop(k)
if self.iterable:
- self._size_estimate -= len(value.value)
+ self.metrics.inc_evictions(len(value.value))
+ else:
+ self.metrics.inc_evictions()
logger.debug(
"[%s] _prune_cache before: %d, after len: %d",
@@ -144,12 +158,14 @@ class ExpiringCache(object):
def __len__(self):
if self.iterable:
- return self._size_estimate
+ return sum(len(entry.value) for entry in itervalues(self._cache))
else:
return len(self._cache)
class _CacheEntry(object):
+ __slots__ = ["time", "value"]
+
def __init__(self, time, value):
self.time = time
self.value = value
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index a0c2d37610..89224b26cc 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -200,7 +200,7 @@ class LoggingContext(object):
sentinel = Sentinel()
- def __init__(self, name=None, parent_context=None):
+ def __init__(self, name=None, parent_context=None, request=None):
self.previous_context = LoggingContext.current_context()
self.name = name
@@ -218,6 +218,13 @@ class LoggingContext(object):
self.parent_context = parent_context
+ if self.parent_context is not None:
+ self.parent_context.copy_to(self)
+
+ if request is not None:
+ # the request param overrides the request from the parent context
+ self.request = request
+
def __str__(self):
return "%s@%x" % (self.name, id(self))
@@ -256,9 +263,6 @@ class LoggingContext(object):
)
self.alive = True
- if self.parent_context is not None:
- self.parent_context.copy_to(self)
-
return self
def __exit__(self, type, value, traceback):
@@ -439,6 +443,35 @@ class PreserveLoggingContext(object):
)
+def nested_logging_context(suffix, parent_context=None):
+ """Creates a new logging context as a child of another.
+
+ The nested logging context will have a 'request' made up of the parent context's
+ request, plus the given suffix.
+
+ CPU/db usage stats will be added to the parent context's on exit.
+
+ Normal usage looks like:
+
+ with nested_logging_context(suffix):
+ # ... do stuff
+
+ Args:
+ suffix (str): suffix to add to the parent context's 'request'.
+ parent_context (LoggingContext|None): parent context. Will use the current context
+ if None.
+
+ Returns:
+ LoggingContext: new logging context.
+ """
+ if parent_context is None:
+ parent_context = LoggingContext.current_context()
+ return LoggingContext(
+ parent_context=parent_context,
+ request=parent_context.request + "-" + suffix,
+ )
+
+
def preserve_fn(f):
"""Function decorator which wraps the function with run_in_background"""
def g(*args, **kwargs):
diff --git a/synapse/util/manhole.py b/synapse/util/manhole.py
index 14be3c7396..8d0f2a8918 100644
--- a/synapse/util/manhole.py
+++ b/synapse/util/manhole.py
@@ -19,22 +19,40 @@ from twisted.conch.ssh.keys import Key
from twisted.cred import checkers, portal
PUBLIC_KEY = (
- "ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAGEArzJx8OYOnJmzf4tfBEvLi8DVPrJ3/c9k2I/Az"
- "64fxjHf9imyRJbixtQhlH9lfNjUIx+4LmrJH5QNRsFporcHDKOTwTTYLh5KmRpslkYHRivcJS"
- "kbh/C+BR3utDS555mV"
+ "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDHhGATaW4KhE23+7nrH4jFx3yLq9OjaEs5"
+ "XALqeK+7385NlLja3DE/DO9mGhnd9+bAy39EKT3sTV6+WXQ4yD0TvEEyUEMtjWkSEm6U32+C"
+ "DaS3TW/vPBUMeJQwq+Ydcif1UlnpXrDDTamD0AU9VaEvHq+3HAkipqn0TGpKON6aqk4vauDx"
+ "oXSsV5TXBVrxP/y7HpMOpU4GUWsaaacBTKKNnUaQB4UflvydaPJUuwdaCUJGTMjbhWrjVfK+"
+ "jslseSPxU6XvrkZMyCr4znxvuDxjMk1RGIdO7v+rbBMLEgqtSMNqJbYeVCnj2CFgc3fcTcld"
+ "X2uOJDrJb/WRlHulthCh"
)
PRIVATE_KEY = """-----BEGIN RSA PRIVATE KEY-----
-MIIByAIBAAJhAK8ycfDmDpyZs3+LXwRLy4vA1T6yd/3PZNiPwM+uH8Yx3/YpskSW
-4sbUIZR/ZXzY1CMfuC5qyR+UDUbBaaK3Bwyjk8E02C4eSpkabJZGB0Yr3CUpG4fw
-vgUd7rQ0ueeZlQIBIwJgbh+1VZfr7WftK5lu7MHtqE1S1vPWZQYE3+VUn8yJADyb
-Z4fsZaCrzW9lkIqXkE3GIY+ojdhZhkO1gbG0118sIgphwSWKRxK0mvh6ERxKqIt1
-xJEJO74EykXZV4oNJ8sjAjEA3J9r2ZghVhGN6V8DnQrTk24Td0E8hU8AcP0FVP+8
-PQm/g/aXf2QQkQT+omdHVEJrAjEAy0pL0EBH6EVS98evDCBtQw22OZT52qXlAwZ2
-gyTriKFVoqjeEjt3SZKKqXHSApP/AjBLpF99zcJJZRq2abgYlf9lv1chkrWqDHUu
-DZttmYJeEfiFBBavVYIF1dOlZT0G8jMCMBc7sOSZodFnAiryP+Qg9otSBjJ3bQML
-pSTqy7c3a2AScC/YyOwkDaICHnnD3XyjMwIxALRzl0tQEKMXs6hH8ToUdlLROCrP
-EhQ0wahUTCk1gKA4uPD6TMTChavbh4K63OvbKg==
+MIIEpQIBAAKCAQEAx4RgE2luCoRNt/u56x+Ixcd8i6vTo2hLOVwC6nivu9/OTZS4
+2twxPwzvZhoZ3ffmwMt/RCk97E1evll0OMg9E7xBMlBDLY1pEhJulN9vgg2kt01v
+7zwVDHiUMKvmHXIn9VJZ6V6ww02pg9AFPVWhLx6vtxwJIqap9ExqSjjemqpOL2rg
+8aF0rFeU1wVa8T/8ux6TDqVOBlFrGmmnAUyijZ1GkAeFH5b8nWjyVLsHWglCRkzI
+24Vq41Xyvo7JbHkj8VOl765GTMgq+M58b7g8YzJNURiHTu7/q2wTCxIKrUjDaiW2
+HlQp49ghYHN33E3JXV9rjiQ6yW/1kZR7pbYQoQIDAQABAoIBAQC8KJ0q8Wzzwh5B
+esa1dQHZ8+4DEsL/Amae66VcVwD0X3cCN1W2IZ7X5W0Ij2kBqr8V51RYhcR+S+Ek
+BtzSiBUBvbKGrqcMGKaUgomDIMzai99hd0gvCCyZnEW1OQhFkNkaRNXCfqiZJ27M
+fqvSUiU2eOwh9fCvmxoA6Of8o3FbzcJ+1GMcobWRllDtLmj6lgVbDzuA+0jC5daB
+9Tj1pBzu3wn3ufxiS+gBnJ+7NcXH3E73lqCcPa2ufbZ1haxfiGCnRIhFXuQDgxFX
+vKdEfDgtvas6r1ahGbc+b/q8E8fZT7cABuIU4yfOORK+MhpyWbvoyyzuVGKj3PKt
+KSPJu5CZAoGBAOkoJfAVyYteqKcmGTanGqQnAY43CaYf6GdSPX/jg+JmKZg0zqMC
+jWZUtPb93i+jnOInbrnuHOiHAxI8wmhEPed28H2lC/LU8PzlqFkZXKFZ4vLOhhRB
+/HeHCFIDosPFlohWi3b+GAjD7sXgnIuGmnXWe2ea/TS3yersifDEoKKjAoGBANsQ
+gJX2cJv1c3jhdgcs8vAt5zIOKcCLTOr/QPmVf/kxjNgndswcKHwsxE/voTO9q+TF
+v/6yCSTxAdjuKz1oIYWgi/dZo82bBKWxNRpgrGviU3/zwxiHlyIXUhzQu78q3VS/
+7S1XVbc7qMV++XkYKHPVD+nVG/gGzFxumX7MLXfrAoGBAJit9cn2OnjNj9uFE1W6
+r7N254ndeLAUjPe73xH0RtTm2a4WRopwjW/JYIetTuYbWgyujc+robqTTuuOZjAp
+H/CG7o0Ym251CypQqaFO/l2aowclPp/dZhpPjp9GSjuxFBZLtiBB3DNBOwbRQzIK
+/vLTdRQvZkgzYkI4i0vjNt3JAoGBANP8HSKBLymMlShlrSx2b8TB9tc2Y2riohVJ
+2ttqs0M2kt/dGJWdrgOz4mikL+983Olt/0P9juHDoxEEMK2kpcPEv40lnmBpYU7h
+s8yJvnBLvJe2EJYdJ8AipyAhUX1FgpbvfxmASP8eaUxsegeXvBWTGWojAoS6N2o+
+0KSl+l3vAoGAFqm0gO9f/Q1Se60YQd4l2PZeMnJFv0slpgHHUwegmd6wJhOD7zJ1
+CkZcXwiv7Nog7AI9qKJEUXLjoqL+vJskBzSOqU3tcd670YQMi1aXSXJqYE202K7o
+EddTrx3TNpr1D5m/f+6mnXWrc8u9y1+GNx9yz889xMjIBTBI9KqaaOs=
-----END RSA PRIVATE KEY-----"""
diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py
index 97f1267380..4b4ac5f6c7 100644
--- a/synapse/util/metrics.py
+++ b/synapse/util/metrics.py
@@ -20,6 +20,7 @@ from prometheus_client import Counter
from twisted.internet import defer
+from synapse.metrics import InFlightGauge
from synapse.util.logcontext import LoggingContext
logger = logging.getLogger(__name__)
@@ -45,6 +46,13 @@ block_db_txn_duration = Counter(
block_db_sched_duration = Counter(
"synapse_util_metrics_block_db_sched_duration_seconds", "", ["block_name"])
+# Tracks the number of blocks currently active
+in_flight = InFlightGauge(
+ "synapse_util_metrics_block_in_flight", "",
+ labels=["block_name"],
+ sub_metrics=["real_time_max", "real_time_sum"],
+)
+
def measure_func(name):
def wrapper(func):
@@ -82,10 +90,14 @@ class Measure(object):
self.start_usage = self.start_context.get_resource_usage()
+ in_flight.register((self.name,), self._update_in_flight)
+
def __exit__(self, exc_type, exc_val, exc_tb):
if isinstance(exc_type, Exception) or not self.start_context:
return
+ in_flight.unregister((self.name,), self._update_in_flight)
+
duration = self.clock.time() - self.start
block_counter.labels(self.name).inc()
@@ -120,3 +132,13 @@ class Measure(object):
if self.created_context:
self.start_context.__exit__(exc_type, exc_val, exc_tb)
+
+ def _update_in_flight(self, metrics):
+ """Gets called when processing in flight metrics
+ """
+ duration = self.clock.time() - self.start
+
+ metrics.real_time_max = max(metrics.real_time_max, duration)
+ metrics.real_time_sum += duration
+
+ # TODO: Add other in flight metrics.
diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
index 8a3a06fd74..26cce7d197 100644
--- a/synapse/util/retryutils.py
+++ b/synapse/util/retryutils.py
@@ -188,7 +188,7 @@ class RetryDestinationLimiter(object):
else:
self.retry_interval = self.min_retry_interval
- logger.debug(
+ logger.info(
"Connection to %s was unsuccessful (%s(%s)); backoff now %i",
self.destination, exc_type, exc_val, self.retry_interval
)
|