diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py
index 5b150cb0e5..064c4a7a1e 100644
--- a/synapse/util/distributor.py
+++ b/synapse/util/distributor.py
@@ -15,6 +15,12 @@
from twisted.internet import defer
+from synapse.util.logcontext import (
+ PreserveLoggingContext, preserve_context_over_deferred,
+)
+
+from synapse.util import unwrapFirstError
+
import logging
@@ -99,23 +105,27 @@ class Signal(object):
Returns a Deferred that will complete when all the observers have
completed."""
- def eb(failure):
- logger.warning(
- "%s signal observer %s failed: %r",
- self.name, observer, failure,
- exc_info=(
- failure.type,
- failure.value,
- failure.getTracebackObject()))
- if not self.suppress_failures:
- failure.raiseException()
-
- deferreds = [
- defer.maybeDeferred(observer, *args, **kwargs)
- for observer in self.observers
- ]
-
- d = defer.gatherResults(deferreds, consumeErrors=True)
- d.addErrback(eb)
-
- return d
+ def do(observer):
+ def eb(failure):
+ logger.warning(
+ "%s signal observer %s failed: %r",
+ self.name, observer, failure,
+ exc_info=(
+ failure.type,
+ failure.value,
+ failure.getTracebackObject()))
+ if not self.suppress_failures:
+ return failure
+ return defer.maybeDeferred(observer, *args, **kwargs).addErrback(eb)
+
+ with PreserveLoggingContext():
+ deferreds = [
+ do(observer)
+ for observer in self.observers
+ ]
+
+ d = defer.gatherResults(deferreds, consumeErrors=True)
+
+ d.addErrback(unwrapFirstError)
+
+ return preserve_context_over_deferred(d)
|