diff --git a/synapse/notifier.py b/synapse/notifier.py
index af161a81d7..5f5f765bea 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -304,8 +304,7 @@ class Notifier(object):
without waking up any of the normal user event streams"""
self.notify_replication()
- @defer.inlineCallbacks
- def wait_for_events(
+ async def wait_for_events(
self, user_id, timeout, callback, room_ids=None, from_token=StreamToken.START
):
"""Wait until the callback returns a non empty response or the
@@ -313,9 +312,9 @@ class Notifier(object):
"""
user_stream = self.user_to_user_stream.get(user_id)
if user_stream is None:
- current_token = yield self.event_sources.get_current_token()
+ current_token = await self.event_sources.get_current_token()
if room_ids is None:
- room_ids = yield self.store.get_rooms_for_user(user_id)
+ room_ids = await self.store.get_rooms_for_user(user_id)
user_stream = _NotifierUserStream(
user_id=user_id,
rooms=room_ids,
@@ -344,11 +343,11 @@ class Notifier(object):
self.hs.get_reactor(),
)
with PreserveLoggingContext():
- yield listener.deferred
+ await listener.deferred
current_token = user_stream.current_token
- result = yield callback(prev_token, current_token)
+ result = await callback(prev_token, current_token)
if result:
break
@@ -364,12 +363,11 @@ class Notifier(object):
# This happened if there was no timeout or if the timeout had
# already expired.
current_token = user_stream.current_token
- result = yield callback(prev_token, current_token)
+ result = await callback(prev_token, current_token)
return result
- @defer.inlineCallbacks
- def get_events_for(
+ async def get_events_for(
self,
user,
pagination_config,
@@ -391,15 +389,14 @@ class Notifier(object):
"""
from_token = pagination_config.from_token
if not from_token:
- from_token = yield self.event_sources.get_current_token()
+ from_token = await self.event_sources.get_current_token()
limit = pagination_config.limit
- room_ids, is_joined = yield self._get_room_ids(user, explicit_room_id)
+ room_ids, is_joined = await self._get_room_ids(user, explicit_room_id)
is_peeking = not is_joined
- @defer.inlineCallbacks
- def check_for_updates(before_token, after_token):
+ async def check_for_updates(before_token, after_token):
if not after_token.is_after(before_token):
return EventStreamResult([], (from_token, from_token))
@@ -415,7 +412,7 @@ class Notifier(object):
if only_keys and name not in only_keys:
continue
- new_events, new_key = yield source.get_new_events(
+ new_events, new_key = await source.get_new_events(
user=user,
from_key=getattr(from_token, keyname),
limit=limit,
@@ -425,7 +422,7 @@ class Notifier(object):
)
if name == "room":
- new_events = yield filter_events_for_client(
+ new_events = await filter_events_for_client(
self.storage,
user.to_string(),
new_events,
@@ -461,7 +458,7 @@ class Notifier(object):
user_id_for_stream,
)
- result = yield self.wait_for_events(
+ result = await self.wait_for_events(
user_id_for_stream,
timeout,
check_for_updates,
|