summary refs log tree commit diff
path: root/synapse/push
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-05-22 13:41:11 +0100
committerGitHub <noreply@github.com>2020-05-22 13:41:11 +0100
commit06a02bc1ce9ef23a6dff28dbfd30f910ae330b1d (patch)
treef814615713b930736b936395a1ad391ff6efb82b /synapse/push
parentUse a non-empty RelayState for user interactive auth with SAML. (#7552) (diff)
downloadsynapse-06a02bc1ce9ef23a6dff28dbfd30f910ae330b1d.tar.xz
Convert sending mail to async/await. (#7557)
Mainly because sometimes the email push code raises exceptions where the
stack traces have gotten lost, which is hopefully fixed by this.
Diffstat (limited to 'synapse/push')
-rw-r--r--synapse/push/emailpusher.py38
-rw-r--r--synapse/push/mailer.py84
2 files changed, 53 insertions, 69 deletions
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index ba4551d619..568c13eaea 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -15,7 +15,6 @@
 
 import logging
 
-from twisted.internet import defer
 from twisted.internet.error import AlreadyCalled, AlreadyCancelled
 
 from synapse.metrics.background_process_metrics import run_as_background_process
@@ -132,8 +131,7 @@ class EmailPusher(object):
         self._is_processing = False
         self._start_processing()
 
-    @defer.inlineCallbacks
-    def _process(self):
+    async def _process(self):
         # we should never get here if we are already processing
         assert not self._is_processing
 
@@ -142,7 +140,7 @@ class EmailPusher(object):
 
             if self.throttle_params is None:
                 # this is our first loop: load up the throttle params
-                self.throttle_params = yield self.store.get_throttle_params_by_room(
+                self.throttle_params = await self.store.get_throttle_params_by_room(
                     self.pusher_id
                 )
 
@@ -151,7 +149,7 @@ class EmailPusher(object):
             while True:
                 starting_max_ordering = self.max_stream_ordering
                 try:
-                    yield self._unsafe_process()
+                    await self._unsafe_process()
                 except Exception:
                     logger.exception("Exception processing notifs")
                 if self.max_stream_ordering == starting_max_ordering:
@@ -159,8 +157,7 @@ class EmailPusher(object):
         finally:
             self._is_processing = False
 
-    @defer.inlineCallbacks
-    def _unsafe_process(self):
+    async def _unsafe_process(self):
         """
         Main logic of the push loop without the wrapper function that sets
         up logging, measures and guards against multiple instances of it
@@ -168,12 +165,12 @@ class EmailPusher(object):
         """
         start = 0 if INCLUDE_ALL_UNREAD_NOTIFS else self.last_stream_ordering
         fn = self.store.get_unread_push_actions_for_user_in_range_for_email
-        unprocessed = yield fn(self.user_id, start, self.max_stream_ordering)
+        unprocessed = await fn(self.user_id, start, self.max_stream_ordering)
 
         soonest_due_at = None
 
         if not unprocessed:
-            yield self.save_last_stream_ordering_and_success(self.max_stream_ordering)
+            await self.save_last_stream_ordering_and_success(self.max_stream_ordering)
             return
 
         for push_action in unprocessed:
@@ -201,15 +198,15 @@ class EmailPusher(object):
                     "throttle_ms": self.get_room_throttle_ms(push_action["room_id"]),
                 }
 
-                yield self.send_notification(unprocessed, reason)
+                await self.send_notification(unprocessed, reason)
 
-                yield self.save_last_stream_ordering_and_success(
+                await self.save_last_stream_ordering_and_success(
                     max(ea["stream_ordering"] for ea in unprocessed)
                 )
 
                 # we update the throttle on all the possible unprocessed push actions
                 for ea in unprocessed:
-                    yield self.sent_notif_update_throttle(ea["room_id"], ea)
+                    await self.sent_notif_update_throttle(ea["room_id"], ea)
                 break
             else:
                 if soonest_due_at is None or should_notify_at < soonest_due_at:
@@ -227,14 +224,13 @@ class EmailPusher(object):
                 self.seconds_until(soonest_due_at), self.on_timer
             )
 
-    @defer.inlineCallbacks
-    def save_last_stream_ordering_and_success(self, last_stream_ordering):
+    async def save_last_stream_ordering_and_success(self, last_stream_ordering):
         if last_stream_ordering is None:
             # This happens if we haven't yet processed anything
             return
 
         self.last_stream_ordering = last_stream_ordering
-        pusher_still_exists = yield self.store.update_pusher_last_stream_ordering_and_success(
+        pusher_still_exists = await self.store.update_pusher_last_stream_ordering_and_success(
             self.app_id,
             self.email,
             self.user_id,
@@ -275,13 +271,12 @@ class EmailPusher(object):
         may_send_at = last_sent_ts + throttle_ms
         return may_send_at
 
-    @defer.inlineCallbacks
-    def sent_notif_update_throttle(self, room_id, notified_push_action):
+    async def sent_notif_update_throttle(self, room_id, notified_push_action):
         # We have sent a notification, so update the throttle accordingly.
         # If the event that triggered the notif happened more than
         # THROTTLE_RESET_AFTER_MS after the previous one that triggered a
         # notif, we release the throttle. Otherwise, the throttle is increased.
-        time_of_previous_notifs = yield self.store.get_time_of_last_push_action_before(
+        time_of_previous_notifs = await self.store.get_time_of_last_push_action_before(
             notified_push_action["stream_ordering"]
         )
 
@@ -310,14 +305,13 @@ class EmailPusher(object):
             "last_sent_ts": self.clock.time_msec(),
             "throttle_ms": new_throttle_ms,
         }
-        yield self.store.set_throttle_params(
+        await self.store.set_throttle_params(
             self.pusher_id, room_id, self.throttle_params[room_id]
         )
 
-    @defer.inlineCallbacks
-    def send_notification(self, push_actions, reason):
+    async def send_notification(self, push_actions, reason):
         logger.info("Sending notif email for user %r", self.user_id)
 
-        yield self.mailer.send_notification_mail(
+        await self.mailer.send_notification_mail(
             self.app_id, self.user_id, self.email, push_actions, reason
         )
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index ab33abbeed..d57a66a697 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -26,8 +26,6 @@ from six.moves import urllib
 import bleach
 import jinja2
 
-from twisted.internet import defer
-
 from synapse.api.constants import EventTypes
 from synapse.api.errors import StoreError
 from synapse.logging.context import make_deferred_yieldable
@@ -127,8 +125,7 @@ class Mailer(object):
 
         logger.info("Created Mailer for app_name %s" % app_name)
 
-    @defer.inlineCallbacks
-    def send_password_reset_mail(self, email_address, token, client_secret, sid):
+    async def send_password_reset_mail(self, email_address, token, client_secret, sid):
         """Send an email with a password reset link to a user
 
         Args:
@@ -149,14 +146,13 @@ class Mailer(object):
 
         template_vars = {"link": link}
 
-        yield self.send_email(
+        await self.send_email(
             email_address,
             "[%s] Password Reset" % self.hs.config.server_name,
             template_vars,
         )
 
-    @defer.inlineCallbacks
-    def send_registration_mail(self, email_address, token, client_secret, sid):
+    async def send_registration_mail(self, email_address, token, client_secret, sid):
         """Send an email with a registration confirmation link to a user
 
         Args:
@@ -177,14 +173,13 @@ class Mailer(object):
 
         template_vars = {"link": link}
 
-        yield self.send_email(
+        await self.send_email(
             email_address,
             "[%s] Register your Email Address" % self.hs.config.server_name,
             template_vars,
         )
 
-    @defer.inlineCallbacks
-    def send_add_threepid_mail(self, email_address, token, client_secret, sid):
+    async def send_add_threepid_mail(self, email_address, token, client_secret, sid):
         """Send an email with a validation link to a user for adding a 3pid to their account
 
         Args:
@@ -206,20 +201,19 @@ class Mailer(object):
 
         template_vars = {"link": link}
 
-        yield self.send_email(
+        await self.send_email(
             email_address,
             "[%s] Validate Your Email" % self.hs.config.server_name,
             template_vars,
         )
 
-    @defer.inlineCallbacks
-    def send_notification_mail(
+    async def send_notification_mail(
         self, app_id, user_id, email_address, push_actions, reason
     ):
         """Send email regarding a user's room notifications"""
         rooms_in_order = deduped_ordered_list([pa["room_id"] for pa in push_actions])
 
-        notif_events = yield self.store.get_events(
+        notif_events = await self.store.get_events(
             [pa["event_id"] for pa in push_actions]
         )
 
@@ -232,7 +226,7 @@ class Mailer(object):
         state_by_room = {}
 
         try:
-            user_display_name = yield self.store.get_profile_displayname(
+            user_display_name = await self.store.get_profile_displayname(
                 UserID.from_string(user_id).localpart
             )
             if user_display_name is None:
@@ -240,14 +234,13 @@ class Mailer(object):
         except StoreError:
             user_display_name = user_id
 
-        @defer.inlineCallbacks
-        def _fetch_room_state(room_id):
-            room_state = yield self.store.get_current_state_ids(room_id)
+        async def _fetch_room_state(room_id):
+            room_state = await self.store.get_current_state_ids(room_id)
             state_by_room[room_id] = room_state
 
         # Run at most 3 of these at once: sync does 10 at a time but email
         # notifs are much less realtime than sync so we can afford to wait a bit.
-        yield concurrently_execute(_fetch_room_state, rooms_in_order, 3)
+        await concurrently_execute(_fetch_room_state, rooms_in_order, 3)
 
         # actually sort our so-called rooms_in_order list, most recent room first
         rooms_in_order.sort(key=lambda r: -(notifs_by_room[r][-1]["received_ts"] or 0))
@@ -255,19 +248,19 @@ class Mailer(object):
         rooms = []
 
         for r in rooms_in_order:
-            roomvars = yield self.get_room_vars(
+            roomvars = await self.get_room_vars(
                 r, user_id, notifs_by_room[r], notif_events, state_by_room[r]
             )
             rooms.append(roomvars)
 
-        reason["room_name"] = yield calculate_room_name(
+        reason["room_name"] = await calculate_room_name(
             self.store,
             state_by_room[reason["room_id"]],
             user_id,
             fallback_to_members=True,
         )
 
-        summary_text = yield self.make_summary_text(
+        summary_text = await self.make_summary_text(
             notifs_by_room, state_by_room, notif_events, user_id, reason
         )
 
@@ -282,12 +275,11 @@ class Mailer(object):
             "reason": reason,
         }
 
-        yield self.send_email(
+        await self.send_email(
             email_address, "[%s] %s" % (self.app_name, summary_text), template_vars
         )
 
-    @defer.inlineCallbacks
-    def send_email(self, email_address, subject, template_vars):
+    async def send_email(self, email_address, subject, template_vars):
         """Send an email with the given information and template text"""
         try:
             from_string = self.hs.config.email_notif_from % {"app": self.app_name}
@@ -317,7 +309,7 @@ class Mailer(object):
 
         logger.info("Sending email to %s" % email_address)
 
-        yield make_deferred_yieldable(
+        await make_deferred_yieldable(
             self.sendmail(
                 self.hs.config.email_smtp_host,
                 raw_from,
@@ -332,13 +324,14 @@ class Mailer(object):
             )
         )
 
-    @defer.inlineCallbacks
-    def get_room_vars(self, room_id, user_id, notifs, notif_events, room_state_ids):
+    async def get_room_vars(
+        self, room_id, user_id, notifs, notif_events, room_state_ids
+    ):
         my_member_event_id = room_state_ids[("m.room.member", user_id)]
-        my_member_event = yield self.store.get_event(my_member_event_id)
+        my_member_event = await self.store.get_event(my_member_event_id)
         is_invite = my_member_event.content["membership"] == "invite"
 
-        room_name = yield calculate_room_name(self.store, room_state_ids, user_id)
+        room_name = await calculate_room_name(self.store, room_state_ids, user_id)
 
         room_vars = {
             "title": room_name,
@@ -350,7 +343,7 @@ class Mailer(object):
 
         if not is_invite:
             for n in notifs:
-                notifvars = yield self.get_notif_vars(
+                notifvars = await self.get_notif_vars(
                     n, user_id, notif_events[n["event_id"]], room_state_ids
                 )
 
@@ -377,9 +370,8 @@ class Mailer(object):
 
         return room_vars
 
-    @defer.inlineCallbacks
-    def get_notif_vars(self, notif, user_id, notif_event, room_state_ids):
-        results = yield self.store.get_events_around(
+    async def get_notif_vars(self, notif, user_id, notif_event, room_state_ids):
+        results = await self.store.get_events_around(
             notif["room_id"],
             notif["event_id"],
             before_limit=CONTEXT_BEFORE,
@@ -392,25 +384,24 @@ class Mailer(object):
             "messages": [],
         }
 
-        the_events = yield filter_events_for_client(
+        the_events = await filter_events_for_client(
             self.storage, user_id, results["events_before"]
         )
         the_events.append(notif_event)
 
         for event in the_events:
-            messagevars = yield self.get_message_vars(notif, event, room_state_ids)
+            messagevars = await self.get_message_vars(notif, event, room_state_ids)
             if messagevars is not None:
                 ret["messages"].append(messagevars)
 
         return ret
 
-    @defer.inlineCallbacks
-    def get_message_vars(self, notif, event, room_state_ids):
+    async def get_message_vars(self, notif, event, room_state_ids):
         if event.type != EventTypes.Message:
             return
 
         sender_state_event_id = room_state_ids[("m.room.member", event.sender)]
-        sender_state_event = yield self.store.get_event(sender_state_event_id)
+        sender_state_event = await self.store.get_event(sender_state_event_id)
         sender_name = name_from_member_event(sender_state_event)
         sender_avatar_url = sender_state_event.content.get("avatar_url")
 
@@ -460,8 +451,7 @@ class Mailer(object):
 
         return messagevars
 
-    @defer.inlineCallbacks
-    def make_summary_text(
+    async def make_summary_text(
         self, notifs_by_room, room_state_ids, notif_events, user_id, reason
     ):
         if len(notifs_by_room) == 1:
@@ -471,17 +461,17 @@ class Mailer(object):
             # If the room has some kind of name, use it, but we don't
             # want the generated-from-names one here otherwise we'll
             # end up with, "new message from Bob in the Bob room"
-            room_name = yield calculate_room_name(
+            room_name = await calculate_room_name(
                 self.store, room_state_ids[room_id], user_id, fallback_to_members=False
             )
 
             my_member_event_id = room_state_ids[room_id][("m.room.member", user_id)]
-            my_member_event = yield self.store.get_event(my_member_event_id)
+            my_member_event = await self.store.get_event(my_member_event_id)
             if my_member_event.content["membership"] == "invite":
                 inviter_member_event_id = room_state_ids[room_id][
                     ("m.room.member", my_member_event.sender)
                 ]
-                inviter_member_event = yield self.store.get_event(
+                inviter_member_event = await self.store.get_event(
                     inviter_member_event_id
                 )
                 inviter_name = name_from_member_event(inviter_member_event)
@@ -506,7 +496,7 @@ class Mailer(object):
                     state_event_id = room_state_ids[room_id][
                         ("m.room.member", event.sender)
                     ]
-                    state_event = yield self.store.get_event(state_event_id)
+                    state_event = await self.store.get_event(state_event_id)
                     sender_name = name_from_member_event(state_event)
 
                 if sender_name is not None and room_name is not None:
@@ -535,7 +525,7 @@ class Mailer(object):
                         }
                     )
 
-                    member_events = yield self.store.get_events(
+                    member_events = await self.store.get_events(
                         [
                             room_state_ids[room_id][("m.room.member", s)]
                             for s in sender_ids
@@ -567,7 +557,7 @@ class Mailer(object):
                     }
                 )
 
-                member_events = yield self.store.get_events(
+                member_events = await self.store.get_events(
                     [room_state_ids[room_id][("m.room.member", s)] for s in sender_ids]
                 )