summary refs log tree commit diff
path: root/synapse/push/emailpusher.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/push/emailpusher.py82
1 files changed, 46 insertions, 36 deletions
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index c6763971ee..64a35c1994 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -14,12 +14,19 @@
 # limitations under the License.
 
 import logging
+from typing import TYPE_CHECKING, Any, Dict, List, Optional
 
+from twisted.internet.base import DelayedCall
 from twisted.internet.error import AlreadyCalled, AlreadyCancelled
 
 from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.push import Pusher
+from synapse.push.mailer import Mailer
 from synapse.types import RoomStreamToken
 
+if TYPE_CHECKING:
+    from synapse.app.homeserver import HomeServer
+
 logger = logging.getLogger(__name__)
 
 # The amount of time we always wait before ever emailing about a notification
@@ -46,7 +53,7 @@ THROTTLE_RESET_AFTER_MS = 12 * 60 * 60 * 1000
 INCLUDE_ALL_UNREAD_NOTIFS = False
 
 
-class EmailPusher:
+class EmailPusher(Pusher):
     """
     A pusher that sends email notifications about events (approximately)
     when they happen.
@@ -54,37 +61,31 @@ class EmailPusher:
     factor out the common parts
     """
 
-    def __init__(self, hs, pusherdict, mailer):
-        self.hs = hs
+    def __init__(self, hs: "HomeServer", pusherdict: Dict[str, Any], mailer: Mailer):
+        super().__init__(hs, pusherdict)
         self.mailer = mailer
 
         self.store = self.hs.get_datastore()
-        self.clock = self.hs.get_clock()
-        self.pusher_id = pusherdict["id"]
-        self.user_id = pusherdict["user_name"]
-        self.app_id = pusherdict["app_id"]
         self.email = pusherdict["pushkey"]
         self.last_stream_ordering = pusherdict["last_stream_ordering"]
-        self.timed_call = None
-        self.throttle_params = None
-
-        # See httppusher
-        self.max_stream_ordering = None
+        self.timed_call = None  # type: Optional[DelayedCall]
+        self.throttle_params = {}  # type: Dict[str, Dict[str, int]]
+        self._inited = False
 
         self._is_processing = False
 
-    def on_started(self, should_check_for_notifs):
+    def on_started(self, should_check_for_notifs: bool) -> None:
         """Called when this pusher has been started.
 
         Args:
-            should_check_for_notifs (bool): Whether we should immediately
+            should_check_for_notifs: Whether we should immediately
                 check for push to send. Set to False only if it's known there
                 is nothing to send
         """
         if should_check_for_notifs and self.mailer is not None:
             self._start_processing()
 
-    def on_stop(self):
+    def on_stop(self) -> None:
         if self.timed_call:
             try:
                 self.timed_call.cancel()
@@ -92,7 +93,7 @@ class EmailPusher:
                 pass
             self.timed_call = None
 
-    def on_new_notifications(self, max_token: RoomStreamToken):
+    def on_new_notifications(self, max_token: RoomStreamToken) -> None:
         # We just use the minimum stream ordering and ignore the vector clock
         # component. This is safe to do as long as we *always* ignore the vector
         # clock components.
@@ -106,23 +107,23 @@ class EmailPusher:
             self.max_stream_ordering = max_stream_ordering
         self._start_processing()
 
-    def on_new_receipts(self, min_stream_id, max_stream_id):
+    def on_new_receipts(self, min_stream_id: int, max_stream_id: int) -> None:
         # We could wake up and cancel the timer but there tend to be quite a
         # lot of read receipts so it's probably less work to just let the
         # timer fire
         pass
 
-    def on_timer(self):
+    def on_timer(self) -> None:
         self.timed_call = None
         self._start_processing()
 
-    def _start_processing(self):
+    def _start_processing(self) -> None:
         if self._is_processing:
             return
 
         run_as_background_process("emailpush.process", self._process)
 
