summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorPatrick Cloke <clokep@users.noreply.github.com>2020-08-17 12:18:01 -0400
committerGitHub <noreply@github.com>2020-08-17 12:18:01 -0400
commit050e20e7ca56c3a5985fdcf64012800c153260f2 (patch)
tree514cb57562e94f61c70a25c205113b728f0d3178 /synapse/storage
parentUse the default templates when a custom template file cannot be found (#8037) (diff)
downloadsynapse-050e20e7ca56c3a5985fdcf64012800c153260f2.tar.xz
Convert some of the general database methods to async (#8100)
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/database.py23
-rw-r--r--synapse/storage/databases/main/appservice.py2
-rw-r--r--synapse/storage/databases/main/events_worker.py16
-rw-r--r--synapse/storage/databases/main/registration.py8
-rw-r--r--synapse/storage/databases/main/roommember.py4
5 files changed, 24 insertions, 29 deletions
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index 4ada6f5563..8a9e06efcf 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -332,8 +332,7 @@ class DatabasePool(object):
         """
         return self._db_pool.running
 
-    @defer.inlineCallbacks
-    def _check_safe_to_upsert(self):
+    async def _check_safe_to_upsert(self):
         """
         Is it safe to use native UPSERT?
 
@@ -342,7 +341,7 @@ class DatabasePool(object):
 
         If the background updates have not completed, wait 15 sec and check again.
         """
-        updates = yield self.simple_select_list(
+        updates = await self.simple_select_list(
             "background_updates",
             keyvalues=None,
             retcols=["update_name"],
@@ -614,8 +613,7 @@ class DatabasePool(object):
     # "Simple" SQL API methods that operate on a single table with no JOINs,
     # no complex WHERE clauses, just a dict of values for columns.
 
-    @defer.inlineCallbacks
-    def simple_insert(self, table, values, or_ignore=False, desc="simple_insert"):
+    async def simple_insert(self, table, values, or_ignore=False, desc="simple_insert"):
         """Executes an INSERT query on the named table.
 
         Args:
@@ -631,7 +629,7 @@ class DatabasePool(object):
             `or_ignore` is True
         """
         try:
-            yield self.runInteraction(desc, self.simple_insert_txn, table, values)
+            await self.runInteraction(desc, self.simple_insert_txn, table, values)
         except self.engine.module.IntegrityError:
             # We have to do or_ignore flag at this layer, since we can't reuse
             # a cursor after we receive an error from the db.
@@ -684,8 +682,7 @@ class DatabasePool(object):
 
         txn.executemany(sql, vals)
 
-    @defer.inlineCallbacks
-    def simple_upsert(
+    async def simple_upsert(
         self,
         table,
         keyvalues,
@@ -714,14 +711,14 @@ class DatabasePool(object):
                 inserting
             lock (bool): True to lock the table when doing the upsert.
         Returns:
-            Deferred(None or bool): Native upserts always return None. Emulated
+            None or bool: Native upserts always return None. Emulated
             upserts return True if a new entry was created, False if an existing
             one was updated.
         """
         attempts = 0
         while True:
             try:
-                result = yield self.runInteraction(
+                return await self.runInteraction(
                     desc,
                     self.simple_upsert_txn,
                     table,
@@ -730,7 +727,6 @@ class DatabasePool(object):
                     insertion_values,
                     lock=lock,
                 )
-                return result
             except self.engine.module.IntegrityError as e:
                 attempts += 1
                 if attempts >= 5:
@@ -1121,8 +1117,7 @@ class DatabasePool(object):
 
         return cls.cursor_to_dict(txn)
 
-    @defer.inlineCallbacks
-    def simple_select_many_batch(
+    async def simple_select_many_batch(
         self,
         table,
         column,
@@ -1156,7 +1151,7 @@ class DatabasePool(object):
             it_list[i : i + batch_size] for i in range(0, len(it_list), batch_size)
         ]
         for chunk in chunks:
-            rows = yield self.runInteraction(
+            rows = await self.runInteraction(
                 desc,
                 self.simple_select_many_txn,
                 table,
diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py
index 5cf1a88399..02568a2391 100644
--- a/synapse/storage/databases/main/appservice.py
+++ b/synapse/storage/databases/main/appservice.py
@@ -169,7 +169,7 @@ class ApplicationServiceTransactionWorkerStore(
             service(ApplicationService): The service whose state to set.
             state(ApplicationServiceState): The connectivity state to apply.
         Returns:
-            A Deferred which resolves when the state was set successfully.
+            An Awaitable which resolves when the state was set successfully.
         """
         return self.db_pool.simple_upsert(
             "application_services_state", {"as_id": service.id}, {"state": state}
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 5687448e3d..8c63a0dc4d 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -847,13 +847,15 @@ class EventsWorkerStore(SQLBaseStore):
         """Given a list of event ids, check if we have already processed and
         stored them as non outliers.
         """
-        rows = yield self.db_pool.simple_select_many_batch(
-            table="events",
-            retcols=("event_id",),
-            column="event_id",
-            iterable=list(event_ids),
-            keyvalues={"outlier": False},
-            desc="have_events_in_timeline",
+        rows = yield defer.ensureDeferred(
+            self.db_pool.simple_select_many_batch(
+                table="events",
+                retcols=("event_id",),
+                column="event_id",
+                iterable=list(event_ids),
+                keyvalues={"outlier": False},
+                desc="have_events_in_timeline",
+            )
         )
 
         return {r["event_id"] for r in rows}
diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index de50fa6e94..068ad22b30 100644
--- a/synapse/storage/databases/main/registration.py
+++ b/synapse/storage/databases/main/registration.py
@@ -17,9 +17,7 @@
 
 import logging
 import re
-from typing import Dict, List, Optional
-
-from twisted.internet.defer import Deferred
+from typing import Awaitable, Dict, List, Optional
 
 from synapse.api.constants import UserTypes
 from synapse.api.errors import Codes, StoreError, SynapseError, ThreepidValidationError
@@ -563,7 +561,7 @@ class RegistrationWorkerStore(SQLBaseStore):
             id_server (str)
 
         Returns:
-            Deferred
+            Awaitable
         """
         # We need to use an upsert, in case they user had already bound the
         # threepid
@@ -1084,7 +1082,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
 
     def record_user_external_id(
         self, auth_provider: str, external_id: str, user_id: str
-    ) -> Deferred:
+    ) -> Awaitable:
         """Record a mapping from an external user id to a mxid
 
         Args:
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index 1cc8c08ed0..161edbeccb 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -767,13 +767,13 @@ class RoomMemberWorkerStore(EventsWorkerStore):
 
         return set(room_ids)
 
-    def get_membership_from_event_ids(
+    async def get_membership_from_event_ids(
         self, member_event_ids: Iterable[str]
     ) -> List[dict]:
         """Get user_id and membership of a set of event IDs.
         """
 
-        return self.db_pool.simple_select_many_batch(
+        return await self.db_pool.simple_select_many_batch(
             table="room_memberships",
             column="event_id",
             iterable=member_event_ids,