Give pushers their own background logcontext
Each pusher has its own loop which runs for as long as it has work to do. This
should run in its own background thread with its own logcontext, as other
similar loops elsewhere in the system do - which means that CPU usage is
consistently attributed to that loop, rather than to whatever request happened
to start the loop.
1 files changed, 24 insertions, 24 deletions
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index d746371420..0c9c0201e8 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -18,8 +18,7 @@ import logging
from twisted.internet import defer
from twisted.internet.error import AlreadyCalled, AlreadyCancelled
-from synapse.util.logcontext import LoggingContext
-from synapse.util.metrics import Measure
+from synapse.metrics.background_process_metrics import run_as_background_process
logger = logging.getLogger(__name__)
@@ -80,7 +79,7 @@ class EmailPusher(object):
self.throttle_params = yield self.store.get_throttle_params_by_room(
self.pusher_id
)
- yield self._process()
+ self._start_processing()
except Exception:
logger.exception("Error starting email pusher")
@@ -92,10 +91,10 @@ class EmailPusher(object):
pass
self.timed_call = None
- @defer.inlineCallbacks
def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering)
- yield self._process()
+ self._start_processing()
+ return defer.succeed(None)
def on_new_receipts(self, min_stream_id, max_stream_id):
# We could wake up and cancel the timer but there tend to be quite a
@@ -103,32 +102,33 @@ class EmailPusher(object):
# timer fire
return defer.succeed(None)
- @defer.inlineCallbacks
def on_timer(self):
self.timed_call = None
- yield self._process()
+ self._start_processing()
- @defer.inlineCallbacks
- def _process(self):
+ def _start_processing(self):
if self.processing:
return
- with LoggingContext("emailpush._process"):
- with Measure(self.clock, "emailpush._process"):
+ run_as_background_process("emailpush.process", self._process)
+
+ @defer.inlineCallbacks
+ def _process(self):
+ try:
+ self.processing = True
+
+ # if the max ordering changes while we're running _unsafe_process,
+ # call it again, and so on until we've caught up.
+ while True:
+ starting_max_ordering = self.max_stream_ordering
try:
- self.processing = True
- # if the max ordering changes while we're running _unsafe_process,
- # call it again, and so on until we've caught up.
- while True:
- starting_max_ordering = self.max_stream_ordering
- try:
- yield self._unsafe_process()
- except Exception:
- logger.exception("Exception processing notifs")
- if self.max_stream_ordering == starting_max_ordering:
- break
- finally:
- self.processing = False
+ yield self._unsafe_process()
+ except Exception:
+ logger.exception("Exception processing notifs")
+ if self.max_stream_ordering == starting_max_ordering:
+ break
+ finally:
+ self.processing = False
@defer.inlineCallbacks
def _unsafe_process(self):
|