From 476899295f5fd6cff64799bcbc84cd4bf9005e33 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 8 May 2015 16:32:18 +0100 Subject: Change the way we do logging contexts so that they survive divergences --- synapse/util/__init__.py | 8 +++++--- synapse/util/async.py | 6 ++---- synapse/util/logcontext.py | 31 +++++++++++++++++++++++++++++++ 3 files changed, 38 insertions(+), 7 deletions(-) (limited to 'synapse/util') diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 79109d0b19..364b927851 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.util.logcontext import LoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from twisted.internet import defer, reactor, task @@ -50,8 +50,10 @@ class Clock(object): current_context = LoggingContext.current_context() def wrapped_callback(): - LoggingContext.thread_local.current_context = current_context - callback() + with PreserveLoggingContext(): + LoggingContext.thread_local.current_context = current_context + callback() + return reactor.callLater(delay, wrapped_callback) def cancel_call_later(self, timer): diff --git a/synapse/util/async.py b/synapse/util/async.py index d8febdb90c..f78395a431 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -16,15 +16,13 @@ from twisted.internet import defer, reactor -from .logcontext import PreserveLoggingContext +from .logcontext import preserve_context_over_deferred -@defer.inlineCallbacks def sleep(seconds): d = defer.Deferred() reactor.callLater(seconds, d.callback, seconds) - with PreserveLoggingContext(): - yield d + return preserve_context_over_deferred(d) def run_on_reactor(): diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index da7872e95d..192e3f49f0 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +from twisted.internet import defer + import threading import logging @@ -129,3 +131,32 @@ class PreserveLoggingContext(object): def __exit__(self, type, value, traceback): """Restores the current logging context""" LoggingContext.thread_local.current_context = self.current_context + + +def preserve_context_over_fn(fn, *args, **kwargs): + with PreserveLoggingContext(): + deferred = fn(*args, **kwargs) + + return preserve_context_over_deferred(deferred) + + +def preserve_context_over_deferred(deferred): + d = defer.Deferred() + + current_context = LoggingContext.current_context() + + def cb(res): + with PreserveLoggingContext(): + LoggingContext.thread_local.current_context = current_context + res = d.callback(res) + return res + + def eb(failure): + with PreserveLoggingContext(): + LoggingContext.thread_local.current_context = current_context + res = d.errback(failure) + return res + + deferred.addCallbacks(cb, eb) + + return d -- cgit 1.4.1 From da1aa07db5f29436aeaae8d0dcbfe9872b4a7233 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 8 May 2015 16:52:49 +0100 Subject: Add some docs --- synapse/util/logcontext.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) (limited to 'synapse/util') diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index 192e3f49f0..3dce8d2bf3 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -134,13 +134,25 @@ class PreserveLoggingContext(object): def preserve_context_over_fn(fn, *args, **kwargs): + """Takes a function and invokes it with the given arguments, but removes + and restores the current logging context while doing so. + + If the result is a deferred, call preserve_context_over_deferred before + returning it. + """ with PreserveLoggingContext(): - deferred = fn(*args, **kwargs) + res = fn(*args, **kwargs) - return preserve_context_over_deferred(deferred) + if isinstance(res, defer.Deferred): + return preserve_context_over_deferred(res) + else: + return res def preserve_context_over_deferred(deferred): + """Given a deferred wrap it such that any callbacks added later to it will + be invoked with the current context. + """ d = defer.Deferred() current_context = LoggingContext.current_context() -- cgit 1.4.1 From 2236ef6c92b7964665f5c43b941754d70aa506d8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 8 May 2015 19:53:34 +0100 Subject: Fix up leak. Add warnings. --- synapse/handlers/_base.py | 11 +++++++---- synapse/handlers/federation.py | 29 ++++++++++++++++----------- synapse/handlers/presence.py | 10 ++++++---- synapse/handlers/typing.py | 4 +++- synapse/http/server.py | 6 ++++-- synapse/util/__init__.py | 3 ++- synapse/util/distributor.py | 45 ++++++++++++++++++++---------------------- synapse/util/logcontext.py | 11 ++++++++++- 8 files changed, 70 insertions(+), 49 deletions(-) (limited to 'synapse/util') diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 4b3f4eadab..ddc5c21e7d 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -20,6 +20,8 @@ from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.api.constants import Membership, EventTypes from synapse.types import UserID +from synapse.util.logcontext import PreserveLoggingContext + import logging @@ -137,10 +139,11 @@ class BaseHandler(object): "Failed to get destination from event %s", s.event_id ) - # Don't block waiting on waking up all the listeners. - notify_d = self.notifier.on_new_room_event( - event, extra_users=extra_users - ) + with PreserveLoggingContext(): + # Don't block waiting on waking up all the listeners. + notify_d = self.notifier.on_new_room_event( + event, extra_users=extra_users + ) def log_failure(f): logger.warn( diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 85e2757227..77c315c47c 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -21,6 +21,7 @@ from synapse.api.errors import ( AuthError, FederationError, StoreError, ) from synapse.api.constants import EventTypes, Membership, RejectedReason +from synapse.util.logcontext import PreserveLoggingContext from synapse.util.logutils import log_function from synapse.util.async import run_on_reactor from synapse.util.frozenutils import unfreeze @@ -197,9 +198,10 @@ class FederationHandler(BaseHandler): target_user = UserID.from_string(target_user_id) extra_users.append(target_user) - d = self.notifier.on_new_room_event( - event, extra_users=extra_users - ) + with PreserveLoggingContext(): + d = self.notifier.on_new_room_event( + event, extra_users=extra_users + ) def log_failure(f): logger.warn( @@ -431,9 +433,10 @@ class FederationHandler(BaseHandler): auth_events=auth_events, ) - d = self.notifier.on_new_room_event( - new_event, extra_users=[joinee] - ) + with PreserveLoggingContext(): + d = self.notifier.on_new_room_event( + new_event, extra_users=[joinee] + ) def log_failure(f): logger.warn( @@ -512,9 +515,10 @@ class FederationHandler(BaseHandler): target_user = UserID.from_string(target_user_id) extra_users.append(target_user) - d = self.notifier.on_new_room_event( - event, extra_users=extra_users - ) + with PreserveLoggingContext(): + d = self.notifier.on_new_room_event( + event, extra_users=extra_users + ) def log_failure(f): logger.warn( @@ -594,9 +598,10 @@ class FederationHandler(BaseHandler): ) target_user = UserID.from_string(event.state_key) - d = self.notifier.on_new_room_event( - event, extra_users=[target_user], - ) + with PreserveLoggingContext(): + d = self.notifier.on_new_room_event( + event, extra_users=[target_user], + ) def log_failure(f): logger.warn( diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 6ae39a1d37..1edab05492 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -18,6 +18,7 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, AuthError from synapse.api.constants import PresenceState +from synapse.util.logcontext import PreserveLoggingContext from synapse.util.logutils import log_function from synapse.types import UserID import synapse.metrics @@ -808,10 +809,11 @@ class PresenceHandler(BaseHandler): def push_update_to_clients(self, observed_user, users_to_push=[], room_ids=[], statuscache=None): - self.notifier.on_new_user_event( - users_to_push, - room_ids, - ) + with PreserveLoggingContext(): + self.notifier.on_new_user_event( + users_to_push, + room_ids, + ) class PresenceEventSource(object): diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index c0b2bd7db0..64fe51aa3e 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -18,6 +18,7 @@ from twisted.internet import defer from ._base import BaseHandler from synapse.api.errors import SynapseError, AuthError +from synapse.util.logcontext import PreserveLoggingContext from synapse.types import UserID import logging @@ -216,7 +217,8 @@ class TypingNotificationHandler(BaseHandler): self._latest_room_serial += 1 self._room_serials[room_id] = self._latest_room_serial - self.notifier.on_new_user_event(rooms=[room_id]) + with PreserveLoggingContext(): + self.notifier.on_new_user_event(rooms=[room_id]) class TypingNotificationEventSource(object): diff --git a/synapse/http/server.py b/synapse/http/server.py index 93ecbd7589..73efbff4f2 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -17,7 +17,7 @@ from synapse.api.errors import ( cs_exception, SynapseError, CodeMessageException, UnrecognizedRequestError ) -from synapse.util.logcontext import LoggingContext +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext import synapse.metrics from syutil.jsonutil import ( @@ -85,7 +85,9 @@ def request_handler(request_handler): "Received request: %s %s", request.method, request.path ) - yield request_handler(self, request) + d = request_handler(self, request) + with PreserveLoggingContext(): + yield d code = request.code except CodeMessageException as e: code = e.code diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 364b927851..fd3eb1f574 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -54,7 +54,8 @@ class Clock(object): LoggingContext.thread_local.current_context = current_context callback() - return reactor.callLater(delay, wrapped_callback) + with PreserveLoggingContext(): + return reactor.callLater(delay, wrapped_callback) def cancel_call_later(self, timer): timer.cancel() diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py index 9d9c350397..5b150cb0e5 100644 --- a/synapse/util/distributor.py +++ b/synapse/util/distributor.py @@ -13,8 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.util.logcontext import PreserveLoggingContext - from twisted.internet import defer import logging @@ -93,7 +91,6 @@ class Signal(object): Each observer callable may return a Deferred.""" self.observers.append(observer) - @defer.inlineCallbacks def fire(self, *args, **kwargs): """Invokes every callable in the observer list, passing in the args and kwargs. Exceptions thrown by observers are logged but ignored. It is @@ -101,24 +98,24 @@ class Signal(object): Returns a Deferred that will complete when all the observers have completed.""" - with PreserveLoggingContext(): - deferreds = [] - for observer in self.observers: - d = defer.maybeDeferred(observer, *args, **kwargs) - - def eb(failure): - logger.warning( - "%s signal observer %s failed: %r", - self.name, observer, failure, - exc_info=( - failure.type, - failure.value, - failure.getTracebackObject())) - if not self.suppress_failures: - failure.raiseException() - deferreds.append(d.addErrback(eb)) - results = [] - for deferred in deferreds: - result = yield deferred - results.append(result) - defer.returnValue(results) + + def eb(failure): + logger.warning( + "%s signal observer %s failed: %r", + self.name, observer, failure, + exc_info=( + failure.type, + failure.value, + failure.getTracebackObject())) + if not self.suppress_failures: + failure.raiseException() + + deferreds = [ + defer.maybeDeferred(observer, *args, **kwargs) + for observer in self.observers + ] + + d = defer.gatherResults(deferreds, consumeErrors=True) + d.addErrback(eb) + + return d diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index 3dce8d2bf3..a92d518b43 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -132,6 +132,13 @@ class PreserveLoggingContext(object): """Restores the current logging context""" LoggingContext.thread_local.current_context = self.current_context + if self.current_context is not LoggingContext.sentinel: + if self.current_context.parent_context is None: + logger.warn( + "Restoring dead context: %s", + self.current_context, + ) + def preserve_context_over_fn(fn, *args, **kwargs): """Takes a function and invokes it with the given arguments, but removes @@ -169,6 +176,8 @@ def preserve_context_over_deferred(deferred): res = d.errback(failure) return res - deferred.addCallbacks(cb, eb) + if deferred.called: + return deferred + deferred.addCallbacks(cb, eb) return d -- cgit 1.4.1 From 95dedb866f04ee4ae034c35130f2a8dc86243fbb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 12 May 2015 13:14:29 +0100 Subject: Unwrap defer.gatherResults failures --- synapse/federation/federation_base.py | 4 +++- synapse/handlers/federation.py | 3 ++- synapse/handlers/message.py | 5 +++-- synapse/handlers/profile.py | 3 ++- synapse/handlers/room.py | 4 ++-- synapse/util/__init__.py | 6 ++++++ 6 files changed, 18 insertions(+), 7 deletions(-) (limited to 'synapse/util') diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index 21a763214b..5217d91aab 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -24,6 +24,8 @@ from synapse.crypto.event_signing import check_event_content_hash from synapse.api.errors import SynapseError +from synapse.util import unwrapFirstError + import logging @@ -94,7 +96,7 @@ class FederationBase(object): yield defer.gatherResults( [do(pdu) for pdu in pdus], consumeErrors=True - ) + ).addErrback(unwrapFirstError) defer.returnValue(signed_pdus) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 77c315c47c..cd85001578 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -21,6 +21,7 @@ from synapse.api.errors import ( AuthError, FederationError, StoreError, ) from synapse.api.constants import EventTypes, Membership, RejectedReason +from synapse.util import unwrapFirstError from synapse.util.logcontext import PreserveLoggingContext from synapse.util.logutils import log_function from synapse.util.async import run_on_reactor @@ -926,7 +927,7 @@ class FederationHandler(BaseHandler): if d in have_events and not have_events[d] ], consumeErrors=True - ) + ).addErrback(unwrapFirstError) if different_events: local_view = dict(auth_events) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 22e19af17f..b7d52647d7 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -20,6 +20,7 @@ from synapse.api.errors import RoomError, SynapseError from synapse.streams.config import PaginationConfig from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator +from synapse.util import unwrapFirstError from synapse.util.logcontext import PreserveLoggingContext from synapse.types import UserID @@ -303,7 +304,7 @@ class MessageHandler(BaseHandler): event.room_id ), ] - ) + ).addErrback(unwrapFirstError) start_token = now_token.copy_and_replace("room_key", token[0]) end_token = now_token.copy_and_replace("room_key", token[1]) @@ -328,7 +329,7 @@ class MessageHandler(BaseHandler): yield defer.gatherResults( [handle_room(e) for e in room_list], consumeErrors=True - ) + ).addErrback(unwrapFirstError) ret = { "rooms": rooms_ret, diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index ffb449d457..71ff78ab23 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -18,6 +18,7 @@ from twisted.internet import defer from synapse.api.errors import SynapseError, AuthError, CodeMessageException from synapse.api.constants import EventTypes, Membership from synapse.types import UserID +from synapse.util import unwrapFirstError from ._base import BaseHandler @@ -159,7 +160,7 @@ class ProfileHandler(BaseHandler): self.store.get_profile_avatar_url(user.localpart), ], consumeErrors=True - ) + ).addErrback(unwrapFirstError) state["displayname"] = displayname state["avatar_url"] = avatar_url diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index cfa2e38ed2..ea5abba6ab 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -21,7 +21,7 @@ from ._base import BaseHandler from synapse.types import UserID, RoomAlias, RoomID from synapse.api.constants import EventTypes, Membership, JoinRules from synapse.api.errors import StoreError, SynapseError -from synapse.util import stringutils +from synapse.util import stringutils, unwrapFirstError from synapse.util.async import run_on_reactor from synapse.events.utils import serialize_event @@ -537,7 +537,7 @@ class RoomListHandler(BaseHandler): for room in chunk ], consumeErrors=True, - ) + ).addErrback(unwrapFirstError) for i, room in enumerate(chunk): room["num_joined_members"] = len(results[i]) diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index fd3eb1f574..c1a16b639a 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -23,6 +23,12 @@ import logging logger = logging.getLogger(__name__) +def unwrapFirstError(failure): + # defer.gatherResults and DeferredLists wrap failures. + failure.trap(defer.FirstError) + return failure.value.subFailure + + class Clock(object): """A small utility that obtains current time-of-day so that time may be mocked during unit-tests. -- cgit 1.4.1 From 8022b27fc26bd2127019f5179c8956ea475dd284 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 12 May 2015 13:14:48 +0100 Subject: Make distributer.fire work as it did --- synapse/util/distributor.py | 50 +++++++++++++++++++++++++++------------------ 1 file changed, 30 insertions(+), 20 deletions(-) (limited to 'synapse/util') diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py index 5b150cb0e5..064c4a7a1e 100644 --- a/synapse/util/distributor.py +++ b/synapse/util/distributor.py @@ -15,6 +15,12 @@ from twisted.internet import defer +from synapse.util.logcontext import ( + PreserveLoggingContext, preserve_context_over_deferred, +) + +from synapse.util import unwrapFirstError + import logging @@ -99,23 +105,27 @@ class Signal(object): Returns a Deferred that will complete when all the observers have completed.""" - def eb(failure): - logger.warning( - "%s signal observer %s failed: %r", - self.name, observer, failure, - exc_info=( - failure.type, - failure.value, - failure.getTracebackObject())) - if not self.suppress_failures: - failure.raiseException() - - deferreds = [ - defer.maybeDeferred(observer, *args, **kwargs) - for observer in self.observers - ] - - d = defer.gatherResults(deferreds, consumeErrors=True) - d.addErrback(eb) - - return d + def do(observer): + def eb(failure): + logger.warning( + "%s signal observer %s failed: %r", + self.name, observer, failure, + exc_info=( + failure.type, + failure.value, + failure.getTracebackObject())) + if not self.suppress_failures: + return failure + return defer.maybeDeferred(observer, *args, **kwargs).addErrback(eb) + + with PreserveLoggingContext(): + deferreds = [ + do(observer) + for observer in self.observers + ] + + d = defer.gatherResults(deferreds, consumeErrors=True) + + d.addErrback(unwrapFirstError) + + return preserve_context_over_deferred(d) -- cgit 1.4.1