summary refs log tree commit diff
path: root/synapse/push/emailpusher.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/push/emailpusher.py')
-rw-r--r--synapse/push/emailpusher.py47
1 files changed, 39 insertions, 8 deletions
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index f9954df392..74e3a70562 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -19,6 +19,7 @@ import logging
 
 from synapse.util.metrics import Measure
 from synapse.util.async import run_on_reactor
+from synapse.util.logcontext import LoggingContext
 
 logger = logging.getLogger(__name__)
 
@@ -56,6 +57,8 @@ class EmailPusher(object):
         # See httppusher
         self.max_stream_ordering = None
 
+        self.processing = False
+
     @defer.inlineCallbacks
     def on_started(self):
         self.throttle_params = yield self.store.get_throttle_params_by_room(
@@ -63,20 +66,48 @@ class EmailPusher(object):
         )
         yield self._process()
 
+    def on_stop(self):
+        if self.timed_call:
+            self.timed_call.cancel()
+
     @defer.inlineCallbacks
     def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
-        with Measure(self.clock, "push.on_new_notifications"):
-            self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering)
-            yield self._process()
+        self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering)
+        yield self._process()
 
     @defer.inlineCallbacks
     def on_timer(self):
         self.timed_call = None
-        with Measure(self.clock, "push.on_timer"):
-            yield self._process()
+        yield self._process()
 
     @defer.inlineCallbacks
     def _process(self):
+        if self.processing:
+            return
+
+        with LoggingContext("emailpush._process"):
+            with Measure(self.clock, "emailpush._process"):
+                try:
+                    self.processing = True
+                    # if the max ordering changes while we're running _unsafe_process,
+                    # call it again, and so on until we've caught up.
+                    while True:
+                        starting_max_ordering = self.max_stream_ordering
+                        try:
+                            yield self._unsafe_process()
+                        except:
+                            logger.exception("Exception processing notifs")
+                        if self.max_stream_ordering == starting_max_ordering:
+                            break
+                finally:
+                    self.processing = False
+
+    def _unsafe_process(self):
+        """
+        Main logic of the push loop without the wrapper function that sets
+        up logging, measures and guards against multiple instances of it
+        being run.
+        """
         last_notifs = yield self.store.get_time_of_latest_push_action_by_room_for_user(
             self.user_id
         )
@@ -118,9 +149,9 @@ class EmailPusher(object):
                 if soonest_due_at is None or should_notify_at < soonest_due_at:
                     soonest_due_at = should_notify_at
 
-        if self.timed_call is not None:
-            self.timed_call.cancel()
-            self.timed_call = None
+                if self.timed_call is not None:
+                    self.timed_call.cancel()
+                    self.timed_call = None
 
         if soonest_due_at is not None:
             self.timed_call = reactor.callLater(