diff options
Diffstat (limited to 'synapse/push/emailpusher.py')
-rw-r--r-- | synapse/push/emailpusher.py | 72 |
1 files changed, 38 insertions, 34 deletions
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index d746371420..50e1007d84 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -18,8 +18,7 @@ import logging from twisted.internet import defer from twisted.internet.error import AlreadyCalled, AlreadyCancelled -from synapse.util.logcontext import LoggingContext -from synapse.util.metrics import Measure +from synapse.metrics.background_process_metrics import run_as_background_process logger = logging.getLogger(__name__) @@ -71,18 +70,11 @@ class EmailPusher(object): # See httppusher self.max_stream_ordering = None - self.processing = False + self._is_processing = False - @defer.inlineCallbacks def on_started(self): if self.mailer is not None: - try: - self.throttle_params = yield self.store.get_throttle_params_by_room( - self.pusher_id - ) - yield self._process() - except Exception: - logger.exception("Error starting email pusher") + self._start_processing() def on_stop(self): if self.timed_call: @@ -92,43 +84,55 @@ class EmailPusher(object): pass self.timed_call = None - @defer.inlineCallbacks def on_new_notifications(self, min_stream_ordering, max_stream_ordering): - self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering) - yield self._process() + if self.max_stream_ordering: + self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering) + else: + self.max_stream_ordering = max_stream_ordering + self._start_processing() def on_new_receipts(self, min_stream_id, max_stream_id): # We could wake up and cancel the timer but there tend to be quite a # lot of read receipts so it's probably less work to just let the # timer fire - return defer.succeed(None) + pass - @defer.inlineCallbacks def on_timer(self): self.timed_call = None - yield self._process() + self._start_processing() + + def _start_processing(self): + if self._is_processing: + return + + run_as_background_process("emailpush.process", self._process) @defer.inlineCallbacks def _process(self): - if self.processing: - return + # we should never get here if we are already processing + assert not self._is_processing + + try: + self._is_processing = True + + if self.throttle_params is None: + # this is our first loop: load up the throttle params + self.throttle_params = yield self.store.get_throttle_params_by_room( + self.pusher_id + ) - with LoggingContext("emailpush._process"): - with Measure(self.clock, "emailpush._process"): + # if the max ordering changes while we're running _unsafe_process, + # call it again, and so on until we've caught up. + while True: + starting_max_ordering = self.max_stream_ordering try: - self.processing = True - # if the max ordering changes while we're running _unsafe_process, - # call it again, and so on until we've caught up. - while True: - starting_max_ordering = self.max_stream_ordering - try: - yield self._unsafe_process() - except Exception: - logger.exception("Exception processing notifs") - if self.max_stream_ordering == starting_max_ordering: - break - finally: - self.processing = False + yield self._unsafe_process() + except Exception: + logger.exception("Exception processing notifs") + if self.max_stream_ordering == starting_max_ordering: + break + finally: + self._is_processing = False @defer.inlineCallbacks def _unsafe_process(self): |