summary refs log tree commit diff
path: root/synapse/notifier.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/notifier.py')
-rw-r--r--synapse/notifier.py29
1 files changed, 15 insertions, 14 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 82f391481c..340b16ce25 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -24,13 +24,10 @@ from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import AuthError
 from synapse.handlers.presence import format_user_presence_state
 from synapse.metrics import LaterGauge
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.types import StreamToken
-from synapse.util.async_helpers import (
-    DeferredTimeoutError,
-    ObservableDeferred,
-    add_timeout_to_deferred,
-)
-from synapse.util.logcontext import PreserveLoggingContext, run_in_background
+from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
+from synapse.util.logcontext import PreserveLoggingContext
 from synapse.util.logutils import log_function
 from synapse.util.metrics import Measure
 from synapse.visibility import filter_events_for_client
@@ -252,7 +249,10 @@ class Notifier(object):
     def _on_new_room_event(self, event, room_stream_id, extra_users=[]):
         """Notify any user streams that are interested in this room event"""
         # poke any interested application service.
-        run_in_background(self._notify_app_services, room_stream_id)
+        run_as_background_process(
+            "notify_app_services",
+            self._notify_app_services, room_stream_id,
+        )
 
         if self.federation_sender:
             self.federation_sender.notify_new_events(room_stream_id)
@@ -337,7 +337,7 @@ class Notifier(object):
                     # Now we wait for the _NotifierUserStream to be told there
                     # is a new token.
                     listener = user_stream.new_listener(prev_token)
-                    add_timeout_to_deferred(
+                    listener.deferred = timeout_deferred(
                         listener.deferred,
                         (end_time - now) / 1000.,
                         self.hs.get_reactor(),
@@ -354,7 +354,7 @@ class Notifier(object):
                     # Update the prev_token to the current_token since nothing
                     # has happened between the old prev_token and the current_token
                     prev_token = current_token
-                except DeferredTimeoutError:
+                except defer.TimeoutError:
                     break
                 except defer.CancelledError:
                     break
@@ -559,15 +559,16 @@ class Notifier(object):
             if end_time <= now:
                 break
 
-            add_timeout_to_deferred(
-                listener.deferred.addTimeout,
-                (end_time - now) / 1000.,
-                self.hs.get_reactor(),
+            listener.deferred = timeout_deferred(
+                listener.deferred,
+                timeout=(end_time - now) / 1000.,
+                reactor=self.hs.get_reactor(),
             )
+
             try:
                 with PreserveLoggingContext():
                     yield listener.deferred
-            except DeferredTimeoutError:
+            except defer.TimeoutError:
                 break
             except defer.CancelledError:
                 break