diff options
Diffstat (limited to 'synapse/push/emailpusher.py')
-rw-r--r-- | synapse/push/emailpusher.py | 114 |
1 files changed, 52 insertions, 62 deletions
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index c6763971ee..4ac1b31748 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -14,11 +14,17 @@ # limitations under the License. import logging +from typing import TYPE_CHECKING, Dict, List, Optional +from twisted.internet.base import DelayedCall from twisted.internet.error import AlreadyCalled, AlreadyCancelled from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.types import RoomStreamToken +from synapse.push import Pusher, PusherConfig, ThrottleParams +from synapse.push.mailer import Mailer + +if TYPE_CHECKING: + from synapse.app.homeserver import HomeServer logger = logging.getLogger(__name__) @@ -46,7 +52,7 @@ THROTTLE_RESET_AFTER_MS = 12 * 60 * 60 * 1000 INCLUDE_ALL_UNREAD_NOTIFS = False -class EmailPusher: +class EmailPusher(Pusher): """ A pusher that sends email notifications about events (approximately) when they happen. @@ -54,37 +60,30 @@ class EmailPusher: factor out the common parts """ - def __init__(self, hs, pusherdict, mailer): - self.hs = hs + def __init__(self, hs: "HomeServer", pusher_config: PusherConfig, mailer: Mailer): + super().__init__(hs, pusher_config) self.mailer = mailer self.store = self.hs.get_datastore() - self.clock = self.hs.get_clock() - self.pusher_id = pusherdict["id"] - self.user_id = pusherdict["user_name"] - self.app_id = pusherdict["app_id"] - self.email = pusherdict["pushkey"] - self.last_stream_ordering = pusherdict["last_stream_ordering"] - self.timed_call = None - self.throttle_params = None - - # See httppusher - self.max_stream_ordering = None + self.email = pusher_config.pushkey + self.timed_call = None # type: Optional[DelayedCall] + self.throttle_params = {} # type: Dict[str, ThrottleParams] + self._inited = False self._is_processing = False - def on_started(self, should_check_for_notifs): + def on_started(self, should_check_for_notifs: bool) -> None: """Called when this pusher has been started. Args: - should_check_for_notifs (bool): Whether we should immediately + should_check_for_notifs: Whether we should immediately check for push to send. Set to False only if it's known there is nothing to send """ if should_check_for_notifs and self.mailer is not None: self._start_processing() - def on_stop(self): + def on_stop(self) -> None: if self.timed_call: try: self.timed_call.cancel() @@ -92,37 +91,23 @@ class EmailPusher: pass self.timed_call = None - def on_new_notifications(self, max_token: RoomStreamToken): - # We just use the minimum stream ordering and ignore the vector clock - # component. This is safe to do as long as we *always* ignore the vector - # clock components. - max_stream_ordering = max_token.stream - - if self.max_stream_ordering: - self.max_stream_ordering = max( - max_stream_ordering, self.max_stream_ordering - ) - else: - self.max_stream_ordering = max_stream_ordering - self._start_processing() - - def on_new_receipts(self, min_stream_id, max_stream_id): + def on_new_receipts(self, min_stream_id: int, max_stream_id: int) -> None: # We could wake up and cancel the timer but there tend to be quite a # lot of read receipts so it's probably less work to just let the # timer fire pass - def on_timer(self): + def on_timer(self) -> None: self.timed_call = None self._start_processing() - def _start_processing(self): + def _start_processing(self) -> None: if self._is_processing: return run_as_background_process("emailpush.process", self._process) - def _pause_processing(self): + def _pause_processing(self) -> None: """Used by tests to temporarily pause processing of events. Asserts that its not currently processing. @@ -130,25 +115,27 @@ class EmailPusher: assert not self._is_processing self._is_processing = True - def _resume_processing(self): + def _resume_processing(self) -> None: """Used by tests to resume processing of events after pausing. """ assert self._is_processing self._is_processing = False self._start_processing() - async def _process(self): + async def _process(self) -> None: # we should never get here if we are already processing assert not self._is_processing try: self._is_processing = True - if self.throttle_params is None: + if not self._inited: # this is our first loop: load up the throttle params + assert self.pusher_id is not None self.throttle_params = await self.store.get_throttle_params_by_room( self.pusher_id ) + self._inited = True # if the max ordering changes while we're running _unsafe_process, # call it again, and so on until we've caught up. @@ -163,17 +150,18 @@ class EmailPusher: finally: self._is_processing = False - async def _unsafe_process(self): + async def _unsafe_process(self) -> None: """ Main logic of the push loop without the wrapper function that sets up logging, measures and guards against multiple instances of it being run. """ start = 0 if INCLUDE_ALL_UNREAD_NOTIFS else self.last_stream_ordering - fn = self.store.get_unread_push_actions_for_user_in_range_for_email - unprocessed = await fn(self.user_id, start, self.max_stream_ordering) + unprocessed = await self.store.get_unread_push_actions_for_user_in_range_for_email( + self.user_id, start, self.max_stream_ordering + ) - soonest_due_at = None + soonest_due_at = None # type: Optional[int] if not unprocessed: await self.save_last_stream_ordering_and_success(self.max_stream_ordering) @@ -230,11 +218,9 @@ class EmailPusher: self.seconds_until(soonest_due_at), self.on_timer ) - async def save_last_stream_ordering_and_success(self, last_stream_ordering): - if last_stream_ordering is None: - # This happens if we haven't yet processed anything - return - + async def save_last_stream_ordering_and_success( + self, last_stream_ordering: int + ) -> None: self.last_stream_ordering = last_stream_ordering pusher_still_exists = await self.store.update_pusher_last_stream_ordering_and_success( self.app_id, @@ -248,28 +234,30 @@ class EmailPusher: # lets just stop and return. self.on_stop() - def seconds_until(self, ts_msec): + def seconds_until(self, ts_msec: int) -> float: secs = (ts_msec - self.clock.time_msec()) / 1000 return max(secs, 0) - def get_room_throttle_ms(self, room_id): + def get_room_throttle_ms(self, room_id: str) -> int: if room_id in self.throttle_params: - return self.throttle_params[room_id]["throttle_ms"] + return self.throttle_params[room_id].throttle_ms else: return 0 - def get_room_last_sent_ts(self, room_id): + def get_room_last_sent_ts(self, room_id: str) -> int: if room_id in self.throttle_params: - return self.throttle_params[room_id]["last_sent_ts"] + return self.throttle_params[room_id].last_sent_ts else: return 0 - def room_ready_to_notify_at(self, room_id): + def room_ready_to_notify_at(self, room_id: str) -> int: """ Determines whether throttling should prevent us from sending an email for the given room - Returns: The timestamp when we are next allowed to send an email notif - for this room + + Returns: + The timestamp when we are next allowed to send an email notif + for this room """ last_sent_ts = self.get_room_last_sent_ts(room_id) throttle_ms = self.get_room_throttle_ms(room_id) @@ -277,7 +265,9 @@ class EmailPusher: may_send_at = last_sent_ts + throttle_ms return may_send_at - async def sent_notif_update_throttle(self, room_id, notified_push_action): + async def sent_notif_update_throttle( + self, room_id: str, notified_push_action: dict + ) -> None: # We have sent a notification, so update the throttle accordingly. # If the event that triggered the notif happened more than # THROTTLE_RESET_AFTER_MS after the previous one that triggered a @@ -307,15 +297,15 @@ class EmailPusher: new_throttle_ms = min( current_throttle_ms * THROTTLE_MULTIPLIER, THROTTLE_MAX_MS ) - self.throttle_params[room_id] = { - "last_sent_ts": self.clock.time_msec(), - "throttle_ms": new_throttle_ms, - } + self.throttle_params[room_id] = ThrottleParams( + self.clock.time_msec(), new_throttle_ms, + ) + assert self.pusher_id is not None await self.store.set_throttle_params( self.pusher_id, room_id, self.throttle_params[room_id] ) - async def send_notification(self, push_actions, reason): + async def send_notification(self, push_actions: List[dict], reason: dict) -> None: logger.info("Sending notif email for user %r", self.user_id) await self.mailer.send_notification_mail( |