diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index 4ada6f5563..8a9e06efcf 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -332,8 +332,7 @@ class DatabasePool(object):
"""
return self._db_pool.running
- @defer.inlineCallbacks
- def _check_safe_to_upsert(self):
+ async def _check_safe_to_upsert(self):
"""
Is it safe to use native UPSERT?
@@ -342,7 +341,7 @@ class DatabasePool(object):
If the background updates have not completed, wait 15 sec and check again.
"""
- updates = yield self.simple_select_list(
+ updates = await self.simple_select_list(
"background_updates",
keyvalues=None,
retcols=["update_name"],
@@ -614,8 +613,7 @@ class DatabasePool(object):
# "Simple" SQL API methods that operate on a single table with no JOINs,
# no complex WHERE clauses, just a dict of values for columns.
- @defer.inlineCallbacks
- def simple_insert(self, table, values, or_ignore=False, desc="simple_insert"):
+ async def simple_insert(self, table, values, or_ignore=False, desc="simple_insert"):
"""Executes an INSERT query on the named table.
Args:
@@ -631,7 +629,7 @@ class DatabasePool(object):
`or_ignore` is True
"""
try:
- yield self.runInteraction(desc, self.simple_insert_txn, table, values)
+ await self.runInteraction(desc, self.simple_insert_txn, table, values)
except self.engine.module.IntegrityError:
# We have to do or_ignore flag at this layer, since we can't reuse
# a cursor after we receive an error from the db.
@@ -684,8 +682,7 @@ class DatabasePool(object):
txn.executemany(sql, vals)
- @defer.inlineCallbacks
- def simple_upsert(
+ async def simple_upsert(
self,
table,
keyvalues,
@@ -714,14 +711,14 @@ class DatabasePool(object):
inserting
lock (bool): True to lock the table when doing the upsert.
Returns:
- Deferred(None or bool): Native upserts always return None. Emulated
+ None or bool: Native upserts always return None. Emulated
upserts return True if a new entry was created, False if an existing
one was updated.
"""
attempts = 0
while True:
try:
- result = yield self.runInteraction(
+ return await self.runInteraction(
desc,
self.simple_upsert_txn,
table,
@@ -730,7 +727,6 @@ class DatabasePool(object):
insertion_values,
lock=lock,
)
- return result
except self.engine.module.IntegrityError as e:
attempts += 1
if attempts >= 5:
@@ -1121,8 +1117,7 @@ class DatabasePool(object):
return cls.cursor_to_dict(txn)
- @defer.inlineCallbacks
- def simple_select_many_batch(
+ async def simple_select_many_batch(
self,
table,
column,
@@ -1156,7 +1151,7 @@ class DatabasePool(object):
it_list[i : i + batch_size] for i in range(0, len(it_list), batch_size)
]
for chunk in chunks:
- rows = yield self.runInteraction(
+ rows = await self.runInteraction(
desc,
self.simple_select_many_txn,
table,
diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py
index 5cf1a88399..02568a2391 100644
--- a/synapse/storage/databases/main/appservice.py
+++ b/synapse/storage/databases/main/appservice.py
@@ -169,7 +169,7 @@ class ApplicationServiceTransactionWorkerStore(
service(ApplicationService): The service whose state to set.
state(ApplicationServiceState): The connectivity state to apply.
Returns:
- A Deferred which resolves when the state was set successfully.
+ An Awaitable which resolves when the state was set successfully.
"""
return self.db_pool.simple_upsert(
"application_services_state", {"as_id": service.id}, {"state": state}
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 5687448e3d..8c63a0dc4d 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -847,13 +847,15 @@ class EventsWorkerStore(SQLBaseStore):
"""Given a list of event ids, check if we have already processed and
stored them as non outliers.
"""
- rows = yield self.db_pool.simple_select_many_batch(
- table="events",
- retcols=("event_id",),
- column="event_id",
- iterable=list(event_ids),
- keyvalues={"outlier": False},
- desc="have_events_in_timeline",
+ rows = yield defer.ensureDeferred(
+ self.db_pool.simple_select_many_batch(
+ table="events",
+ retcols=("event_id",),
+ column="event_id",
+ iterable=list(event_ids),
+ keyvalues={"outlier": False},
+ desc="have_events_in_timeline",
+ )
)
return {r["event_id"] for r in rows}
diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index de50fa6e94..068ad22b30 100644
--- a/synapse/storage/databases/main/registration.py
+++ b/synapse/storage/databases/main/registration.py
@@ -17,9 +17,7 @@
import logging
import re
-from typing import Dict, List, Optional
-
-from twisted.internet.defer import Deferred
+from typing import Awaitable, Dict, List, Optional
from synapse.api.constants import UserTypes
from synapse.api.errors import Codes, StoreError, SynapseError, ThreepidValidationError
@@ -563,7 +561,7 @@ class RegistrationWorkerStore(SQLBaseStore):
id_server (str)
Returns:
- Deferred
+ Awaitable
"""
# We need to use an upsert, in case they user had already bound the
# threepid
@@ -1084,7 +1082,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
def record_user_external_id(
self, auth_provider: str, external_id: str, user_id: str
- ) -> Deferred:
+ ) -> Awaitable:
"""Record a mapping from an external user id to a mxid
Args:
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index 1cc8c08ed0..161edbeccb 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -767,13 +767,13 @@ class RoomMemberWorkerStore(EventsWorkerStore):
return set(room_ids)
- def get_membership_from_event_ids(
+ async def get_membership_from_event_ids(
self, member_event_ids: Iterable[str]
) -> List[dict]:
"""Get user_id and membership of a set of event IDs.
"""
- return self.db_pool.simple_select_many_batch(
+ return await self.db_pool.simple_select_many_batch(
table="room_memberships",
column="event_id",
iterable=member_event_ids,
|