diff --git a/changelog.d/8014.misc b/changelog.d/8014.misc
new file mode 100644
index 0000000000..dfe4c03171
--- /dev/null
+++ b/changelog.d/8014.misc
@@ -0,0 +1 @@
+Convert various parts of the codebase to async/await.
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index f88bad5f25..ae6bd1d352 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -109,7 +109,7 @@ class InitialSyncHandler(BaseHandler):
rooms_ret = []
- now_token = await self.hs.get_event_sources().get_current_token()
+ now_token = self.hs.get_event_sources().get_current_token()
presence_stream = self.hs.get_event_sources().sources["presence"]
pagination_config = PaginationConfig(from_token=now_token)
@@ -360,7 +360,7 @@ class InitialSyncHandler(BaseHandler):
current_state.values(), time_now
)
- now_token = await self.hs.get_event_sources().get_current_token()
+ now_token = self.hs.get_event_sources().get_current_token()
limit = pagin_config.limit if pagin_config else None
if limit is None:
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index da06582d4b..487420bb5d 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -309,7 +309,7 @@ class PaginationHandler(object):
room_token = pagin_config.from_token.room_key
else:
pagin_config.from_token = (
- await self.hs.get_event_sources().get_current_token_for_pagination()
+ self.hs.get_event_sources().get_current_token_for_pagination()
)
room_token = pagin_config.from_token.room_key
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 0c5b99234d..a8545255b1 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -22,7 +22,7 @@ import logging
import math
import string
from collections import OrderedDict
-from typing import Optional, Tuple
+from typing import Awaitable, Optional, Tuple
from synapse.api.constants import (
EventTypes,
@@ -1041,7 +1041,7 @@ class RoomEventSource(object):
):
# We just ignore the key for now.
- to_key = await self.get_current_key()
+ to_key = self.get_current_key()
from_token = RoomStreamToken.parse(from_key)
if from_token.topological:
@@ -1081,10 +1081,10 @@ class RoomEventSource(object):
return (events, end_key)
- def get_current_key(self):
- return self.store.get_room_events_max_id()
+ def get_current_key(self) -> str:
+ return "s%d" % (self.store.get_room_max_stream_ordering(),)
- def get_current_key_for_room(self, room_id):
+ def get_current_key_for_room(self, room_id: str) -> Awaitable[str]:
return self.store.get_room_events_max_id(room_id)
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index 9b312a1558..d58f9788c5 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -340,7 +340,7 @@ class SearchHandler(BaseHandler):
# If client has asked for "context" for each event (i.e. some surrounding
# events and state), fetch that
if event_context is not None:
- now_token = await self.hs.get_event_sources().get_current_token()
+ now_token = self.hs.get_event_sources().get_current_token()
contexts = {}
for event in allowed_events:
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index eaa4eeadf7..5a19bac929 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -961,7 +961,7 @@ class SyncHandler(object):
# this is due to some of the underlying streams not supporting the ability
# to query up to a given point.
# Always use the `now_token` in `SyncResultBuilder`
- now_token = await self.event_sources.get_current_token()
+ now_token = self.event_sources.get_current_token()
logger.debug(
"Calculating sync response for %r between %s and %s",
diff --git a/synapse/notifier.py b/synapse/notifier.py
index bd41f77852..22ab4a9da5 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -320,7 +320,7 @@ class Notifier(object):
"""
user_stream = self.user_to_user_stream.get(user_id)
if user_stream is None:
- current_token = await self.event_sources.get_current_token()
+ current_token = self.event_sources.get_current_token()
if room_ids is None:
room_ids = await self.store.get_rooms_for_user(user_id)
user_stream = _NotifierUserStream(
@@ -397,7 +397,7 @@ class Notifier(object):
"""
from_token = pagination_config.from_token
if not from_token:
- from_token = await self.event_sources.get_current_token()
+ from_token = self.event_sources.get_current_token()
limit = pagination_config.limit
diff --git a/synapse/storage/data_stores/main/stream.py b/synapse/storage/data_stores/main/stream.py
index 10d39b3699..f1334a6efc 100644
--- a/synapse/storage/data_stores/main/stream.py
+++ b/synapse/storage/data_stores/main/stream.py
@@ -39,6 +39,7 @@ what sort order was used:
import abc
import logging
from collections import namedtuple
+from typing import Optional
from twisted.internet import defer
@@ -557,19 +558,18 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
return self.db.runInteraction("get_room_event_before_stream_ordering", _f)
- @defer.inlineCallbacks
- def get_room_events_max_id(self, room_id=None):
+ async def get_room_events_max_id(self, room_id: Optional[str] = None) -> str:
"""Returns the current token for rooms stream.
By default, it returns the current global stream token. Specifying a
`room_id` causes it to return the current room specific topological
token.
"""
- token = yield self.get_room_max_stream_ordering()
+ token = self.get_room_max_stream_ordering()
if room_id is None:
return "s%d" % (token,)
else:
- topo = yield self.db.runInteraction(
+ topo = await self.db.runInteraction(
"_get_max_topological_txn", self._get_max_topological_txn, room_id
)
return "t%d-%d" % (topo, token)
diff --git a/synapse/streams/events.py b/synapse/streams/events.py
index 5d3eddcfdc..393e34b9fb 100644
--- a/synapse/streams/events.py
+++ b/synapse/streams/events.py
@@ -15,8 +15,6 @@
from typing import Any, Dict
-from twisted.internet import defer
-
from synapse.handlers.account_data import AccountDataEventSource
from synapse.handlers.presence import PresenceEventSource
from synapse.handlers.receipts import ReceiptEventSource
@@ -40,19 +38,18 @@ class EventSources(object):
} # type: Dict[str, Any]
self.store = hs.get_datastore()
- @defer.inlineCallbacks
- def get_current_token(self):
+ def get_current_token(self) -> StreamToken:
push_rules_key, _ = self.store.get_push_rules_stream_token()
to_device_key = self.store.get_to_device_stream_token()
device_list_key = self.store.get_device_stream_token()
groups_key = self.store.get_group_stream_token()
token = StreamToken(
- room_key=(yield self.sources["room"].get_current_key()),
- presence_key=(yield self.sources["presence"].get_current_key()),
- typing_key=(yield self.sources["typing"].get_current_key()),
- receipt_key=(yield self.sources["receipt"].get_current_key()),
- account_data_key=(yield self.sources["account_data"].get_current_key()),
+ room_key=self.sources["room"].get_current_key(),
+ presence_key=self.sources["presence"].get_current_key(),
+ typing_key=self.sources["typing"].get_current_key(),
+ receipt_key=self.sources["receipt"].get_current_key(),
+ account_data_key=self.sources["account_data"].get_current_key(),
push_rules_key=push_rules_key,
to_device_key=to_device_key,
device_list_key=device_list_key,
@@ -60,8 +57,7 @@ class EventSources(object):
)
return token
- @defer.inlineCallbacks
- def get_current_token_for_pagination(self):
+ def get_current_token_for_pagination(self) -> StreamToken:
"""Get the current token for a given room to be used to paginate
events.
@@ -69,10 +65,10 @@ class EventSources(object):
than `room`, since they are not used during pagination.
Returns:
- Deferred[StreamToken]
+ The current token for pagination.
"""
token = StreamToken(
- room_key=(yield self.sources["room"].get_current_key()),
+ room_key=self.sources["room"].get_current_key(),
presence_key=0,
typing_key=0,
receipt_key=0,
diff --git a/tests/server_notices/test_resource_limits_server_notices.py b/tests/server_notices/test_resource_limits_server_notices.py
index 99908edba3..7f70353b0d 100644
--- a/tests/server_notices/test_resource_limits_server_notices.py
+++ b/tests/server_notices/test_resource_limits_server_notices.py
@@ -275,7 +275,7 @@ class TestResourceLimitsServerNoticesWithRealRooms(unittest.HomeserverTestCase):
self.server_notices_manager.get_or_create_notice_room_for_user(self.user_id)
)
- token = self.get_success(self.event_source.get_current_token())
+ token = self.event_source.get_current_token()
events, _ = self.get_success(
self.store.get_recent_events_for_room(
room_id, limit=100, end_token=token.room_key
|