summary refs log tree commit diff
path: root/synapse/util/async.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/util/async.py')
-rw-r--r--synapse/util/async.py25
1 files changed, 21 insertions, 4 deletions
diff --git a/synapse/util/async.py b/synapse/util/async.py
index 1c2044e5b4..7bf2d38bb8 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -38,6 +38,9 @@ class ObservableDeferred(object):
     deferred.
 
     If consumeErrors is true errors will be captured from the origin deferred.
+
+    Cancelling or otherwise resolving an observer will not affect the original
+    ObservableDeferred.
     """
 
     __slots__ = ["_deferred", "_observers", "_result"]
@@ -45,10 +48,10 @@ class ObservableDeferred(object):
     def __init__(self, deferred, consumeErrors=False):
         object.__setattr__(self, "_deferred", deferred)
         object.__setattr__(self, "_result", None)
-        object.__setattr__(self, "_observers", [])
+        object.__setattr__(self, "_observers", set())
 
         def callback(r):
-            self._result = (True, r)
+            object.__setattr__(self, "_result", (True, r))
             while self._observers:
                 try:
                     self._observers.pop().callback(r)
@@ -57,7 +60,7 @@ class ObservableDeferred(object):
             return r
 
         def errback(f):
-            self._result = (False, f)
+            object.__setattr__(self, "_result", (False, f))
             while self._observers:
                 try:
                     self._observers.pop().errback(f)
@@ -74,14 +77,28 @@ class ObservableDeferred(object):
     def observe(self):
         if not self._result:
             d = defer.Deferred()
-            self._observers.append(d)
+
+            def remove(r):
+                self._observers.discard(d)
+                return r
+            d.addBoth(remove)
+
+            self._observers.add(d)
             return d
         else:
             success, res = self._result
             return defer.succeed(res) if success else defer.fail(res)
 
+    def observers(self):
+        return self._observers
+
     def __getattr__(self, name):
         return getattr(self._deferred, name)
 
     def __setattr__(self, name, value):
         setattr(self._deferred, name, value)
+
+    def __repr__(self):
+        return "<ObservableDeferred object at %s, result=%r, _deferred=%r>" % (
+            id(self), self._result, self._deferred,
+        )