diff --git a/synapse/notifier.py b/synapse/notifier.py
index 1e4f78b993..a1c06e8fca 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -13,15 +13,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from twisted.internet import defer, reactor
-from twisted.internet.defer import TimeoutError
+from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError
from synapse.handlers.presence import format_user_presence_state
from synapse.util.logutils import log_function
-from synapse.util.async import ObservableDeferred
+from synapse.util.async import (
+ ObservableDeferred, add_timeout_to_deferred,
+ DeferredTimeoutError,
+)
from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
from synapse.util.metrics import Measure
from synapse.types import StreamToken
@@ -332,8 +334,9 @@ class Notifier(object):
# Now we wait for the _NotifierUserStream to be told there
# is a new token.
listener = user_stream.new_listener(prev_token)
- listener.deferred.addTimeout(
- (end_time - now) / 1000., reactor,
+ add_timeout_to_deferred(
+ listener.deferred,
+ (end_time - now) / 1000.,
)
with PreserveLoggingContext():
yield listener.deferred
@@ -347,7 +350,7 @@ class Notifier(object):
# Update the prev_token to the current_token since nothing
# has happened between the old prev_token and the current_token
prev_token = current_token
- except TimeoutError:
+ except DeferredTimeoutError:
break
except defer.CancelledError:
break
@@ -552,11 +555,14 @@ class Notifier(object):
if end_time <= now:
break
- listener.deferred.addTimeout((end_time - now) / 1000., reactor)
+ add_timeout_to_deferred(
+ listener.deferred.addTimeout,
+ (end_time - now) / 1000.,
+ )
try:
with PreserveLoggingContext():
yield listener.deferred
- except TimeoutError:
+ except DeferredTimeoutError:
break
except defer.CancelledError:
break
|