1 files changed, 20 insertions, 21 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 078abfc56d..27c034ed51 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -308,49 +308,48 @@ class Notifier(object):
else:
current_token = user_stream.current_token
- listener = [_NotificationListener(deferred)]
-
- if timeout and not current_token.is_after(from_token):
- user_stream.listeners.add(listener[0])
-
+ result = None
if current_token.is_after(from_token):
result = yield callback(from_token, current_token)
- else:
- result = None
-
- timer = [None]
if result:
- user_stream.listeners.discard(listener[0])
defer.returnValue(result)
- return
if timeout:
+ timer = [None]
+ listeners = []
timed_out = [False]
+ def notify_listeners():
+ user_stream.listeners.difference_update(listeners)
+ for listener in listeners:
+ listener.notify(current_token)
+ del listeners[:]
+
def _timeout_listener():
timed_out[0] = True
timer[0] = None
- user_stream.listeners.discard(listener[0])
- listener[0].notify(current_token)
+ notify_listeners()
# We create multiple notification listeners so we have to manage
# canceling the timeout ourselves.
timer[0] = self.clock.call_later(timeout/1000., _timeout_listener)
while not result and not timed_out[0]:
- new_token = yield deferred
deferred = defer.Deferred()
- listener[0] = _NotificationListener(deferred)
- user_stream.listeners.add(listener[0])
+ notify_listeners()
+ listeners.append(_NotificationListener(deferred))
+ user_stream.listeners.update(listeners)
+ new_token = yield deferred
+
result = yield callback(current_token, new_token)
current_token = new_token
- if timer[0] is not None:
- try:
- self.clock.cancel_call_later(timer[0])
- except:
- logger.exception("Failed to cancel notifer timer")
+ if timer[0] is not None:
+ try:
+ self.clock.cancel_call_later(timer[0])
+ except:
+ logger.exception("Failed to cancel notifer timer")
defer.returnValue(result)
|