diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index d746371420..0c9c0201e8 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -18,8 +18,7 @@ import logging
from twisted.internet import defer
from twisted.internet.error import AlreadyCalled, AlreadyCancelled
-from synapse.util.logcontext import LoggingContext
-from synapse.util.metrics import Measure
+from synapse.metrics.background_process_metrics import run_as_background_process
logger = logging.getLogger(__name__)
@@ -80,7 +79,7 @@ class EmailPusher(object):
self.throttle_params = yield self.store.get_throttle_params_by_room(
self.pusher_id
)
- yield self._process()
+ self._start_processing()
except Exception:
logger.exception("Error starting email pusher")
@@ -92,10 +91,10 @@ class EmailPusher(object):
pass
self.timed_call = None
- @defer.inlineCallbacks
def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering)
- yield self._process()
+ self._start_processing()
+ return defer.succeed(None)
def on_new_receipts(self, min_stream_id, max_stream_id):
# We could wake up and cancel the timer but there tend to be quite a
@@ -103,32 +102,33 @@ class EmailPusher(object):
# timer fire
return defer.succeed(None)
- @defer.inlineCallbacks
def on_timer(self):
self.timed_call = None
- yield self._process()
+ self._start_processing()
- @defer.inlineCallbacks
- def _process(self):
+ def _start_processing(self):
if self.processing:
return
- with LoggingContext("emailpush._process"):
- with Measure(self.clock, "emailpush._process"):
+ run_as_background_process("emailpush.process", self._process)
+
+ @defer.inlineCallbacks
+ def _process(self):
+ try:
+ self.processing = True
+
+ # if the max ordering changes while we're running _unsafe_process,
+ # call it again, and so on until we've caught up.
+ while True:
+ starting_max_ordering = self.max_stream_ordering
try:
- self.processing = True
- # if the max ordering changes while we're running _unsafe_process,
- # call it again, and so on until we've caught up.
- while True:
- starting_max_ordering = self.max_stream_ordering
- try:
- yield self._unsafe_process()
- except Exception:
- logger.exception("Exception processing notifs")
- if self.max_stream_ordering == starting_max_ordering:
- break
- finally:
- self.processing = False
+ yield self._unsafe_process()
+ except Exception:
+ logger.exception("Exception processing notifs")
+ if self.max_stream_ordering == starting_max_ordering:
+ break
+ finally:
+ self.processing = False
@defer.inlineCallbacks
def _unsafe_process(self):
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 48abd5e4d6..5f6b21bc67 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -22,9 +22,8 @@ from prometheus_client import Counter
from twisted.internet import defer
from twisted.internet.error import AlreadyCalled, AlreadyCancelled
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push import PusherConfigException
-from synapse.util.logcontext import LoggingContext
-from synapse.util.metrics import Measure
from . import push_rule_evaluator, push_tools
@@ -92,34 +91,30 @@ class HttpPusher(object):
self.data_minus_url.update(self.data)
del self.data_minus_url['url']
- @defer.inlineCallbacks
def on_started(self):
- try:
- yield self._process()
- except Exception:
- logger.exception("Error starting http pusher")
+ self._start_processing()
+ return defer.succeed(None)
- @defer.inlineCallbacks
def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering or 0)
- yield self._process()
+ self._start_processing()
+ return defer.suceed(None)
- @defer.inlineCallbacks
def on_new_receipts(self, min_stream_id, max_stream_id):
# Note that the min here shouldn't be relied upon to be accurate.
# We could check the receipts are actually m.read receipts here,
# but currently that's the only type of receipt anyway...
- with LoggingContext("push.on_new_receipts"):
- with Measure(self.clock, "push.on_new_receipts"):
- badge = yield push_tools.get_badge_count(
- self.hs.get_datastore(), self.user_id
- )
- yield self._send_badge(badge)
+ run_as_background_process("http_pusher.on_new_receipts", self._update_badge)
+ return defer.succeed(None)
@defer.inlineCallbacks
+ def _update_badge(self):
+ badge = yield push_tools.get_badge_count(self.hs.get_datastore(), self.user_id)
+ yield self._send_badge(badge)
+
def on_timer(self):
- yield self._process()
+ self._start_processing()
def on_stop(self):
if self.timed_call:
@@ -129,27 +124,28 @@ class HttpPusher(object):
pass
self.timed_call = None
- @defer.inlineCallbacks
- def _process(self):
+ def _start_processing(self):
if self.processing:
return
- with LoggingContext("push._process"):
- with Measure(self.clock, "push._process"):
+ run_as_background_process("httppush.process", self._process)
+
+ @defer.inlineCallbacks
+ def _process(self):
+ try:
+ self.processing = True
+ # if the max ordering changes while we're running _unsafe_process,
+ # call it again, and so on until we've caught up.
+ while True:
+ starting_max_ordering = self.max_stream_ordering
try:
- self.processing = True
- # if the max ordering changes while we're running _unsafe_process,
- # call it again, and so on until we've caught up.
- while True:
- starting_max_ordering = self.max_stream_ordering
- try:
- yield self._unsafe_process()
- except Exception:
- logger.exception("Exception processing notifs")
- if self.max_stream_ordering == starting_max_ordering:
- break
- finally:
- self.processing = False
+ yield self._unsafe_process()
+ except Exception:
+ logger.exception("Exception processing notifs")
+ if self.max_stream_ordering == starting_max_ordering:
+ break
+ finally:
+ self.processing = False
@defer.inlineCallbacks
def _unsafe_process(self):
|