summary refs log tree commit diff
path: root/synapse/util/distributor.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2015-05-12 13:14:48 +0100
committerErik Johnston <erik@matrix.org>2015-05-12 13:14:48 +0100
commit8022b27fc26bd2127019f5179c8956ea475dd284 (patch)
tree335d771c52d2aacf189c1784d0ecec5d1c82188e /synapse/util/distributor.py
parentUnwrap defer.gatherResults failures (diff)
downloadsynapse-8022b27fc26bd2127019f5179c8956ea475dd284.tar.xz
Make distributer.fire work as it did
Diffstat (limited to 'synapse/util/distributor.py')
-rw-r--r--synapse/util/distributor.py50
1 files changed, 30 insertions, 20 deletions
diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py
index 5b150cb0e5..064c4a7a1e 100644
--- a/synapse/util/distributor.py
+++ b/synapse/util/distributor.py
@@ -15,6 +15,12 @@
 
 from twisted.internet import defer
 
+from synapse.util.logcontext import (
+    PreserveLoggingContext, preserve_context_over_deferred,
+)
+
+from synapse.util import unwrapFirstError
+
 import logging
 
 
@@ -99,23 +105,27 @@ class Signal(object):
         Returns a Deferred that will complete when all the observers have
         completed."""
 
-        def eb(failure):
-            logger.warning(
-                "%s signal observer %s failed: %r",
-                self.name, observer, failure,
-                exc_info=(
-                    failure.type,
-                    failure.value,
-                    failure.getTracebackObject()))
-            if not self.suppress_failures:
-                failure.raiseException()
-
-        deferreds = [
-            defer.maybeDeferred(observer, *args, **kwargs)
-            for observer in self.observers
-        ]
-
-        d = defer.gatherResults(deferreds, consumeErrors=True)
-        d.addErrback(eb)
-
-        return d
+        def do(observer):
+            def eb(failure):
+                logger.warning(
+                    "%s signal observer %s failed: %r",
+                    self.name, observer, failure,
+                    exc_info=(
+                        failure.type,
+                        failure.value,
+                        failure.getTracebackObject()))
+                if not self.suppress_failures:
+                    return failure
+            return defer.maybeDeferred(observer, *args, **kwargs).addErrback(eb)
+
+        with PreserveLoggingContext():
+            deferreds = [
+                do(observer)
+                for observer in self.observers
+            ]
+
+            d = defer.gatherResults(deferreds, consumeErrors=True)
+
+        d.addErrback(unwrapFirstError)
+
+        return preserve_context_over_deferred(d)