diff --git a/synapse/util/async.py b/synapse/util/async.py
index 1c2044e5b4..7bf2d38bb8 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -38,6 +38,9 @@ class ObservableDeferred(object):
deferred.
If consumeErrors is true errors will be captured from the origin deferred.
+
+ Cancelling or otherwise resolving an observer will not affect the original
+ ObservableDeferred.
"""
__slots__ = ["_deferred", "_observers", "_result"]
@@ -45,10 +48,10 @@ class ObservableDeferred(object):
def __init__(self, deferred, consumeErrors=False):
object.__setattr__(self, "_deferred", deferred)
object.__setattr__(self, "_result", None)
- object.__setattr__(self, "_observers", [])
+ object.__setattr__(self, "_observers", set())
def callback(r):
- self._result = (True, r)
+ object.__setattr__(self, "_result", (True, r))
while self._observers:
try:
self._observers.pop().callback(r)
@@ -57,7 +60,7 @@ class ObservableDeferred(object):
return r
def errback(f):
- self._result = (False, f)
+ object.__setattr__(self, "_result", (False, f))
while self._observers:
try:
self._observers.pop().errback(f)
@@ -74,14 +77,28 @@ class ObservableDeferred(object):
def observe(self):
if not self._result:
d = defer.Deferred()
- self._observers.append(d)
+
+ def remove(r):
+ self._observers.discard(d)
+ return r
+ d.addBoth(remove)
+
+ self._observers.add(d)
return d
else:
success, res = self._result
return defer.succeed(res) if success else defer.fail(res)
+ def observers(self):
+ return self._observers
+
def __getattr__(self, name):
return getattr(self._deferred, name)
def __setattr__(self, name, value):
setattr(self._deferred, name, value)
+
+ def __repr__(self):
+ return "<ObservableDeferred object at %s, result=%r, _deferred=%r>" % (
+ id(self), self._result, self._deferred,
+ )
|