summary refs log tree commit diff
path: root/synapse/util/async.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2015-06-18 15:49:05 +0100
committerErik Johnston <erik@matrix.org>2015-06-18 15:49:24 +0100
commit22049ea700173017cf2f8e88fb8848e06b82f9b3 (patch)
tree5a6375689c4009777690da704bc2baec86e5a014 /synapse/util/async.py
parentFix notifier leak (diff)
downloadsynapse-22049ea700173017cf2f8e88fb8848e06b82f9b3.tar.xz
Refactor the notifier.wait_for_events code to be clearer. Add _NotifierUserStream.new_listener that accpets a token to avoid races.
Diffstat (limited to '')
-rw-r--r--synapse/util/async.py13
1 files changed, 11 insertions, 2 deletions
diff --git a/synapse/util/async.py b/synapse/util/async.py
index 1c2044e5b4..6f567bcaa6 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -45,7 +45,7 @@ class ObservableDeferred(object):
     def __init__(self, deferred, consumeErrors=False):
         object.__setattr__(self, "_deferred", deferred)
         object.__setattr__(self, "_result", None)
-        object.__setattr__(self, "_observers", [])
+        object.__setattr__(self, "_observers", set())
 
         def callback(r):
             self._result = (True, r)
@@ -74,12 +74,21 @@ class ObservableDeferred(object):
     def observe(self):
         if not self._result:
             d = defer.Deferred()
-            self._observers.append(d)
+
+            def remove(r):
+                self._observers.discard(d)
+                return r
+            d.addBoth(remove)
+
+            self._observers.add(d)
             return d
         else:
             success, res = self._result
             return defer.succeed(res) if success else defer.fail(res)
 
+    def observers(self):
+        return self._observers
+
     def __getattr__(self, name):
         return getattr(self._deferred, name)