summary refs log tree commit diff
path: root/synapse/push
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2018-10-22 16:12:11 +0100
committerRichard van der Hoff <richard@matrix.org>2018-10-22 16:12:11 +0100
commitc7273c11bc73630e3f2ab1a82d20a40d8b17f9a3 (patch)
tree659a03763a1c6492399f2f34d625d482961c0126 /synapse/push
parentmove get_all_pushers call down (diff)
downloadsynapse-c7273c11bc73630e3f2ab1a82d20a40d8b17f9a3.tar.xz
Give pushers their own background logcontext
Each pusher has its own loop which runs for as long as it has work to do. This
should run in its own background thread with its own logcontext, as other
similar loops elsewhere in the system do - which means that CPU usage is
consistently attributed to that loop, rather than to whatever request happened
to start the loop.
Diffstat (limited to 'synapse/push')
-rw-r--r--synapse/push/emailpusher.py48
-rw-r--r--synapse/push/httppusher.py64
2 files changed, 54 insertions, 58 deletions
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index d746371420..0c9c0201e8 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -18,8 +18,7 @@ import logging
 from twisted.internet import defer
 from twisted.internet.error import AlreadyCalled, AlreadyCancelled
 
-from synapse.util.logcontext import LoggingContext
-from synapse.util.metrics import Measure
+from synapse.metrics.background_process_metrics import run_as_background_process
 
 logger = logging.getLogger(__name__)
 
@@ -80,7 +79,7 @@ class EmailPusher(object):
                 self.throttle_params = yield self.store.get_throttle_params_by_room(
                     self.pusher_id
                 )
-                yield self._process()
+                self._start_processing()
             except Exception:
                 logger.exception("Error starting email pusher")
 
@@ -92,10 +91,10 @@ class EmailPusher(object):
                 pass
             self.timed_call = None
 
-    @defer.inlineCallbacks
     def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
         self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering)
-        yield self._process()
+        self._start_processing()
+        return defer.succeed(None)
 
     def on_new_receipts(self, min_stream_id, max_stream_id):
         # We could wake up and cancel the timer but there tend to be quite a
@@ -103,32 +102,33 @@ class EmailPusher(object):
         # timer fire
         return defer.succeed(None)
 
-    @defer.inlineCallbacks
     def on_timer(self):
         self.timed_call = None
-        yield self._process()
+        self._start_processing()
 
-    @defer.inlineCallbacks
-    def _process(self):
+    def _start_processing(self):
         if self.processing:
             return
 
-        with LoggingContext("emailpush._process"):
-            with Measure(self.clock, "emailpush._process"):
+        run_as_background_process("emailpush.process", self._process)
+
+    @defer.inlineCallbacks
+    def _process(self):
+        try:
+            self.processing = True
+
+            # if the max ordering changes while we're running _unsafe_process,
+            # call it again, and so on until we've caught up.
+            while True:
+                starting_max_ordering = self.max_stream_ordering
                 try:
-                    self.processing = True
-                    # if the max ordering changes while we're running _unsafe_process,
-                    # call it again, and so on until we've caught up.
-                    while True:
-                        starting_max_ordering = self.max_stream_ordering
-                        try:
-                            yield self._unsafe_process()
-                        except Exception:
-                            logger.exception("Exception processing notifs")
-                        if self.max_stream_ordering == starting_max_ordering:
-                            break
-                finally:
-                    self.processing = False
+                    yield self._unsafe_process()
+                except Exception:
+                    logger.exception("Exception processing notifs")
+                if self.max_stream_ordering == starting_max_ordering:
+                    break
+        finally:
+            self.processing = False
 
     @defer.inlineCallbacks
     def _unsafe_process(self):
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 48abd5e4d6..5f6b21bc67 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -22,9 +22,8 @@ from prometheus_client import Counter
 from twisted.internet import defer
 from twisted.internet.error import AlreadyCalled, AlreadyCancelled
 
+from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.push import PusherConfigException
-from synapse.util.logcontext import LoggingContext
-from synapse.util.metrics import Measure
 
 from . import push_rule_evaluator, push_tools
 
@@ -92,34 +91,30 @@ class HttpPusher(object):
         self.data_minus_url.update(self.data)
         del self.data_minus_url['url']
 
-    @defer.inlineCallbacks
     def on_started(self):
-        try:
-            yield self._process()
-        except Exception:
-            logger.exception("Error starting http pusher")
+        self._start_processing()
+        return defer.succeed(None)
 
-    @defer.inlineCallbacks
     def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
         self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering or 0)
-        yield self._process()
+        self._start_processing()
+        return defer.suceed(None)
 
-    @defer.inlineCallbacks
     def on_new_receipts(self, min_stream_id, max_stream_id):
         # Note that the min here shouldn't be relied upon to be accurate.
 
         # We could check the receipts are actually m.read receipts here,
         # but currently that's the only type of receipt anyway...
-        with LoggingContext("push.on_new_receipts"):
-            with Measure(self.clock, "push.on_new_receipts"):
-                badge = yield push_tools.get_badge_count(
-                    self.hs.get_datastore(), self.user_id
-                )
-            yield self._send_badge(badge)
+        run_as_background_process("http_pusher.on_new_receipts", self._update_badge)
+        return defer.succeed(None)
 
     @defer.inlineCallbacks
+    def _update_badge(self):
+        badge = yield push_tools.get_badge_count(self.hs.get_datastore(), self.user_id)
+        yield self._send_badge(badge)
+
     def on_timer(self):
-        yield self._process()
+        self._start_processing()
 
     def on_stop(self):
         if self.timed_call:
@@ -129,27 +124,28 @@ class HttpPusher(object):
                 pass
             self.timed_call = None
 
-    @defer.inlineCallbacks
-    def _process(self):
+    def _start_processing(self):
         if self.processing:
             return
 
-        with LoggingContext("push._process"):
-            with Measure(self.clock, "push._process"):
+        run_as_background_process("httppush.process", self._process)
+
+    @defer.inlineCallbacks
+    def _process(self):
+        try:
+            self.processing = True
+            # if the max ordering changes while we're running _unsafe_process,
+            # call it again, and so on until we've caught up.
+            while True:
+                starting_max_ordering = self.max_stream_ordering
                 try:
-                    self.processing = True
-                    # if the max ordering changes while we're running _unsafe_process,
-                    # call it again, and so on until we've caught up.
-                    while True:
-                        starting_max_ordering = self.max_stream_ordering
-                        try:
-                            yield self._unsafe_process()
-                        except Exception:
-                            logger.exception("Exception processing notifs")
-                        if self.max_stream_ordering == starting_max_ordering:
-                            break
-                finally:
-                    self.processing = False
+                    yield self._unsafe_process()
+                except Exception:
+                    logger.exception("Exception processing notifs")
+                if self.max_stream_ordering == starting_max_ordering:
+                    break
+        finally:
+            self.processing = False
 
     @defer.inlineCallbacks
     def _unsafe_process(self):