summary refs log tree commit diff
path: root/synapse/notifier.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/notifier.py')
-rw-r--r--synapse/notifier.py40
1 files changed, 23 insertions, 17 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 054ca59ad2..acbd4bb5ae 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -17,6 +17,7 @@ from twisted.internet import defer
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import AuthError
 
+from synapse.util import DeferredTimedOutError
 from synapse.util.logutils import log_function
 from synapse.util.async import ObservableDeferred
 from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
@@ -294,14 +295,7 @@ class Notifier(object):
 
         result = None
         if timeout:
-            # Will be set to a _NotificationListener that we'll be waiting on.
-            # Allows us to cancel it.
-            listener = None
-
-            def timed_out():
-                if listener:
-                    listener.deferred.cancel()
-            timer = self.clock.call_later(timeout / 1000., timed_out)
+            end_time = self.clock.time_msec() + timeout
 
             prev_token = from_token
             while not result:
@@ -312,6 +306,10 @@ class Notifier(object):
                     if result:
                         break
 
+                    now = self.clock.time_msec()
+                    if end_time <= now:
+                        break
+
                     # Now we wait for the _NotifierUserStream to be told there
                     # is a new token.
                     # We need to supply the token we supplied to callback so
@@ -319,11 +317,14 @@ class Notifier(object):
                     prev_token = current_token
                     listener = user_stream.new_listener(prev_token)
                     with PreserveLoggingContext():
-                        yield listener.deferred
+                        yield self.clock.time_bound_deferred(
+                            listener.deferred,
+                            time_out=(end_time - now) / 1000.
+                        )
+                except DeferredTimedOutError:
+                    break
                 except defer.CancelledError:
                     break
-
-            self.clock.cancel_call_later(timer, ignore_errs=True)
         else:
             current_token = user_stream.current_token
             result = yield callback(from_token, current_token)
@@ -492,22 +493,27 @@ class Notifier(object):
         """
         listener = _NotificationListener(None)
 
-        def timed_out():
-            listener.deferred.cancel()
+        end_time = self.clock.time_msec() + timeout
 
-        timer = self.clock.call_later(timeout / 1000., timed_out)
         while True:
             listener.deferred = self.replication_deferred.observe()
             result = yield callback()
             if result:
                 break
 
+            now = self.clock.time_msec()
+            if end_time <= now:
+                break
+
             try:
                 with PreserveLoggingContext():
-                    yield listener.deferred
+                    yield self.clock.time_bound_deferred(
+                        listener.deferred,
+                        time_out=(end_time - now) / 1000.
+                    )
+            except DeferredTimedOutError:
+                break
             except defer.CancelledError:
                 break
 
-        self.clock.cancel_call_later(timer, ignore_errs=True)
-
         defer.returnValue(result)