diff --git a/synapse/notifier.py b/synapse/notifier.py
index 82f391481c..f1d92c1395 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -25,11 +25,7 @@ from synapse.api.errors import AuthError
from synapse.handlers.presence import format_user_presence_state
from synapse.metrics import LaterGauge
from synapse.types import StreamToken
-from synapse.util.async_helpers import (
- DeferredTimeoutError,
- ObservableDeferred,
- add_timeout_to_deferred,
-)
+from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
from synapse.util.logcontext import PreserveLoggingContext, run_in_background
from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
@@ -337,7 +333,7 @@ class Notifier(object):
# Now we wait for the _NotifierUserStream to be told there
# is a new token.
listener = user_stream.new_listener(prev_token)
- add_timeout_to_deferred(
+ listener.deferred = timeout_deferred(
listener.deferred,
(end_time - now) / 1000.,
self.hs.get_reactor(),
@@ -354,7 +350,7 @@ class Notifier(object):
# Update the prev_token to the current_token since nothing
# has happened between the old prev_token and the current_token
prev_token = current_token
- except DeferredTimeoutError:
+ except defer.TimeoutError:
break
except defer.CancelledError:
break
@@ -559,15 +555,16 @@ class Notifier(object):
if end_time <= now:
break
- add_timeout_to_deferred(
- listener.deferred.addTimeout,
- (end_time - now) / 1000.,
- self.hs.get_reactor(),
+ listener.deferred = timeout_deferred(
+ listener.deferred,
+ timeout=(end_time - now) / 1000.,
+ reactor=self.hs.get_reactor(),
)
+
try:
with PreserveLoggingContext():
yield listener.deferred
- except DeferredTimeoutError:
+ except defer.TimeoutError:
break
except defer.CancelledError:
break
|