summary refs log tree commit diff
path: root/synapse/push/emailpusher.py
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2018-10-22 16:12:11 +0100
committerRichard van der Hoff <richard@matrix.org>2018-10-22 16:12:11 +0100
commitc7273c11bc73630e3f2ab1a82d20a40d8b17f9a3 (patch)
tree659a03763a1c6492399f2f34d625d482961c0126 /synapse/push/emailpusher.py
parentmove get_all_pushers call down (diff)
downloadsynapse-c7273c11bc73630e3f2ab1a82d20a40d8b17f9a3.tar.xz
Give pushers their own background logcontext
Each pusher has its own loop which runs for as long as it has work to do. This
should run in its own background thread with its own logcontext, as other
similar loops elsewhere in the system do - which means that CPU usage is
consistently attributed to that loop, rather than to whatever request happened
to start the loop.
Diffstat (limited to 'synapse/push/emailpusher.py')
-rw-r--r--synapse/push/emailpusher.py48
1 files changed, 24 insertions, 24 deletions
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index d746371420..0c9c0201e8 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -18,8 +18,7 @@ import logging
 from twisted.internet import defer
 from twisted.internet.error import AlreadyCalled, AlreadyCancelled
 
-from synapse.util.logcontext import LoggingContext
-from synapse.util.metrics import Measure
+from synapse.metrics.background_process_metrics import run_as_background_process
 
 logger = logging.getLogger(__name__)
 
@@ -80,7 +79,7 @@ class EmailPusher(object):
                 self.throttle_params = yield self.store.get_throttle_params_by_room(
                     self.pusher_id
                 )
-                yield self._process()
+                self._start_processing()
             except Exception:
                 logger.exception("Error starting email pusher")
 
@@ -92,10 +91,10 @@ class EmailPusher(object):
                 pass
             self.timed_call = None
 
-    @defer.inlineCallbacks
     def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
         self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering)
-        yield self._process()
+        self._start_processing()
+        return defer.succeed(None)
 
     def on_new_receipts(self, min_stream_id, max_stream_id):
         # We could wake up and cancel the timer but there tend to be quite a
@@ -103,32 +102,33 @@ class EmailPusher(object):
         # timer fire
         return defer.succeed(None)
 
-    @defer.inlineCallbacks
     def on_timer(self):
         self.timed_call = None
-        yield self._process()
+        self._start_processing()
 
-    @defer.inlineCallbacks
-    def _process(self):
+    def _start_processing(self):
         if self.processing:
             return
 
-        with LoggingContext("emailpush._process"):
-            with Measure(self.clock, "emailpush._process"):
+        run_as_background_process("emailpush.process", self._process)
+
+    @defer.inlineCallbacks
+    def _process(self):
+        try:
+            self.processing = True
+
+            # if the max ordering changes while we're running _unsafe_process,
+            # call it again, and so on until we've caught up.
+            while True:
+                starting_max_ordering = self.max_stream_ordering
                 try:
-                    self.processing = True
-                    # if the max ordering changes while we're running _unsafe_process,
-                    # call it again, and so on until we've caught up.
-                    while True:
-                        starting_max_ordering = self.max_stream_ordering
-                        try:
-                            yield self._unsafe_process()
-                        except Exception:
-                            logger.exception("Exception processing notifs")
-                        if self.max_stream_ordering == starting_max_ordering:
-                            break
-                finally:
-                    self.processing = False
+                    yield self._unsafe_process()
+                except Exception:
+                    logger.exception("Exception processing notifs")
+                if self.max_stream_ordering == starting_max_ordering:
+                    break
+        finally:
+            self.processing = False
 
     @defer.inlineCallbacks
     def _unsafe_process(self):