diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index a86996689c..b62773dcbe 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -510,6 +510,7 @@ class SyncHandler(object):
Returns:
Deferred(SyncResult)
"""
+ logger.info("Calculating sync response for %r", sync_config.user)
# NB: The now_token gets changed by some of the generate_sync_* methods,
# this is due to some of the underlying streams not supporting the ability
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 054ca59ad2..acbd4bb5ae 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -17,6 +17,7 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError
+from synapse.util import DeferredTimedOutError
from synapse.util.logutils import log_function
from synapse.util.async import ObservableDeferred
from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
@@ -294,14 +295,7 @@ class Notifier(object):
result = None
if timeout:
- # Will be set to a _NotificationListener that we'll be waiting on.
- # Allows us to cancel it.
- listener = None
-
- def timed_out():
- if listener:
- listener.deferred.cancel()
- timer = self.clock.call_later(timeout / 1000., timed_out)
+ end_time = self.clock.time_msec() + timeout
prev_token = from_token
while not result:
@@ -312,6 +306,10 @@ class Notifier(object):
if result:
break
+ now = self.clock.time_msec()
+ if end_time <= now:
+ break
+
# Now we wait for the _NotifierUserStream to be told there
# is a new token.
# We need to supply the token we supplied to callback so
@@ -319,11 +317,14 @@ class Notifier(object):
prev_token = current_token
listener = user_stream.new_listener(prev_token)
with PreserveLoggingContext():
- yield listener.deferred
+ yield self.clock.time_bound_deferred(
+ listener.deferred,
+ time_out=(end_time - now) / 1000.
+ )
+ except DeferredTimedOutError:
+ break
except defer.CancelledError:
break
-
- self.clock.cancel_call_later(timer, ignore_errs=True)
else:
current_token = user_stream.current_token
result = yield callback(from_token, current_token)
@@ -492,22 +493,27 @@ class Notifier(object):
"""
listener = _NotificationListener(None)
- def timed_out():
- listener.deferred.cancel()
+ end_time = self.clock.time_msec() + timeout
- timer = self.clock.call_later(timeout / 1000., timed_out)
while True:
listener.deferred = self.replication_deferred.observe()
result = yield callback()
if result:
break
+ now = self.clock.time_msec()
+ if end_time <= now:
+ break
+
try:
with PreserveLoggingContext():
- yield listener.deferred
+ yield self.clock.time_bound_deferred(
+ listener.deferred,
+ time_out=(end_time - now) / 1000.
+ )
+ except DeferredTimedOutError:
+ break
except defer.CancelledError:
break
- self.clock.cancel_call_later(timer, ignore_errs=True)
-
defer.returnValue(result)
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index c05b9450be..30fc480108 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -24,6 +24,11 @@ import logging
logger = logging.getLogger(__name__)
+class DeferredTimedOutError(SynapseError):
+ def __init__(self):
+ super(SynapseError).__init__(504, "Timed out")
+
+
def unwrapFirstError(failure):
# defer.gatherResults and DeferredLists wrap failures.
failure.trap(defer.FirstError)
@@ -89,7 +94,7 @@ class Clock(object):
def timed_out_fn():
try:
- ret_deferred.errback(SynapseError(504, "Timed out"))
+ ret_deferred.errback(DeferredTimedOutError())
except:
pass
diff --git a/tests/utils.py b/tests/utils.py
index 2d0bd205fd..d3d6c8021d 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -294,6 +294,10 @@ class MockClock(object):
def advance_time_msec(self, ms):
self.advance_time(ms / 1000.)
+ def time_bound_deferred(self, d, *args, **kwargs):
+ # We don't bother timing things out for now.
+ return d
+
class SQLiteMemoryDbPool(ConnectionPool, object):
def __init__(self):
|