From a334e1cacec5dd9c1a983415906d8ffea4a30134 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 19 Sep 2018 10:39:40 +0100 Subject: Update to use new timeout function everywhere. The existing deferred timeout helper function (and the one into twisted) suffer from a bug when a deferred's canceller throws an exception, #3842. The new helper function doesn't suffer from this problem. --- synapse/notifier.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) (limited to 'synapse/notifier.py') diff --git a/synapse/notifier.py b/synapse/notifier.py index 82f391481c..2d683718fb 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -28,7 +28,7 @@ from synapse.types import StreamToken from synapse.util.async_helpers import ( DeferredTimeoutError, ObservableDeferred, - add_timeout_to_deferred, + timeout_deferred, ) from synapse.util.logcontext import PreserveLoggingContext, run_in_background from synapse.util.logutils import log_function @@ -337,7 +337,7 @@ class Notifier(object): # Now we wait for the _NotifierUserStream to be told there # is a new token. listener = user_stream.new_listener(prev_token) - add_timeout_to_deferred( + listener.deferred = timeout_deferred( listener.deferred, (end_time - now) / 1000., self.hs.get_reactor(), @@ -559,11 +559,12 @@ class Notifier(object): if end_time <= now: break - add_timeout_to_deferred( - listener.deferred.addTimeout, - (end_time - now) / 1000., - self.hs.get_reactor(), + listener.deferred = timeout_deferred( + listener.deferred, + timeout=(end_time - now) / 1000., + reactor=self.hs.get_reactor(), ) + try: with PreserveLoggingContext(): yield listener.deferred -- cgit 1.4.1 From 9407bcf37a99ebb72eef73e19632cdcf5964c968 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 19 Sep 2018 11:05:21 +0100 Subject: Replace custom DeferredTimeoutError with defer.TimeoutError --- synapse/notifier.py | 5 ++--- synapse/util/async_helpers.py | 12 +++--------- 2 files changed, 5 insertions(+), 12 deletions(-) (limited to 'synapse/notifier.py') diff --git a/synapse/notifier.py b/synapse/notifier.py index 2d683718fb..2e7525b7d2 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -26,7 +26,6 @@ from synapse.handlers.presence import format_user_presence_state from synapse.metrics import LaterGauge from synapse.types import StreamToken from synapse.util.async_helpers import ( - DeferredTimeoutError, ObservableDeferred, timeout_deferred, ) @@ -354,7 +353,7 @@ class Notifier(object): # Update the prev_token to the current_token since nothing # has happened between the old prev_token and the current_token prev_token = current_token - except DeferredTimeoutError: + except defer.TimeoutError: break except defer.CancelledError: break @@ -568,7 +567,7 @@ class Notifier(object): try: with PreserveLoggingContext(): yield listener.deferred - except DeferredTimeoutError: + except defer.TimeoutError: break except defer.CancelledError: break diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 2b2e85ecb7..ec7b2c9672 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -374,16 +374,10 @@ class ReadWriteLock(object): defer.returnValue(_ctx_manager()) -class DeferredTimeoutError(Exception): - """ - This error is raised by default when a L{Deferred} times out. - """ - - def _cancelled_to_timed_out_error(value, timeout): if isinstance(value, failure.Failure): value.trap(CancelledError) - raise DeferredTimeoutError(timeout, "Deferred") + raise defer.TimeoutError(timeout, "Deferred") return value @@ -408,7 +402,7 @@ def timeout_deferred(deferred, timeout, reactor, on_timeout_cancel=None): the timeout. The default callable (if none is provided) will translate a - CancelledError Failure into a DeferredTimeoutError. + CancelledError Failure into a defer.TimeoutError. Returns: Deferred @@ -427,7 +421,7 @@ def timeout_deferred(deferred, timeout, reactor, on_timeout_cancel=None): logger.exception("Canceller failed during timeout") if not new_d.called: - new_d.errback(DeferredTimeoutError(timeout, "Deferred")) + new_d.errback(defer.TimeoutError(timeout, "Deferred")) delayed_call = reactor.callLater(timeout, time_it_out) -- cgit 1.4.1 From 80d2d50f4764d6662ceef86f834f845b539263e8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 19 Sep 2018 11:19:47 +0100 Subject: Fixup --- changelog.d/3910 | 1 - changelog.d/3910.bugfix | 1 + synapse/notifier.py | 5 +---- 3 files changed, 2 insertions(+), 5 deletions(-) delete mode 100644 changelog.d/3910 create mode 100644 changelog.d/3910.bugfix (limited to 'synapse/notifier.py') diff --git a/changelog.d/3910 b/changelog.d/3910 deleted file mode 100644 index 22ec2adc33..0000000000 --- a/changelog.d/3910 +++ /dev/null @@ -1 +0,0 @@ -Fix bug where things occaisonally were not being timed out correctly. diff --git a/changelog.d/3910.bugfix b/changelog.d/3910.bugfix new file mode 100644 index 0000000000..22ec2adc33 --- /dev/null +++ b/changelog.d/3910.bugfix @@ -0,0 +1 @@ +Fix bug where things occaisonally were not being timed out correctly. diff --git a/synapse/notifier.py b/synapse/notifier.py index 2e7525b7d2..f1d92c1395 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -25,10 +25,7 @@ from synapse.api.errors import AuthError from synapse.handlers.presence import format_user_presence_state from synapse.metrics import LaterGauge from synapse.types import StreamToken -from synapse.util.async_helpers import ( - ObservableDeferred, - timeout_deferred, -) +from synapse.util.async_helpers import ObservableDeferred, timeout_deferred from synapse.util.logcontext import PreserveLoggingContext, run_in_background from synapse.util.logutils import log_function from synapse.util.metrics import Measure -- cgit 1.4.1 From a87d419a85e1c966da5bb4fc08f3309eb78ecb7a Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 26 Sep 2018 07:48:35 +0100 Subject: Run notify_app_services as a bg process This ensures that its resource usage metrics get recorded somewhere rather than getting lost. (It also fixes an error when called from a nested logging context which completes before the bg process) --- synapse/notifier.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) (limited to 'synapse/notifier.py') diff --git a/synapse/notifier.py b/synapse/notifier.py index f1d92c1395..340b16ce25 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -24,9 +24,10 @@ from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError from synapse.handlers.presence import format_user_presence_state from synapse.metrics import LaterGauge +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import StreamToken from synapse.util.async_helpers import ObservableDeferred, timeout_deferred -from synapse.util.logcontext import PreserveLoggingContext, run_in_background +from synapse.util.logcontext import PreserveLoggingContext from synapse.util.logutils import log_function from synapse.util.metrics import Measure from synapse.visibility import filter_events_for_client @@ -248,7 +249,10 @@ class Notifier(object): def _on_new_room_event(self, event, room_stream_id, extra_users=[]): """Notify any user streams that are interested in this room event""" # poke any interested application service. - run_in_background(self._notify_app_services, room_stream_id) + run_as_background_process( + "notify_app_services", + self._notify_app_services, room_stream_id, + ) if self.federation_sender: self.federation_sender.notify_new_events(room_stream_id) -- cgit 1.4.1