diff options
author | Erik Johnston <erik@matrix.org> | 2015-05-08 19:53:34 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2015-05-08 19:53:34 +0100 |
commit | 2236ef6c92b7964665f5c43b941754d70aa506d8 (patch) | |
tree | 5befd0effe712d4c4af6264346769f57c9486060 /synapse/util | |
parent | Add some docs (diff) | |
download | synapse-2236ef6c92b7964665f5c43b941754d70aa506d8.tar.xz |
Fix up leak. Add warnings.
Diffstat (limited to 'synapse/util')
-rw-r--r-- | synapse/util/__init__.py | 3 | ||||
-rw-r--r-- | synapse/util/distributor.py | 45 | ||||
-rw-r--r-- | synapse/util/logcontext.py | 11 |
3 files changed, 33 insertions, 26 deletions
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 364b927851..fd3eb1f574 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -54,7 +54,8 @@ class Clock(object): LoggingContext.thread_local.current_context = current_context callback() - return reactor.callLater(delay, wrapped_callback) + with PreserveLoggingContext(): + return reactor.callLater(delay, wrapped_callback) def cancel_call_later(self, timer): timer.cancel() diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py index 9d9c350397..5b150cb0e5 100644 --- a/synapse/util/distributor.py +++ b/synapse/util/distributor.py @@ -13,8 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.util.logcontext import PreserveLoggingContext - from twisted.internet import defer import logging @@ -93,7 +91,6 @@ class Signal(object): Each observer callable may return a Deferred.""" self.observers.append(observer) - @defer.inlineCallbacks def fire(self, *args, **kwargs): """Invokes every callable in the observer list, passing in the args and kwargs. Exceptions thrown by observers are logged but ignored. It is @@ -101,24 +98,24 @@ class Signal(object): Returns a Deferred that will complete when all the observers have completed.""" - with PreserveLoggingContext(): - deferreds = [] - for observer in self.observers: - d = defer.maybeDeferred(observer, *args, **kwargs) - - def eb(failure): - logger.warning( - "%s signal observer %s failed: %r", - self.name, observer, failure, - exc_info=( - failure.type, - failure.value, - failure.getTracebackObject())) - if not self.suppress_failures: - failure.raiseException() - deferreds.append(d.addErrback(eb)) - results = [] - for deferred in deferreds: - result = yield deferred - results.append(result) - defer.returnValue(results) + + def eb(failure): + logger.warning( + "%s signal observer %s failed: %r", + self.name, observer, failure, + exc_info=( + failure.type, + failure.value, + failure.getTracebackObject())) + if not self.suppress_failures: + failure.raiseException() + + deferreds = [ + defer.maybeDeferred(observer, *args, **kwargs) + for observer in self.observers + ] + + d = defer.gatherResults(deferreds, consumeErrors=True) + d.addErrback(eb) + + return d diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index 3dce8d2bf3..a92d518b43 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -132,6 +132,13 @@ class PreserveLoggingContext(object): """Restores the current logging context""" LoggingContext.thread_local.current_context = self.current_context + if self.current_context is not LoggingContext.sentinel: + if self.current_context.parent_context is None: + logger.warn( + "Restoring dead context: %s", + self.current_context, + ) + def preserve_context_over_fn(fn, *args, **kwargs): """Takes a function and invokes it with the given arguments, but removes @@ -169,6 +176,8 @@ def preserve_context_over_deferred(deferred): res = d.errback(failure) return res - deferred.addCallbacks(cb, eb) + if deferred.called: + return deferred + deferred.addCallbacks(cb, eb) return d |