summary refs log tree commit diff
path: root/synapse/push/emailpusher.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/push/emailpusher.py')
-rw-r--r--synapse/push/emailpusher.py82
1 files changed, 51 insertions, 31 deletions
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index e8ee67401f..424ffa8b68 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -32,13 +32,13 @@ DELAY_BEFORE_MAIL_MS = 10 * 60 * 1000
 THROTTLE_START_MS = 10 * 60 * 1000
 THROTTLE_MAX_MS = 24 * 60 * 60 * 1000  # 24h
 # THROTTLE_MULTIPLIER = 6              # 10 mins, 1 hour, 6 hours, 24 hours
-THROTTLE_MULTIPLIER = 144              # 10 mins, 24 hours - i.e. jump straight to 1 day
+THROTTLE_MULTIPLIER = 144  # 10 mins, 24 hours - i.e. jump straight to 1 day
 
 # If no event triggers a notification for this long after the previous,
 # the throttle is released.
 # 12 hours - a gap of 12 hours in conversation is surely enough to merit a new
 # notification when things get going again...
-THROTTLE_RESET_AFTER_MS = (12 * 60 * 60 * 1000)
+THROTTLE_RESET_AFTER_MS = 12 * 60 * 60 * 1000
 
 # does each email include all unread notifs, or just the ones which have happened
 # since the last mail?
@@ -53,17 +53,18 @@ class EmailPusher(object):
     This shares quite a bit of code with httpusher: it would be good to
     factor out the common parts
     """
+
     def __init__(self, hs, pusherdict, mailer):
         self.hs = hs
         self.mailer = mailer
 
         self.store = self.hs.get_datastore()
         self.clock = self.hs.get_clock()
-        self.pusher_id = pusherdict['id']
-        self.user_id = pusherdict['user_name']
-        self.app_id = pusherdict['app_id']
-        self.email = pusherdict['pushkey']
-        self.last_stream_ordering = pusherdict['last_stream_ordering']
+        self.pusher_id = pusherdict["id"]
+        self.user_id = pusherdict["user_name"]
+        self.app_id = pusherdict["app_id"]
+        self.email = pusherdict["pushkey"]
+        self.last_stream_ordering = pusherdict["last_stream_ordering"]
         self.timed_call = None
         self.throttle_params = None
 
@@ -93,7 +94,9 @@ class EmailPusher(object):
 
     def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
         if self.max_stream_ordering:
-            self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering)
+            self.max_stream_ordering = max(
+                max_stream_ordering, self.max_stream_ordering
+            )
         else:
             self.max_stream_ordering = max_stream_ordering
         self._start_processing()
@@ -114,6 +117,21 @@ class EmailPusher(object):
 
         run_as_background_process("emailpush.process", self._process)
 
+    def _pause_processing(self):
+        """Used by tests to temporarily pause processing of events.
+
+        Asserts that its not currently processing.
+        """
+        assert not self._is_processing
+        self._is_processing = True
+
+    def _resume_processing(self):
+        """Used by tests to resume processing of events after pausing.
+        """
+        assert self._is_processing
+        self._is_processing = False
+        self._start_processing()
+
     @defer.inlineCallbacks
     def _process(self):
         # we should never get here if we are already processing
@@ -159,14 +177,12 @@ class EmailPusher(object):
             return
 
         for push_action in unprocessed:
-            received_at = push_action['received_ts']
+            received_at = push_action["received_ts"]
             if received_at is None:
                 received_at = 0
             notif_ready_at = received_at + DELAY_BEFORE_MAIL_MS
 
-            room_ready_at = self.room_ready_to_notify_at(
-                push_action['room_id']
-            )
+            room_ready_at = self.room_ready_to_notify_at(push_action["room_id"])
 
             should_notify_at = max(notif_ready_at, room_ready_at)
 
@@ -177,25 +193,23 @@ class EmailPusher(object):
                 # to be delivered.
 
                 reason = {
-                    'room_id': push_action['room_id'],
-                    'now': self.clock.time_msec(),
-                    'received_at': received_at,
-                    'delay_before_mail_ms': DELAY_BEFORE_MAIL_MS,
-                    'last_sent_ts': self.get_room_last_sent_ts(push_action['room_id']),
-                    'throttle_ms': self.get_room_throttle_ms(push_action['room_id']),
+                    "room_id": push_action["room_id"],
+                    "now": self.clock.time_msec(),
+                    "received_at": received_at,
+                    "delay_before_mail_ms": DELAY_BEFORE_MAIL_MS,
+                    "last_sent_ts": self.get_room_last_sent_ts(push_action["room_id"]),
+                    "throttle_ms": self.get_room_throttle_ms(push_action["room_id"]),
                 }
 
                 yield self.send_notification(unprocessed, reason)
 
-                yield self.save_last_stream_ordering_and_success(max([
-                    ea['stream_ordering'] for ea in unprocessed
-                ]))
+                yield self.save_last_stream_ordering_and_success(
+                    max([ea["stream_ordering"] for ea in unprocessed])
+                )
 
                 # we update the throttle on all the possible unprocessed push actions
                 for ea in unprocessed:
-                    yield self.sent_notif_update_throttle(
-                        ea['room_id'], ea
-                    )
+                    yield self.sent_notif_update_throttle(ea["room_id"], ea)
                 break
             else:
                 if soonest_due_at is None or should_notify_at < soonest_due_at:
@@ -215,10 +229,17 @@ class EmailPusher(object):
 
     @defer.inlineCallbacks
     def save_last_stream_ordering_and_success(self, last_stream_ordering):
+        if last_stream_ordering is None:
+            # This happens if we haven't yet processed anything
+            return
+
         self.last_stream_ordering = last_stream_ordering
         yield self.store.update_pusher_last_stream_ordering_and_success(
-            self.app_id, self.email, self.user_id,
-            last_stream_ordering, self.clock.time_msec()
+            self.app_id,
+            self.email,
+            self.user_id,
+            last_stream_ordering,
+            self.clock.time_msec(),
         )
 
     def seconds_until(self, ts_msec):
@@ -257,10 +278,10 @@ class EmailPusher(object):
         # THROTTLE_RESET_AFTER_MS after the previous one that triggered a
         # notif, we release the throttle. Otherwise, the throttle is increased.
         time_of_previous_notifs = yield self.store.get_time_of_last_push_action_before(
-            notified_push_action['stream_ordering']
+            notified_push_action["stream_ordering"]
         )
 
-        time_of_this_notifs = notified_push_action['received_ts']
+        time_of_this_notifs = notified_push_action["received_ts"]
 
         if time_of_previous_notifs is not None and time_of_this_notifs is not None:
             gap = time_of_this_notifs - time_of_previous_notifs
@@ -279,12 +300,11 @@ class EmailPusher(object):
                 new_throttle_ms = THROTTLE_START_MS
             else:
                 new_throttle_ms = min(
-                    current_throttle_ms * THROTTLE_MULTIPLIER,
-                    THROTTLE_MAX_MS
+                    current_throttle_ms * THROTTLE_MULTIPLIER, THROTTLE_MAX_MS
                 )
         self.throttle_params[room_id] = {
             "last_sent_ts": self.clock.time_msec(),
-            "throttle_ms": new_throttle_ms
+            "throttle_ms": new_throttle_ms,
         }
         yield self.store.set_throttle_params(
             self.pusher_id, room_id, self.throttle_params[room_id]