From 9e05c8d3090a956054b5b39347359655236caaef Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Sep 2018 10:42:10 +0100 Subject: Change the manhole SSH key to have more bits Newer versions of openssh client refuse to connect to the old key due to its length. --- synapse/util/manhole.py | 44 +++++++++++++++++++++++++++++++------------- 1 file changed, 31 insertions(+), 13 deletions(-) (limited to 'synapse/util') diff --git a/synapse/util/manhole.py b/synapse/util/manhole.py index 14be3c7396..8d0f2a8918 100644 --- a/synapse/util/manhole.py +++ b/synapse/util/manhole.py @@ -19,22 +19,40 @@ from twisted.conch.ssh.keys import Key from twisted.cred import checkers, portal PUBLIC_KEY = ( - "ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAGEArzJx8OYOnJmzf4tfBEvLi8DVPrJ3/c9k2I/Az" - "64fxjHf9imyRJbixtQhlH9lfNjUIx+4LmrJH5QNRsFporcHDKOTwTTYLh5KmRpslkYHRivcJS" - "kbh/C+BR3utDS555mV" + "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDHhGATaW4KhE23+7nrH4jFx3yLq9OjaEs5" + "XALqeK+7385NlLja3DE/DO9mGhnd9+bAy39EKT3sTV6+WXQ4yD0TvEEyUEMtjWkSEm6U32+C" + "DaS3TW/vPBUMeJQwq+Ydcif1UlnpXrDDTamD0AU9VaEvHq+3HAkipqn0TGpKON6aqk4vauDx" + "oXSsV5TXBVrxP/y7HpMOpU4GUWsaaacBTKKNnUaQB4UflvydaPJUuwdaCUJGTMjbhWrjVfK+" + "jslseSPxU6XvrkZMyCr4znxvuDxjMk1RGIdO7v+rbBMLEgqtSMNqJbYeVCnj2CFgc3fcTcld" + "X2uOJDrJb/WRlHulthCh" ) PRIVATE_KEY = """-----BEGIN RSA PRIVATE KEY----- -MIIByAIBAAJhAK8ycfDmDpyZs3+LXwRLy4vA1T6yd/3PZNiPwM+uH8Yx3/YpskSW -4sbUIZR/ZXzY1CMfuC5qyR+UDUbBaaK3Bwyjk8E02C4eSpkabJZGB0Yr3CUpG4fw -vgUd7rQ0ueeZlQIBIwJgbh+1VZfr7WftK5lu7MHtqE1S1vPWZQYE3+VUn8yJADyb -Z4fsZaCrzW9lkIqXkE3GIY+ojdhZhkO1gbG0118sIgphwSWKRxK0mvh6ERxKqIt1 -xJEJO74EykXZV4oNJ8sjAjEA3J9r2ZghVhGN6V8DnQrTk24Td0E8hU8AcP0FVP+8 -PQm/g/aXf2QQkQT+omdHVEJrAjEAy0pL0EBH6EVS98evDCBtQw22OZT52qXlAwZ2 -gyTriKFVoqjeEjt3SZKKqXHSApP/AjBLpF99zcJJZRq2abgYlf9lv1chkrWqDHUu -DZttmYJeEfiFBBavVYIF1dOlZT0G8jMCMBc7sOSZodFnAiryP+Qg9otSBjJ3bQML -pSTqy7c3a2AScC/YyOwkDaICHnnD3XyjMwIxALRzl0tQEKMXs6hH8ToUdlLROCrP -EhQ0wahUTCk1gKA4uPD6TMTChavbh4K63OvbKg== +MIIEpQIBAAKCAQEAx4RgE2luCoRNt/u56x+Ixcd8i6vTo2hLOVwC6nivu9/OTZS4 +2twxPwzvZhoZ3ffmwMt/RCk97E1evll0OMg9E7xBMlBDLY1pEhJulN9vgg2kt01v +7zwVDHiUMKvmHXIn9VJZ6V6ww02pg9AFPVWhLx6vtxwJIqap9ExqSjjemqpOL2rg +8aF0rFeU1wVa8T/8ux6TDqVOBlFrGmmnAUyijZ1GkAeFH5b8nWjyVLsHWglCRkzI +24Vq41Xyvo7JbHkj8VOl765GTMgq+M58b7g8YzJNURiHTu7/q2wTCxIKrUjDaiW2 +HlQp49ghYHN33E3JXV9rjiQ6yW/1kZR7pbYQoQIDAQABAoIBAQC8KJ0q8Wzzwh5B +esa1dQHZ8+4DEsL/Amae66VcVwD0X3cCN1W2IZ7X5W0Ij2kBqr8V51RYhcR+S+Ek +BtzSiBUBvbKGrqcMGKaUgomDIMzai99hd0gvCCyZnEW1OQhFkNkaRNXCfqiZJ27M +fqvSUiU2eOwh9fCvmxoA6Of8o3FbzcJ+1GMcobWRllDtLmj6lgVbDzuA+0jC5daB +9Tj1pBzu3wn3ufxiS+gBnJ+7NcXH3E73lqCcPa2ufbZ1haxfiGCnRIhFXuQDgxFX +vKdEfDgtvas6r1ahGbc+b/q8E8fZT7cABuIU4yfOORK+MhpyWbvoyyzuVGKj3PKt +KSPJu5CZAoGBAOkoJfAVyYteqKcmGTanGqQnAY43CaYf6GdSPX/jg+JmKZg0zqMC +jWZUtPb93i+jnOInbrnuHOiHAxI8wmhEPed28H2lC/LU8PzlqFkZXKFZ4vLOhhRB +/HeHCFIDosPFlohWi3b+GAjD7sXgnIuGmnXWe2ea/TS3yersifDEoKKjAoGBANsQ +gJX2cJv1c3jhdgcs8vAt5zIOKcCLTOr/QPmVf/kxjNgndswcKHwsxE/voTO9q+TF +v/6yCSTxAdjuKz1oIYWgi/dZo82bBKWxNRpgrGviU3/zwxiHlyIXUhzQu78q3VS/ +7S1XVbc7qMV++XkYKHPVD+nVG/gGzFxumX7MLXfrAoGBAJit9cn2OnjNj9uFE1W6 +r7N254ndeLAUjPe73xH0RtTm2a4WRopwjW/JYIetTuYbWgyujc+robqTTuuOZjAp +H/CG7o0Ym251CypQqaFO/l2aowclPp/dZhpPjp9GSjuxFBZLtiBB3DNBOwbRQzIK +/vLTdRQvZkgzYkI4i0vjNt3JAoGBANP8HSKBLymMlShlrSx2b8TB9tc2Y2riohVJ +2ttqs0M2kt/dGJWdrgOz4mikL+983Olt/0P9juHDoxEEMK2kpcPEv40lnmBpYU7h +s8yJvnBLvJe2EJYdJ8AipyAhUX1FgpbvfxmASP8eaUxsegeXvBWTGWojAoS6N2o+ +0KSl+l3vAoGAFqm0gO9f/Q1Se60YQd4l2PZeMnJFv0slpgHHUwegmd6wJhOD7zJ1 +CkZcXwiv7Nog7AI9qKJEUXLjoqL+vJskBzSOqU3tcd670YQMi1aXSXJqYE202K7o +EddTrx3TNpr1D5m/f+6mnXWrc8u9y1+GNx9yz889xMjIBTBI9KqaaOs= -----END RSA PRIVATE KEY-----""" -- cgit 1.5.1 From 0a81038ea0e308d2944b7deae43060e4982b0abe Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 14 Sep 2018 14:39:59 +0100 Subject: Add in flight real time metrics for Measure blocks --- synapse/metrics/__init__.py | 110 +++++++++++++++++++++++++++++++++++++++++++- synapse/util/metrics.py | 22 +++++++++ 2 files changed, 131 insertions(+), 1 deletion(-) (limited to 'synapse/util') diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 550f8443f7..98333b2048 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -18,8 +18,11 @@ import gc import logging import os import platform +import threading import time +import six + import attr from prometheus_client import Counter, Gauge, Histogram from prometheus_client.core import REGISTRY, GaugeMetricFamily @@ -68,7 +71,7 @@ class LaterGauge(object): return if isinstance(calls, dict): - for k, v in calls.items(): + for k, v in six.iteritems(calls): g.add_metric(k, v) else: g.add_metric([], calls) @@ -87,6 +90,111 @@ class LaterGauge(object): all_gauges[self.name] = self +class InFlightGauge(object): + """Tracks number of things (e.g. requests, Measure blocks, etc) in flight + at any given time. + + Each InFlightGauge will create a metric called `_total` that counts + the number of in flight blocks, as well as a metrics for each item in the + given `sub_metrics` as `_` which will get updated by the + callbacks. + + Args: + name (str) + desc (str) + labels (list[str]) + sub_metrics (list[str]): A list of sub metrics that the callbacks + will update. + """ + + # TODO: Expand this to + + def __init__(self, name, desc, labels, sub_metrics): + self.name = name + self.desc = desc + self.labels = labels + self.sub_metrics = sub_metrics + + # Create a class which have the sub_metrics values as attributes, which + # default to 0 on initialization. Used to pass to registered callbacks. + self._metrics_class = attr.make_class( + "_MetricsEntry", + attrs={x: attr.ib(0) for x in sub_metrics}, + slots=True, + ) + + # Counts number of in flight blocks for a given set of label values + self._registrations = {} + + # Protects access to _registrations + self._lock = threading.Lock() + + self._register_with_collector() + + def register(self, key, callback): + """Registers that we've entered a new block with labels `key`. + + `callback` gets called each time the metrics are collected. The same + value must also be given to `unregister`. + + `callback` gets called with an object that has an attribute per + sub_metric, which should be updated with the necessary values. Note that + the metrics object is shared between all callbacks registered with the + same key. + + Note that `callback` may be called on a separate thread. + """ + with self._lock: + self._registrations.setdefault(key, set()).add(callback) + + def unregister(self, key, callback): + """Registers that we've exited a block with labels `key`. + """ + + with self._lock: + self._registrations.setdefault(key, set()).discard(callback) + + def collect(self): + """Called by prometheus client when it reads metrics. + + Note: may be called by a separate thread. + """ + in_flight = GaugeMetricFamily(self.name + "_total", self.desc, labels=self.labels) + + metrics_by_key = {} + + # We copy so that we don't mutate the list while iterating + with self._lock: + keys = list(self._registrations) + + for key in keys: + with self._lock: + callbacks = set(self._registrations[key]) + + in_flight.add_metric(key, len(callbacks)) + + metrics = self._metrics_class() + metrics_by_key[key] = metrics + for callback in callbacks: + callback(metrics) + + yield in_flight + + for name in self.sub_metrics: + gauge = GaugeMetricFamily("_".join([self.name, name]), "", labels=self.labels) + for key, metrics in six.iteritems(metrics_by_key): + gauge.add_metric(key, getattr(metrics, name)) + yield gauge + + def _register_with_collector(self): + if self.name in all_gauges.keys(): + logger.warning("%s already registered, reregistering" % (self.name,)) + REGISTRY.unregister(all_gauges.pop(self.name)) + + REGISTRY.register(self) + all_gauges[self.name] = self + + # # Detailed CPU metrics # diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index 97f1267380..4b4ac5f6c7 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -20,6 +20,7 @@ from prometheus_client import Counter from twisted.internet import defer +from synapse.metrics import InFlightGauge from synapse.util.logcontext import LoggingContext logger = logging.getLogger(__name__) @@ -45,6 +46,13 @@ block_db_txn_duration = Counter( block_db_sched_duration = Counter( "synapse_util_metrics_block_db_sched_duration_seconds", "", ["block_name"]) +# Tracks the number of blocks currently active +in_flight = InFlightGauge( + "synapse_util_metrics_block_in_flight", "", + labels=["block_name"], + sub_metrics=["real_time_max", "real_time_sum"], +) + def measure_func(name): def wrapper(func): @@ -82,10 +90,14 @@ class Measure(object): self.start_usage = self.start_context.get_resource_usage() + in_flight.register((self.name,), self._update_in_flight) + def __exit__(self, exc_type, exc_val, exc_tb): if isinstance(exc_type, Exception) or not self.start_context: return + in_flight.unregister((self.name,), self._update_in_flight) + duration = self.clock.time() - self.start block_counter.labels(self.name).inc() @@ -120,3 +132,13 @@ class Measure(object): if self.created_context: self.start_context.__exit__(exc_type, exc_val, exc_tb) + + def _update_in_flight(self, metrics): + """Gets called when processing in flight metrics + """ + duration = self.clock.time() - self.start + + metrics.real_time_max = max(metrics.real_time_max, duration) + metrics.real_time_sum += duration + + # TODO: Add other in flight metrics. -- cgit 1.5.1 From fcfe7a850dada2e65982f2bc805d8bc409f07512 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 14 Sep 2018 19:23:07 +0100 Subject: Add an awful secondary timeout to fix wedged requests This is an attempt to mitigate #3842 by adding yet-another-timeout --- synapse/http/matrixfederationclient.py | 11 ++++++++ synapse/util/async_helpers.py | 51 ++++++++++++++++++++++++++++++++++ 2 files changed, 62 insertions(+) (limited to 'synapse/util') diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index da16b5dd8c..13b19f7626 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -42,6 +42,7 @@ from synapse.api.errors import ( ) from synapse.http.endpoint import matrix_federation_endpoint from synapse.util import logcontext +from synapse.util.async_helpers import timeout_no_seriously from synapse.util.logcontext import make_deferred_yieldable from synapse.util.metrics import Measure @@ -228,6 +229,16 @@ class MatrixFederationHttpClient(object): ) request_deferred.addTimeout(_sec_timeout, self.hs.get_reactor()) + # Sometimes the timeout above doesn't work, so lets hack yet + # another layer of timeouts in in the vain hope that at some + # point the world made sense and this really really really + # should work. + request_deferred = timeout_no_seriously( + request_deferred, + timeout=_sec_timeout * 2, + reactor=self.hs.get_reactor(), + ) + with Measure(self.clock, "outbound_request"): response = yield make_deferred_yieldable( request_deferred, diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 9b3f2f4b96..083e4f4128 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -438,3 +438,54 @@ def _cancelled_to_timed_out_error(value, timeout): value.trap(CancelledError) raise DeferredTimeoutError(timeout, "Deferred") return value + + +def timeout_no_seriously(deferred, timeout, reactor): + """The in build twisted deferred addTimeout (and the method above) + completely fail to time things out under some unknown circumstances. + + Lets try a different way of timing things out and maybe that will make + things work?! + + TODO: Kill this with fire. + """ + + new_d = defer.Deferred() + + timed_out = [False] + + def time_it_out(): + timed_out[0] = True + deferred.cancel() + + if not new_d.called: + new_d.errback(DeferredTimeoutError(timeout, "Deferred")) + + delayed_call = reactor.callLater(timeout, time_it_out) + + def convert_cancelled(value): + if timed_out[0]: + return _cancelled_to_timed_out_error(value, timeout) + return value + + deferred.addBoth(convert_cancelled) + + def cancel_timeout(result): + # stop the pending call to cancel the deferred if it's been fired + if delayed_call.active(): + delayed_call.cancel() + return result + + deferred.addBoth(cancel_timeout) + + def success_cb(val): + if not new_d.called: + new_d.callback(val) + + def failure_cb(val): + if not new_d.called: + new_d.errback(val) + + deferred.addCallbacks(success_cb, failure_cb) + + return new_d -- cgit 1.5.1 From 24efb2a70dfd8aa100507612836601991e11affc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Sat, 15 Sep 2018 11:38:39 +0100 Subject: Fix timeout function Turns out deferred.cancel sometimes throws, so we do that last to ensure that we always do resolve the new deferred. --- synapse/util/async_helpers.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'synapse/util') diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 083e4f4128..40c2946129 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -456,11 +456,12 @@ def timeout_no_seriously(deferred, timeout, reactor): def time_it_out(): timed_out[0] = True - deferred.cancel() if not new_d.called: new_d.errback(DeferredTimeoutError(timeout, "Deferred")) + deferred.cancel() + delayed_call = reactor.callLater(timeout, time_it_out) def convert_cancelled(value): -- cgit 1.5.1 From a334e1cacec5dd9c1a983415906d8ffea4a30134 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 19 Sep 2018 10:39:40 +0100 Subject: Update to use new timeout function everywhere. The existing deferred timeout helper function (and the one into twisted) suffer from a bug when a deferred's canceller throws an exception, #3842. The new helper function doesn't suffer from this problem. --- synapse/http/client.py | 4 +- synapse/http/matrixfederationclient.py | 25 +++++++----- synapse/notifier.py | 13 +++--- synapse/util/async_helpers.py | 73 +++++++++------------------------- 4 files changed, 43 insertions(+), 72 deletions(-) (limited to 'synapse/util') diff --git a/synapse/http/client.py b/synapse/http/client.py index ec339a92ad..3d05f83b8c 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -43,7 +43,7 @@ from twisted.web.http_headers import Headers from synapse.api.errors import Codes, HttpResponseException, SynapseError from synapse.http import cancelled_to_request_timed_out_error, redact_uri from synapse.http.endpoint import SpiderEndpoint -from synapse.util.async_helpers import add_timeout_to_deferred +from synapse.util.async_helpers import timeout_deferred from synapse.util.caches import CACHE_SIZE_FACTOR from synapse.util.logcontext import make_deferred_yieldable @@ -99,7 +99,7 @@ class SimpleHttpClient(object): request_deferred = treq.request( method, uri, agent=self.agent, data=data, headers=headers ) - add_timeout_to_deferred( + request_deferred = timeout_deferred( request_deferred, 60, self.hs.get_reactor(), cancelled_to_request_timed_out_error, ) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 530c0245a9..14b12cd1c4 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -44,7 +44,7 @@ from synapse.api.errors import ( SynapseError, ) from synapse.http.endpoint import matrix_federation_endpoint -from synapse.util.async_helpers import timeout_no_seriously +from synapse.util.async_helpers import timeout_deferred from synapse.util.logcontext import make_deferred_yieldable from synapse.util.metrics import Measure @@ -145,8 +145,14 @@ def _handle_json_response(reactor, timeout_sec, request, response): """ try: check_content_type_is_json(response.headers) + d = treq.json_content(response) - d.addTimeout(timeout_sec, reactor) + d = timeout_deferred( + d, + timeout=timeout_sec, + reactor=reactor, + ) + body = yield make_deferred_yieldable(d) except Exception as e: logger.warn( @@ -321,15 +327,10 @@ class MatrixFederationHttpClient(object): reactor=self.hs.get_reactor(), unbuffered=True ) - request_deferred.addTimeout(_sec_timeout, self.hs.get_reactor()) - # Sometimes the timeout above doesn't work, so lets hack yet - # another layer of timeouts in in the vain hope that at some - # point the world made sense and this really really really - # should work. - request_deferred = timeout_no_seriously( + request_deferred = timeout_deferred( request_deferred, - timeout=_sec_timeout * 2, + timeout=_sec_timeout, reactor=self.hs.get_reactor(), ) @@ -388,7 +389,11 @@ class MatrixFederationHttpClient(object): # :'( # Update transactions table? d = treq.content(response) - d.addTimeout(_sec_timeout, self.hs.get_reactor()) + d = timeout_deferred( + d, + timeout=_sec_timeout, + reactor=self.hs.get_reactor(), + ) body = yield make_deferred_yieldable(d) raise HttpResponseException( response.code, response.phrase, body diff --git a/synapse/notifier.py b/synapse/notifier.py index 82f391481c..2d683718fb 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -28,7 +28,7 @@ from synapse.types import StreamToken from synapse.util.async_helpers import ( DeferredTimeoutError, ObservableDeferred, - add_timeout_to_deferred, + timeout_deferred, ) from synapse.util.logcontext import PreserveLoggingContext, run_in_background from synapse.util.logutils import log_function @@ -337,7 +337,7 @@ class Notifier(object): # Now we wait for the _NotifierUserStream to be told there # is a new token. listener = user_stream.new_listener(prev_token) - add_timeout_to_deferred( + listener.deferred = timeout_deferred( listener.deferred, (end_time - now) / 1000., self.hs.get_reactor(), @@ -559,11 +559,12 @@ class Notifier(object): if end_time <= now: break - add_timeout_to_deferred( - listener.deferred.addTimeout, - (end_time - now) / 1000., - self.hs.get_reactor(), + listener.deferred = timeout_deferred( + listener.deferred, + timeout=(end_time - now) / 1000., + reactor=self.hs.get_reactor(), ) + try: with PreserveLoggingContext(): yield listener.deferred diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 40c2946129..2e12bcda98 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -380,23 +380,25 @@ class DeferredTimeoutError(Exception): """ -def add_timeout_to_deferred(deferred, timeout, reactor, on_timeout_cancel=None): - """ - Add a timeout to a deferred by scheduling it to be cancelled after - timeout seconds. +def _cancelled_to_timed_out_error(value, timeout): + if isinstance(value, failure.Failure): + value.trap(CancelledError) + raise DeferredTimeoutError(timeout, "Deferred") + return value - This is essentially a backport of deferred.addTimeout, which was introduced - in twisted 16.5. - If the deferred gets timed out, it errbacks with a DeferredTimeoutError, - unless a cancelable function was passed to its initialization or unless - a different on_timeout_cancel callable is provided. +def timeout_deferred(deferred, timeout, reactor, on_timeout_cancel=None): + """The in built twisted `Deferred.addTimeout` fails to time out deferreds + that have a canceller that throws exceptions. This method creates a new + deferred that wraps and times out the given deferred, correctly handling + the case where the given deferred's canceller throws. - Args: - deferred (defer.Deferred): deferred to be timed out - timeout (Number): seconds to time out after - reactor (twisted.internet.reactor): the Twisted reactor to use + NOTE: Unlike `Deferred.addTimeout`, this function returns a new deferred + Args: + deferred (Deferred) + timeout (float): Timeout in seconds + reactor (twisted.internet.reactor): The twisted reactor to use on_timeout_cancel (callable): A callable which is called immediately after the deferred times out, and not if this deferred is otherwise cancelled before the timeout. @@ -407,47 +409,9 @@ def add_timeout_to_deferred(deferred, timeout, reactor, on_timeout_cancel=None): The default callable (if none is provided) will translate a CancelledError Failure into a DeferredTimeoutError. - """ - timed_out = [False] - def time_it_out(): - timed_out[0] = True - deferred.cancel() - - delayed_call = reactor.callLater(timeout, time_it_out) - - def convert_cancelled(value): - if timed_out[0]: - to_call = on_timeout_cancel or _cancelled_to_timed_out_error - return to_call(value, timeout) - return value - - deferred.addBoth(convert_cancelled) - - def cancel_timeout(result): - # stop the pending call to cancel the deferred if it's been fired - if delayed_call.active(): - delayed_call.cancel() - return result - - deferred.addBoth(cancel_timeout) - - -def _cancelled_to_timed_out_error(value, timeout): - if isinstance(value, failure.Failure): - value.trap(CancelledError) - raise DeferredTimeoutError(timeout, "Deferred") - return value - - -def timeout_no_seriously(deferred, timeout, reactor): - """The in build twisted deferred addTimeout (and the method above) - completely fail to time things out under some unknown circumstances. - - Lets try a different way of timing things out and maybe that will make - things work?! - - TODO: Kill this with fire. + Returns: + Deferred """ new_d = defer.Deferred() @@ -466,7 +430,8 @@ def timeout_no_seriously(deferred, timeout, reactor): def convert_cancelled(value): if timed_out[0]: - return _cancelled_to_timed_out_error(value, timeout) + to_call = on_timeout_cancel or _cancelled_to_timed_out_error + return to_call(value, timeout) return value deferred.addBoth(convert_cancelled) -- cgit 1.5.1 From 6c48aa0256ce7dfd62e72a77b03f5bb2222473ba Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 19 Sep 2018 11:04:42 +0100 Subject: Run canceller first to allow it to generate correct error --- synapse/util/async_helpers.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) (limited to 'synapse/util') diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 2e12bcda98..2b2e85ecb7 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -421,11 +421,14 @@ def timeout_deferred(deferred, timeout, reactor, on_timeout_cancel=None): def time_it_out(): timed_out[0] = True + try: + deferred.cancel() + except: # noqa: E722, if we throw any exception it'll break time outs + logger.exception("Canceller failed during timeout") + if not new_d.called: new_d.errback(DeferredTimeoutError(timeout, "Deferred")) - deferred.cancel() - delayed_call = reactor.callLater(timeout, time_it_out) def convert_cancelled(value): -- cgit 1.5.1 From 9407bcf37a99ebb72eef73e19632cdcf5964c968 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 19 Sep 2018 11:05:21 +0100 Subject: Replace custom DeferredTimeoutError with defer.TimeoutError --- synapse/notifier.py | 5 ++--- synapse/util/async_helpers.py | 12 +++--------- 2 files changed, 5 insertions(+), 12 deletions(-) (limited to 'synapse/util') diff --git a/synapse/notifier.py b/synapse/notifier.py index 2d683718fb..2e7525b7d2 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -26,7 +26,6 @@ from synapse.handlers.presence import format_user_presence_state from synapse.metrics import LaterGauge from synapse.types import StreamToken from synapse.util.async_helpers import ( - DeferredTimeoutError, ObservableDeferred, timeout_deferred, ) @@ -354,7 +353,7 @@ class Notifier(object): # Update the prev_token to the current_token since nothing # has happened between the old prev_token and the current_token prev_token = current_token - except DeferredTimeoutError: + except defer.TimeoutError: break except defer.CancelledError: break @@ -568,7 +567,7 @@ class Notifier(object): try: with PreserveLoggingContext(): yield listener.deferred - except DeferredTimeoutError: + except defer.TimeoutError: break except defer.CancelledError: break diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 2b2e85ecb7..ec7b2c9672 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -374,16 +374,10 @@ class ReadWriteLock(object): defer.returnValue(_ctx_manager()) -class DeferredTimeoutError(Exception): - """ - This error is raised by default when a L{Deferred} times out. - """ - - def _cancelled_to_timed_out_error(value, timeout): if isinstance(value, failure.Failure): value.trap(CancelledError) - raise DeferredTimeoutError(timeout, "Deferred") + raise defer.TimeoutError(timeout, "Deferred") return value @@ -408,7 +402,7 @@ def timeout_deferred(deferred, timeout, reactor, on_timeout_cancel=None): the timeout. The default callable (if none is provided) will translate a - CancelledError Failure into a DeferredTimeoutError. + CancelledError Failure into a defer.TimeoutError. Returns: Deferred @@ -427,7 +421,7 @@ def timeout_deferred(deferred, timeout, reactor, on_timeout_cancel=None): logger.exception("Canceller failed during timeout") if not new_d.called: - new_d.errback(DeferredTimeoutError(timeout, "Deferred")) + new_d.errback(defer.TimeoutError(timeout, "Deferred")) delayed_call = reactor.callLater(timeout, time_it_out) -- cgit 1.5.1 From 642199570c57fd37b856c3808e08fe2923001b55 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Wed, 19 Sep 2018 17:28:18 +0100 Subject: Improve the logging when handling a federation transaction (#3904) Let's try to rationalise the logging that happens when we are processing an incoming transaction, to make it easier to figure out what is going wrong when they take ages. In particular: - make everything start with a [room_id event_id] prefix - make sure we log a warning when catching exceptions rather than just turning them into other, more cryptic, exceptions. --- changelog.d/3904.misc | 1 + synapse/handlers/federation.py | 164 +++++++++++++++++++++++++++-------------- synapse/util/retryutils.py | 2 +- 3 files changed, 111 insertions(+), 56 deletions(-) create mode 100644 changelog.d/3904.misc (limited to 'synapse/util') diff --git a/changelog.d/3904.misc b/changelog.d/3904.misc new file mode 100644 index 0000000000..1e3c8e1706 --- /dev/null +++ b/changelog.d/3904.misc @@ -0,0 +1 @@ +Improve the logging when handling a federation transaction \ No newline at end of file diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index f10b46414b..8d6bd7976d 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -69,6 +69,27 @@ from ._base import BaseHandler logger = logging.getLogger(__name__) +def shortstr(iterable, maxitems=5): + """If iterable has maxitems or fewer, return the stringification of a list + containing those items. + + Otherwise, return the stringification of a a list with the first maxitems items, + followed by "...". + + Args: + iterable (Iterable): iterable to truncate + maxitems (int): number of items to return before truncating + + Returns: + unicode + """ + + items = list(itertools.islice(iterable, maxitems + 1)) + if len(items) <= maxitems: + return str(items) + return u"[" + u", ".join(repr(r) for r in items[:maxitems]) + u", ...]" + + class FederationHandler(BaseHandler): """Handles events that originated from federation. Responsible for: @@ -114,7 +135,6 @@ class FederationHandler(BaseHandler): self._room_pdu_linearizer = Linearizer("fed_room_pdu") @defer.inlineCallbacks - @log_function def on_receive_pdu( self, origin, pdu, get_missing=True, sent_to_us_directly=False, ): @@ -130,9 +150,17 @@ class FederationHandler(BaseHandler): Returns (Deferred): completes with None """ + room_id = pdu.room_id + event_id = pdu.event_id + + logger.info( + "[%s %s] handling received PDU: %s", + room_id, event_id, pdu, + ) + # We reprocess pdus when we have seen them only as outliers existing = yield self.store.get_event( - pdu.event_id, + event_id, allow_none=True, allow_rejected=True, ) @@ -147,7 +175,7 @@ class FederationHandler(BaseHandler): ) ) if already_seen: - logger.debug("Already seen pdu %s", pdu.event_id) + logger.debug("[%s %s]: Already seen pdu", room_id, event_id) return # do some initial sanity-checking of the event. In particular, make @@ -156,6 +184,7 @@ class FederationHandler(BaseHandler): try: self._sanity_check_event(pdu) except SynapseError as err: + logger.warn("[%s %s] Received event failed sanity checks", room_id, event_id) raise FederationError( "ERROR", err.code, @@ -165,10 +194,12 @@ class FederationHandler(BaseHandler): # If we are currently in the process of joining this room, then we # queue up events for later processing. - if pdu.room_id in self.room_queues: - logger.info("Ignoring PDU %s for room %s from %s for now; join " - "in progress", pdu.event_id, pdu.room_id, origin) - self.room_queues[pdu.room_id].append((pdu, origin)) + if room_id in self.room_queues: + logger.info( + "[%s %s] Queuing PDU from %s for now: join in progress", + room_id, event_id, origin, + ) + self.room_queues[room_id].append((pdu, origin)) return # If we're no longer in the room just ditch the event entirely. This @@ -179,7 +210,7 @@ class FederationHandler(BaseHandler): # we should check if we *are* in fact in the room. If we are then we # can magically rejoin the room. is_in_room = yield self.auth.check_host_in_room( - pdu.room_id, + room_id, self.server_name ) if not is_in_room: @@ -188,8 +219,8 @@ class FederationHandler(BaseHandler): ) if was_in_room: logger.info( - "Ignoring PDU %s for room %s from %s as we've left the room!", - pdu.event_id, pdu.room_id, origin, + "[%s %s] Ignoring PDU from %s as we've left the room", + room_id, event_id, origin, ) defer.returnValue(None) @@ -204,8 +235,8 @@ class FederationHandler(BaseHandler): ) logger.debug( - "_handle_new_pdu min_depth for %s: %d", - pdu.room_id, min_depth + "[%s %s] min_depth: %d", + room_id, event_id, min_depth, ) prevs = {e_id for e_id, _ in pdu.prev_events} @@ -218,17 +249,18 @@ class FederationHandler(BaseHandler): # send to the clients. pdu.internal_metadata.outlier = True elif min_depth and pdu.depth > min_depth: - if get_missing and prevs - seen: + missing_prevs = prevs - seen + if get_missing and missing_prevs: # If we're missing stuff, ensure we only fetch stuff one # at a time. logger.info( - "Acquiring lock for room %r to fetch %d missing events: %r...", - pdu.room_id, len(prevs - seen), list(prevs - seen)[:5], + "[%s %s] Acquiring room lock to fetch %d missing prev_events: %s", + room_id, event_id, len(missing_prevs), shortstr(missing_prevs), ) with (yield self._room_pdu_linearizer.queue(pdu.room_id)): logger.info( - "Acquired lock for room %r to fetch %d missing events", - pdu.room_id, len(prevs - seen), + "[%s %s] Acquired room lock to fetch %d missing prev_events", + room_id, event_id, len(missing_prevs), ) yield self._get_missing_events_for_pdu( @@ -241,19 +273,23 @@ class FederationHandler(BaseHandler): if not prevs - seen: logger.info( - "Found all missing prev events for %s", pdu.event_id + "[%s %s] Found all missing prev_events", + room_id, event_id, ) - elif prevs - seen: + elif missing_prevs: logger.info( - "Not fetching %d missing events for room %r,event %s: %r...", - len(prevs - seen), pdu.room_id, pdu.event_id, - list(prevs - seen)[:5], + "[%s %s] Not recursively fetching %d missing prev_events: %s", + room_id, event_id, len(missing_prevs), shortstr(missing_prevs), ) if sent_to_us_directly and prevs - seen: # If they have sent it to us directly, and the server # isn't telling us about the auth events that it's # made a message referencing, we explode + logger.warn( + "[%s %s] Failed to fetch %d prev events: rejecting", + room_id, event_id, len(prevs - seen), + ) raise FederationError( "ERROR", 403, @@ -270,15 +306,19 @@ class FederationHandler(BaseHandler): auth_chains = set() try: # Get the state of the events we know about - ours = yield self.store.get_state_groups(pdu.room_id, list(seen)) + ours = yield self.store.get_state_groups(room_id, list(seen)) state_groups.append(ours) # Ask the remote server for the states we don't # know about for p in prevs - seen: + logger.info( + "[%s %s] Requesting state at missing prev_event %s", + room_id, event_id, p, + ) state, got_auth_chain = ( yield self.federation_client.get_state_for_room( - origin, pdu.room_id, p + origin, room_id, p, ) ) auth_chains.update(got_auth_chain) @@ -291,19 +331,24 @@ class FederationHandler(BaseHandler): ev_ids, get_prev_content=False, check_redacted=False ) - room_version = yield self.store.get_room_version(pdu.room_id) + room_version = yield self.store.get_room_version(room_id) state_map = yield resolve_events_with_factory( - room_version, state_groups, {pdu.event_id: pdu}, fetch + room_version, state_groups, {event_id: pdu}, fetch ) state = (yield self.store.get_events(state_map.values())).values() auth_chain = list(auth_chains) except Exception: + logger.warn( + "[%s %s] Error attempting to resolve state at missing " + "prev_events", + room_id, event_id, exc_info=True, + ) raise FederationError( "ERROR", 403, "We can't get valid state history.", - affected=pdu.event_id, + affected=event_id, ) yield self._process_received_pdu( @@ -322,15 +367,16 @@ class FederationHandler(BaseHandler): prevs (set(str)): List of event ids which we are missing min_depth (int): Minimum depth of events to return. """ - # We recalculate seen, since it may have changed. + + room_id = pdu.room_id + event_id = pdu.event_id + seen = yield self.store.have_seen_events(prevs) if not prevs - seen: return - latest = yield self.store.get_latest_event_ids_in_room( - pdu.room_id - ) + latest = yield self.store.get_latest_event_ids_in_room(room_id) # We add the prev events that we have seen to the latest # list to ensure the remote server doesn't give them to us @@ -338,8 +384,8 @@ class FederationHandler(BaseHandler): latest |= seen logger.info( - "Missing %d events for room %r pdu %s: %r...", - len(prevs - seen), pdu.room_id, pdu.event_id, list(prevs - seen)[:5] + "[%s %s]: Requesting %d prev_events: %s", + room_id, event_id, len(prevs - seen), shortstr(prevs - seen) ) # XXX: we set timeout to 10s to help workaround @@ -392,7 +438,7 @@ class FederationHandler(BaseHandler): missing_events = yield self.federation_client.get_missing_events( origin, - pdu.room_id, + room_id, earliest_events_ids=list(latest), latest_events=[pdu], limit=10, @@ -401,37 +447,46 @@ class FederationHandler(BaseHandler): ) logger.info( - "Got %d events: %r...", - len(missing_events), [e.event_id for e in missing_events[:5]] + "[%s %s]: Got %d prev_events: %s", + room_id, event_id, len(missing_events), shortstr(missing_events), ) # We want to sort these by depth so we process them and # tell clients about them in order. missing_events.sort(key=lambda x: x.depth) - for e in missing_events: - logger.info("Handling found event %s", e.event_id) + for ev in missing_events: + logger.info( + "[%s %s] Handling received prev_event %s", + room_id, event_id, ev.event_id, + ) try: yield self.on_receive_pdu( origin, - e, + ev, get_missing=False ) except FederationError as e: if e.code == 403: - logger.warn("Event %s failed history check.") + logger.warn( + "[%s %s] Received prev_event %s failed history check.", + room_id, event_id, ev.event_id, + ) else: raise - @log_function @defer.inlineCallbacks - def _process_received_pdu(self, origin, pdu, state, auth_chain): + def _process_received_pdu(self, origin, event, state, auth_chain): """ Called when we have a new pdu. We need to do auth checks and put it through the StateHandler. """ - event = pdu + room_id = event.room_id + event_id = event.event_id - logger.debug("Processing event: %s", event) + logger.debug( + "[%s %s] Processing event: %s", + room_id, event_id, event, + ) # FIXME (erikj): Awful hack to make the case where we are not currently # in the room work @@ -440,15 +495,16 @@ class FederationHandler(BaseHandler): # event. if state and auth_chain and not event.internal_metadata.is_outlier(): is_in_room = yield self.auth.check_host_in_room( - event.room_id, + room_id, self.server_name ) else: is_in_room = True + if not is_in_room: logger.info( - "Got event for room we're not in: %r %r", - event.room_id, event.event_id + "[%s %s] Got event for room we're not in", + room_id, event_id, ) try: @@ -460,7 +516,7 @@ class FederationHandler(BaseHandler): "ERROR", e.code, e.msg, - affected=event.event_id, + affected=event_id, ) else: @@ -509,12 +565,12 @@ class FederationHandler(BaseHandler): affected=event.event_id, ) - room = yield self.store.get_room(event.room_id) + room = yield self.store.get_room(room_id) if not room: try: yield self.store.store_room( - room_id=event.room_id, + room_id=room_id, room_creator_user_id="", is_public=False, ) @@ -542,7 +598,7 @@ class FederationHandler(BaseHandler): if newly_joined: user = UserID.from_string(event.state_key) - yield self.user_joined_room(user, event.room_id) + yield self.user_joined_room(user, room_id) @log_function @defer.inlineCallbacks @@ -1459,12 +1515,10 @@ class FederationHandler(BaseHandler): else: defer.returnValue(None) - @log_function def get_min_depth_for_context(self, context): return self.store.get_min_depth(context) @defer.inlineCallbacks - @log_function def _handle_new_event(self, origin, event, state=None, auth_events=None, backfilled=False): context = yield self._prep_event( @@ -1664,8 +1718,8 @@ class FederationHandler(BaseHandler): ) except AuthError as e: logger.warn( - "Rejecting %s because %s", - event.event_id, e.msg + "[%s %s] Rejecting: %s", + event.room_id, event.event_id, e.msg ) context.rejected = RejectedReason.AUTH_ERROR diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index 8a3a06fd74..26cce7d197 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -188,7 +188,7 @@ class RetryDestinationLimiter(object): else: self.retry_interval = self.min_retry_interval - logger.debug( + logger.info( "Connection to %s was unsuccessful (%s(%s)); backoff now %i", self.destination, exc_type, exc_val, self.retry_interval ) -- cgit 1.5.1 From 8601c24287c452c0d9a803b130e0f68cf6169f6a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 21 Sep 2018 14:19:46 +0100 Subject: Fix some instances of ExpiringCache not expiring cache items ExpiringCache required that `start()` be called before it would actually start expiring entries. A number of places didn't do that. This PR removes `start` from ExpiringCache, and automatically starts backround reaping process on creation instead. --- synapse/app/appservice.py | 1 - synapse/app/client_reader.py | 1 - synapse/app/event_creator.py | 1 - synapse/app/federation_reader.py | 1 - synapse/app/federation_sender.py | 1 - synapse/app/frontend_proxy.py | 1 - synapse/app/homeserver.py | 1 - synapse/app/media_repository.py | 1 - synapse/app/pusher.py | 1 - synapse/app/synchrotron.py | 1 - synapse/app/user_dir.py | 1 - synapse/federation/federation_client.py | 28 ++++++++++++--------------- synapse/rest/media/v1/preview_url_resource.py | 1 - synapse/state/__init__.py | 9 --------- synapse/util/caches/expiringcache.py | 1 - tests/util/test_expiring_cache.py | 1 - 16 files changed, 12 insertions(+), 39 deletions(-) (limited to 'synapse/util') diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py index 86b5067400..02039f7e79 100644 --- a/synapse/app/appservice.py +++ b/synapse/app/appservice.py @@ -172,7 +172,6 @@ def start(config_options): def start(): ps.get_datastore().start_profiling() - ps.get_state_handler().start_caching() reactor.callWhenRunning(start) diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py index ce2b113dbb..4c73c637bb 100644 --- a/synapse/app/client_reader.py +++ b/synapse/app/client_reader.py @@ -181,7 +181,6 @@ def start(config_options): ss.start_listening(config.worker_listeners) def start(): - ss.get_state_handler().start_caching() ss.get_datastore().start_profiling() reactor.callWhenRunning(start) diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py index f98e456ea0..bc82197b2a 100644 --- a/synapse/app/event_creator.py +++ b/synapse/app/event_creator.py @@ -199,7 +199,6 @@ def start(config_options): ss.start_listening(config.worker_listeners) def start(): - ss.get_state_handler().start_caching() ss.get_datastore().start_profiling() reactor.callWhenRunning(start) diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index 60f5973505..18ca71ef99 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -168,7 +168,6 @@ def start(config_options): ss.start_listening(config.worker_listeners) def start(): - ss.get_state_handler().start_caching() ss.get_datastore().start_profiling() reactor.callWhenRunning(start) diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 60dd09aac3..6501c57792 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -201,7 +201,6 @@ def start(config_options): def start(): ps.get_datastore().start_profiling() - ps.get_state_handler().start_caching() reactor.callWhenRunning(start) _base.start_worker_reactor("synapse-federation-sender", config) diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py index 8c0b9c67b0..b076fbe522 100644 --- a/synapse/app/frontend_proxy.py +++ b/synapse/app/frontend_proxy.py @@ -258,7 +258,6 @@ def start(config_options): ss.start_listening(config.worker_listeners) def start(): - ss.get_state_handler().start_caching() ss.get_datastore().start_profiling() reactor.callWhenRunning(start) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 3241ded188..8c5d858b0b 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -384,7 +384,6 @@ def setup(config_options): def start(): hs.get_pusherpool().start() - hs.get_state_handler().start_caching() hs.get_datastore().start_profiling() hs.get_datastore().start_doing_background_updates() hs.get_federation_client().start_get_pdu_cache() diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py index e3dbb3b4e6..992d182dba 100644 --- a/synapse/app/media_repository.py +++ b/synapse/app/media_repository.py @@ -168,7 +168,6 @@ def start(config_options): ss.start_listening(config.worker_listeners) def start(): - ss.get_state_handler().start_caching() ss.get_datastore().start_profiling() reactor.callWhenRunning(start) diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 244c604de9..2ec4c7defb 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -228,7 +228,6 @@ def start(config_options): def start(): ps.get_pusherpool().start() ps.get_datastore().start_profiling() - ps.get_state_handler().start_caching() reactor.callWhenRunning(start) diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 6662340797..df81b7bcbe 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -435,7 +435,6 @@ def start(config_options): def start(): ss.get_datastore().start_profiling() - ss.get_state_handler().start_caching() reactor.callWhenRunning(start) diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py index 96ffcaf073..b383e79c1c 100644 --- a/synapse/app/user_dir.py +++ b/synapse/app/user_dir.py @@ -229,7 +229,6 @@ def start(config_options): def start(): ps.get_datastore().start_profiling() - ps.get_state_handler().start_caching() reactor.callWhenRunning(start) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index fe67b2ff42..5a92428f56 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -66,6 +66,14 @@ class FederationClient(FederationBase): self.state = hs.get_state_handler() self.transport_layer = hs.get_federation_transport_client() + self._get_pdu_cache = ExpiringCache( + cache_name="get_pdu_cache", + clock=self._clock, + max_len=1000, + expiry_ms=120 * 1000, + reset_expiry_on_get=False, + ) + def _clear_tried_cache(self): """Clear pdu_destination_tried cache""" now = self._clock.time_msec() @@ -82,17 +90,6 @@ class FederationClient(FederationBase): if destination_dict: self.pdu_destination_tried[event_id] = destination_dict - def start_get_pdu_cache(self): - self._get_pdu_cache = ExpiringCache( - cache_name="get_pdu_cache", - clock=self._clock, - max_len=1000, - expiry_ms=120 * 1000, - reset_expiry_on_get=False, - ) - - self._get_pdu_cache.start() - @log_function def make_query(self, destination, query_type, args, retry_on_dns_fail=False, ignore_backoff=False): @@ -229,10 +226,9 @@ class FederationClient(FederationBase): # TODO: Rate limit the number of times we try and get the same event. - if self._get_pdu_cache: - ev = self._get_pdu_cache.get(event_id) - if ev: - defer.returnValue(ev) + ev = self._get_pdu_cache.get(event_id) + if ev: + defer.returnValue(ev) pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {}) @@ -285,7 +281,7 @@ class FederationClient(FederationBase): ) continue - if self._get_pdu_cache is not None and signed_pdu: + if signed_pdu: self._get_pdu_cache[event_id] = signed_pdu defer.returnValue(signed_pdu) diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index cad2dec33a..af01040a38 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -79,7 +79,6 @@ class PreviewUrlResource(Resource): # don't spider URLs more often than once an hour expiry_ms=60 * 60 * 1000, ) - self._cache.start() self._cleaner_loop = self.clock.looping_call( self._start_expire_url_cache_data, 10 * 1000, diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index d7ae22a661..b22495c1f9 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -95,10 +95,6 @@ class StateHandler(object): self.hs = hs self._state_resolution_handler = hs.get_state_resolution_handler() - def start_caching(self): - # TODO: remove this shim - self._state_resolution_handler.start_caching() - @defer.inlineCallbacks def get_current_state(self, room_id, event_type=None, state_key="", latest_event_ids=None): @@ -428,9 +424,6 @@ class StateResolutionHandler(object): self._state_cache = None self.resolve_linearizer = Linearizer(name="state_resolve_lock") - def start_caching(self): - logger.debug("start_caching") - self._state_cache = ExpiringCache( cache_name="state_cache", clock=self.clock, @@ -440,8 +433,6 @@ class StateResolutionHandler(object): reset_expiry_on_get=True, ) - self._state_cache.start() - @defer.inlineCallbacks @log_function def resolve_state_groups( diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py index ce85b2ae11..921a9c5b29 100644 --- a/synapse/util/caches/expiringcache.py +++ b/synapse/util/caches/expiringcache.py @@ -58,7 +58,6 @@ class ExpiringCache(object): self.metrics = register_cache("expiring", cache_name, self) - def start(self): if not self._expiry_ms: # Don't bother starting the loop if things never expire return diff --git a/tests/util/test_expiring_cache.py b/tests/util/test_expiring_cache.py index 5cbada4eda..50bc7702d2 100644 --- a/tests/util/test_expiring_cache.py +++ b/tests/util/test_expiring_cache.py @@ -65,7 +65,6 @@ class ExpiringCacheTestCase(unittest.TestCase): def test_time_eviction(self): clock = MockClock() cache = ExpiringCache("test", clock, expiry_ms=1000) - cache.start() cache["key"] = 1 clock.advance_time(0.5) -- cgit 1.5.1 From 79eded1ae4f0d341918f5c58076f1e60cd400e8a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 21 Sep 2018 14:52:21 +0100 Subject: Make ExpiringCache slightly more performant --- synapse/util/caches/expiringcache.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'synapse/util') diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py index 921a9c5b29..48ca2d634d 100644 --- a/synapse/util/caches/expiringcache.py +++ b/synapse/util/caches/expiringcache.py @@ -16,6 +16,8 @@ import logging from collections import OrderedDict +from six import iteritems + from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.caches import register_cache @@ -127,7 +129,7 @@ class ExpiringCache(object): keys_to_delete = set() - for key, cache_entry in self._cache.items(): + for key, cache_entry in iteritems(self._cache): if now - cache_entry.time > self._expiry_ms: keys_to_delete.add(key) @@ -149,6 +151,8 @@ class ExpiringCache(object): class _CacheEntry(object): + __slots__ = ["time", "value"] + def __init__(self, time, value): self.time = time self.value = value -- cgit 1.5.1 From fdd1a62e8d09ddccbe685fe7d7840990a9c06241 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 21 Sep 2018 14:55:47 +0100 Subject: Add a five minute cache to get_destination_retry_timings Hopefully helps with #3931 --- synapse/storage/transactions.py | 23 ++++++++++++++++++++++- synapse/util/caches/expiringcache.py | 13 +++++++++++++ 2 files changed, 35 insertions(+), 1 deletion(-) (limited to 'synapse/util') diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index baf0379a68..ab54977a75 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -23,6 +23,7 @@ from canonicaljson import encode_canonical_json from twisted.internet import defer from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.util.caches.expiringcache import ExpiringCache from ._base import SQLBaseStore, db_to_json @@ -49,6 +50,8 @@ _UpdateTransactionRow = namedtuple( ) ) +SENTINEL = object() + class TransactionStore(SQLBaseStore): """A collection of queries for handling PDUs. @@ -59,6 +62,12 @@ class TransactionStore(SQLBaseStore): self._clock.looping_call(self._start_cleanup_transactions, 30 * 60 * 1000) + self._destination_retry_cache = ExpiringCache( + cache_name="get_destination_retry_timings", + clock=self._clock, + expiry_ms=5 * 60 * 1000, + ) + def get_received_txn_response(self, transaction_id, origin): """For an incoming transaction from a given origin, check if we have already responded to it. If so, return the response code and response @@ -155,6 +164,7 @@ class TransactionStore(SQLBaseStore): """ pass + @defer.inlineCallbacks def get_destination_retry_timings(self, destination): """Gets the current retry timings (if any) for a given destination. @@ -165,10 +175,20 @@ class TransactionStore(SQLBaseStore): None if not retrying Otherwise a dict for the retry scheme """ - return self.runInteraction( + + result = self._destination_retry_cache.get(destination, SENTINEL) + if result is not SENTINEL: + defer.returnValue(result) + + result = yield self.runInteraction( "get_destination_retry_timings", self._get_destination_retry_timings, destination) + # We don't hugely care about race conditions between getting and + # invalidating the cache, since we time out fairly quickly anyway. + self._destination_retry_cache[destination] = result + defer.returnValue(result) + def _get_destination_retry_timings(self, txn, destination): result = self._simple_select_one_txn( txn, @@ -196,6 +216,7 @@ class TransactionStore(SQLBaseStore): retry_interval (int) - how long until next retry in ms """ + self._destination_retry_cache.pop(destination) return self.runInteraction( "set_destination_retry_timings", self._set_destination_retry_timings, diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py index 48ca2d634d..346669e4ce 100644 --- a/synapse/util/caches/expiringcache.py +++ b/synapse/util/caches/expiringcache.py @@ -24,6 +24,9 @@ from synapse.util.caches import register_cache logger = logging.getLogger(__name__) +SENTINEL = object() + + class ExpiringCache(object): def __init__(self, cache_name, clock, max_len=0, expiry_ms=0, reset_expiry_on_get=False, iterable=False): @@ -102,6 +105,16 @@ class ExpiringCache(object): return entry.value + def pop(self, key, default=None): + value = self._cache.pop(key, SENTINEL) + if value is SENTINEL: + return default + + if self.iterable: + self._size_estimate -= len(value.value) + + return value + def __contains__(self, key): return key in self._cache -- cgit 1.5.1 From 19dc676d1aaf66a45c9a1810132952f7ae3ca2aa Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 21 Sep 2018 16:25:42 +0100 Subject: Fix ExpiringCache.__len__ to be accurate It used to try and produce an estimate, which was sometimes negative. This caused metrics to be sad, so lets always just calculate it from scratch. --- synapse/util/caches/expiringcache.py | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) (limited to 'synapse/util') diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py index 346669e4ce..8ac56d85ff 100644 --- a/synapse/util/caches/expiringcache.py +++ b/synapse/util/caches/expiringcache.py @@ -16,7 +16,7 @@ import logging from collections import OrderedDict -from six import iteritems +from six import iteritems, itervalues from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.caches import register_cache @@ -59,8 +59,6 @@ class ExpiringCache(object): self.iterable = iterable - self._size_estimate = 0 - self.metrics = register_cache("expiring", cache_name, self) if not self._expiry_ms: @@ -79,16 +77,11 @@ class ExpiringCache(object): now = self._clock.time_msec() self._cache[key] = _CacheEntry(now, value) - if self.iterable: - self._size_estimate += len(value) - # Evict if there are now too many items while self._max_len and len(self) > self._max_len: _key, value = self._cache.popitem(last=False) if self.iterable: - removed_len = len(value.value) - self.metrics.inc_evictions(removed_len) - self._size_estimate -= removed_len + self.metrics.inc_evictions(len(value.value)) else: self.metrics.inc_evictions() @@ -111,7 +104,9 @@ class ExpiringCache(object): return default if self.iterable: - self._size_estimate -= len(value.value) + self.metrics.inc_evictions(len(value.value)) + else: + self.metrics.inc_evictions() return value @@ -149,7 +144,9 @@ class ExpiringCache(object): for k in keys_to_delete: value = self._cache.pop(k) if self.iterable: - self._size_estimate -= len(value.value) + self.metrics.inc_evictions(len(value.value)) + else: + self.metrics.inc_evictions() logger.debug( "[%s] _prune_cache before: %d, after len: %d", @@ -158,7 +155,7 @@ class ExpiringCache(object): def __len__(self): if self.iterable: - return self._size_estimate + return sum(len(entry.value) for entry in itervalues(self._cache)) else: return len(self._cache) -- cgit 1.5.1 From 3baf6e1667f0c257f288a28bd542a93c2445fd30 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 21 Sep 2018 16:25:42 +0100 Subject: Fix ExpiringCache.__len__ to be accurate It used to try and produce an estimate, which was sometimes negative. This caused metrics to be sad, so lets always just calculate it from scratch. (This appears to have been a longstanding bug, but one which has been made more of a problem by #3932 and #3933). (This was originally done by Erik as part of #3933. I'm cherry-picking it because really it's a fix in its own right) --- synapse/util/caches/expiringcache.py | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) (limited to 'synapse/util') diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py index 921a9c5b29..9af4ec4aa8 100644 --- a/synapse/util/caches/expiringcache.py +++ b/synapse/util/caches/expiringcache.py @@ -16,6 +16,8 @@ import logging from collections import OrderedDict +from six import itervalues + from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.caches import register_cache @@ -54,8 +56,6 @@ class ExpiringCache(object): self.iterable = iterable - self._size_estimate = 0 - self.metrics = register_cache("expiring", cache_name, self) if not self._expiry_ms: @@ -74,16 +74,11 @@ class ExpiringCache(object): now = self._clock.time_msec() self._cache[key] = _CacheEntry(now, value) - if self.iterable: - self._size_estimate += len(value) - # Evict if there are now too many items while self._max_len and len(self) > self._max_len: _key, value = self._cache.popitem(last=False) if self.iterable: - removed_len = len(value.value) - self.metrics.inc_evictions(removed_len) - self._size_estimate -= removed_len + self.metrics.inc_evictions(len(value.value)) else: self.metrics.inc_evictions() @@ -134,7 +129,9 @@ class ExpiringCache(object): for k in keys_to_delete: value = self._cache.pop(k) if self.iterable: - self._size_estimate -= len(value.value) + self.metrics.inc_evictions(len(value.value)) + else: + self.metrics.inc_evictions() logger.debug( "[%s] _prune_cache before: %d, after len: %d", @@ -143,7 +140,7 @@ class ExpiringCache(object): def __len__(self): if self.iterable: - return self._size_estimate + return sum(len(entry.value) for entry in itervalues(self._cache)) else: return len(self._cache) -- cgit 1.5.1 From 7ee94fc1baf6c4e2970232383d76277d8378712b Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 26 Sep 2018 12:40:20 +0100 Subject: Log which cache is throwing exceptions --- synapse/util/caches/__init__.py | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) (limited to 'synapse/util') diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py index 7b065b195e..f37d5bec08 100644 --- a/synapse/util/caches/__init__.py +++ b/synapse/util/caches/__init__.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging import os import six @@ -20,6 +21,8 @@ from six.moves import intern from prometheus_client.core import REGISTRY, Gauge, GaugeMetricFamily +logger = logging.getLogger(__name__) + CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.5)) @@ -76,16 +79,20 @@ def register_cache(cache_type, cache_name, cache): return [] def collect(self): - if cache_type == "response_cache": - response_cache_size.labels(cache_name).set(len(cache)) - response_cache_hits.labels(cache_name).set(self.hits) - response_cache_evicted.labels(cache_name).set(self.evicted_size) - response_cache_total.labels(cache_name).set(self.hits + self.misses) - else: - cache_size.labels(cache_name).set(len(cache)) - cache_hits.labels(cache_name).set(self.hits) - cache_evicted.labels(cache_name).set(self.evicted_size) - cache_total.labels(cache_name).set(self.hits + self.misses) + try: + if cache_type == "response_cache": + response_cache_size.labels(cache_name).set(len(cache)) + response_cache_hits.labels(cache_name).set(self.hits) + response_cache_evicted.labels(cache_name).set(self.evicted_size) + response_cache_total.labels(cache_name).set(self.hits + self.misses) + else: + cache_size.labels(cache_name).set(len(cache)) + cache_hits.labels(cache_name).set(self.hits) + cache_evicted.labels(cache_name).set(self.evicted_size) + cache_total.labels(cache_name).set(self.hits + self.misses) + except Exception as e: + logger.warn("Error calculating metrics for %s: %s", cache_name, e) + raise yield GaugeMetricFamily("__unused", "") -- cgit 1.5.1 From 4a15a3e4d539dcea9a4a57e7cd800a926f2a17c3 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Thu, 27 Sep 2018 11:25:34 +0100 Subject: Include eventid in log lines when processing incoming federation transactions (#3959) when processing incoming transactions, it can be hard to see what's going on, because we process a bunch of stuff in parallel, and because we may end up recursively working our way through a chain of three or four events. This commit creates a way to use logcontexts to add the relevant event ids to the log lines. --- changelog.d/3959.feature | 1 + synapse/federation/federation_server.py | 32 ++++++++-------- synapse/handlers/federation.py | 65 ++++++++++++++++++++------------- synapse/util/logcontext.py | 41 +++++++++++++++++++-- tests/test_federation.py | 28 ++++++++------ tests/util/test_logcontext.py | 5 +++ 6 files changed, 115 insertions(+), 57 deletions(-) create mode 100644 changelog.d/3959.feature (limited to 'synapse/util') diff --git a/changelog.d/3959.feature b/changelog.d/3959.feature new file mode 100644 index 0000000000..b3a4f37a8d --- /dev/null +++ b/changelog.d/3959.feature @@ -0,0 +1 @@ +Include eventid in log lines when processing incoming federation transactions \ No newline at end of file diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 9a571e4fc7..819e8f7331 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -46,6 +46,7 @@ from synapse.replication.http.federation import ( from synapse.types import get_domain_from_id from synapse.util.async_helpers import Linearizer, concurrently_execute from synapse.util.caches.response_cache import ResponseCache +from synapse.util.logcontext import nested_logging_context from synapse.util.logutils import log_function # when processing incoming transactions, we try to handle multiple rooms in @@ -187,21 +188,22 @@ class FederationServer(FederationBase): for pdu in pdus_by_room[room_id]: event_id = pdu.event_id - try: - yield self._handle_received_pdu( - origin, pdu - ) - pdu_results[event_id] = {} - except FederationError as e: - logger.warn("Error handling PDU %s: %s", event_id, e) - pdu_results[event_id] = {"error": str(e)} - except Exception as e: - f = failure.Failure() - pdu_results[event_id] = {"error": str(e)} - logger.error( - "Failed to handle PDU %s: %s", - event_id, f.getTraceback().rstrip(), - ) + with nested_logging_context(event_id): + try: + yield self._handle_received_pdu( + origin, pdu + ) + pdu_results[event_id] = {} + except FederationError as e: + logger.warn("Error handling PDU %s: %s", event_id, e) + pdu_results[event_id] = {"error": str(e)} + except Exception as e: + f = failure.Failure() + pdu_results[event_id] = {"error": str(e)} + logger.error( + "Failed to handle PDU %s: %s", + event_id, f.getTraceback().rstrip(), + ) yield concurrently_execute( process_pdus_for_room, pdus_by_room.keys(), diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 2ccdc3bfa7..993546387c 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -339,14 +339,18 @@ class FederationHandler(BaseHandler): "[%s %s] Requesting state at missing prev_event %s", room_id, event_id, p, ) - state, got_auth_chain = ( - yield self.federation_client.get_state_for_room( - origin, room_id, p, + + with logcontext.nested_logging_context(p): + state, got_auth_chain = ( + yield self.federation_client.get_state_for_room( + origin, room_id, p, + ) ) - ) - auth_chains.update(got_auth_chain) - state_group = {(x.type, x.state_key): x.event_id for x in state} - state_groups.append(state_group) + auth_chains.update(got_auth_chain) + state_group = { + (x.type, x.state_key): x.event_id for x in state + } + state_groups.append(state_group) # Resolve any conflicting state def fetch(ev_ids): @@ -483,20 +487,21 @@ class FederationHandler(BaseHandler): "[%s %s] Handling received prev_event %s", room_id, event_id, ev.event_id, ) - try: - yield self.on_receive_pdu( - origin, - ev, - sent_to_us_directly=False, - ) - except FederationError as e: - if e.code == 403: - logger.warn( - "[%s %s] Received prev_event %s failed history check.", - room_id, event_id, ev.event_id, + with logcontext.nested_logging_context(ev.event_id): + try: + yield self.on_receive_pdu( + origin, + ev, + sent_to_us_directly=False, ) - else: - raise + except FederationError as e: + if e.code == 403: + logger.warn( + "[%s %s] Received prev_event %s failed history check.", + room_id, event_id, ev.event_id, + ) + else: + raise @defer.inlineCallbacks def _process_received_pdu(self, origin, event, state, auth_chain): @@ -1135,7 +1140,8 @@ class FederationHandler(BaseHandler): try: logger.info("Processing queued PDU %s which was received " "while we were joining %s", p.event_id, p.room_id) - yield self.on_receive_pdu(origin, p, sent_to_us_directly=True) + with logcontext.nested_logging_context(p.event_id): + yield self.on_receive_pdu(origin, p, sent_to_us_directly=True) except Exception as e: logger.warn( "Error handling queued PDU %s from %s: %s", @@ -1581,15 +1587,22 @@ class FederationHandler(BaseHandler): Notifies about the events where appropriate. """ - contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults( - [ - logcontext.run_in_background( - self._prep_event, + + @defer.inlineCallbacks + def prep(ev_info): + event = ev_info["event"] + with logcontext.nested_logging_context(suffix=event.event_id): + res = yield self._prep_event( origin, - ev_info["event"], + event, state=ev_info.get("state"), auth_events=ev_info.get("auth_events"), ) + defer.returnValue(res) + + contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults( + [ + logcontext.run_in_background(prep, ev_info) for ev_info in event_infos ], consumeErrors=True, )) diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index a0c2d37610..89224b26cc 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -200,7 +200,7 @@ class LoggingContext(object): sentinel = Sentinel() - def __init__(self, name=None, parent_context=None): + def __init__(self, name=None, parent_context=None, request=None): self.previous_context = LoggingContext.current_context() self.name = name @@ -218,6 +218,13 @@ class LoggingContext(object): self.parent_context = parent_context + if self.parent_context is not None: + self.parent_context.copy_to(self) + + if request is not None: + # the request param overrides the request from the parent context + self.request = request + def __str__(self): return "%s@%x" % (self.name, id(self)) @@ -256,9 +263,6 @@ class LoggingContext(object): ) self.alive = True - if self.parent_context is not None: - self.parent_context.copy_to(self) - return self def __exit__(self, type, value, traceback): @@ -439,6 +443,35 @@ class PreserveLoggingContext(object): ) +def nested_logging_context(suffix, parent_context=None): + """Creates a new logging context as a child of another. + + The nested logging context will have a 'request' made up of the parent context's + request, plus the given suffix. + + CPU/db usage stats will be added to the parent context's on exit. + + Normal usage looks like: + + with nested_logging_context(suffix): + # ... do stuff + + Args: + suffix (str): suffix to add to the parent context's 'request'. + parent_context (LoggingContext|None): parent context. Will use the current context + if None. + + Returns: + LoggingContext: new logging context. + """ + if parent_context is None: + parent_context = LoggingContext.current_context() + return LoggingContext( + parent_context=parent_context, + request=parent_context.request + "-" + suffix, + ) + + def preserve_fn(f): """Function decorator which wraps the function with run_in_background""" def g(*args, **kwargs): diff --git a/tests/test_federation.py b/tests/test_federation.py index 2540604fcc..ff55c7a627 100644 --- a/tests/test_federation.py +++ b/tests/test_federation.py @@ -6,6 +6,7 @@ from twisted.internet.defer import maybeDeferred, succeed from synapse.events import FrozenEvent from synapse.types import Requester, UserID from synapse.util import Clock +from synapse.util.logcontext import LoggingContext from tests import unittest from tests.server import ThreadedMemoryReactorClock, setup_test_homeserver @@ -117,9 +118,10 @@ class MessageAcceptTests(unittest.TestCase): } ) - d = self.handler.on_receive_pdu( - "test.serv", lying_event, sent_to_us_directly=True - ) + with LoggingContext(request="lying_event"): + d = self.handler.on_receive_pdu( + "test.serv", lying_event, sent_to_us_directly=True + ) # Step the reactor, so the database fetches come back self.reactor.advance(1) @@ -209,11 +211,12 @@ class MessageAcceptTests(unittest.TestCase): } ) - d = self.handler.on_receive_pdu( - "test.serv", good_event, sent_to_us_directly=True - ) - self.reactor.advance(1) - self.assertEqual(self.successResultOf(d), None) + with LoggingContext(request="good_event"): + d = self.handler.on_receive_pdu( + "test.serv", good_event, sent_to_us_directly=True + ) + self.reactor.advance(1) + self.assertEqual(self.successResultOf(d), None) bad_event = FrozenEvent( { @@ -230,10 +233,11 @@ class MessageAcceptTests(unittest.TestCase): } ) - d = self.handler.on_receive_pdu( - "test.serv", bad_event, sent_to_us_directly=True - ) - self.reactor.advance(1) + with LoggingContext(request="bad_event"): + d = self.handler.on_receive_pdu( + "test.serv", bad_event, sent_to_us_directly=True + ) + self.reactor.advance(1) extrem = maybeDeferred( self.homeserver.datastore.get_latest_event_ids_in_room, self.room_id diff --git a/tests/util/test_logcontext.py b/tests/util/test_logcontext.py index 4633db77b3..8adaee3c8d 100644 --- a/tests/util/test_logcontext.py +++ b/tests/util/test_logcontext.py @@ -159,6 +159,11 @@ class LoggingContextTestCase(unittest.TestCase): self.assertEqual(r, "bum") self._check_test_key("one") + def test_nested_logging_context(self): + with LoggingContext(request="foo"): + nested_context = logcontext.nested_logging_context(suffix="bar") + self.assertEqual(nested_context.request, "foo-bar") + # a function which returns a deferred which has been "called", but # which had a function which returned another incomplete deferred on -- cgit 1.5.1 From 8ea887856c1bcb97c90d88fe20f7f82ed7f48967 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 1 Oct 2018 11:59:52 +0100 Subject: Don't update eviction metrics on explicit removal --- synapse/util/caches/expiringcache.py | 5 ----- 1 file changed, 5 deletions(-) (limited to 'synapse/util') diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py index 8ac56d85ff..f2f55ba6c9 100644 --- a/synapse/util/caches/expiringcache.py +++ b/synapse/util/caches/expiringcache.py @@ -103,11 +103,6 @@ class ExpiringCache(object): if value is SENTINEL: return default - if self.iterable: - self.metrics.inc_evictions(len(value.value)) - else: - self.metrics.inc_evictions() - return value def __contains__(self, key): -- cgit 1.5.1 From 4f3e3ac192f2c5ff37198f50af99bc8fbd57beca Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 1 Oct 2018 12:25:27 +0100 Subject: Correctly match 'dict.pop' api --- synapse/util/caches/expiringcache.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) (limited to 'synapse/util') diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py index f2f55ba6c9..f369780277 100644 --- a/synapse/util/caches/expiringcache.py +++ b/synapse/util/caches/expiringcache.py @@ -98,10 +98,18 @@ class ExpiringCache(object): return entry.value - def pop(self, key, default=None): - value = self._cache.pop(key, SENTINEL) + def pop(self, key, default=SENTINEL): + """Removes and returns the value with the given key from the cache. + + If the key isn't in the cache then `default` will be returned if + specified, otherwise `KeyError` will get raised. + + Identical functionality to `dict.pop(..)`. + """ + + value = self._cache.pop(key, default) if value is SENTINEL: - return default + raise KeyError(key) return value -- cgit 1.5.1