diff options
Diffstat (limited to 'synapse/notifier.py')
-rw-r--r-- | synapse/notifier.py | 29 |
1 files changed, 17 insertions, 12 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py index 939723a404..8355c7d621 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -14,13 +14,16 @@ # limitations under the License. from twisted.internet import defer + from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError from synapse.handlers.presence import format_user_presence_state -from synapse.util import DeferredTimedOutError from synapse.util.logutils import log_function -from synapse.util.async import ObservableDeferred +from synapse.util.async import ( + ObservableDeferred, add_timeout_to_deferred, + DeferredTimeoutError, +) from synapse.util.logcontext import PreserveLoggingContext, run_in_background from synapse.util.metrics import Measure from synapse.types import StreamToken @@ -336,11 +339,12 @@ class Notifier(object): # Now we wait for the _NotifierUserStream to be told there # is a new token. listener = user_stream.new_listener(prev_token) + add_timeout_to_deferred( + listener.deferred, + (end_time - now) / 1000., + ) with PreserveLoggingContext(): - yield self.clock.time_bound_deferred( - listener.deferred, - time_out=(end_time - now) / 1000. - ) + yield listener.deferred current_token = user_stream.current_token @@ -351,7 +355,7 @@ class Notifier(object): # Update the prev_token to the current_token since nothing # has happened between the old prev_token and the current_token prev_token = current_token - except DeferredTimedOutError: + except DeferredTimeoutError: break except defer.CancelledError: break @@ -556,13 +560,14 @@ class Notifier(object): if end_time <= now: break + add_timeout_to_deferred( + listener.deferred.addTimeout, + (end_time - now) / 1000., + ) try: with PreserveLoggingContext(): - yield self.clock.time_bound_deferred( - listener.deferred, - time_out=(end_time - now) / 1000. - ) - except DeferredTimedOutError: + yield listener.deferred + except DeferredTimeoutError: break except defer.CancelledError: break |