From 476899295f5fd6cff64799bcbc84cd4bf9005e33 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 8 May 2015 16:32:18 +0100 Subject: Change the way we do logging contexts so that they survive divergences --- synapse/http/client.py | 6 +++++- synapse/http/matrixfederationclient.py | 32 ++++++++++++++++---------------- 2 files changed, 21 insertions(+), 17 deletions(-) (limited to 'synapse/http') diff --git a/synapse/http/client.py b/synapse/http/client.py index e8a5dedab4..5b3cefb2dc 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -14,6 +14,7 @@ # limitations under the License. from synapse.api.errors import CodeMessageException +from synapse.util.logcontext import preserve_context_over_fn from syutil.jsonutil import encode_canonical_json import synapse.metrics @@ -61,7 +62,10 @@ class SimpleHttpClient(object): # A small wrapper around self.agent.request() so we can easily attach # counters to it outgoing_requests_counter.inc(method) - d = self.agent.request(method, *args, **kwargs) + d = preserve_context_over_fn( + self.agent.request, + method, *args, **kwargs + ) def _cb(response): incoming_responses_counter.inc(method, response.code) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 7fa295cad5..c99d237c73 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -22,7 +22,7 @@ from twisted.web._newclient import ResponseDone from synapse.http.endpoint import matrix_federation_endpoint from synapse.util.async import sleep -from synapse.util.logcontext import PreserveLoggingContext +from synapse.util.logcontext import preserve_context_over_fn import synapse.metrics from syutil.jsonutil import encode_canonical_json @@ -144,22 +144,22 @@ class MatrixFederationHttpClient(object): producer = body_callback(method, url_bytes, headers_dict) try: - with PreserveLoggingContext(): - request_deferred = self.agent.request( - destination, - endpoint, - method, - path_bytes, - param_bytes, - query_bytes, - Headers(headers_dict), - producer - ) + request_deferred = preserve_context_over_fn( + self.agent.request, + destination, + endpoint, + method, + path_bytes, + param_bytes, + query_bytes, + Headers(headers_dict), + producer + ) - response = yield self.clock.time_bound_deferred( - request_deferred, - time_out=60, - ) + response = yield self.clock.time_bound_deferred( + request_deferred, + time_out=60, + ) logger.debug("Got response to %s", method) break -- cgit 1.5.1 From 2236ef6c92b7964665f5c43b941754d70aa506d8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 8 May 2015 19:53:34 +0100 Subject: Fix up leak. Add warnings. --- synapse/handlers/_base.py | 11 +++++++---- synapse/handlers/federation.py | 29 ++++++++++++++++----------- synapse/handlers/presence.py | 10 ++++++---- synapse/handlers/typing.py | 4 +++- synapse/http/server.py | 6 ++++-- synapse/util/__init__.py | 3 ++- synapse/util/distributor.py | 45 ++++++++++++++++++++---------------------- synapse/util/logcontext.py | 11 ++++++++++- 8 files changed, 70 insertions(+), 49 deletions(-) (limited to 'synapse/http') diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 4b3f4eadab..ddc5c21e7d 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -20,6 +20,8 @@ from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.api.constants import Membership, EventTypes from synapse.types import UserID +from synapse.util.logcontext import PreserveLoggingContext + import logging @@ -137,10 +139,11 @@ class BaseHandler(object): "Failed to get destination from event %s", s.event_id ) - # Don't block waiting on waking up all the listeners. - notify_d = self.notifier.on_new_room_event( - event, extra_users=extra_users - ) + with PreserveLoggingContext(): + # Don't block waiting on waking up all the listeners. + notify_d = self.notifier.on_new_room_event( + event, extra_users=extra_users + ) def log_failure(f): logger.warn( diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 85e2757227..77c315c47c 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -21,6 +21,7 @@ from synapse.api.errors import ( AuthError, FederationError, StoreError, ) from synapse.api.constants import EventTypes, Membership, RejectedReason +from synapse.util.logcontext import PreserveLoggingContext from synapse.util.logutils import log_function from synapse.util.async import run_on_reactor from synapse.util.frozenutils import unfreeze @@ -197,9 +198,10 @@ class FederationHandler(BaseHandler): target_user = UserID.from_string(target_user_id) extra_users.append(target_user) - d = self.notifier.on_new_room_event( - event, extra_users=extra_users - ) + with PreserveLoggingContext(): + d = self.notifier.on_new_room_event( + event, extra_users=extra_users + ) def log_failure(f): logger.warn( @@ -431,9 +433,10 @@ class FederationHandler(BaseHandler): auth_events=auth_events, ) - d = self.notifier.on_new_room_event( - new_event, extra_users=[joinee] - ) + with PreserveLoggingContext(): + d = self.notifier.on_new_room_event( + new_event, extra_users=[joinee] + ) def log_failure(f): logger.warn( @@ -512,9 +515,10 @@ class FederationHandler(BaseHandler): target_user = UserID.from_string(target_user_id) extra_users.append(target_user) - d = self.notifier.on_new_room_event( - event, extra_users=extra_users - ) + with PreserveLoggingContext(): + d = self.notifier.on_new_room_event( + event, extra_users=extra_users + ) def log_failure(f): logger.warn( @@ -594,9 +598,10 @@ class FederationHandler(BaseHandler): ) target_user = UserID.from_string(event.state_key) - d = self.notifier.on_new_room_event( - event, extra_users=[target_user], - ) + with PreserveLoggingContext(): + d = self.notifier.on_new_room_event( + event, extra_users=[target_user], + ) def log_failure(f): logger.warn( diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 6ae39a1d37..1edab05492 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -18,6 +18,7 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, AuthError from synapse.api.constants import PresenceState +from synapse.util.logcontext import PreserveLoggingContext from synapse.util.logutils import log_function from synapse.types import UserID import synapse.metrics @@ -808,10 +809,11 @@ class PresenceHandler(BaseHandler): def push_update_to_clients(self, observed_user, users_to_push=[], room_ids=[], statuscache=None): - self.notifier.on_new_user_event( - users_to_push, - room_ids, - ) + with PreserveLoggingContext(): + self.notifier.on_new_user_event( + users_to_push, + room_ids, + ) class PresenceEventSource(object): diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index c0b2bd7db0..64fe51aa3e 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -18,6 +18,7 @@ from twisted.internet import defer from ._base import BaseHandler from synapse.api.errors import SynapseError, AuthError +from synapse.util.logcontext import PreserveLoggingContext from synapse.types import UserID import logging @@ -216,7 +217,8 @@ class TypingNotificationHandler(BaseHandler): self._latest_room_serial += 1 self._room_serials[room_id] = self._latest_room_serial - self.notifier.on_new_user_event(rooms=[room_id]) + with PreserveLoggingContext(): + self.notifier.on_new_user_event(rooms=[room_id]) class TypingNotificationEventSource(object): diff --git a/synapse/http/server.py b/synapse/http/server.py index 93ecbd7589..73efbff4f2 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -17,7 +17,7 @@ from synapse.api.errors import ( cs_exception, SynapseError, CodeMessageException, UnrecognizedRequestError ) -from synapse.util.logcontext import LoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext import synapse.metrics from syutil.jsonutil import ( @@ -85,7 +85,9 @@ def request_handler(request_handler): "Received request: %s %s", request.method, request.path ) - yield request_handler(self, request) + d = request_handler(self, request) + with PreserveLoggingContext(): + yield d code = request.code except CodeMessageException as e: code = e.code diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 364b927851..fd3eb1f574 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -54,7 +54,8 @@ class Clock(object): LoggingContext.thread_local.current_context = current_context callback() - return reactor.callLater(delay, wrapped_callback) + with PreserveLoggingContext(): + return reactor.callLater(delay, wrapped_callback) def cancel_call_later(self, timer): timer.cancel() diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py index 9d9c350397..5b150cb0e5 100644 --- a/synapse/util/distributor.py +++ b/synapse/util/distributor.py @@ -13,8 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.util.logcontext import PreserveLoggingContext - from twisted.internet import defer import logging @@ -93,7 +91,6 @@ class Signal(object): Each observer callable may return a Deferred.""" self.observers.append(observer) - @defer.inlineCallbacks def fire(self, *args, **kwargs): """Invokes every callable in the observer list, passing in the args and kwargs. Exceptions thrown by observers are logged but ignored. It is @@ -101,24 +98,24 @@ class Signal(object): Returns a Deferred that will complete when all the observers have completed.""" - with PreserveLoggingContext(): - deferreds = [] - for observer in self.observers: - d = defer.maybeDeferred(observer, *args, **kwargs) - - def eb(failure): - logger.warning( - "%s signal observer %s failed: %r", - self.name, observer, failure, - exc_info=( - failure.type, - failure.value, - failure.getTracebackObject())) - if not self.suppress_failures: - failure.raiseException() - deferreds.append(d.addErrback(eb)) - results = [] - for deferred in deferreds: - result = yield deferred - results.append(result) - defer.returnValue(results) + + def eb(failure): + logger.warning( + "%s signal observer %s failed: %r", + self.name, observer, failure, + exc_info=( + failure.type, + failure.value, + failure.getTracebackObject())) + if not self.suppress_failures: + failure.raiseException() + + deferreds = [ + defer.maybeDeferred(observer, *args, **kwargs) + for observer in self.observers + ] + + d = defer.gatherResults(deferreds, consumeErrors=True) + d.addErrback(eb) + + return d diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index 3dce8d2bf3..a92d518b43 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -132,6 +132,13 @@ class PreserveLoggingContext(object): """Restores the current logging context""" LoggingContext.thread_local.current_context = self.current_context + if self.current_context is not LoggingContext.sentinel: + if self.current_context.parent_context is None: + logger.warn( + "Restoring dead context: %s", + self.current_context, + ) + def preserve_context_over_fn(fn, *args, **kwargs): """Takes a function and invokes it with the given arguments, but removes @@ -169,6 +176,8 @@ def preserve_context_over_deferred(deferred): res = d.errback(failure) return res - deferred.addCallbacks(cb, eb) + if deferred.called: + return deferred + deferred.addCallbacks(cb, eb) return d -- cgit 1.5.1 From 5b1631a4a9ad4c1ed0adaff3ffc8238014359e95 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 19 May 2015 14:53:32 +0100 Subject: Add a timeout param to get_event --- synapse/federation/federation_base.py | 1 + synapse/federation/federation_client.py | 23 ++++++++++++++--------- synapse/federation/transport/client.py | 4 ++-- synapse/http/matrixfederationclient.py | 13 ++++++++----- 4 files changed, 25 insertions(+), 16 deletions(-) (limited to 'synapse/http') diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index 5217d91aab..f0430b2cb1 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -80,6 +80,7 @@ class FederationBase(object): destinations=[pdu.origin], event_id=pdu.event_id, outlier=outlier, + timeout=10000, ) if new_pdu: diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 904c7c0945..a163b2674d 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -22,6 +22,7 @@ from .units import Edu from synapse.api.errors import ( CodeMessageException, HttpResponseException, SynapseError, ) +from synapse.util import unwrapFirstError from synapse.util.expiringcache import ExpiringCache from synapse.util.logutils import log_function from synapse.events import FrozenEvent @@ -173,7 +174,7 @@ class FederationClient(FederationBase): @defer.inlineCallbacks @log_function - def get_pdu(self, destinations, event_id, outlier=False): + def get_pdu(self, destinations, event_id, outlier=False, timeout=None): """Requests the PDU with given origin and ID from the remote home servers. @@ -212,7 +213,7 @@ class FederationClient(FederationBase): with limiter: transaction_data = yield self.transport_layer.get_event( - destination, event_id + destination, event_id, timeout=timeout, ) logger.debug("transaction_data %r", transaction_data) @@ -370,13 +371,17 @@ class FederationClient(FederationBase): for p in content.get("auth_chain", []) ] - signed_state = yield self._check_sigs_and_hash_and_fetch( - destination, state, outlier=True - ) - - signed_auth = yield self._check_sigs_and_hash_and_fetch( - destination, auth_chain, outlier=True - ) + signed_state, signed_auth = yield defer.gatherResults( + [ + self._check_sigs_and_hash_and_fetch( + destination, state, outlier=True + ), + self._check_sigs_and_hash_and_fetch( + destination, auth_chain, outlier=True + ) + ], + consumeErrors=True + ).addErrback(unwrapFirstError) auth_chain.sort(key=lambda e: e.depth) diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 80d03012b7..c2b53b78b2 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -50,7 +50,7 @@ class TransportLayerClient(object): ) @log_function - def get_event(self, destination, event_id): + def get_event(self, destination, event_id, timeout=None): """ Requests the pdu with give id and origin from the given server. Args: @@ -65,7 +65,7 @@ class TransportLayerClient(object): destination, event_id) path = PREFIX + "/event/%s/" % (event_id, ) - return self.client.get_json(destination, path=path) + return self.client.get_json(destination, path=path, timeout=timeout) @log_function def backfill(self, destination, room_id, event_tuples, limit): diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index c99d237c73..312bbcc6b8 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -110,7 +110,8 @@ class MatrixFederationHttpClient(object): @defer.inlineCallbacks def _create_request(self, destination, method, path_bytes, body_callback, headers_dict={}, param_bytes=b"", - query_bytes=b"", retry_on_dns_fail=True): + query_bytes=b"", retry_on_dns_fail=True, + timeout=None): """ Creates and sends a request to the given url """ headers_dict[b"User-Agent"] = [self.version_string] @@ -158,7 +159,7 @@ class MatrixFederationHttpClient(object): response = yield self.clock.time_bound_deferred( request_deferred, - time_out=60, + time_out=timeout/1000. if timeout else 60, ) logger.debug("Got response to %s", method) @@ -181,7 +182,7 @@ class MatrixFederationHttpClient(object): _flatten_response_never_received(e), ) - if retries_left: + if retries_left and not timeout: yield sleep(2 ** (5 - retries_left)) retries_left -= 1 else: @@ -334,7 +335,8 @@ class MatrixFederationHttpClient(object): defer.returnValue(json.loads(body)) @defer.inlineCallbacks - def get_json(self, destination, path, args={}, retry_on_dns_fail=True): + def get_json(self, destination, path, args={}, retry_on_dns_fail=True, + timeout=None): """ GETs some json from the given host homeserver and path Args: @@ -370,7 +372,8 @@ class MatrixFederationHttpClient(object): path.encode("ascii"), query_bytes=query_bytes, body_callback=body_callback, - retry_on_dns_fail=retry_on_dns_fail + retry_on_dns_fail=retry_on_dns_fail, + timeout=timeout, ) if 200 <= response.code < 300: -- cgit 1.5.1 From 284f55a7fbf597e32508301fe9571cd1b8523625 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 22 May 2015 15:18:04 +0100 Subject: Add doc strings --- synapse/federation/federation_client.py | 2 ++ synapse/federation/transport/client.py | 2 ++ synapse/http/matrixfederationclient.py | 3 +++ 3 files changed, 7 insertions(+) (limited to 'synapse/http') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index ecb6dbd770..3249060bcf 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -190,6 +190,8 @@ class FederationClient(FederationBase): outlier (bool): Indicates whether the PDU is an `outlier`, i.e. if it's from an arbitary point in the context as opposed to part of the current block of PDUs. Defaults to `False` + timeout (int): How long to try (in ms) each destination for before + moving to the next destination. None indicates no timeout. Returns: Deferred: Results in the requested PDU. diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index c2b53b78b2..610a4c3163 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -57,6 +57,8 @@ class TransportLayerClient(object): destination (str): The host name of the remote home server we want to get the state from. event_id (str): The id of the event being requested. + timeout (int): How long to try (in ms) the destination for before + giving up. None indicates no timeout. Returns: Deferred: Results in a dict received from the remote homeserver. diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 312bbcc6b8..6f976d5ce8 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -345,6 +345,9 @@ class MatrixFederationHttpClient(object): path (str): The HTTP path. args (dict): A dictionary used to create query strings, defaults to None. + timeout (int): How long to try (in ms) the destination for before + giving up. None indicates no timeout and that the request will + be retried. Returns: Deferred: Succeeds when we get *any* HTTP response. -- cgit 1.5.1