summary refs log tree commit diff
path: root/synapse/push/emailpusher.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/push/emailpusher.py')
-rw-r--r--synapse/push/emailpusher.py38
1 files changed, 16 insertions, 22 deletions
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index ba4551d619..568c13eaea 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -15,7 +15,6 @@
 
 import logging
 
-from twisted.internet import defer
 from twisted.internet.error import AlreadyCalled, AlreadyCancelled
 
 from synapse.metrics.background_process_metrics import run_as_background_process
@@ -132,8 +131,7 @@ class EmailPusher(object):
         self._is_processing = False
         self._start_processing()
 
-    @defer.inlineCallbacks
-    def _process(self):
+    async def _process(self):
         # we should never get here if we are already processing
         assert not self._is_processing
 
@@ -142,7 +140,7 @@ class EmailPusher(object):
 
             if self.throttle_params is None:
                 # this is our first loop: load up the throttle params
-                self.throttle_params = yield self.store.get_throttle_params_by_room(
+                self.throttle_params = await self.store.get_throttle_params_by_room(
                     self.pusher_id
                 )
 
@@ -151,7 +149,7 @@ class EmailPusher(object):
             while True:
                 starting_max_ordering = self.max_stream_ordering
                 try:
-                    yield self._unsafe_process()
+                    await self._unsafe_process()
                 except Exception:
                     logger.exception("Exception processing notifs")
                 if self.max_stream_ordering == starting_max_ordering:
@@ -159,8 +157,7 @@ class EmailPusher(object):
         finally:
             self._is_processing = False
 
-    @defer.inlineCallbacks
-    def _unsafe_process(self):
+    async def _unsafe_process(self):
         """
         Main logic of the push loop without the wrapper function that sets
         up logging, measures and guards against multiple instances of it
@@ -168,12 +165,12 @@ class EmailPusher(object):
         """
         start = 0 if INCLUDE_ALL_UNREAD_NOTIFS else self.last_stream_ordering
         fn = self.store.get_unread_push_actions_for_user_in_range_for_email
-        unprocessed = yield fn(self.user_id, start, self.max_stream_ordering)
+        unprocessed = await fn(self.user_id, start, self.max_stream_ordering)
 
         soonest_due_at = None
 
         if not unprocessed:
-            yield self.save_last_stream_ordering_and_success(self.max_stream_ordering)
+            await self.save_last_stream_ordering_and_success(self.max_stream_ordering)
             return
 
         for push_action in unprocessed:
@@ -201,15 +198,15 @@ class EmailPusher(object):
                     "throttle_ms": self.get_room_throttle_ms(push_action["room_id"]),
                 }
 
-                yield self.send_notification(unprocessed, reason)
+                await self.send_notification(unprocessed, reason)
 
-                yield self.save_last_stream_ordering_and_success(
+                await self.save_last_stream_ordering_and_success(
                     max(ea["stream_ordering"] for ea in unprocessed)
                 )
 
                 # we update the throttle on all the possible unprocessed push actions
                 for ea in unprocessed:
-                    yield self.sent_notif_update_throttle(ea["room_id"], ea)
+                    await self.sent_notif_update_throttle(ea["room_id"], ea)
                 break
             else:
                 if soonest_due_at is None or should_notify_at < soonest_due_at:
@@ -227,14 +224,13 @@ class EmailPusher(object):
                 self.seconds_until(soonest_due_at), self.on_timer
             )
 
-    @defer.inlineCallbacks
-    def save_last_stream_ordering_and_success(self, last_stream_ordering):
+    async def save_last_stream_ordering_and_success(self, last_stream_ordering):
         if last_stream_ordering is None:
             # This happens if we haven't yet processed anything
             return
 
         self.last_stream_ordering = last_stream_ordering
-        pusher_still_exists = yield self.store.update_pusher_last_stream_ordering_and_success(
+        pusher_still_exists = await self.store.update_pusher_last_stream_ordering_and_success(
             self.app_id,
             self.email,
             self.user_id,
@@ -275,13 +271,12 @@ class EmailPusher(object):
         may_send_at = last_sent_ts + throttle_ms
         return may_send_at
 
-    @defer.inlineCallbacks
-    def sent_notif_update_throttle(self, room_id, notified_push_action):
+    async def sent_notif_update_throttle(self, room_id, notified_push_action):
         # We have sent a notification, so update the throttle accordingly.
         # If the event that triggered the notif happened more than
         # THROTTLE_RESET_AFTER_MS after the previous one that triggered a
         # notif, we release the throttle. Otherwise, the throttle is increased.
-        time_of_previous_notifs = yield self.store.get_time_of_last_push_action_before(
+        time_of_previous_notifs = await self.store.get_time_of_last_push_action_before(
             notified_push_action["stream_ordering"]
         )
 
@@ -310,14 +305,13 @@ class EmailPusher(object):
             "last_sent_ts": self.clock.time_msec(),
             "throttle_ms": new_throttle_ms,
         }
-        yield self.store.set_throttle_params(
+        await self.store.set_throttle_params(
             self.pusher_id, room_id, self.throttle_params[room_id]
         )
 
-    @defer.inlineCallbacks
-    def send_notification(self, push_actions, reason):
+    async def send_notification(self, push_actions, reason):
         logger.info("Sending notif email for user %r", self.user_id)
 
-        yield self.mailer.send_notification_mail(
+        await self.mailer.send_notification_mail(
             self.app_id, self.user_id, self.email, push_actions, reason
         )