summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/push/emailpusher.py48
-rw-r--r--synapse/push/httppusher.py64
2 files changed, 54 insertions, 58 deletions
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index d746371420..0c9c0201e8 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -18,8 +18,7 @@ import logging
 from twisted.internet import defer
 from twisted.internet.error import AlreadyCalled, AlreadyCancelled
 
-from synapse.util.logcontext import LoggingContext
-from synapse.util.metrics import Measure
+from synapse.metrics.background_process_metrics import run_as_background_process
 
 logger = logging.getLogger(__name__)
 
@@ -80,7 +79,7 @@ class EmailPusher(object):
                 self.throttle_params = yield self.store.get_throttle_params_by_room(
                     self.pusher_id
                 )
-                yield self._process()
+                self._start_processing()
             except Exception:
                 logger.exception("Error starting email pusher")
 
@@ -92,10 +91,10 @@ class EmailPusher(object):
                 pass
             self.timed_call = None
 
-    @defer.inlineCallbacks
     def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
         self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering)
-        yield self._process()
+        self._start_processing()
+        return defer.succeed(None)
 
     def on_new_receipts(self, min_stream_id, max_stream_id):
         # We could wake up and cancel the timer but there tend to be quite a
@@ -103,32 +102,33 @@ class EmailPusher(object):
         # timer fire
         return defer.succeed(None)
 
-    @defer.inlineCallbacks
     def on_timer(self):
         self.timed_call = None
-        yield self._process()
+        self._start_processing()
 
-    @defer.inlineCallbacks
-    def _process(self):
+    def _start_processing(self):
         if self.processing:
             return
 
-        with LoggingContext("emailpush._process"):
-            with Measure(self.clock, "emailpush._process"):
+        run_as_background_process("emailpush.process", self._process)
+
+    @defer.inlineCallbacks
+    def _process(self):
+        try:
+            self.processing = True
+
+            # if the max ordering changes while we're running _unsafe_process,
+            # call it again, and so on until we've caught up.
+            while True:
+                starting_max_ordering = self.max_stream_ordering
                 try:
-                    self.processing = True
-                    # if the max ordering changes while we're running _unsafe_process,
-                    # call it again, and so on until we've caught up.
-                    while True:
-                        starting_max_ordering = self.max_stream_ordering
-                        try:
-                            yield self._unsafe_process()
-                        except Exception:
-                            logger.exception("Exception processing notifs")
-                        if self.max_stream_ordering == starting_max_ordering:
-                            break
-                finally:
-                    self.processing = False
+                    yield self._unsafe_process()
+                except Exception:
+                    logger.exception("Exception processing notifs")
+                if self.max_stream_ordering == starting_max_ordering:
+                    break
+        finally:
+            self.processing = False
 
     @defer.inlineCallbacks
     def _unsafe_process(self):
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 48abd5e4d6..5f6b21bc67 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -22,9 +22,8 @@ from prometheus_client import Counter
 from twisted.internet import defer
 from twisted.internet.error import AlreadyCalled, AlreadyCancelled
 
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.push import PusherConfigException
-from synapse.util.logcontext import LoggingContext
-from synapse.util.metrics import Measure
 
 from . import push_rule_evaluator, push_tools
 
@@ -92,34 +91,30 @@ class HttpPusher(object):
         self.data_minus_url.update(self.data)
         del self.data_minus_url['url']
 
-    @defer.inlineCallbacks
     def on_started(self):
-        try:
-            yield self._process()
-        except Exception:
-            logger.exception("Error starting http pusher")
+        self._start_processing()
+        return defer.succeed(None)
 
-    @defer.inlineCallbacks
     def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
         self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering or 0)
-        yield self._process()
+        self._start_processing()
+        return defer.suceed(None)
 
-    @defer.inlineCallbacks
     def on_new_receipts(self, min_stream_id, max_stream_id):
         # Note that the min here shouldn't be relied upon to be accurate.
 
         # We could check the receipts are actually m.read receipts here,
         # but currently that's the only type of receipt anyway...
-        with LoggingContext("push.on_new_receipts"):
-            with Measure(self.clock, "push.on_new_receipts"):
-                badge = yield push_tools.get_badge_count(
-                    self.hs.get_datastore(), self.user_id
-                )
-            yield self._send_badge(badge)
+        run_as_background_process("http_pusher.on_new_receipts", self._update_badge)
+        return defer.succeed(None)
 
     @defer.inlineCallbacks
+    def _update_badge(self):
+        badge = yield push_tools.get_badge_count(self.hs.get_datastore(), self.user_id)
+        yield self._send_badge(badge)
+
     def on_timer(self):
-        yield self._process()
+        self._start_processing()
 
     def on_stop(self):
         if self.timed_call:
@@ -129,27 +124,28 @@ class HttpPusher(object):
                 pass
             self.timed_call = None
 
-    @defer.inlineCallbacks
-    def _process(self):
+    def _start_processing(self):
         if self.processing:
             return
 
-        with LoggingContext("push._process"):
-            with Measure(self.clock, "push._process"):
+        run_as_background_process("httppush.process", self._process)
+
+    @defer.inlineCallbacks
+    def _process(self):
+        try:
+            self.processing = True
+            # if the max ordering changes while we're running _unsafe_process,
+            # call it again, and so on until we've caught up.
+            while True:
+                starting_max_ordering = self.max_stream_ordering
                 try:
-                    self.processing = True
-                    # if the max ordering changes while we're running _unsafe_process,
-                    # call it again, and so on until we've caught up.
-                    while True:
-                        starting_max_ordering = self.max_stream_ordering
-                        try:
-                            yield self._unsafe_process()
-                        except Exception:
-                            logger.exception("Exception processing notifs")
-                        if self.max_stream_ordering == starting_max_ordering:
-                            break
-                finally:
-                    self.processing = False
+                    yield self._unsafe_process()
+                except Exception:
+                    logger.exception("Exception processing notifs")
+                if self.max_stream_ordering == starting_max_ordering:
+                    break
+        finally:
+            self.processing = False
 
     @defer.inlineCallbacks
     def _unsafe_process(self):