summary refs log tree commit diff
path: root/synapse/notifier.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/notifier.py23
1 files changed, 11 insertions, 12 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 0e40a4aad6..1e4f78b993 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -13,12 +13,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.internet import defer
+from twisted.internet import defer, reactor
+from twisted.internet.defer import TimeoutError
+
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import AuthError
 from synapse.handlers.presence import format_user_presence_state
 
-from synapse.util import DeferredTimedOutError
 from synapse.util.logutils import log_function
 from synapse.util.async import ObservableDeferred
 from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
@@ -331,11 +332,11 @@ class Notifier(object):
                     # Now we wait for the _NotifierUserStream to be told there
                     # is a new token.
                     listener = user_stream.new_listener(prev_token)
+                    listener.deferred.addTimeout(
+                        (end_time - now) / 1000., reactor,
+                    )
                     with PreserveLoggingContext():
-                        yield self.clock.time_bound_deferred(
-                            listener.deferred,
-                            time_out=(end_time - now) / 1000.
-                        )
+                        yield listener.deferred
 
                     current_token = user_stream.current_token
 
@@ -346,7 +347,7 @@ class Notifier(object):
                     # Update the prev_token to the current_token since nothing
                     # has happened between the old prev_token and the current_token
                     prev_token = current_token
-                except DeferredTimedOutError:
+                except TimeoutError:
                     break
                 except defer.CancelledError:
                     break
@@ -551,13 +552,11 @@ class Notifier(object):
             if end_time <= now:
                 break
 
+            listener.deferred.addTimeout((end_time - now) / 1000., reactor)
             try:
                 with PreserveLoggingContext():
-                    yield self.clock.time_bound_deferred(
-                        listener.deferred,
-                        time_out=(end_time - now) / 1000.
-                    )
-            except DeferredTimedOutError:
+                    yield listener.deferred
+            except TimeoutError:
                 break
             except defer.CancelledError:
                 break