Add single instance & logging stuff
Copy the stuff over from http pusher that prevents multiple instances of process running at once and sets up logging and measure blocks.
1 files changed, 39 insertions, 8 deletions
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index f9954df392..74e3a70562 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -19,6 +19,7 @@ import logging
from synapse.util.metrics import Measure
from synapse.util.async import run_on_reactor
+from synapse.util.logcontext import LoggingContext
logger = logging.getLogger(__name__)
@@ -56,6 +57,8 @@ class EmailPusher(object):
# See httppusher
self.max_stream_ordering = None
+ self.processing = False
+
@defer.inlineCallbacks
def on_started(self):
self.throttle_params = yield self.store.get_throttle_params_by_room(
@@ -63,20 +66,48 @@ class EmailPusher(object):
)
yield self._process()
+ def on_stop(self):
+ if self.timed_call:
+ self.timed_call.cancel()
+
@defer.inlineCallbacks
def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
- with Measure(self.clock, "push.on_new_notifications"):
- self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering)
- yield self._process()
+ self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering)
+ yield self._process()
@defer.inlineCallbacks
def on_timer(self):
self.timed_call = None
- with Measure(self.clock, "push.on_timer"):
- yield self._process()
+ yield self._process()
@defer.inlineCallbacks
def _process(self):
+ if self.processing:
+ return
+
+ with LoggingContext("emailpush._process"):
+ with Measure(self.clock, "emailpush._process"):
+ try:
+ self.processing = True
+ # if the max ordering changes while we're running _unsafe_process,
+ # call it again, and so on until we've caught up.
+ while True:
+ starting_max_ordering = self.max_stream_ordering
+ try:
+ yield self._unsafe_process()
+ except:
+ logger.exception("Exception processing notifs")
+ if self.max_stream_ordering == starting_max_ordering:
+ break
+ finally:
+ self.processing = False
+
+ def _unsafe_process(self):
+ """
+ Main logic of the push loop without the wrapper function that sets
+ up logging, measures and guards against multiple instances of it
+ being run.
+ """
last_notifs = yield self.store.get_time_of_latest_push_action_by_room_for_user(
self.user_id
)
@@ -118,9 +149,9 @@ class EmailPusher(object):
if soonest_due_at is None or should_notify_at < soonest_due_at:
soonest_due_at = should_notify_at
- if self.timed_call is not None:
- self.timed_call.cancel()
- self.timed_call = None
+ if self.timed_call is not None:
+ self.timed_call.cancel()
+ self.timed_call = None
if soonest_due_at is not None:
self.timed_call = reactor.callLater(
|