-    def _pause_processing(self):
+    def _pause_processing(self) -> None:
         """Used by tests to temporarily pause processing of events.
 
         Asserts that its not currently processing.
@@ -130,25 +131,26 @@ class EmailPusher:
         assert not self._is_processing
         self._is_processing = True
 
-    def _resume_processing(self):
+    def _resume_processing(self) -> None:
         """Used by tests to resume processing of events after pausing.
         """
         assert self._is_processing
         self._is_processing = False
         self._start_processing()
 
-    async def _process(self):
+    async def _process(self) -> None:
         # we should never get here if we are already processing
         assert not self._is_processing
 
         try:
             self._is_processing = True
 
-            if self.throttle_params is None:
+            if not self._inited:
                 # this is our first loop: load up the throttle params
                 self.throttle_params = await self.store.get_throttle_params_by_room(
                     self.pusher_id
                 )
+                self._inited = True
 
             # if the max ordering changes while we're running _unsafe_process,
             # call it again, and so on until we've caught up.
@@ -163,17 +165,19 @@ class EmailPusher:
         finally:
             self._is_processing = False
 
-    async def _unsafe_process(self):
+    async def _unsafe_process(self) -> None:
         """
         Main logic of the push loop without the wrapper function that sets
         up logging, measures and guards against multiple instances of it
         being run.
         """
         start = 0 if INCLUDE_ALL_UNREAD_NOTIFS else self.last_stream_ordering
-        fn = self.store.get_unread_push_actions_for_user_in_range_for_email
-        unprocessed = await fn(self.user_id, start, self.max_stream_ordering)
+        assert self.max_stream_ordering is not None
+        unprocessed = await self.store.get_unread_push_actions_for_user_in_range_for_email(
+            self.user_id, start, self.max_stream_ordering
+        )
 
-        soonest_due_at = None
+        soonest_due_at = None  # type: Optional[int]
 
         if not unprocessed:
             await self.save_last_stream_ordering_and_success(self.max_stream_ordering)
@@ -230,7 +234,9 @@ class EmailPusher:
                 self.seconds_until(soonest_due_at), self.on_timer
             )
 
-    async def save_last_stream_ordering_and_success(self, last_stream_ordering):
+    async def save_last_stream_ordering_and_success(
+        self, last_stream_ordering: Optional[int]
+    ) -> None:
         if last_stream_ordering is None:
             # This happens if we haven't yet processed anything
             return
@@ -248,28 +254,30 @@ class EmailPusher:
             # lets just stop and return.
             self.on_stop()
 
-    def seconds_until(self, ts_msec):
+    def seconds_until(self, ts_msec: int) -> float:
         secs = (ts_msec - self.clock.time_msec()) / 1000
         return max(secs, 0)
 
-    def get_room_throttle_ms(self, room_id):
+    def get_room_throttle_ms(self, room_id: str) -> int:
         if room_id in self.throttle_params:
             return self.throttle_params[room_id]["throttle_ms"]
         else:
             return 0
 
-    def get_room_last_sent_ts(self, room_id):
+    def get_room_last_sent_ts(self, room_id: str) -> int:
         if room_id in self.throttle_params:
             return self.throttle_params[room_id]["last_sent_ts"]
         else:
             return 0
 
-    def room_ready_to_notify_at(self, room_id):
+    def room_ready_to_notify_at(self, room_id: str) -> int:
         """
         Determines whether throttling should prevent us from sending an email
         for the given room
-        Returns: The timestamp when we are next allowed to send an email notif
-        for this room
+
+        Returns:
+            The timestamp when we are next allowed to send an email notif
+            for this room
         """
         last_sent_ts = self.get_room_last_sent_ts(room_id)
         throttle_ms = self.get_room_throttle_ms(room_id)
@@ -277,7 +285,9 @@ class EmailPusher:
         may_send_at = last_sent_ts + throttle_ms
         return may_send_at
 
-    async def sent_notif_update_throttle(self, room_id, notified_push_action):
+    async def sent_notif_update_throttle(
+        self, room_id: str, notified_push_action: dict
+    ) -> None:
         # We have sent a notification, so update the throttle accordingly.
         # If the event that triggered the notif happened more than
         # THROTTLE_RESET_AFTER_MS after the previous one that triggered a
@@ -315,7 +325,7 @@ class EmailPusher:
             self.pusher_id, room_id, self.throttle_params[room_id]
         )
 
-    async def send_notification(self, push_actions, reason):
+    async def send_notification(self, push_actions: List[dict], reason: dict) -> None:
         logger.info("Sending notif email for user %r", self.user_id)
 
         await self.mailer.send_notification_mail(