diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index 42e5b0c0a5..568c13eaea 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -15,7 +15,6 @@
import logging
-from twisted.internet import defer
from twisted.internet.error import AlreadyCalled, AlreadyCancelled
from synapse.metrics.background_process_metrics import run_as_background_process
@@ -132,8 +131,7 @@ class EmailPusher(object):
self._is_processing = False
self._start_processing()
- @defer.inlineCallbacks
- def _process(self):
+ async def _process(self):
# we should never get here if we are already processing
assert not self._is_processing
@@ -142,7 +140,7 @@ class EmailPusher(object):
if self.throttle_params is None:
# this is our first loop: load up the throttle params
- self.throttle_params = yield self.store.get_throttle_params_by_room(
+ self.throttle_params = await self.store.get_throttle_params_by_room(
self.pusher_id
)
@@ -151,7 +149,7 @@ class EmailPusher(object):
while True:
starting_max_ordering = self.max_stream_ordering
try:
- yield self._unsafe_process()
+ await self._unsafe_process()
except Exception:
logger.exception("Exception processing notifs")
if self.max_stream_ordering == starting_max_ordering:
@@ -159,8 +157,7 @@ class EmailPusher(object):
finally:
self._is_processing = False
- @defer.inlineCallbacks
- def _unsafe_process(self):
+ async def _unsafe_process(self):
"""
Main logic of the push loop without the wrapper function that sets
up logging, measures and guards against multiple instances of it
@@ -168,12 +165,12 @@ class EmailPusher(object):
"""
start = 0 if INCLUDE_ALL_UNREAD_NOTIFS else self.last_stream_ordering
fn = self.store.get_unread_push_actions_for_user_in_range_for_email
- unprocessed = yield fn(self.user_id, start, self.max_stream_ordering)
+ unprocessed = await fn(self.user_id, start, self.max_stream_ordering)
soonest_due_at = None
if not unprocessed:
- yield self.save_last_stream_ordering_and_success(self.max_stream_ordering)
+ await self.save_last_stream_ordering_and_success(self.max_stream_ordering)
return
for push_action in unprocessed:
@@ -201,15 +198,15 @@ class EmailPusher(object):
"throttle_ms": self.get_room_throttle_ms(push_action["room_id"]),
}
- yield self.send_notification(unprocessed, reason)
+ await self.send_notification(unprocessed, reason)
- yield self.save_last_stream_ordering_and_success(
- max([ea["stream_ordering"] for ea in unprocessed])
+ await self.save_last_stream_ordering_and_success(
+ max(ea["stream_ordering"] for ea in unprocessed)
)
# we update the throttle on all the possible unprocessed push actions
for ea in unprocessed:
- yield self.sent_notif_update_throttle(ea["room_id"], ea)
+ await self.sent_notif_update_throttle(ea["room_id"], ea)
break
else:
if soonest_due_at is None or should_notify_at < soonest_due_at:
@@ -227,21 +224,18 @@ class EmailPusher(object):
self.seconds_until(soonest_due_at), self.on_timer
)
- @defer.inlineCallbacks
- def save_last_stream_ordering_and_success(self, last_stream_ordering):
+ async def save_last_stream_ordering_and_success(self, last_stream_ordering):
if last_stream_ordering is None:
# This happens if we haven't yet processed anything
return
self.last_stream_ordering = last_stream_ordering
- pusher_still_exists = (
- yield self.store.update_pusher_last_stream_ordering_and_success(
- self.app_id,
- self.email,
- self.user_id,
- last_stream_ordering,
- self.clock.time_msec(),
- )
+ pusher_still_exists = await self.store.update_pusher_last_stream_ordering_and_success(
+ self.app_id,
+ self.email,
+ self.user_id,
+ last_stream_ordering,
+ self.clock.time_msec(),
)
if not pusher_still_exists:
# The pusher has been deleted while we were processing, so
@@ -277,13 +271,12 @@ class EmailPusher(object):
may_send_at = last_sent_ts + throttle_ms
return may_send_at
- @defer.inlineCallbacks
- def sent_notif_update_throttle(self, room_id, notified_push_action):
+ async def sent_notif_update_throttle(self, room_id, notified_push_action):
# We have sent a notification, so update the throttle accordingly.
# If the event that triggered the notif happened more than
# THROTTLE_RESET_AFTER_MS after the previous one that triggered a
# notif, we release the throttle. Otherwise, the throttle is increased.
- time_of_previous_notifs = yield self.store.get_time_of_last_push_action_before(
+ time_of_previous_notifs = await self.store.get_time_of_last_push_action_before(
notified_push_action["stream_ordering"]
)
@@ -312,14 +305,13 @@ class EmailPusher(object):
"last_sent_ts": self.clock.time_msec(),
"throttle_ms": new_throttle_ms,
}
- yield self.store.set_throttle_params(
+ await self.store.set_throttle_params(
self.pusher_id, room_id, self.throttle_params[room_id]
)
- @defer.inlineCallbacks
- def send_notification(self, push_actions, reason):
+ async def send_notification(self, push_actions, reason):
logger.info("Sending notif email for user %r", self.user_id)
- yield self.mailer.send_notification_mail(
+ await self.mailer.send_notification_mail(
self.app_id, self.user_id, self.email, push_actions, reason
)
|