diff options
author | Mark Haines <mjark@negativecurvature.net> | 2015-05-12 15:25:54 +0100 |
---|---|---|
committer | Mark Haines <mjark@negativecurvature.net> | 2015-05-12 15:25:54 +0100 |
commit | ec07dba29e43d00e3630e22112e332555dc69b8c (patch) | |
tree | f7d8b11b02542260131e9fb3102f1b2a1022baa3 /synapse/util/async.py | |
parent | Merge pull request #147 from matrix-org/presence-performance (diff) | |
parent | Change the way we create observers to deferreds so that we don't get spammed ... (diff) | |
download | synapse-ec07dba29e43d00e3630e22112e332555dc69b8c.tar.xz |
Merge pull request #143 from matrix-org/erikj/SYN-375
SYN-375 - Lots of unhandled deferred exceptions.
Diffstat (limited to 'synapse/util/async.py')
-rw-r--r-- | synapse/util/async.py | 60 |
1 files changed, 48 insertions, 12 deletions
diff --git a/synapse/util/async.py b/synapse/util/async.py index f78395a431..1c2044e5b4 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -32,20 +32,56 @@ def run_on_reactor(): return sleep(0) -def create_observer(deferred): - """Creates a deferred that observes the result or failure of the given - deferred *without* affecting the given deferred. +class ObservableDeferred(object): + """Wraps a deferred object so that we can add observer deferreds. These + observer deferreds do not affect the callback chain of the original + deferred. + + If consumeErrors is true errors will be captured from the origin deferred. """ - d = defer.Deferred() - def callback(r): - d.callback(r) - return r + __slots__ = ["_deferred", "_observers", "_result"] + + def __init__(self, deferred, consumeErrors=False): + object.__setattr__(self, "_deferred", deferred) + object.__setattr__(self, "_result", None) + object.__setattr__(self, "_observers", []) + + def callback(r): + self._result = (True, r) + while self._observers: + try: + self._observers.pop().callback(r) + except: + pass + return r + + def errback(f): + self._result = (False, f) + while self._observers: + try: + self._observers.pop().errback(f) + except: + pass + + if consumeErrors: + return None + else: + return f + + deferred.addCallbacks(callback, errback) - def errback(f): - d.errback(f) - return f + def observe(self): + if not self._result: + d = defer.Deferred() + self._observers.append(d) + return d + else: + success, res = self._result + return defer.succeed(res) if success else defer.fail(res) - deferred.addCallbacks(callback, errback) + def __getattr__(self, name): + return getattr(self._deferred, name) - return d + def __setattr__(self, name, value): + setattr(self._deferred, name, value) |