summary refs log tree commit diff
path: root/synapse/notifier.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/notifier.py')
-rw-r--r--synapse/notifier.py22
1 files changed, 14 insertions, 8 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 1e4f78b993..a1c06e8fca 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -13,15 +13,17 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.internet import defer, reactor
-from twisted.internet.defer import TimeoutError
+from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import AuthError
 from synapse.handlers.presence import format_user_presence_state
 
 from synapse.util.logutils import log_function
-from synapse.util.async import ObservableDeferred
+from synapse.util.async import (
+    ObservableDeferred, add_timeout_to_deferred,
+    DeferredTimeoutError,
+)
 from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
 from synapse.util.metrics import Measure
 from synapse.types import StreamToken
@@ -332,8 +334,9 @@ class Notifier(object):
                     # Now we wait for the _NotifierUserStream to be told there
                     # is a new token.
                     listener = user_stream.new_listener(prev_token)
-                    listener.deferred.addTimeout(
-                        (end_time - now) / 1000., reactor,
+                    add_timeout_to_deferred(
+                        listener.deferred,
+                        (end_time - now) / 1000.,
                     )
                     with PreserveLoggingContext():
                         yield listener.deferred
@@ -347,7 +350,7 @@ class Notifier(object):
                     # Update the prev_token to the current_token since nothing
                     # has happened between the old prev_token and the current_token
                     prev_token = current_token
-                except TimeoutError:
+                except DeferredTimeoutError:
                     break
                 except defer.CancelledError:
                     break
@@ -552,11 +555,14 @@ class Notifier(object):
             if end_time <= now:
                 break
 
-            listener.deferred.addTimeout((end_time - now) / 1000., reactor)
+            add_timeout_to_deferred(
+                listener.deferred.addTimeout,
+                (end_time - now) / 1000.,
+            )
             try:
                 with PreserveLoggingContext():
                     yield listener.deferred
-            except TimeoutError:
+            except DeferredTimeoutError:
                 break
             except defer.CancelledError:
                 break