diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index 4b3f4eadab..ddc5c21e7d 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -20,6 +20,8 @@ from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.api.constants import Membership, EventTypes
from synapse.types import UserID
+from synapse.util.logcontext import PreserveLoggingContext
+
import logging
@@ -137,10 +139,11 @@ class BaseHandler(object):
"Failed to get destination from event %s", s.event_id
)
- # Don't block waiting on waking up all the listeners.
- notify_d = self.notifier.on_new_room_event(
- event, extra_users=extra_users
- )
+ with PreserveLoggingContext():
+ # Don't block waiting on waking up all the listeners.
+ notify_d = self.notifier.on_new_room_event(
+ event, extra_users=extra_users
+ )
def log_failure(f):
logger.warn(
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 85e2757227..77c315c47c 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -21,6 +21,7 @@ from synapse.api.errors import (
AuthError, FederationError, StoreError,
)
from synapse.api.constants import EventTypes, Membership, RejectedReason
+from synapse.util.logcontext import PreserveLoggingContext
from synapse.util.logutils import log_function
from synapse.util.async import run_on_reactor
from synapse.util.frozenutils import unfreeze
@@ -197,9 +198,10 @@ class FederationHandler(BaseHandler):
target_user = UserID.from_string(target_user_id)
extra_users.append(target_user)
- d = self.notifier.on_new_room_event(
- event, extra_users=extra_users
- )
+ with PreserveLoggingContext():
+ d = self.notifier.on_new_room_event(
+ event, extra_users=extra_users
+ )
def log_failure(f):
logger.warn(
@@ -431,9 +433,10 @@ class FederationHandler(BaseHandler):
auth_events=auth_events,
)
- d = self.notifier.on_new_room_event(
- new_event, extra_users=[joinee]
- )
+ with PreserveLoggingContext():
+ d = self.notifier.on_new_room_event(
+ new_event, extra_users=[joinee]
+ )
def log_failure(f):
logger.warn(
@@ -512,9 +515,10 @@ class FederationHandler(BaseHandler):
target_user = UserID.from_string(target_user_id)
extra_users.append(target_user)
- d = self.notifier.on_new_room_event(
- event, extra_users=extra_users
- )
+ with PreserveLoggingContext():
+ d = self.notifier.on_new_room_event(
+ event, extra_users=extra_users
+ )
def log_failure(f):
logger.warn(
@@ -594,9 +598,10 @@ class FederationHandler(BaseHandler):
)
target_user = UserID.from_string(event.state_key)
- d = self.notifier.on_new_room_event(
- event, extra_users=[target_user],
- )
+ with PreserveLoggingContext():
+ d = self.notifier.on_new_room_event(
+ event, extra_users=[target_user],
+ )
def log_failure(f):
logger.warn(
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 6ae39a1d37..1edab05492 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -18,6 +18,7 @@ from twisted.internet import defer
from synapse.api.errors import SynapseError, AuthError
from synapse.api.constants import PresenceState
+from synapse.util.logcontext import PreserveLoggingContext
from synapse.util.logutils import log_function
from synapse.types import UserID
import synapse.metrics
@@ -808,10 +809,11 @@ class PresenceHandler(BaseHandler):
def push_update_to_clients(self, observed_user, users_to_push=[],
room_ids=[], statuscache=None):
- self.notifier.on_new_user_event(
- users_to_push,
- room_ids,
- )
+ with PreserveLoggingContext():
+ self.notifier.on_new_user_event(
+ users_to_push,
+ room_ids,
+ )
class PresenceEventSource(object):
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index c0b2bd7db0..64fe51aa3e 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -18,6 +18,7 @@ from twisted.internet import defer
from ._base import BaseHandler
from synapse.api.errors import SynapseError, AuthError
+from synapse.util.logcontext import PreserveLoggingContext
from synapse.types import UserID
import logging
@@ -216,7 +217,8 @@ class TypingNotificationHandler(BaseHandler):
self._latest_room_serial += 1
self._room_serials[room_id] = self._latest_room_serial
- self.notifier.on_new_user_event(rooms=[room_id])
+ with PreserveLoggingContext():
+ self.notifier.on_new_user_event(rooms=[room_id])
class TypingNotificationEventSource(object):
diff --git a/synapse/http/server.py b/synapse/http/server.py
index 93ecbd7589..73efbff4f2 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -17,7 +17,7 @@
from synapse.api.errors import (
cs_exception, SynapseError, CodeMessageException, UnrecognizedRequestError
)
-from synapse.util.logcontext import LoggingContext
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
import synapse.metrics
from syutil.jsonutil import (
@@ -85,7 +85,9 @@ def request_handler(request_handler):
"Received request: %s %s",
request.method, request.path
)
- yield request_handler(self, request)
+ d = request_handler(self, request)
+ with PreserveLoggingContext():
+ yield d
code = request.code
except CodeMessageException as e:
code = e.code
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index 364b927851..fd3eb1f574 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -54,7 +54,8 @@ class Clock(object):
LoggingContext.thread_local.current_context = current_context
callback()
- return reactor.callLater(delay, wrapped_callback)
+ with PreserveLoggingContext():
+ return reactor.callLater(delay, wrapped_callback)
def cancel_call_later(self, timer):
timer.cancel()
diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py
index 9d9c350397..5b150cb0e5 100644
--- a/synapse/util/distributor.py
+++ b/synapse/util/distributor.py
@@ -13,8 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.util.logcontext import PreserveLoggingContext
-
from twisted.internet import defer
import logging
@@ -93,7 +91,6 @@ class Signal(object):
Each observer callable may return a Deferred."""
self.observers.append(observer)
- @defer.inlineCallbacks
def fire(self, *args, **kwargs):
"""Invokes every callable in the observer list, passing in the args and
kwargs. Exceptions thrown by observers are logged but ignored. It is
@@ -101,24 +98,24 @@ class Signal(object):
Returns a Deferred that will complete when all the observers have
completed."""
- with PreserveLoggingContext():
- deferreds = []
- for observer in self.observers:
- d = defer.maybeDeferred(observer, *args, **kwargs)
-
- def eb(failure):
- logger.warning(
- "%s signal observer %s failed: %r",
- self.name, observer, failure,
- exc_info=(
- failure.type,
- failure.value,
- failure.getTracebackObject()))
- if not self.suppress_failures:
- failure.raiseException()
- deferreds.append(d.addErrback(eb))
- results = []
- for deferred in deferreds:
- result = yield deferred
- results.append(result)
- defer.returnValue(results)
+
+ def eb(failure):
+ logger.warning(
+ "%s signal observer %s failed: %r",
+ self.name, observer, failure,
+ exc_info=(
+ failure.type,
+ failure.value,
+ failure.getTracebackObject()))
+ if not self.suppress_failures:
+ failure.raiseException()
+
+ deferreds = [
+ defer.maybeDeferred(observer, *args, **kwargs)
+ for observer in self.observers
+ ]
+
+ d = defer.gatherResults(deferreds, consumeErrors=True)
+ d.addErrback(eb)
+
+ return d
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index 3dce8d2bf3..a92d518b43 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -132,6 +132,13 @@ class PreserveLoggingContext(object):
"""Restores the current logging context"""
LoggingContext.thread_local.current_context = self.current_context
+ if self.current_context is not LoggingContext.sentinel:
+ if self.current_context.parent_context is None:
+ logger.warn(
+ "Restoring dead context: %s",
+ self.current_context,
+ )
+
def preserve_context_over_fn(fn, *args, **kwargs):
"""Takes a function and invokes it with the given arguments, but removes
@@ -169,6 +176,8 @@ def preserve_context_over_deferred(deferred):
res = d.errback(failure)
return res
- deferred.addCallbacks(cb, eb)
+ if deferred.called:
+ return deferred
+ deferred.addCallbacks(cb, eb)
return d
|