summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2020-08-28 15:59:57 +0100
committerRichard van der Hoff <richard@matrix.org>2020-08-28 15:59:57 +0100
commit5f224a4794cf5b25be8175b926bebc62994baf17 (patch)
treedfd9c8c998a70c500e78f56bd49cf0c589c25b77 /synapse/storage
parentMerge branch 'develop' into matrix-org-hotfixes (diff)
parentOnly return devices with keys from `/federation/v1/user/devices/` (#8198) (diff)
downloadsynapse-5f224a4794cf5b25be8175b926bebc62994baf17.tar.xz
Merge branch 'develop' into matrix-org-hotfixes
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/database.py70
-rw-r--r--synapse/storage/databases/main/appservice.py21
-rw-r--r--synapse/storage/databases/main/deviceinbox.py12
-rw-r--r--synapse/storage/databases/main/devices.py8
-rw-r--r--synapse/storage/databases/main/e2e_room_keys.py30
-rw-r--r--synapse/storage/databases/main/event_federation.py71
-rw-r--r--synapse/storage/databases/main/group_server.py245
-rw-r--r--synapse/storage/databases/main/keys.py50
-rw-r--r--synapse/storage/databases/main/media_repository.py22
-rw-r--r--synapse/storage/databases/main/monthly_active_users.py15
-rw-r--r--synapse/storage/databases/main/openid.py6
-rw-r--r--synapse/storage/databases/main/profile.py10
-rw-r--r--synapse/storage/databases/main/registration.py58
-rw-r--r--synapse/storage/databases/main/room.py16
-rw-r--r--synapse/storage/databases/main/state.py19
-rw-r--r--synapse/storage/databases/main/state_deltas.py21
-rw-r--r--synapse/storage/databases/main/stats.py90
-rw-r--r--synapse/storage/databases/main/stream.py11
-rw-r--r--synapse/storage/databases/main/transactions.py57
-rw-r--r--synapse/storage/databases/state/store.py26
-rw-r--r--synapse/storage/state.py16
21 files changed, 510 insertions, 364 deletions
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index 2f6f49a4bf..7ab370efef 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -28,7 +28,6 @@ from typing import (
     Optional,
     Tuple,
     TypeVar,
-    Union,
     overload,
 )
 
@@ -615,6 +614,7 @@ class DatabasePool(object):
         """Runs a single query for a result set.
 
         Args:
+            desc: description of the transaction, for logging and metrics
             decoder - The function which can resolve the cursor results to
                 something meaningful.
             query - The query string to execute
@@ -650,7 +650,7 @@ class DatabasePool(object):
             or_ignore: bool stating whether an exception should be raised
                 when a conflicting row already exists. If True, False will be
                 returned by the function instead
-            desc: string giving a description of the transaction
+            desc: description of the transaction, for logging and metrics
 
         Returns:
              Whether the row was inserted or not. Only useful when `or_ignore` is True
@@ -687,7 +687,7 @@ class DatabasePool(object):
         Args:
             table: string giving the table name
             values: dict of new column names and values for them
-            desc: string giving a description of the transaction
+            desc: description of the transaction, for logging and metrics
         """
         await self.runInteraction(desc, self.simple_insert_many_txn, table, values)
 
@@ -701,7 +701,6 @@ class DatabasePool(object):
             txn: The transaction to use.
             table: string giving the table name
             values: dict of new column names and values for them
-            desc: string giving a description of the transaction
         """
         if not values:
             return
@@ -756,6 +755,7 @@ class DatabasePool(object):
             keyvalues: The unique key columns and their new values
             values: The nonunique columns and their new values
             insertion_values: additional key/values to use only when inserting
+            desc: description of the transaction, for logging and metrics
             lock: True to lock the table when doing the upsert.
         Returns:
             Native upserts always return None. Emulated upserts return True if a
@@ -1082,6 +1082,7 @@ class DatabasePool(object):
             retcols: list of strings giving the names of the columns to return
             allow_none: If true, return None instead of failing if the SELECT
                 statement returns no rows
+            desc: description of the transaction, for logging and metrics
         """
         return await self.runInteraction(
             desc, self.simple_select_one_txn, table, keyvalues, retcols, allow_none
@@ -1167,6 +1168,7 @@ class DatabasePool(object):
             table: table name
             keyvalues: column names and values to select the rows with
             retcol: column whos value we wish to retrieve.
+            desc: description of the transaction, for logging and metrics
 
         Returns:
             Results in a list
@@ -1191,6 +1193,7 @@ class DatabasePool(object):
                 column names and values to select the rows with, or None to not
                 apply a WHERE clause.
             retcols: the names of the columns to return
+            desc: description of the transaction, for logging and metrics
 
         Returns:
             A list of dictionaries.
@@ -1244,14 +1247,16 @@ class DatabasePool(object):
         """Executes a SELECT query on the named table, which may return zero or
         more rows, returning the result as a list of dicts.
 
-        Filters rows by if value of `column` is in `iterable`.
+        Filters rows by whether the value of `column` is in `iterable`.
 
         Args:
             table: string giving the table name
             column: column name to test for inclusion against `iterable`
             iterable: list
-            keyvalues: dict of column names and values to select the rows with
             retcols: list of strings giving the names of the columns to return
+            keyvalues: dict of column names and values to select the rows with
+            desc: description of the transaction, for logging and metrics
+            batch_size: the number of rows for each select query
         """
         results = []  # type: List[Dict[str, Any]]
 
@@ -1292,7 +1297,7 @@ class DatabasePool(object):
         """Executes a SELECT query on the named table, which may return zero or
         more rows, returning the result as a list of dicts.
 
-        Filters rows by if value of `column` is in `iterable`.
+        Filters rows by whether the value of `column` is in `iterable`.
 
         Args:
             txn: Transaction object
@@ -1368,6 +1373,7 @@ class DatabasePool(object):
             table: string giving the table name
             keyvalues: dict of column names and values to select the row with
             updatevalues: dict giving column names and values to update
+            desc: description of the transaction, for logging and metrics
         """
         await self.runInteraction(
             desc, self.simple_update_one_txn, table, keyvalues, updatevalues
@@ -1427,6 +1433,7 @@ class DatabasePool(object):
         Args:
             table: string giving the table name
             keyvalues: dict of column names and values to select the row with
+            desc: description of the transaction, for logging and metrics
         """
         await self.runInteraction(desc, self.simple_delete_one_txn, table, keyvalues)
 
@@ -1452,13 +1459,38 @@ class DatabasePool(object):
         if txn.rowcount > 1:
             raise StoreError(500, "More than one row matched (%s)" % (table,))
 
-    def simple_delete(self, table: str, keyvalues: Dict[str, Any], desc: str):
-        return self.runInteraction(desc, self.simple_delete_txn, table, keyvalues)
+    async def simple_delete(
+        self, table: str, keyvalues: Dict[str, Any], desc: str
+    ) -> int:
+        """Executes a DELETE query on the named table.
+
+        Filters rows by the key-value pairs.
+
+        Args:
+            table: string giving the table name
+            keyvalues: dict of column names and values to select the row with
+            desc: description of the transaction, for logging and metrics
+
+        Returns:
+            The number of deleted rows.
+        """
+        return await self.runInteraction(desc, self.simple_delete_txn, table, keyvalues)
 
     @staticmethod
     def simple_delete_txn(
         txn: LoggingTransaction, table: str, keyvalues: Dict[str, Any]
     ) -> int:
+        """Executes a DELETE query on the named table.
+
+        Filters rows by the key-value pairs.
+
+        Args:
+            table: string giving the table name
+            keyvalues: dict of column names and values to select the row with
+
+        Returns:
+            The number of deleted rows.
+        """
         sql = "DELETE FROM %s WHERE %s" % (
             table,
             " AND ".join("%s = ?" % (k,) for k in keyvalues),
@@ -1475,6 +1507,20 @@ class DatabasePool(object):
         keyvalues: Dict[str, Any],
         desc: str,
     ) -> int:
+        """Executes a DELETE query on the named table.
+
+        Filters rows by if value of `column` is in `iterable`.
+
+        Args:
+            table: string giving the table name
+            column: column name to test for inclusion against `iterable`
+            iterable: list
+            keyvalues: dict of column names and values to select the rows with
+            desc: description of the transaction, for logging and metrics
+
+        Returns:
+            Number rows deleted
+        """
         return await self.runInteraction(
             desc, self.simple_delete_many_txn, table, column, iterable, keyvalues
         )
@@ -1655,7 +1701,7 @@ class DatabasePool(object):
         term: Optional[str],
         col: str,
         retcols: Iterable[str],
-    ) -> Union[List[Dict[str, Any]], int]:
+    ) -> Optional[List[Dict[str, Any]]]:
         """Executes a SELECT query on the named table, which may return zero or
         more rows, returning the result as a list of dicts.
 
@@ -1667,14 +1713,14 @@ class DatabasePool(object):
             retcols: the names of the columns to return
 
         Returns:
-            0 if no term is given, otherwise a list of dictionaries.
+            None if no term is given, otherwise a list of dictionaries.
         """
         if term:
             sql = "SELECT %s FROM %s WHERE %s LIKE ?" % (", ".join(retcols), table, col)
             termvalues = ["%%" + term + "%%"]
             txn.execute(sql, termvalues)
         else:
-            return 0
+            return None
 
         return cls.cursor_to_dict(txn)
 
diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py
index 77723f7d4d..454c0bc50c 100644
--- a/synapse/storage/databases/main/appservice.py
+++ b/synapse/storage/databases/main/appservice.py
@@ -161,20 +161,18 @@ class ApplicationServiceTransactionWorkerStore(
             return result.get("state")
         return None
 
-    def set_appservice_state(self, service, state):
+    async def set_appservice_state(self, service, state) -> None:
         """Set the application service state.
 
         Args:
             service(ApplicationService): The service whose state to set.
             state(ApplicationServiceState): The connectivity state to apply.
-        Returns:
-            An Awaitable which resolves when the state was set successfully.
         """
-        return self.db_pool.simple_upsert(
+        await self.db_pool.simple_upsert(
             "application_services_state", {"as_id": service.id}, {"state": state}
         )
 
-    def create_appservice_txn(self, service, events):
+    async def create_appservice_txn(self, service, events):
         """Atomically creates a new transaction for this application service
         with the given list of events.
 
@@ -211,20 +209,17 @@ class ApplicationServiceTransactionWorkerStore(
             )
             return AppServiceTransaction(service=service, id=new_txn_id, events=events)
 
-        return self.db_pool.runInteraction(
+        return await self.db_pool.runInteraction(
             "create_appservice_txn", _create_appservice_txn
         )
 
-    def complete_appservice_txn(self, txn_id, service):
+    async def complete_appservice_txn(self, txn_id, service) -> None:
         """Completes an application service transaction.
 
         Args:
             txn_id(str): The transaction ID being completed.
             service(ApplicationService): The application service which was sent
             this transaction.
-        Returns:
-            A Deferred which resolves if this transaction was stored
-            successfully.
         """
         txn_id = int(txn_id)
 
@@ -260,7 +255,7 @@ class ApplicationServiceTransactionWorkerStore(
                 {"txn_id": txn_id, "as_id": service.id},
             )
 
-        return self.db_pool.runInteraction(
+        await self.db_pool.runInteraction(
             "complete_appservice_txn", _complete_appservice_txn
         )
 
@@ -314,13 +309,13 @@ class ApplicationServiceTransactionWorkerStore(
         else:
             return int(last_txn_id[0])  # select 'last_txn' col
 
-    def set_appservice_last_pos(self, pos):
+    async def set_appservice_last_pos(self, pos) -> None:
         def set_appservice_last_pos_txn(txn):
             txn.execute(
                 "UPDATE appservice_stream_position SET stream_ordering = ?", (pos,)
             )
 
-        return self.db_pool.runInteraction(
+        await self.db_pool.runInteraction(
             "set_appservice_last_pos", set_appservice_last_pos_txn
         )
 
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index bb85637a95..0044433110 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -190,15 +190,15 @@ class DeviceInboxWorkerStore(SQLBaseStore):
         )
 
     @trace
-    def delete_device_msgs_for_remote(self, destination, up_to_stream_id):
+    async def delete_device_msgs_for_remote(
+        self, destination: str, up_to_stream_id: int
+    ) -> None:
         """Used to delete messages when the remote destination acknowledges
         their receipt.
 
         Args:
-            destination(str): The destination server_name
-            up_to_stream_id(int): Where to delete messages up to.
-        Returns:
-            A deferred that resolves when the messages have been deleted.
+            destination: The destination server_name
+            up_to_stream_id: Where to delete messages up to.
         """
 
         def delete_messages_for_remote_destination_txn(txn):
@@ -209,7 +209,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
             )
             txn.execute(sql, (destination, up_to_stream_id))
 
-        return self.db_pool.runInteraction(
+        await self.db_pool.runInteraction(
             "delete_device_msgs_for_remote", delete_messages_for_remote_destination_txn
         )
 
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index a811a39eb5..def96637a2 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -498,9 +498,7 @@ class DeviceWorkerStore(SQLBaseStore):
     ) -> Tuple[int, List[JsonDict]]:
         now_stream_id = self._device_list_id_gen.get_current_token()
 
-        devices = self._get_e2e_device_keys_txn(
-            txn, [(user_id, None)], include_all_devices=True
-        )
+        devices = self._get_e2e_device_keys_txn(txn, [(user_id, None)])
 
         if devices:
             user_devices = devices[user_id]
@@ -716,11 +714,11 @@ class DeviceWorkerStore(SQLBaseStore):
 
         return {row["user_id"] for row in rows}
 
-    def mark_remote_user_device_cache_as_stale(self, user_id: str):
+    async def mark_remote_user_device_cache_as_stale(self, user_id: str) -> None:
         """Records that the server has reason to believe the cache of the devices
         for the remote users is out of date.
         """
-        return self.db_pool.simple_upsert(
+        await self.db_pool.simple_upsert(
             table="device_lists_remote_resync",
             keyvalues={"user_id": user_id},
             values={},
diff --git a/synapse/storage/databases/main/e2e_room_keys.py b/synapse/storage/databases/main/e2e_room_keys.py
index 82f9d870fd..12cecceec2 100644
--- a/synapse/storage/databases/main/e2e_room_keys.py
+++ b/synapse/storage/databases/main/e2e_room_keys.py
@@ -151,7 +151,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
 
         return sessions
 
-    def get_e2e_room_keys_multi(self, user_id, version, room_keys):
+    async def get_e2e_room_keys_multi(self, user_id, version, room_keys):
         """Get multiple room keys at a time.  The difference between this function and
         get_e2e_room_keys is that this function can be used to retrieve
         multiple specific keys at a time, whereas get_e2e_room_keys is used for
@@ -166,10 +166,10 @@ class EndToEndRoomKeyStore(SQLBaseStore):
                 that we want to query
 
         Returns:
-           Deferred[dict[str, dict[str, dict]]]: a map of room IDs to session IDs to room key
+           dict[str, dict[str, dict]]: a map of room IDs to session IDs to room key
         """
 
-        return self.db_pool.runInteraction(
+        return await self.db_pool.runInteraction(
             "get_e2e_room_keys_multi",
             self._get_e2e_room_keys_multi_txn,
             user_id,
@@ -283,7 +283,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
             raise StoreError(404, "No current backup version")
         return row[0]
 
-    def get_e2e_room_keys_version_info(self, user_id, version=None):
+    async def get_e2e_room_keys_version_info(self, user_id, version=None):
         """Get info metadata about a version of our room_keys backup.
 
         Args:
@@ -293,7 +293,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
         Raises:
             StoreError: with code 404 if there are no e2e_room_keys_versions present
         Returns:
-            A deferred dict giving the info metadata for this backup version, with
+            A dict giving the info metadata for this backup version, with
             fields including:
                 version(str)
                 algorithm(str)
@@ -324,12 +324,12 @@ class EndToEndRoomKeyStore(SQLBaseStore):
                 result["etag"] = 0
             return result
 
-        return self.db_pool.runInteraction(
+        return await self.db_pool.runInteraction(
             "get_e2e_room_keys_version_info", _get_e2e_room_keys_version_info_txn
         )
 
     @trace
-    def create_e2e_room_keys_version(self, user_id, info):
+    async def create_e2e_room_keys_version(self, user_id: str, info: dict) -> str:
         """Atomically creates a new version of this user's e2e_room_keys store
         with the given version info.
 
@@ -338,7 +338,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
             info(dict): the info about the backup version to be created
 
         Returns:
-            A deferred string for the newly created version ID
+            The newly created version ID
         """
 
         def _create_e2e_room_keys_version_txn(txn):
@@ -365,7 +365,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
 
             return new_version
 
-        return self.db_pool.runInteraction(
+        return await self.db_pool.runInteraction(
             "create_e2e_room_keys_version_txn", _create_e2e_room_keys_version_txn
         )
 
@@ -403,13 +403,15 @@ class EndToEndRoomKeyStore(SQLBaseStore):
             )
 
     @trace
-    def delete_e2e_room_keys_version(self, user_id, version=None):
+    async def delete_e2e_room_keys_version(
+        self, user_id: str, version: Optional[str] = None
+    ) -> None:
         """Delete a given backup version of the user's room keys.
         Doesn't delete their actual key data.
 
         Args:
-            user_id(str): the user whose backup version we're deleting
-            version(str): Optional. the version ID of the backup version we're deleting
+            user_id: the user whose backup version we're deleting
+            version: Optional. the version ID of the backup version we're deleting
                 If missing, we delete the current backup version info.
         Raises:
             StoreError: with code 404 if there are no e2e_room_keys_versions present,
@@ -430,13 +432,13 @@ class EndToEndRoomKeyStore(SQLBaseStore):
                 keyvalues={"user_id": user_id, "version": this_version},
             )
 
-            return self.db_pool.simple_update_one_txn(
+            self.db_pool.simple_update_one_txn(
                 txn,
                 table="e2e_room_keys_versions",
                 keyvalues={"user_id": user_id, "version": this_version},
                 updatevalues={"deleted": 1},
             )
 
-        return self.db_pool.runInteraction(
+        await self.db_pool.runInteraction(
             "delete_e2e_room_keys_version", _delete_e2e_room_keys_version_txn
         )
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 6e5761c7b7..0b69aa6a94 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -59,7 +59,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
             include_given: include the given events in result
 
         Returns:
-            list of event_ids
+            An awaitable which resolve to a list of event_ids
         """
         return await self.db_pool.runInteraction(
             "get_auth_chain_ids",
@@ -95,7 +95,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
 
         return list(results)
 
-    def get_auth_chain_difference(self, state_sets: List[Set[str]]):
+    async def get_auth_chain_difference(self, state_sets: List[Set[str]]) -> Set[str]:
         """Given sets of state events figure out the auth chain difference (as
         per state res v2 algorithm).
 
@@ -104,10 +104,10 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
         chain.
 
         Returns:
-            Deferred[Set[str]]
+            The set of the difference in auth chains.
         """
 
-        return self.db_pool.runInteraction(
+        return await self.db_pool.runInteraction(
             "get_auth_chain_difference",
             self._get_auth_chain_difference_txn,
             state_sets,
@@ -252,8 +252,8 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
         # Return all events where not all sets can reach them.
         return {eid for eid, n in event_to_missing_sets.items() if n}
 
-    def get_oldest_events_with_depth_in_room(self, room_id):
-        return self.db_pool.runInteraction(
+    async def get_oldest_events_with_depth_in_room(self, room_id):
+        return await self.db_pool.runInteraction(
             "get_oldest_events_with_depth_in_room",
             self.get_oldest_events_with_depth_in_room_txn,
             room_id,
@@ -293,7 +293,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
         else:
             return max(row["depth"] for row in rows)
 
-    def get_prev_events_for_room(self, room_id: str):
+    async def get_prev_events_for_room(self, room_id: str) -> List[str]:
         """
         Gets a subset of the current forward extremities in the given room.
 
@@ -301,14 +301,14 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
         events which refer to hundreds of prev_events.
 
         Args:
-            room_id (str): room_id
+            room_id: room_id
 
         Returns:
-            Deferred[List[str]]: the event ids of the forward extremites
+            The event ids of the forward extremities.
 
         """
 
-        return self.db_pool.runInteraction(
+        return await self.db_pool.runInteraction(
             "get_prev_events_for_room", self._get_prev_events_for_room_txn, room_id
         )
 
@@ -328,17 +328,19 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
 
         return [row[0] for row in txn]
 
-    def get_rooms_with_many_extremities(self, min_count, limit, room_id_filter):
+    async def get_rooms_with_many_extremities(
+        self, min_count: int, limit: int, room_id_filter: Iterable[str]
+    ) -> List[str]:
         """Get the top rooms with at least N extremities.
 
         Args:
-            min_count (int): The minimum number of extremities
-            limit (int): The maximum number of rooms to return.
-            room_id_filter (iterable[str]): room_ids to exclude from the results
+            min_count: The minimum number of extremities
+            limit: The maximum number of rooms to return.
+            room_id_filter: room_ids to exclude from the results
 
         Returns:
-            Deferred[list]: At most `limit` room IDs that have at least
-            `min_count` extremities, sorted by extremity count.
+            At most `limit` room IDs that have at least `min_count` extremities,
+            sorted by extremity count.
         """
 
         def _get_rooms_with_many_extremities_txn(txn):
@@ -363,7 +365,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
             txn.execute(sql, query_args)
             return [room_id for room_id, in txn]
 
-        return self.db_pool.runInteraction(
+        return await self.db_pool.runInteraction(
             "get_rooms_with_many_extremities", _get_rooms_with_many_extremities_txn
         )
 
@@ -376,10 +378,10 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
             desc="get_latest_event_ids_in_room",
         )
 
-    def get_min_depth(self, room_id):
-        """ For hte given room, get the minimum depth we have seen for it.
+    async def get_min_depth(self, room_id: str) -> int:
+        """For the given room, get the minimum depth we have seen for it.
         """
-        return self.db_pool.runInteraction(
+        return await self.db_pool.runInteraction(
             "get_min_depth", self._get_min_depth_interaction, room_id
         )
 
@@ -394,7 +396,9 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
 
         return int(min_depth) if min_depth is not None else None
 
-    def get_forward_extremeties_for_room(self, room_id, stream_ordering):
+    async def get_forward_extremeties_for_room(
+        self, room_id: str, stream_ordering: int
+    ) -> List[str]:
         """For a given room_id and stream_ordering, return the forward
         extremeties of the room at that point in "time".
 
@@ -402,11 +406,11 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
         stream_orderings from that point.
 
         Args:
-            room_id (str):
-            stream_ordering (int):
+            room_id:
+            stream_ordering:
 
         Returns:
-            deferred, which resolves to a list of event_ids
+            A list of event_ids
         """
         # We want to make the cache more effective, so we clamp to the last
         # change before the given ordering.
@@ -422,10 +426,10 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
         if last_change > self.stream_ordering_month_ago:
             stream_ordering = min(last_change, stream_ordering)
 
-        return self._get_forward_extremeties_for_room(room_id, stream_ordering)
+        return await self._get_forward_extremeties_for_room(room_id, stream_ordering)
 
     @cached(max_entries=5000, num_args=2)
-    def _get_forward_extremeties_for_room(self, room_id, stream_ordering):
+    async def _get_forward_extremeties_for_room(self, room_id, stream_ordering):
         """For a given room_id and stream_ordering, return the forward
         extremeties of the room at that point in "time".
 
@@ -450,19 +454,18 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
             txn.execute(sql, (stream_ordering, room_id))
             return [event_id for event_id, in txn]
 
-        return self.db_pool.runInteraction(
+        return await self.db_pool.runInteraction(
             "get_forward_extremeties_for_room", get_forward_extremeties_for_room_txn
         )
 
-    async def get_backfill_events(self, room_id, event_list, limit):
+    async def get_backfill_events(self, room_id: str, event_list: list, limit: int):
         """Get a list of Events for a given topic that occurred before (and
         including) the events in event_list. Return a list of max size `limit`
 
         Args:
-            txn
-            room_id (str)
-            event_list (list)
-            limit (int)
+            room_id
+            event_list
+            limit
         """
         event_ids = await self.db_pool.runInteraction(
             "get_backfill_events",
@@ -631,8 +634,8 @@ class EventFederationStore(EventFederationWorkerStore):
             _delete_old_forward_extrem_cache_txn,
         )
 
-    def clean_room_for_join(self, room_id):
-        return self.db_pool.runInteraction(
+    async def clean_room_for_join(self, room_id):
+        return await self.db_pool.runInteraction(
             "clean_room_for_join", self._clean_room_for_join_txn, room_id
         )
 
diff --git a/synapse/storage/databases/main/group_server.py b/synapse/storage/databases/main/group_server.py
index e3ead71853..ccfbb2135e 100644
--- a/synapse/storage/databases/main/group_server.py
+++ b/synapse/storage/databases/main/group_server.py
@@ -14,7 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from typing import Any, Dict, List, Optional, Tuple
+from typing import Any, Dict, List, Optional, Tuple, Union
 
 from synapse.api.errors import SynapseError
 from synapse.storage._base import SQLBaseStore, db_to_json
@@ -70,7 +70,9 @@ class GroupServerWorkerStore(SQLBaseStore):
             desc="get_invited_users_in_group",
         )
 
-    def get_rooms_in_group(self, group_id: str, include_private: bool = False):
+    async def get_rooms_in_group(
+        self, group_id: str, include_private: bool = False
+    ) -> List[Dict[str, Union[str, bool]]]:
         """Retrieve the rooms that belong to a given group. Does not return rooms that
         lack members.
 
@@ -79,8 +81,7 @@ class GroupServerWorkerStore(SQLBaseStore):
             include_private: Whether to return private rooms in results
 
         Returns:
-            Deferred[List[Dict[str, str|bool]]]: A list of dictionaries, each in the
-            form of:
+            A list of dictionaries, each in the form of:
 
             {
               "room_id": "!a_room_id:example.com",  # The ID of the room
@@ -117,13 +118,13 @@ class GroupServerWorkerStore(SQLBaseStore):
                 for room_id, is_public in txn
             ]
 
-        return self.db_pool.runInteraction(
+        return await self.db_pool.runInteraction(
             "get_rooms_in_group", _get_rooms_in_group_txn
         )
 
-    def get_rooms_for_summary_by_category(
+    async def get_rooms_for_summary_by_category(
         self, group_id: str, include_private: bool = False,
-    ):
+    ) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]:
         """Get the rooms and categories that should be included in a summary request
 
         Args:
@@ -131,7 +132,7 @@ class GroupServerWorkerStore(SQLBaseStore):
             include_private: Whether to return private rooms in results
 
         Returns:
-            Deferred[Tuple[List, Dict]]: A tuple containing:
+            A tuple containing:
 
                 * A list of dictionaries with the keys:
                     * "room_id": str, the room ID
@@ -207,7 +208,7 @@ class GroupServerWorkerStore(SQLBaseStore):
 
             return rooms, categories
 
-        return self.db_pool.runInteraction(
+        return await self.db_pool.runInteraction(
             "get_rooms_for_summary", _get_rooms_for_summary_txn
         )
 
@@ -281,10 +282,11 @@ class GroupServerWorkerStore(SQLBaseStore):
             desc="get_local_groups_for_room",
         )
 
-    def get_users_for_summary_by_role(self, group_id, include_private=False):
+    async def get_users_for_summary_by_role(self, group_id, include_private=False):
         """Get the users and roles that should be included in a summary request
 
-        Returns ([users], [roles])
+        Returns:
+            ([users], [roles])
         """
 
         def _get_users_for_summary_txn(txn):
@@ -338,7 +340,7 @@ class GroupServerWorkerStore(SQLBaseStore):
 
             return users, roles
 
-        return self.db_pool.runInteraction(
+        return await self.db_pool.runInteraction(
             "get_users_for_summary_by_role", _get_users_for_summary_txn
         )
 
@@ -376,7 +378,7 @@ class GroupServerWorkerStore(SQLBaseStore):
             allow_none=True,
         )
 
-    def get_users_membership_info_in_group(self, group_id, user_id):
+    async def get_users_membership_info_in_group(self, group_id, user_id):
         """Get a dict describing the membership of a user in a group.
 
         Example if joined:
@@ -387,7 +389,8 @@ class GroupServerWorkerStore(SQLBaseStore):
                 "is_privileged": False,
             }
 
-        Returns an empty dict if the user is not join/invite/etc
+        Returns:
+             An empty dict if the user is not join/invite/etc
         """
 
         def _get_users_membership_in_group_txn(txn):
@@ -419,7 +422,7 @@ class GroupServerWorkerStore(SQLBaseStore):
 
             return {}
 
-        return self.db_pool.runInteraction(
+        return await self.db_pool.runInteraction(
             "get_users_membership_info_in_group", _get_users_membership_in_group_txn
         )
 
@@ -433,7 +436,7 @@ class GroupServerWorkerStore(SQLBaseStore):
             desc="get_publicised_groups_for_user",
         )
 
-    def get_attestations_need_renewals(self, valid_until_ms):
+    async def get_attestations_need_renewals(self, valid_until_ms):
         """Get all attestations that need to be renewed until givent time
         """
 
@@ -445,7 +448,7 @@ class GroupServerWorkerStore(SQLBaseStore):
             txn.execute(sql, (valid_until_ms,))
             return self.db_pool.cursor_to_dict(txn)
 
-        return self.db_pool.runInteraction(
+        return await self.db_pool.runInteraction(
             "get_attestations_need_renewals", _get_attestations_need_renewals_txn
         )
 
@@ -475,7 +478,7 @@ class GroupServerWorkerStore(SQLBaseStore):
             desc="get_joined_groups",
         )
 
-    def get_all_groups_for_user(self, user_id, now_token):
+    async def get_all_groups_for_user(self, user_id, now_token):
         def _get_all_groups_for_user_txn(txn):
             sql = """
                 SELECT group_id, type, membership, u.content
@@ -495,7 +498,7 @@ class GroupServerWorkerStore(SQLBaseStore):
                 for row in txn
             ]
 
-        return self.db_pool.runInteraction(
+        return await self.db_pool.runInteraction(
             "get_all_groups_for_user", _get_all_groups_for_user_txn
         )
 
@@ -600,8 +603,27 @@ class GroupServerStore(GroupServerWorkerStore):
             desc="set_group_join_policy",
         )
 
-    def add_room_to_summary(self, group_id, room_id, category_id, order, is_public):
-        return self.db_pool.runInteraction(
+    async def add_room_to_summary(
+        self,
+        group_id: str,
+        room_id: str,
+        category_id: str,
+        order: int,
+        is_public: Optional[bool],
+    ) -> None:
+        """Add (or update) room's entry in summary.
+
+        Args:
+            group_id
+            room_id
+            category_id: If not None then adds the category to the end of
+                the summary if its not already there.
+            order: If not None inserts the room at that position, e.g. an order
+                of 1 will put the room first. Otherwise, the room gets added to
+                the end.
+            is_public
+        """
+        await self.db_pool.runInteraction(
             "add_room_to_summary",
             self._add_room_to_summary_txn,
             group_id,
@@ -612,18 +634,26 @@ class GroupServerStore(GroupServerWorkerStore):
         )
 
     def _add_room_to_summary_txn(
-        self, txn, group_id, room_id, category_id, order, is_public
-    ):
+        self,
+        txn,
+        group_id: str,
+        room_id: str,
+        category_id: str,
+        order: int,
+        is_public: Optional[bool],
+    ) -> None:
         """Add (or update) room's entry in summary.
 
         Args:
-            group_id (str)
-            room_id (str)
-            category_id (str): If not None then adds the category to the end of
-                the summary if its not already there. [Optional]
-            order (int): If not None inserts the room at that position, e.g.
-                an order of 1 will put the room first. Otherwise, the room gets
-                added to the end.
+            txn
+            group_id
+            room_id
+            category_id: If not None then adds the category to the end of
+                the summary if its not already there.
+            order: If not None inserts the room at that position, e.g. an order
+                of 1 will put the room first. Otherwise, the room gets added to
+                the end.
+            is_public
         """
         room_in_group = self.db_pool.simple_select_one_onecol_txn(
             txn,
@@ -728,11 +758,13 @@ class GroupServerStore(GroupServerWorkerStore):
                 },
             )
 
-    def remove_room_from_summary(self, group_id, room_id, category_id):
+    async def remove_room_from_summary(
+        self, group_id: str, room_id: str, category_id: str
+    ) -> int:
         if category_id is None:
             category_id = _DEFAULT_CATEGORY_ID
 
-        return self.db_pool.simple_delete(
+        return await self.db_pool.simple_delete(
             table="group_summary_rooms",
             keyvalues={
                 "group_id": group_id,
@@ -742,7 +774,13 @@ class GroupServerStore(GroupServerWorkerStore):
             desc="remove_room_from_summary",
         )
 
-    def upsert_group_category(self, group_id, category_id, profile, is_public):
+    async def upsert_group_category(
+        self,
+        group_id: str,
+        category_id: str,
+        profile: Optional[JsonDict],
+        is_public: Optional[bool],
+    ) -> None:
         """Add/update room category for group
         """
         insertion_values = {}
@@ -758,7 +796,7 @@ class GroupServerStore(GroupServerWorkerStore):
         else:
             update_values["is_public"] = is_public
 
-        return self.db_pool.simple_upsert(
+        await self.db_pool.simple_upsert(
             table="group_room_categories",
             keyvalues={"group_id": group_id, "category_id": category_id},
             values=update_values,
@@ -766,14 +804,20 @@ class GroupServerStore(GroupServerWorkerStore):
             desc="upsert_group_category",
         )
 
-    def remove_group_category(self, group_id, category_id):
-        return self.db_pool.simple_delete(
+    async def remove_group_category(self, group_id: str, category_id: str) -> int:
+        return await self.db_pool.simple_delete(
             table="group_room_categories",
             keyvalues={"group_id": group_id, "category_id": category_id},
             desc="remove_group_category",
         )
 
-    def upsert_group_role(self, group_id, role_id, profile, is_public):
+    async def upsert_group_role(
+        self,
+        group_id: str,
+        role_id: str,
+        profile: Optional[JsonDict],
+        is_public: Optional[bool],
+    ) -> None:
         """Add/remove user role
         """
         insertion_values = {}
@@ -789,7 +833,7 @@ class GroupServerStore(GroupServerWorkerStore):
         else:
             update_values["is_public"] = is_public
 
-        return self.db_pool.simple_upsert(
+        await self.db_pool.simple_upsert(
             table="group_roles",
             keyvalues={"group_id": group_id, "role_id": role_id},
             values=update_values,
@@ -797,15 +841,34 @@ class GroupServerStore(GroupServerWorkerStore):
             desc="upsert_group_role",
         )
 
-    def remove_group_role(self, group_id, role_id):
-        return self.db_pool.simple_delete(
+    async def remove_group_role(self, group_id: str, role_id: str) -> int:
+        return await self.db_pool.simple_delete(
             table="group_roles",
             keyvalues={"group_id": group_id, "role_id": role_id},
             desc="remove_group_role",
         )
 
-    def add_user_to_summary(self, group_id, user_id, role_id, order, is_public):
-        return self.db_pool.runInteraction(
+    async def add_user_to_summary(
+        self,
+        group_id: str,
+        user_id: str,
+        role_id: str,
+        order: int,
+        is_public: Optional[bool],
+    ) -> None:
+        """Add (or update) user's entry in summary.
+
+        Args:
+            group_id
+            user_id
+            role_id: If not None then adds the role to the end of the summary if
+                its not already there.
+            order: If not None inserts the user at that position, e.g. an order
+                of 1 will put the user first. Otherwise, the user gets added to
+                the end.
+            is_public
+        """
+        await self.db_pool.runInteraction(
             "add_user_to_summary",
             self._add_user_to_summary_txn,
             group_id,
@@ -816,18 +879,26 @@ class GroupServerStore(GroupServerWorkerStore):
         )
 
     def _add_user_to_summary_txn(
-        self, txn, group_id, user_id, role_id, order, is_public
+        self,
+        txn,
+        group_id: str,
+        user_id: str,
+        role_id: str,
+        order: int,
+        is_public: Optional[bool],
     ):
         """Add (or update) user's entry in summary.
 
         Args:
-            group_id (str)
-            user_id (str)
-            role_id (str): If not None then adds the role to the end of
-                the summary if its not already there. [Optional]
-            order (int): If not None inserts the user at that position, e.g.
-                an order of 1 will put the user first. Otherwise, the user gets
-                added to the end.
+            txn
+            group_id
+            user_id
+            role_id: If not None then adds the role to the end of the summary if
+                its not already there.
+            order: If not None inserts the user at that position, e.g. an order
+                of 1 will put the user first. Otherwise, the user gets added to
+                the end.
+            is_public
         """
         user_in_group = self.db_pool.simple_select_one_onecol_txn(
             txn,
@@ -928,46 +999,47 @@ class GroupServerStore(GroupServerWorkerStore):
                 },
             )
 
-    def remove_user_from_summary(self, group_id, user_id, role_id):
+    async def remove_user_from_summary(
+        self, group_id: str, user_id: str, role_id: str
+    ) -> int:
         if role_id is None:
             role_id = _DEFAULT_ROLE_ID
 
-        return self.db_pool.simple_delete(
+        return await self.db_pool.simple_delete(
             table="group_summary_users",
             keyvalues={"group_id": group_id, "role_id": role_id, "user_id": user_id},
             desc="remove_user_from_summary",
         )
 
-    def add_group_invite(self, group_id, user_id):
+    async def add_group_invite(self, group_id: str, user_id: str) -> None:
         """Record that the group server has invited a user
         """
-        return self.db_pool.simple_insert(
+        await self.db_pool.simple_insert(
             table="group_invites",
             values={"group_id": group_id, "user_id": user_id},
             desc="add_group_invite",
         )
 
-    def add_user_to_group(
+    async def add_user_to_group(
         self,
-        group_id,
-        user_id,
-        is_admin=False,
-        is_public=True,
-        local_attestation=None,
-        remote_attestation=None,
-    ):
+        group_id: str,
+        user_id: str,
+        is_admin: bool = False,
+        is_public: bool = True,
+        local_attestation: dict = None,
+        remote_attestation: dict = None,
+    ) -> None:
         """Add a user to the group server.
 
         Args:
-            group_id (str)
-            user_id (str)
-            is_admin (bool)
-            is_public (bool)
-            local_attestation (dict): The attestation the GS created to give
-                to the remote server. Optional if the user and group are on the
-                same server
-            remote_attestation (dict): The attestation given to GS by remote
+            group_id
+            user_id
+            is_admin
+            is_public
+            local_attestation: The attestation the GS created to give to the remote
                 server. Optional if the user and group are on the same server
+            remote_attestation: The attestation given to GS by remote server.
+                Optional if the user and group are on the same server
         """
 
         def _add_user_to_group_txn(txn):
@@ -1010,9 +1082,9 @@ class GroupServerStore(GroupServerWorkerStore):
                     },
                 )
 
-        return self.db_pool.runInteraction("add_user_to_group", _add_user_to_group_txn)
+        await self.db_pool.runInteraction("add_user_to_group", _add_user_to_group_txn)
 
-    def remove_user_from_group(self, group_id, user_id):
+    async def remove_user_from_group(self, group_id: str, user_id: str) -> None:
         def _remove_user_from_group_txn(txn):
             self.db_pool.simple_delete_txn(
                 txn,
@@ -1040,12 +1112,14 @@ class GroupServerStore(GroupServerWorkerStore):
                 keyvalues={"group_id": group_id, "user_id": user_id},
             )
 
-        return self.db_pool.runInteraction(
+        await self.db_pool.runInteraction(
             "remove_user_from_group", _remove_user_from_group_txn
         )
 
-    def add_room_to_group(self, group_id, room_id, is_public):
-        return self.db_pool.simple_insert(
+    async def add_room_to_group(
+        self, group_id: str, room_id: str, is_public: bool
+    ) -> None:
+        await self.db_pool.simple_insert(
             table="group_rooms",
             values={"group_id": group_id, "room_id": room_id, "is_public": is_public},
             desc="add_room_to_group",
@@ -1061,7 +1135,7 @@ class GroupServerStore(GroupServerWorkerStore):
             desc="update_room_in_group_visibility",
         )
 
-    def remove_room_from_group(self, group_id, room_id):
+    async def remove_room_from_group(self, group_id: str, room_id: str) -> None:
         def _remove_room_from_group_txn(txn):
             self.db_pool.simple_delete_txn(
                 txn,
@@ -1075,7 +1149,7 @@ class GroupServerStore(GroupServerWorkerStore):
                 keyvalues={"group_id": group_id, "room_id": room_id},
             )
 
-        return self.db_pool.runInteraction(
+        await self.db_pool.runInteraction(
             "remove_room_from_group", _remove_room_from_group_txn
         )
 
@@ -1250,16 +1324,16 @@ class GroupServerStore(GroupServerWorkerStore):
             desc="update_remote_attestion",
         )
 
-    def remove_attestation_renewal(self, group_id, user_id):
+    async def remove_attestation_renewal(self, group_id: str, user_id: str) -> int:
         """Remove an attestation that we thought we should renew, but actually
         shouldn't. Ideally this would never get called as we would never
         incorrectly try and do attestations for local users on local groups.
 
         Args:
-            group_id (str)
-            user_id (str)
+            group_id
+            user_id
         """
-        return self.db_pool.simple_delete(
+        return await self.db_pool.simple_delete(
             table="group_attestations_renewals",
             keyvalues={"group_id": group_id, "user_id": user_id},
             desc="remove_attestation_renewal",
@@ -1268,14 +1342,11 @@ class GroupServerStore(GroupServerWorkerStore):
     def get_group_stream_token(self):
         return self._group_updates_id_gen.get_current_token()
 
-    def delete_group(self, group_id):
+    async def delete_group(self, group_id: str) -> None:
         """Deletes a group fully from the database.
 
         Args:
-            group_id (str)
-
-        Returns:
-            Deferred
+            group_id: The group ID to delete.
         """
 
         def _delete_group_txn(txn):
@@ -1299,4 +1370,4 @@ class GroupServerStore(GroupServerWorkerStore):
                     txn, table=table, keyvalues={"group_id": group_id}
                 )
 
-        return self.db_pool.runInteraction("delete_group", _delete_group_txn)
+        await self.db_pool.runInteraction("delete_group", _delete_group_txn)
diff --git a/synapse/storage/databases/main/keys.py b/synapse/storage/databases/main/keys.py
index fadcad51e7..ad43bb05ab 100644
--- a/synapse/storage/databases/main/keys.py
+++ b/synapse/storage/databases/main/keys.py
@@ -16,7 +16,7 @@
 
 import itertools
 import logging
-from typing import Iterable, Tuple
+from typing import Dict, Iterable, List, Optional, Tuple
 
 from signedjson.key import decode_verify_key_bytes
 
@@ -42,16 +42,17 @@ class KeyStore(SQLBaseStore):
     @cachedList(
         cached_method_name="_get_server_verify_key", list_name="server_name_and_key_ids"
     )
-    def get_server_verify_keys(self, server_name_and_key_ids):
+    async def get_server_verify_keys(
+        self, server_name_and_key_ids: Iterable[Tuple[str, str]]
+    ) -> Dict[Tuple[str, str], Optional[FetchKeyResult]]:
         """
         Args:
-            server_name_and_key_ids (iterable[Tuple[str, str]]):
+            server_name_and_key_ids:
                 iterable of (server_name, key-id) tuples to fetch keys for
 
         Returns:
-            Deferred: resolves to dict[Tuple[str, str], FetchKeyResult|None]:
-                map from (server_name, key_id) -> FetchKeyResult, or None if the key is
-                unknown
+            A map from (server_name, key_id) -> FetchKeyResult, or None if the
+            key is unknown
         """
         keys = {}
 
@@ -87,7 +88,7 @@ class KeyStore(SQLBaseStore):
                 _get_keys(txn, batch)
             return keys
 
-        return self.db_pool.runInteraction("get_server_verify_keys", _txn)
+        return await self.db_pool.runInteraction("get_server_verify_keys", _txn)
 
     async def store_server_verify_keys(
         self,
@@ -140,22 +141,28 @@ class KeyStore(SQLBaseStore):
         for i in invalidations:
             invalidate((i,))
 
-    def store_server_keys_json(
-        self, server_name, key_id, from_server, ts_now_ms, ts_expires_ms, key_json_bytes
-    ):
+    async def store_server_keys_json(
+        self,
+        server_name: str,
+        key_id: str,
+        from_server: str,
+        ts_now_ms: int,
+        ts_expires_ms: int,
+        key_json_bytes: bytes,
+    ) -> None:
         """Stores the JSON bytes for a set of keys from a server
         The JSON should be signed by the originating server, the intermediate
         server, and by this server. Updates the value for the
         (server_name, key_id, from_server) triplet if one already existed.
         Args:
-            server_name (str): The name of the server.
-            key_id (str): The identifer of the key this JSON is for.
-            from_server (str): The server this JSON was fetched from.
-            ts_now_ms (int): The time now in milliseconds.
-            ts_valid_until_ms (int): The time when this json stops being valid.
-            key_json (bytes): The encoded JSON.
+            server_name: The name of the server.
+            key_id: The identifer of the key this JSON is for.
+            from_server: The server this JSON was fetched from.
+            ts_now_ms: The time now in milliseconds.
+            ts_valid_until_ms: The time when this json stops being valid.
+            key_json_bytes: The encoded JSON.
         """
-        return self.db_pool.simple_upsert(
+        await self.db_pool.simple_upsert(
             table="server_keys_json",
             keyvalues={
                 "server_name": server_name,
@@ -173,7 +180,9 @@ class KeyStore(SQLBaseStore):
             desc="store_server_keys_json",
         )
 
-    def get_server_keys_json(self, server_keys):
+    async def get_server_keys_json(
+        self, server_keys: Iterable[Tuple[str, Optional[str], Optional[str]]]
+    ) -> Dict[Tuple[str, Optional[str], Optional[str]], List[dict]]:
         """Retrive the key json for a list of server_keys and key ids.
         If no keys are found for a given server, key_id and source then
         that server, key_id, and source triplet entry will be an empty list.
@@ -182,8 +191,7 @@ class KeyStore(SQLBaseStore):
         Args:
             server_keys (list): List of (server_name, key_id, source) triplets.
         Returns:
-            Deferred[dict[Tuple[str, str, str|None], list[dict]]]:
-                Dict mapping (server_name, key_id, source) triplets to lists of dicts
+            A mapping from (server_name, key_id, source) triplets to a list of dicts
         """
 
         def _get_server_keys_json_txn(txn):
@@ -209,6 +217,6 @@ class KeyStore(SQLBaseStore):
                 results[(server_name, key_id, from_server)] = rows
             return results
 
-        return self.db_pool.runInteraction(
+        return await self.db_pool.runInteraction(
             "get_server_keys_json", _get_server_keys_json_txn
         )
diff --git a/synapse/storage/databases/main/media_repository.py b/synapse/storage/databases/main/media_repository.py
index 8361dd63d9..3919ecad69 100644
--- a/synapse/storage/databases/main/media_repository.py
+++ b/synapse/storage/databases/main/media_repository.py
@@ -60,7 +60,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
             desc="get_local_media",
         )
 
-    def store_local_media(
+    async def store_local_media(
         self,
         media_id,
         media_type,
@@ -69,8 +69,8 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
         media_length,
         user_id,
         url_cache=None,
-    ):
-        return self.db_pool.simple_insert(
+    ) -> None:
+        await self.db_pool.simple_insert(
             "local_media_repository",
             {
                 "media_id": media_id,
@@ -141,10 +141,10 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
 
         return self.db_pool.runInteraction("get_url_cache", get_url_cache_txn)
 
-    def store_url_cache(
+    async def store_url_cache(
         self, url, response_code, etag, expires_ts, og, media_id, download_ts
     ):
-        return self.db_pool.simple_insert(
+        await self.db_pool.simple_insert(
             "local_media_repository_url_cache",
             {
                 "url": url,
@@ -172,7 +172,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
             desc="get_local_media_thumbnails",
         )
 
-    def store_local_thumbnail(
+    async def store_local_thumbnail(
         self,
         media_id,
         thumbnail_width,
@@ -181,7 +181,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
         thumbnail_method,
         thumbnail_length,
     ):
-        return self.db_pool.simple_insert(
+        await self.db_pool.simple_insert(
             "local_media_repository_thumbnails",
             {
                 "media_id": media_id,
@@ -212,7 +212,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
             desc="get_cached_remote_media",
         )
 
-    def store_cached_remote_media(
+    async def store_cached_remote_media(
         self,
         origin,
         media_id,
@@ -222,7 +222,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
         upload_name,
         filesystem_id,
     ):
-        return self.db_pool.simple_insert(
+        await self.db_pool.simple_insert(
             "remote_media_cache",
             {
                 "media_origin": origin,
@@ -288,7 +288,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
             desc="get_remote_media_thumbnails",
         )
 
-    def store_remote_media_thumbnail(
+    async def store_remote_media_thumbnail(
         self,
         origin,
         media_id,
@@ -299,7 +299,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
         thumbnail_method,
         thumbnail_length,
     ):
-        return self.db_pool.simple_insert(
+        await self.db_pool.simple_insert(
             "remote_media_cache_thumbnails",
             {
                 "media_origin": origin,
diff --git a/synapse/storage/databases/main/monthly_active_users.py b/synapse/storage/databases/main/monthly_active_users.py
index 66953ffc26..e3ac512f11 100644
--- a/synapse/storage/databases/main/monthly_active_users.py
+++ b/synapse/storage/databases/main/monthly_active_users.py
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-from typing import List
+from typing import Dict, List
 
 from synapse.storage._base import SQLBaseStore
 from synapse.storage.database import DatabasePool, make_in_list_sql_clause
@@ -33,11 +33,11 @@ class MonthlyActiveUsersWorkerStore(SQLBaseStore):
         self.hs = hs
 
     @cached(num_args=0)
-    def get_monthly_active_count(self):
+    async def get_monthly_active_count(self) -> int:
         """Generates current count of monthly active users
 
         Returns:
-            Defered[int]: Number of current monthly active users
+            Number of current monthly active users
         """
 
         def _count_users(txn):
@@ -46,10 +46,10 @@ class MonthlyActiveUsersWorkerStore(SQLBaseStore):
             (count,) = txn.fetchone()
             return count
 
-        return self.db_pool.runInteraction("count_users", _count_users)
+        return await self.db_pool.runInteraction("count_users", _count_users)
 
     @cached(num_args=0)
-    def get_monthly_active_count_by_service(self):
+    async def get_monthly_active_count_by_service(self) -> Dict[str, int]:
         """Generates current count of monthly active users broken down by service.
         A service is typically an appservice but also includes native matrix users.
         Since the `monthly_active_users` table is populated from the `user_ips` table
@@ -57,8 +57,7 @@ class MonthlyActiveUsersWorkerStore(SQLBaseStore):
         method to return anything other than native matrix users.
 
         Returns:
-            Deferred[dict]: dict that includes a mapping between app_service_id
-                and the number of occurrences.
+            A mapping between app_service_id and the number of occurrences.
 
         """
 
@@ -74,7 +73,7 @@ class MonthlyActiveUsersWorkerStore(SQLBaseStore):
             result = txn.fetchall()
             return dict(result)
 
-        return self.db_pool.runInteraction(
+        return await self.db_pool.runInteraction(
             "count_users_by_service", _count_users_by_service
         )
 
diff --git a/synapse/storage/databases/main/openid.py b/synapse/storage/databases/main/openid.py
index dcd1ff911a..4db8949da7 100644
--- a/synapse/storage/databases/main/openid.py
+++ b/synapse/storage/databases/main/openid.py
@@ -2,8 +2,10 @@ from synapse.storage._base import SQLBaseStore
 
 
 class OpenIdStore(SQLBaseStore):
-    def insert_open_id_token(self, token, ts_valid_until_ms, user_id):
-        return self.db_pool.simple_insert(
+    async def insert_open_id_token(
+        self, token: str, ts_valid_until_ms: int, user_id: str
+    ) -> None:
+        await self.db_pool.simple_insert(
             table="open_id_tokens",
             values={
                 "token": token,
diff --git a/synapse/storage/databases/main/profile.py b/synapse/storage/databases/main/profile.py
index 858fd92420..301875a672 100644
--- a/synapse/storage/databases/main/profile.py
+++ b/synapse/storage/databases/main/profile.py
@@ -66,8 +66,8 @@ class ProfileWorkerStore(SQLBaseStore):
             desc="get_from_remote_profile_cache",
         )
 
-    def create_profile(self, user_localpart):
-        return self.db_pool.simple_insert(
+    async def create_profile(self, user_localpart: str) -> None:
+        await self.db_pool.simple_insert(
             table="profiles", values={"user_id": user_localpart}, desc="create_profile"
         )
 
@@ -93,13 +93,15 @@ class ProfileWorkerStore(SQLBaseStore):
 
 
 class ProfileStore(ProfileWorkerStore):
-    def add_remote_profile_cache(self, user_id, displayname, avatar_url):
+    async def add_remote_profile_cache(
+        self, user_id: str, displayname: str, avatar_url: str
+    ) -> None:
         """Ensure we are caching the remote user's profiles.
 
         This should only be called when `is_subscribed_remote_profile_for_user`
         would return true for the user.
         """
-        return self.db_pool.simple_upsert(
+        await self.db_pool.simple_upsert(
             table="remote_profile_cache",
             keyvalues={"user_id": user_id},
             values={
diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index 48bda66f3e..12689f4308 100644
--- a/synapse/storage/databases/main/registration.py
+++ b/synapse/storage/databases/main/registration.py
@@ -17,7 +17,7 @@
 
 import logging
 import re
-from typing import Any, Awaitable, Dict, List, Optional
+from typing import Any, Dict, List, Optional
 
 from synapse.api.constants import UserTypes
 from synapse.api.errors import Codes, StoreError, SynapseError, ThreepidValidationError
@@ -529,43 +529,42 @@ class RegistrationWorkerStore(SQLBaseStore):
             "user_get_threepids",
         )
 
-    def user_delete_threepid(self, user_id, medium, address):
-        return self.db_pool.simple_delete(
+    async def user_delete_threepid(self, user_id, medium, address) -> None:
+        await self.db_pool.simple_delete(
             "user_threepids",
             keyvalues={"user_id": user_id, "medium": medium, "address": address},
             desc="user_delete_threepid",
         )
 
-    def user_delete_threepids(self, user_id: str):
+    async def user_delete_threepids(self, user_id: str) -> None:
         """Delete all threepid this user has bound
 
         Args:
              user_id: The user id to delete all threepids of
 
         """
-        return self.db_pool.simple_delete(
+        await self.db_pool.simple_delete(
             "user_threepids",
             keyvalues={"user_id": user_id},
             desc="user_delete_threepids",
         )
 
-    def add_user_bound_threepid(self, user_id, medium, address, id_server):
+    async def add_user_bound_threepid(
+        self, user_id: str, medium: str, address: str, id_server: str
+    ):
         """The server proxied a bind request to the given identity server on
         behalf of the given user. We need to remember this in case the user
         asks us to unbind the threepid.
 
         Args:
-            user_id (str)
-            medium (str)
-            address (str)
-            id_server (str)
-
-        Returns:
-            Awaitable
+            user_id
+            medium
+            address
+            id_server
         """
         # We need to use an upsert, in case they user had already bound the
         # threepid
-        return self.db_pool.simple_upsert(
+        await self.db_pool.simple_upsert(
             table="user_threepid_id_server",
             keyvalues={
                 "user_id": user_id,
@@ -598,21 +597,20 @@ class RegistrationWorkerStore(SQLBaseStore):
             desc="user_get_bound_threepids",
         )
 
-    def remove_user_bound_threepid(self, user_id, medium, address, id_server):
+    async def remove_user_bound_threepid(
+        self, user_id: str, medium: str, address: str, id_server: str
+    ) -> None:
         """The server proxied an unbind request to the given identity server on
         behalf of the given user, so we remove the mapping of threepid to
         identity server.
 
         Args:
-            user_id (str)
-            medium (str)
-            address (str)
-            id_server (str)
-
-        Returns:
-            Deferred
+            user_id
+            medium
+            address
+            id_server
         """
-        return self.db_pool.simple_delete(
+        await self.db_pool.simple_delete(
             table="user_threepid_id_server",
             keyvalues={
                 "user_id": user_id,
@@ -1083,9 +1081,9 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
 
         self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
 
-    def record_user_external_id(
+    async def record_user_external_id(
         self, auth_provider: str, external_id: str, user_id: str
-    ) -> Awaitable:
+    ) -> None:
         """Record a mapping from an external user id to a mxid
 
         Args:
@@ -1093,7 +1091,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
             external_id: id on that system
             user_id: complete mxid that it is mapped to
         """
-        return self.db_pool.simple_insert(
+        await self.db_pool.simple_insert(
             table="user_external_ids",
             values={
                 "auth_provider": auth_provider,
@@ -1237,25 +1235,25 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
 
         return res if res else False
 
-    def add_user_pending_deactivation(self, user_id):
+    async def add_user_pending_deactivation(self, user_id: str) -> None:
         """
         Adds a user to the table of users who need to be parted from all the rooms they're
         in
         """
-        return self.db_pool.simple_insert(
+        await self.db_pool.simple_insert(
             "users_pending_deactivation",
             values={"user_id": user_id},
             desc="add_user_pending_deactivation",
         )
 
-    def del_user_pending_deactivation(self, user_id):
+    async def del_user_pending_deactivation(self, user_id: str) -> None:
         """
         Removes the given user to the table of users who need to be parted from all the
         rooms they're in, effectively marking that user as fully deactivated.
         """
         # XXX: This should be simple_delete_one but we failed to put a unique index on
         # the table, so somehow duplicate entries have ended up in it.
-        return self.db_pool.simple_delete(
+        await self.db_pool.simple_delete(
             "users_pending_deactivation",
             keyvalues={"user_id": user_id},
             desc="del_user_pending_deactivation",
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 66d7135413..a92641c339 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -27,7 +27,7 @@ from synapse.api.room_versions import RoomVersion, RoomVersions
 from synapse.storage._base import SQLBaseStore, db_to_json
 from synapse.storage.database import DatabasePool, LoggingTransaction
 from synapse.storage.databases.main.search import SearchStore
-from synapse.types import ThirdPartyInstanceID
+from synapse.types import JsonDict, ThirdPartyInstanceID
 from synapse.util import json_encoder
 from synapse.util.caches.descriptors import cached
 
@@ -1296,11 +1296,17 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
 
         return self.db_pool.runInteraction("get_rooms", f)
 
-    def add_event_report(
-        self, room_id, event_id, user_id, reason, content, received_ts
-    ):
+    async def add_event_report(
+        self,
+        room_id: str,
+        event_id: str,
+        user_id: str,
+        reason: str,
+        content: JsonDict,
+        received_ts: int,
+    ) -> None:
         next_id = self._event_reports_id_gen.get_next()
-        return self.db_pool.simple_insert(
+        await self.db_pool.simple_insert(
             table="event_reports",
             values={
                 "id": next_id,
diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py
index 458f169617..5c6168e301 100644
--- a/synapse/storage/databases/main/state.py
+++ b/synapse/storage/databases/main/state.py
@@ -27,6 +27,7 @@ from synapse.storage.database import DatabasePool
 from synapse.storage.databases.main.events_worker import EventsWorkerStore
 from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
 from synapse.storage.state import StateFilter
+from synapse.types import StateMap
 from synapse.util.caches import intern_string
 from synapse.util.caches.descriptors import cached, cachedList
 
@@ -163,15 +164,15 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
         return create_event
 
     @cached(max_entries=100000, iterable=True)
-    def get_current_state_ids(self, room_id):
+    async def get_current_state_ids(self, room_id: str) -> StateMap[str]:
         """Get the current state event ids for a room based on the
         current_state_events table.
 
         Args:
-            room_id (str)
+            room_id: The room to get the state IDs of.
 
         Returns:
-            deferred: dict of (type, state_key) -> event_id
+            The current state of the room.
         """
 
         def _get_current_state_ids_txn(txn):
@@ -184,14 +185,14 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
 
             return {(intern_string(r[0]), intern_string(r[1])): r[2] for r in txn}
 
-        return self.db_pool.runInteraction(
+        return await self.db_pool.runInteraction(
             "get_current_state_ids", _get_current_state_ids_txn
         )
 
     # FIXME: how should this be cached?
-    def get_filtered_current_state_ids(
+    async def get_filtered_current_state_ids(
         self, room_id: str, state_filter: StateFilter = StateFilter.all()
-    ):
+    ) -> StateMap[str]:
         """Get the current state event of a given type for a room based on the
         current_state_events table.  This may not be as up-to-date as the result
         of doing a fresh state resolution as per state_handler.get_current_state
@@ -202,14 +203,14 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
                 from the database.
 
         Returns:
-            defer.Deferred[StateMap[str]]: Map from type/state_key to event ID.
+            Map from type/state_key to event ID.
         """
 
         where_clause, where_args = state_filter.make_sql_filter_clause()
 
         if not where_clause:
             # We delegate to the cached version
-            return self.get_current_state_ids(room_id)
+            return await self.get_current_state_ids(room_id)
 
         def _get_filtered_current_state_ids_txn(txn):
             results = {}
@@ -231,7 +232,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
 
             return results
 
-        return self.db_pool.runInteraction(
+        return await self.db_pool.runInteraction(
             "get_filtered_current_state_ids", _get_filtered_current_state_ids_txn
         )
 
diff --git a/synapse/storage/databases/main/state_deltas.py b/synapse/storage/databases/main/state_deltas.py
index 0d963c98ff..356623fc6e 100644
--- a/synapse/storage/databases/main/state_deltas.py
+++ b/synapse/storage/databases/main/state_deltas.py
@@ -14,8 +14,7 @@
 # limitations under the License.
 
 import logging
-
-from twisted.internet import defer
+from typing import Any, Dict, List, Tuple
 
 from synapse.storage._base import SQLBaseStore
 
@@ -23,7 +22,9 @@ logger = logging.getLogger(__name__)
 
 
 class StateDeltasStore(SQLBaseStore):
-    def get_current_state_deltas(self, prev_stream_id: int, max_stream_id: int):
+    async def get_current_state_deltas(
+        self, prev_stream_id: int, max_stream_id: int
+    ) -> Tuple[int, List[Dict[str, Any]]]:
         """Fetch a list of room state changes since the given stream id
 
         Each entry in the result contains the following fields:
@@ -37,12 +38,12 @@ class StateDeltasStore(SQLBaseStore):
                 if it's new state.
 
         Args:
-            prev_stream_id (int): point to get changes since (exclusive)
-            max_stream_id (int): the point that we know has been correctly persisted
+            prev_stream_id: point to get changes since (exclusive)
+            max_stream_id: the point that we know has been correctly persisted
                - ie, an upper limit to return changes from.
 
         Returns:
-            Deferred[tuple[int, list[dict]]: A tuple consisting of:
+            A tuple consisting of:
                - the stream id which these results go up to
                - list of current_state_delta_stream rows. If it is empty, we are
                  up to date.
@@ -58,7 +59,7 @@ class StateDeltasStore(SQLBaseStore):
             # if the CSDs haven't changed between prev_stream_id and now, we
             # know for certain that they haven't changed between prev_stream_id and
             # max_stream_id.
-            return defer.succeed((max_stream_id, []))
+            return (max_stream_id, [])
 
         def get_current_state_deltas_txn(txn):
             # First we calculate the max stream id that will give us less than
@@ -102,7 +103,7 @@ class StateDeltasStore(SQLBaseStore):
             txn.execute(sql, (prev_stream_id, clipped_stream_id))
             return clipped_stream_id, self.db_pool.cursor_to_dict(txn)
 
-        return self.db_pool.runInteraction(
+        return await self.db_pool.runInteraction(
             "get_current_state_deltas", get_current_state_deltas_txn
         )
 
@@ -114,8 +115,8 @@ class StateDeltasStore(SQLBaseStore):
             retcol="COALESCE(MAX(stream_id), -1)",
         )
 
-    def get_max_stream_id_in_current_state_deltas(self):
-        return self.db_pool.runInteraction(
+    async def get_max_stream_id_in_current_state_deltas(self):
+        return await self.db_pool.runInteraction(
             "get_max_stream_id_in_current_state_deltas",
             self._get_max_stream_id_in_current_state_deltas_txn,
         )
diff --git a/synapse/storage/databases/main/stats.py b/synapse/storage/databases/main/stats.py
index 9fe97af56a..9b9bc304a8 100644
--- a/synapse/storage/databases/main/stats.py
+++ b/synapse/storage/databases/main/stats.py
@@ -15,8 +15,9 @@
 # limitations under the License.
 
 import logging
+from collections import Counter
 from itertools import chain
-from typing import Tuple
+from typing import Any, Dict, List, Optional, Tuple
 
 from twisted.internet.defer import DeferredLock
 
@@ -222,11 +223,11 @@ class StatsStore(StateDeltasStore):
             desc="stats_incremental_position",
         )
 
-    def update_room_state(self, room_id, fields):
+    async def update_room_state(self, room_id: str, fields: Dict[str, Any]) -> None:
         """
         Args:
-            room_id (str)
-            fields (dict[str:Any])
+            room_id
+            fields
         """
 
         # For whatever reason some of the fields may contain null bytes, which
@@ -244,28 +245,30 @@ class StatsStore(StateDeltasStore):
             if field and "\0" in field:
                 fields[col] = None
 
-        return self.db_pool.simple_upsert(
+        await self.db_pool.simple_upsert(
             table="room_stats_state",
             keyvalues={"room_id": room_id},
             values=fields,
             desc="update_room_state",
         )
 
-    def get_statistics_for_subject(self, stats_type, stats_id, start, size=100):
+    async def get_statistics_for_subject(
+        self, stats_type: str, stats_id: str, start: str, size: int = 100
+    ) -> List[dict]:
         """
         Get statistics for a given subject.
 
         Args:
-            stats_type (str): The type of subject
-            stats_id (str): The ID of the subject (e.g. room_id or user_id)
-            start (int): Pagination start. Number of entries, not timestamp.
-            size (int): How many entries to return.
+            stats_type: The type of subject
+            stats_id: The ID of the subject (e.g. room_id or user_id)
+            start: Pagination start. Number of entries, not timestamp.
+            size: How many entries to return.
 
         Returns:
-            Deferred[list[dict]], where the dict has the keys of
+            A list of dicts, where the dict has the keys of
             ABSOLUTE_STATS_FIELDS[stats_type],  and "bucket_size" and "end_ts".
         """
-        return self.db_pool.runInteraction(
+        return await self.db_pool.runInteraction(
             "get_statistics_for_subject",
             self._get_statistics_for_subject_txn,
             stats_type,
@@ -319,18 +322,17 @@ class StatsStore(StateDeltasStore):
             allow_none=True,
         )
 
-    def bulk_update_stats_delta(self, ts, updates, stream_id):
+    async def bulk_update_stats_delta(
+        self, ts: int, updates: Dict[str, Dict[str, Dict[str, Counter]]], stream_id: int
+    ) -> None:
         """Bulk update stats tables for a given stream_id and updates the stats
         incremental position.
 
         Args:
-            ts (int): Current timestamp in ms
-            updates(dict[str, dict[str, dict[str, Counter]]]): The updates to
-                commit as a mapping stats_type -> stats_id -> field -> delta.
-            stream_id (int): Current position.
-
-        Returns:
-            Deferred
+            ts: Current timestamp in ms
+            updates: The updates to commit as a mapping of
+                stats_type -> stats_id -> field -> delta.
+            stream_id: Current position.
         """
 
         def _bulk_update_stats_delta_txn(txn):
@@ -355,38 +357,37 @@ class StatsStore(StateDeltasStore):
                 updatevalues={"stream_id": stream_id},
             )
 
-        return self.db_pool.runInteraction(
+        await self.db_pool.runInteraction(
             "bulk_update_stats_delta", _bulk_update_stats_delta_txn
         )
 
-    def update_stats_delta(
+    async def update_stats_delta(
         self,
-        ts,
-        stats_type,
-        stats_id,
-        fields,
-        complete_with_stream_id,
-        absolute_field_overrides=None,
-    ):
+        ts: int,
+        stats_type: str,
+        stats_id: str,
+        fields: Dict[str, int],
+        complete_with_stream_id: Optional[int],
+        absolute_field_overrides: Optional[Dict[str, int]] = None,
+    ) -> None:
         """
         Updates the statistics for a subject, with a delta (difference/relative
         change).
 
         Args:
-            ts (int): timestamp of the change
-            stats_type (str): "room" or "user" – the kind of subject
-            stats_id (str): the subject's ID (room ID or user ID)
-            fields (dict[str, int]): Deltas of stats values.
-            complete_with_stream_id (int, optional):
+            ts: timestamp of the change
+            stats_type: "room" or "user" – the kind of subject
+            stats_id: the subject's ID (room ID or user ID)
+            fields: Deltas of stats values.
+            complete_with_stream_id:
                 If supplied, converts an incomplete row into a complete row,
                 with the supplied stream_id marked as the stream_id where the
                 row was completed.
-            absolute_field_overrides (dict[str, int]): Current stats values
-                (i.e. not deltas) of absolute fields.
-                Does not work with per-slice fields.
+            absolute_field_overrides: Current stats values (i.e. not deltas) of
+                absolute fields. Does not work with per-slice fields.
         """
 
-        return self.db_pool.runInteraction(
+        await self.db_pool.runInteraction(
             "update_stats_delta",
             self._update_stats_delta_txn,
             ts,
@@ -646,19 +647,20 @@ class StatsStore(StateDeltasStore):
                     txn, into_table, all_dest_keyvalues, src_row
                 )
 
-    def get_changes_room_total_events_and_bytes(self, min_pos, max_pos):
+    async def get_changes_room_total_events_and_bytes(
+        self, min_pos: int, max_pos: int
+    ) -> Dict[str, Dict[str, int]]:
         """Fetches the counts of events in the given range of stream IDs.
 
         Args:
-            min_pos (int)
-            max_pos (int)
+            min_pos
+            max_pos
 
         Returns:
-            Deferred[dict[str, dict[str, int]]]: Mapping of room ID to field
-            changes.
+            Mapping of room ID to field changes.
         """
 
-        return self.db_pool.runInteraction(
+        return await self.db_pool.runInteraction(
             "stats_incremental_total_events_and_bytes",
             self.get_changes_room_total_events_and_bytes_txn,
             min_pos,
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 497f607703..24f44a7e36 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -539,7 +539,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         return rows, token
 
-    def get_room_event_before_stream_ordering(self, room_id: str, stream_ordering: int):
+    async def get_room_event_before_stream_ordering(
+        self, room_id: str, stream_ordering: int
+    ) -> Tuple[int, int, str]:
         """Gets details of the first event in a room at or before a stream ordering
 
         Args:
@@ -547,8 +549,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             stream_ordering:
 
         Returns:
-            Deferred[(int, int, str)]:
-                (stream ordering, topological ordering, event_id)
+            A tuple of (stream ordering, topological ordering, event_id)
         """
 
         def _f(txn):
@@ -563,7 +564,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             txn.execute(sql, (room_id, stream_ordering))
             return txn.fetchone()
 
-        return self.db_pool.runInteraction("get_room_event_before_stream_ordering", _f)
+        return await self.db_pool.runInteraction(
+            "get_room_event_before_stream_ordering", _f
+        )
 
     async def get_room_events_max_id(self, room_id: Optional[str] = None) -> str:
         """Returns the current token for rooms stream.
diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index 52668dbdf9..5b31aab700 100644
--- a/synapse/storage/databases/main/transactions.py
+++ b/synapse/storage/databases/main/transactions.py
@@ -15,12 +15,14 @@
 
 import logging
 from collections import namedtuple
+from typing import Optional, Tuple
 
 from canonicaljson import encode_canonical_json
 
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage._base import SQLBaseStore, db_to_json
 from synapse.storage.database import DatabasePool
+from synapse.types import JsonDict
 from synapse.util.caches.expiringcache import ExpiringCache
 
 db_binary_type = memoryview
@@ -55,21 +57,23 @@ class TransactionStore(SQLBaseStore):
             expiry_ms=5 * 60 * 1000,
         )
 
-    def get_received_txn_response(self, transaction_id, origin):
+    async def get_received_txn_response(
+        self, transaction_id: str, origin: str
+    ) -> Optional[Tuple[int, JsonDict]]:
         """For an incoming transaction from a given origin, check if we have
         already responded to it. If so, return the response code and response
         body (as a dict).
 
         Args:
-            transaction_id (str)
-            origin(str)
+            transaction_id
+            origin
 
         Returns:
-            tuple: None if we have not previously responded to
-            this transaction or a 2-tuple of (int, dict)
+            None if we have not previously responded to this transaction or a
+            2-tuple of (int, dict)
         """
 
-        return self.db_pool.runInteraction(
+        return await self.db_pool.runInteraction(
             "get_received_txn_response",
             self._get_received_txn_response,
             transaction_id,
@@ -98,20 +102,21 @@ class TransactionStore(SQLBaseStore):
         else:
             return None
 
-    def set_received_txn_response(self, transaction_id, origin, code, response_dict):
-        """Persist the response we returened for an incoming transaction, and
+    async def set_received_txn_response(
+        self, transaction_id: str, origin: str, code: int, response_dict: JsonDict
+    ) -> None:
+        """Persist the response we returned for an incoming transaction, and
         should return for subsequent transactions with the same transaction_id
         and origin.
 
         Args:
-            txn
-            transaction_id (str)
-            origin (str)
-            code (int)
-            response_json (str)
+            transaction_id: The incoming transaction ID.
+            origin: The origin server.
+            code: The response code.
+            response_dict: The response, to be encoded into JSON.
         """
 
-        return self.db_pool.simple_insert(
+        await self.db_pool.simple_insert(
             table="received_transactions",
             values={
                 "transaction_id": transaction_id,
@@ -164,21 +169,25 @@ class TransactionStore(SQLBaseStore):
         else:
             return None
 
-    def set_destination_retry_timings(
-        self, destination, failure_ts, retry_last_ts, retry_interval
-    ):
+    async def set_destination_retry_timings(
+        self,
+        destination: str,
+        failure_ts: Optional[int],
+        retry_last_ts: int,
+        retry_interval: int,
+    ) -> None:
         """Sets the current retry timings for a given destination.
         Both timings should be zero if retrying is no longer occuring.
 
         Args:
-            destination (str)
-            failure_ts (int|None) - when the server started failing (ms since epoch)
-            retry_last_ts (int) - time of last retry attempt in unix epoch ms
-            retry_interval (int) - how long until next retry in ms
+            destination
+            failure_ts: when the server started failing (ms since epoch)
+            retry_last_ts: time of last retry attempt in unix epoch ms
+            retry_interval: how long until next retry in ms
         """
 
         self._destination_retry_cache.pop(destination, None)
-        return self.db_pool.runInteraction(
+        return await self.db_pool.runInteraction(
             "set_destination_retry_timings",
             self._set_destination_retry_timings,
             destination,
@@ -254,13 +263,13 @@ class TransactionStore(SQLBaseStore):
             "cleanup_transactions", self._cleanup_transactions
         )
 
-    def _cleanup_transactions(self):
+    async def _cleanup_transactions(self) -> None:
         now = self._clock.time_msec()
         month_ago = now - 30 * 24 * 60 * 60 * 1000
 
         def _cleanup_transactions_txn(txn):
             txn.execute("DELETE FROM received_transactions WHERE ts < ?", (month_ago,))
 
-        return self.db_pool.runInteraction(
+        await self.db_pool.runInteraction(
             "_cleanup_transactions", _cleanup_transactions_txn
         )
diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py
index 7f104ad936..e924f1ca3b 100644
--- a/synapse/storage/databases/state/store.py
+++ b/synapse/storage/databases/state/store.py
@@ -17,8 +17,6 @@ import logging
 from collections import namedtuple
 from typing import Dict, Iterable, List, Set, Tuple
 
-from twisted.internet import defer
-
 from synapse.api.constants import EventTypes
 from synapse.storage._base import SQLBaseStore
 from synapse.storage.database import DatabasePool
@@ -103,7 +101,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
         )
 
     @cached(max_entries=10000, iterable=True)
-    def get_state_group_delta(self, state_group):
+    async def get_state_group_delta(self, state_group):
         """Given a state group try to return a previous group and a delta between
         the old and the new.
 
@@ -135,7 +133,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
                 {(row["type"], row["state_key"]): row["event_id"] for row in delta_ids},
             )
 
-        return self.db_pool.runInteraction(
+        return await self.db_pool.runInteraction(
             "get_state_group_delta", _get_state_group_delta_txn
         )
 
@@ -367,9 +365,9 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
                 fetched_keys=non_member_types,
             )
 
-    def store_state_group(
+    async def store_state_group(
         self, event_id, room_id, prev_group, delta_ids, current_state_ids
-    ):
+    ) -> int:
         """Store a new set of state, returning a newly assigned state group.
 
         Args:
@@ -383,7 +381,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
                 to event_id.
 
         Returns:
-            Deferred[int]: The state group ID
+            The state group ID
         """
 
         def _store_state_group_txn(txn):
@@ -484,11 +482,13 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
 
             return state_group
 
-        return self.db_pool.runInteraction("store_state_group", _store_state_group_txn)
+        return await self.db_pool.runInteraction(
+            "store_state_group", _store_state_group_txn
+        )
 
-    def purge_unreferenced_state_groups(
+    async def purge_unreferenced_state_groups(
         self, room_id: str, state_groups_to_delete
-    ) -> defer.Deferred:
+    ) -> None:
         """Deletes no longer referenced state groups and de-deltas any state
         groups that reference them.
 
@@ -499,7 +499,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
                 to delete.
         """
 
-        return self.db_pool.runInteraction(
+        await self.db_pool.runInteraction(
             "purge_unreferenced_state_groups",
             self._purge_unreferenced_state_groups,
             room_id,
@@ -594,7 +594,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
 
         return {row["state_group"]: row["prev_state_group"] for row in rows}
 
-    def purge_room_state(self, room_id, state_groups_to_delete):
+    async def purge_room_state(self, room_id, state_groups_to_delete):
         """Deletes all record of a room from state tables
 
         Args:
@@ -602,7 +602,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
             state_groups_to_delete (list[int]): State groups to delete
         """
 
-        return self.db_pool.runInteraction(
+        await self.db_pool.runInteraction(
             "purge_room_state",
             self._purge_room_state_txn,
             room_id,
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 534883361f..96a1b59d64 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -333,7 +333,7 @@ class StateGroupStorage(object):
     def __init__(self, hs, stores):
         self.stores = stores
 
-    def get_state_group_delta(self, state_group: int):
+    async def get_state_group_delta(self, state_group: int):
         """Given a state group try to return a previous group and a delta between
         the old and the new.
 
@@ -341,11 +341,11 @@ class StateGroupStorage(object):
             state_group: The state group used to retrieve state deltas.
 
         Returns:
-            Deferred[Tuple[Optional[int], Optional[StateMap[str]]]]:
+            Tuple[Optional[int], Optional[StateMap[str]]]:
                 (prev_group, delta_ids)
         """
 
-        return self.stores.state.get_state_group_delta(state_group)
+        return await self.stores.state.get_state_group_delta(state_group)
 
     async def get_state_groups_ids(
         self, _room_id: str, event_ids: Iterable[str]
@@ -525,7 +525,7 @@ class StateGroupStorage(object):
             state_filter: The state filter used to fetch state from the database.
 
         Returns:
-            A deferred dict from (type, state_key) -> state_event
+            A dict from (type, state_key) -> state_event
         """
         state_map = await self.get_state_ids_for_events([event_id], state_filter)
         return state_map[event_id]
@@ -546,14 +546,14 @@ class StateGroupStorage(object):
         """
         return self.stores.state._get_state_for_groups(groups, state_filter)
 
-    def store_state_group(
+    async def store_state_group(
         self,
         event_id: str,
         room_id: str,
         prev_group: Optional[int],
         delta_ids: Optional[dict],
         current_state_ids: dict,
-    ):
+    ) -> int:
         """Store a new set of state, returning a newly assigned state group.
 
         Args:
@@ -567,8 +567,8 @@ class StateGroupStorage(object):
                 to event_id.
 
         Returns:
-            Deferred[int]: The state group ID
+            The state group ID
         """
-        return self.stores.state.store_state_group(
+        return await self.stores.state.store_state_group(
             event_id, room_id, prev_group, delta_ids, current_state_ids
         )