summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/notifier.py20
-rw-r--r--synapse/util/__init__.py7
2 files changed, 20 insertions, 7 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 40baa6969a..acbd4bb5ae 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -17,6 +17,7 @@ from twisted.internet import defer
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import AuthError
 
+from synapse.util import DeferredTimedOutError
 from synapse.util.logutils import log_function
 from synapse.util.async import ObservableDeferred
 from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
@@ -320,6 +321,8 @@ class Notifier(object):
                             listener.deferred,
                             time_out=(end_time - now) / 1000.
                         )
+                except DeferredTimedOutError:
+                    break
                 except defer.CancelledError:
                     break
         else:
@@ -490,22 +493,27 @@ class Notifier(object):
         """
         listener = _NotificationListener(None)
 
-        def timed_out():
-            listener.deferred.cancel()
+        end_time = self.clock.time_msec() + timeout
 
-        timer = self.clock.call_later(timeout / 1000., timed_out)
         while True:
             listener.deferred = self.replication_deferred.observe()
             result = yield callback()
             if result:
                 break
 
+            now = self.clock.time_msec()
+            if end_time <= now:
+                break
+
             try:
                 with PreserveLoggingContext():
-                    yield listener.deferred
+                    yield self.clock.time_bound_deferred(
+                        listener.deferred,
+                        time_out=(end_time - now) / 1000.
+                    )
+            except DeferredTimedOutError:
+                break
             except defer.CancelledError:
                 break
 
-        self.clock.cancel_call_later(timer, ignore_errs=True)
-
         defer.returnValue(result)
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index c05b9450be..30fc480108 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -24,6 +24,11 @@ import logging
 logger = logging.getLogger(__name__)
 
 
+class DeferredTimedOutError(SynapseError):
+    def __init__(self):
+        super(SynapseError).__init__(504, "Timed out")
+
+
 def unwrapFirstError(failure):
     # defer.gatherResults and DeferredLists wrap failures.
     failure.trap(defer.FirstError)
@@ -89,7 +94,7 @@ class Clock(object):
 
         def timed_out_fn():
             try:
-                ret_deferred.errback(SynapseError(504, "Timed out"))
+                ret_deferred.errback(DeferredTimedOutError())
             except:
                 pass