summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/api/auth.py2
-rw-r--r--synapse/events/builder.py19
-rw-r--r--synapse/events/snapshot.py46
-rw-r--r--synapse/events/third_party_rules.py55
-rw-r--r--synapse/events/utils.py15
-rw-r--r--synapse/groups/attestations.py25
-rw-r--r--synapse/handlers/federation.py2
-rw-r--r--synapse/push/action_generator.py7
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py62
-rw-r--r--synapse/push/httppusher.py58
-rw-r--r--synapse/push/presentable_names.py15
-rw-r--r--synapse/push/push_tools.py22
-rw-r--r--synapse/push/pusherpool.py70
-rw-r--r--synapse/replication/http/federation.py4
-rw-r--r--synapse/replication/http/send_event.py2
-rw-r--r--synapse/rest/media/v1/_base.py8
-rw-r--r--synapse/rest/media/v1/media_repository.py105
-rw-r--r--synapse/rest/media/v1/media_storage.py52
-rw-r--r--synapse/rest/media/v1/preview_url_resource.py10
-rw-r--r--synapse/rest/media/v1/storage_provider.py62
-rw-r--r--synapse/storage/data_stores/main/event_push_actions.py4
-rw-r--r--synapse/visibility.py30
22 files changed, 324 insertions, 351 deletions
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index b53e8451e5..2178e623da 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -82,7 +82,7 @@ class Auth(object):
 
     @defer.inlineCallbacks
     def check_from_context(self, room_version: str, event, context, do_sig_check=True):
-        prev_state_ids = yield context.get_prev_state_ids()
+        prev_state_ids = yield defer.ensureDeferred(context.get_prev_state_ids())
         auth_events_ids = yield self.compute_auth_events(
             event, prev_state_ids, for_verification=True
         )
diff --git a/synapse/events/builder.py b/synapse/events/builder.py
index 0bb216419a..69b53ca2bc 100644
--- a/synapse/events/builder.py
+++ b/synapse/events/builder.py
@@ -17,8 +17,6 @@ from typing import Optional
 import attr
 from nacl.signing import SigningKey
 
-from twisted.internet import defer
-
 from synapse.api.constants import MAX_DEPTH
 from synapse.api.errors import UnsupportedRoomVersionError
 from synapse.api.room_versions import (
@@ -95,31 +93,30 @@ class EventBuilder(object):
     def is_state(self):
         return self._state_key is not None
 
-    @defer.inlineCallbacks
-    def build(self, prev_event_ids):
+    async def build(self, prev_event_ids):
         """Transform into a fully signed and hashed event
 
         Args:
             prev_event_ids (list[str]): The event IDs to use as the prev events
 
         Returns:
-            Deferred[FrozenEvent]
+            FrozenEvent
         """
 
-        state_ids = yield defer.ensureDeferred(
-            self._state.get_current_state_ids(self.room_id, prev_event_ids)
+        state_ids = await self._state.get_current_state_ids(
+            self.room_id, prev_event_ids
         )
-        auth_ids = yield self._auth.compute_auth_events(self, state_ids)
+        auth_ids = await self._auth.compute_auth_events(self, state_ids)
 
         format_version = self.room_version.event_format
         if format_version == EventFormatVersions.V1:
-            auth_events = yield self._store.add_event_hashes(auth_ids)
-            prev_events = yield self._store.add_event_hashes(prev_event_ids)
+            auth_events = await self._store.add_event_hashes(auth_ids)
+            prev_events = await self._store.add_event_hashes(prev_event_ids)
         else:
             auth_events = auth_ids
             prev_events = prev_event_ids
 
-        old_depth = yield self._store.get_max_depth_of(prev_event_ids)
+        old_depth = await self._store.get_max_depth_of(prev_event_ids)
         depth = old_depth + 1
 
         # we cap depth of generated events, to ensure that they are not
diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py
index f94cdcbaba..cca93e3a46 100644
--- a/synapse/events/snapshot.py
+++ b/synapse/events/snapshot.py
@@ -12,17 +12,19 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from typing import Optional, Union
+from typing import TYPE_CHECKING, Optional, Union
 
 import attr
 from frozendict import frozendict
 
-from twisted.internet import defer
-
 from synapse.appservice import ApplicationService
+from synapse.events import EventBase
 from synapse.logging.context import make_deferred_yieldable, run_in_background
 from synapse.types import StateMap
 
+if TYPE_CHECKING:
+    from synapse.storage.data_stores.main import DataStore
+
 
 @attr.s(slots=True)
 class EventContext:
@@ -129,8 +131,7 @@ class EventContext:
             delta_ids=delta_ids,
         )
 
-    @defer.inlineCallbacks
-    def serialize(self, event, store):
+    async def serialize(self, event: EventBase, store: "DataStore") -> dict:
         """Converts self to a type that can be serialized as JSON, and then
         deserialized by `deserialize`
 
@@ -146,7 +147,7 @@ class EventContext:
         # the prev_state_ids, so if we're a state event we include the event
         # id that we replaced in the state.
         if event.is_state():
-            prev_state_ids = yield self.get_prev_state_ids()
+            prev_state_ids = await self.get_prev_state_ids()
             prev_state_id = prev_state_ids.get((event.type, event.state_key))
         else:
             prev_state_id = None
@@ -214,8 +215,7 @@ class EventContext:
 
         return self._state_group
 
-    @defer.inlineCallbacks
-    def get_current_state_ids(self):
+    async def get_current_state_ids(self) -> Optional[StateMap[str]]:
         """
         Gets the room state map, including this event - ie, the state in ``state_group``
 
@@ -224,32 +224,31 @@ class EventContext:
         ``rejected`` is set.
 
         Returns:
-            Deferred[dict[(str, str), str]|None]: Returns None if state_group
-                is None, which happens when the associated event is an outlier.
+            Returns None if state_group is None, which happens when the associated
+            event is an outlier.
 
-                Maps a (type, state_key) to the event ID of the state event matching
-                this tuple.
+            Maps a (type, state_key) to the event ID of the state event matching
+            this tuple.
         """
         if self.rejected:
             raise RuntimeError("Attempt to access state_ids of rejected event")
 
-        yield self._ensure_fetched()
+        await self._ensure_fetched()
         return self._current_state_ids
 
-    @defer.inlineCallbacks
-    def get_prev_state_ids(self):
+    async def get_prev_state_ids(self):
         """
         Gets the room state map, excluding this event.
 
         For a non-state event, this will be the same as get_current_state_ids().
 
         Returns:
-            Deferred[dict[(str, str), str]|None]: Returns None if state_group
+            dict[(str, str), str]|None: Returns None if state_group
                 is None, which happens when the associated event is an outlier.
                 Maps a (type, state_key) to the event ID of the state event matching
                 this tuple.
         """
-        yield self._ensure_fetched()
+        await self._ensure_fetched()
         return self._prev_state_ids
 
     def get_cached_current_state_ids(self):
@@ -269,8 +268,8 @@ class EventContext:
 
         return self._current_state_ids
 
-    def _ensure_fetched(self):
-        return defer.succeed(None)
+    async def _ensure_fetched(self):
+        return None
 
 
 @attr.s(slots=True)
@@ -303,21 +302,20 @@ class _AsyncEventContextImpl(EventContext):
     _event_state_key = attr.ib(default=None)
     _fetching_state_deferred = attr.ib(default=None)
 
-    def _ensure_fetched(self):
+    async def _ensure_fetched(self):
         if not self._fetching_state_deferred:
             self._fetching_state_deferred = run_in_background(self._fill_out_state)
 
-        return make_deferred_yieldable(self._fetching_state_deferred)
+        return await make_deferred_yieldable(self._fetching_state_deferred)
 
-    @defer.inlineCallbacks
-    def _fill_out_state(self):
+    async def _fill_out_state(self):
         """Called to populate the _current_state_ids and _prev_state_ids
         attributes by loading from the database.
         """
         if self.state_group is None:
             return
 
-        self._current_state_ids = yield self._storage.state.get_state_ids_for_group(
+        self._current_state_ids = await self._storage.state.get_state_ids_for_group(
             self.state_group
         )
         if self._event_state_key is not None:
diff --git a/synapse/events/third_party_rules.py b/synapse/events/third_party_rules.py
index 459132d388..2956a64234 100644
--- a/synapse/events/third_party_rules.py
+++ b/synapse/events/third_party_rules.py
@@ -13,7 +13,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.internet import defer
+from synapse.events import EventBase
+from synapse.events.snapshot import EventContext
+from synapse.types import Requester
 
 
 class ThirdPartyEventRules(object):
@@ -39,76 +41,79 @@ class ThirdPartyEventRules(object):
                 config=config, http_client=hs.get_simple_http_client()
             )
 
-    @defer.inlineCallbacks
-    def check_event_allowed(self, event, context):
+    async def check_event_allowed(
+        self, event: EventBase, context: EventContext
+    ) -> bool:
         """Check if a provided event should be allowed in the given context.
 
         Args:
-            event (synapse.events.EventBase): The event to be checked.
-            context (synapse.events.snapshot.EventContext): The context of the event.
+            event: The event to be checked.
+            context: The context of the event.
 
         Returns:
-            defer.Deferred[bool]: True if the event should be allowed, False if not.
+            True if the event should be allowed, False if not.
         """
         if self.third_party_rules is None:
             return True
 
-        prev_state_ids = yield context.get_prev_state_ids()
+        prev_state_ids = await context.get_prev_state_ids()
 
         # Retrieve the state events from the database.
         state_events = {}
         for key, event_id in prev_state_ids.items():
-            state_events[key] = yield self.store.get_event(event_id, allow_none=True)
+            state_events[key] = await self.store.get_event(event_id, allow_none=True)
 
-        ret = yield self.third_party_rules.check_event_allowed(event, state_events)
+        ret = await self.third_party_rules.check_event_allowed(event, state_events)
         return ret
 
-    @defer.inlineCallbacks
-    def on_create_room(self, requester, config, is_requester_admin):
+    async def on_create_room(
+        self, requester: Requester, config: dict, is_requester_admin: bool
+    ) -> bool:
         """Intercept requests to create room to allow, deny or update the
         request config.
 
         Args:
-            requester (Requester)
-            config (dict): The creation config from the client.
-            is_requester_admin (bool): If the requester is an admin
+            requester
+            config: The creation config from the client.
+            is_requester_admin: If the requester is an admin
 
         Returns:
-            defer.Deferred[bool]: Whether room creation is allowed or denied.
+            Whether room creation is allowed or denied.
         """
 
         if self.third_party_rules is None:
             return True
 
-        ret = yield self.third_party_rules.on_create_room(
+        ret = await self.third_party_rules.on_create_room(
             requester, config, is_requester_admin
         )
         return ret
 
-    @defer.inlineCallbacks
-    def check_threepid_can_be_invited(self, medium, address, room_id):
+    async def check_threepid_can_be_invited(
+        self, medium: str, address: str, room_id: str
+    ) -> bool:
         """Check if a provided 3PID can be invited in the given room.
 
         Args:
-            medium (str): The 3PID's medium.
-            address (str): The 3PID's address.
-            room_id (str): The room we want to invite the threepid to.
+            medium: The 3PID's medium.
+            address: The 3PID's address.
+            room_id: The room we want to invite the threepid to.
 
         Returns:
-            defer.Deferred[bool], True if the 3PID can be invited, False if not.
+            True if the 3PID can be invited, False if not.
         """
 
         if self.third_party_rules is None:
             return True
 
-        state_ids = yield self.store.get_filtered_current_state_ids(room_id)
-        room_state_events = yield self.store.get_events(state_ids.values())
+        state_ids = await self.store.get_filtered_current_state_ids(room_id)
+        room_state_events = await self.store.get_events(state_ids.values())
 
         state_events = {}
         for key, event_id in state_ids.items():
             state_events[key] = room_state_events[event_id]
 
-        ret = yield self.third_party_rules.check_threepid_can_be_invited(
+        ret = await self.third_party_rules.check_threepid_can_be_invited(
             medium, address, state_events
         )
         return ret
diff --git a/synapse/events/utils.py b/synapse/events/utils.py
index 11f0d34ec8..2d42e268c6 100644
--- a/synapse/events/utils.py
+++ b/synapse/events/utils.py
@@ -18,8 +18,6 @@ from typing import Any, Mapping, Union
 
 from frozendict import frozendict
 
-from twisted.internet import defer
-
 from synapse.api.constants import EventTypes, RelationTypes
 from synapse.api.errors import Codes, SynapseError
 from synapse.api.room_versions import RoomVersion
@@ -337,8 +335,9 @@ class EventClientSerializer(object):
             hs.config.experimental_msc1849_support_enabled
         )
 
-    @defer.inlineCallbacks
-    def serialize_event(self, event, time_now, bundle_aggregations=True, **kwargs):
+    async def serialize_event(
+        self, event, time_now, bundle_aggregations=True, **kwargs
+    ):
         """Serializes a single event.
 
         Args:
@@ -348,7 +347,7 @@ class EventClientSerializer(object):
             **kwargs: Arguments to pass to `serialize_event`
 
         Returns:
-            Deferred[dict]: The serialized event
+            dict: The serialized event
         """
         # To handle the case of presence events and the like
         if not isinstance(event, EventBase):
@@ -363,8 +362,8 @@ class EventClientSerializer(object):
         if not event.internal_metadata.is_redacted() and (
             self.experimental_msc1849_support_enabled and bundle_aggregations
         ):
-            annotations = yield self.store.get_aggregation_groups_for_event(event_id)
-            references = yield self.store.get_relations_for_event(
+            annotations = await self.store.get_aggregation_groups_for_event(event_id)
+            references = await self.store.get_relations_for_event(
                 event_id, RelationTypes.REFERENCE, direction="f"
             )
 
@@ -378,7 +377,7 @@ class EventClientSerializer(object):
 
             edit = None
             if event.type == EventTypes.Message:
-                edit = yield self.store.get_applicable_edit(event_id)
+                edit = await self.store.get_applicable_edit(event_id)
 
             if edit:
                 # If there is an edit replace the content, preserving existing
diff --git a/synapse/groups/attestations.py b/synapse/groups/attestations.py
index dab13c243f..e674bf44a2 100644
--- a/synapse/groups/attestations.py
+++ b/synapse/groups/attestations.py
@@ -41,8 +41,6 @@ from typing import Tuple
 
 from signedjson.sign import sign_json
 
-from twisted.internet import defer
-
 from synapse.api.errors import HttpResponseException, RequestSendFailed, SynapseError
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.types import get_domain_from_id
@@ -72,8 +70,9 @@ class GroupAttestationSigning(object):
         self.server_name = hs.hostname
         self.signing_key = hs.signing_key
 
-    @defer.inlineCallbacks
-    def verify_attestation(self, attestation, group_id, user_id, server_name=None):
+    async def verify_attestation(
+        self, attestation, group_id, user_id, server_name=None
+    ):
         """Verifies that the given attestation matches the given parameters.
 
         An optional server_name can be supplied to explicitly set which server's
@@ -102,7 +101,7 @@ class GroupAttestationSigning(object):
         if valid_until_ms < now:
             raise SynapseError(400, "Attestation expired")
 
-        yield self.keyring.verify_json_for_server(
+        await self.keyring.verify_json_for_server(
             server_name, attestation, now, "Group attestation"
         )
 
@@ -142,8 +141,7 @@ class GroupAttestionRenewer(object):
                 self._start_renew_attestations, 30 * 60 * 1000
             )
 
-    @defer.inlineCallbacks
-    def on_renew_attestation(self, group_id, user_id, content):
+    async def on_renew_attestation(self, group_id, user_id, content):
         """When a remote updates an attestation
         """
         attestation = content["attestation"]
@@ -151,11 +149,11 @@ class GroupAttestionRenewer(object):
         if not self.is_mine_id(group_id) and not self.is_mine_id(user_id):
             raise SynapseError(400, "Neither user not group are on this server")
 
-        yield self.attestations.verify_attestation(
+        await self.attestations.verify_attestation(
             attestation, user_id=user_id, group_id=group_id
         )
 
-        yield self.store.update_remote_attestion(group_id, user_id, attestation)
+        await self.store.update_remote_attestion(group_id, user_id, attestation)
 
         return {}
 
@@ -172,8 +170,7 @@ class GroupAttestionRenewer(object):
             now + UPDATE_ATTESTATION_TIME_MS
         )
 
-        @defer.inlineCallbacks
-        def _renew_attestation(group_user: Tuple[str, str]):
+        async def _renew_attestation(group_user: Tuple[str, str]):
             group_id, user_id = group_user
             try:
                 if not self.is_mine_id(group_id):
@@ -186,16 +183,16 @@ class GroupAttestionRenewer(object):
                         user_id,
                         group_id,
                     )
-                    yield self.store.remove_attestation_renewal(group_id, user_id)
+                    await self.store.remove_attestation_renewal(group_id, user_id)
                     return
 
                 attestation = self.attestations.create_attestation(group_id, user_id)
 
-                yield self.transport_client.renew_group_attestation(
+                await self.transport_client.renew_group_attestation(
                     destination, group_id, user_id, content={"attestation": attestation}
                 )
 
-                yield self.store.update_attestation_renewal(
+                await self.store.update_attestation_renewal(
                     group_id, user_id, attestation
                 )
             except (RequestSendFailed, HttpResponseException) as e:
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index f5f683bfd4..0d7d1adcea 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -2470,7 +2470,7 @@ class FederationHandler(BaseHandler):
         }
 
         current_state_ids = await context.get_current_state_ids()
-        current_state_ids = dict(current_state_ids)
+        current_state_ids = dict(current_state_ids)  # type: ignore
 
         current_state_ids.update(state_updates)
 
diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py
index 1ffd5e2df3..0d23142653 100644
--- a/synapse/push/action_generator.py
+++ b/synapse/push/action_generator.py
@@ -15,8 +15,6 @@
 
 import logging
 
-from twisted.internet import defer
-
 from synapse.util.metrics import Measure
 
 from .bulk_push_rule_evaluator import BulkPushRuleEvaluator
@@ -37,7 +35,6 @@ class ActionGenerator(object):
         # event stream, so we just run the rules for a client with no profile
         # tag (ie. we just need all the users).
 
-    @defer.inlineCallbacks
-    def handle_push_actions_for_event(self, event, context):
+    async def handle_push_actions_for_event(self, event, context):
         with Measure(self.clock, "action_for_event_by_user"):
-            yield self.bulk_evaluator.action_for_event_by_user(event, context)
+            await self.bulk_evaluator.action_for_event_by_user(event, context)
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 472ddf9f7d..04b9d8ac82 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -19,8 +19,6 @@ from collections import namedtuple
 
 from prometheus_client import Counter
 
-from twisted.internet import defer
-
 from synapse.api.constants import EventTypes, Membership
 from synapse.event_auth import get_user_power_level
 from synapse.state import POWER_KEY
@@ -70,8 +68,7 @@ class BulkPushRuleEvaluator(object):
             resizable=False,
         )
 
-    @defer.inlineCallbacks
-    def _get_rules_for_event(self, event, context):
+    async def _get_rules_for_event(self, event, context):
         """This gets the rules for all users in the room at the time of the event,
         as well as the push rules for the invitee if the event is an invite.
 
@@ -79,19 +76,19 @@ class BulkPushRuleEvaluator(object):
             dict of user_id -> push_rules
         """
         room_id = event.room_id
-        rules_for_room = yield self._get_rules_for_room(room_id)
+        rules_for_room = await self._get_rules_for_room(room_id)
 
-        rules_by_user = yield rules_for_room.get_rules(event, context)
+        rules_by_user = await rules_for_room.get_rules(event, context)
 
         # if this event is an invite event, we may need to run rules for the user
         # who's been invited, otherwise they won't get told they've been invited
         if event.type == "m.room.member" and event.content["membership"] == "invite":
             invited = event.state_key
             if invited and self.hs.is_mine_id(invited):
-                has_pusher = yield self.store.user_has_pusher(invited)
+                has_pusher = await self.store.user_has_pusher(invited)
                 if has_pusher:
                     rules_by_user = dict(rules_by_user)
-                    rules_by_user[invited] = yield self.store.get_push_rules_for_user(
+                    rules_by_user[invited] = await self.store.get_push_rules_for_user(
                         invited
                     )
 
@@ -114,20 +111,19 @@ class BulkPushRuleEvaluator(object):
             self.room_push_rule_cache_metrics,
         )
 
-    @defer.inlineCallbacks
-    def _get_power_levels_and_sender_level(self, event, context):
-        prev_state_ids = yield context.get_prev_state_ids()
+    async def _get_power_levels_and_sender_level(self, event, context):
+        prev_state_ids = await context.get_prev_state_ids()
         pl_event_id = prev_state_ids.get(POWER_KEY)
         if pl_event_id:
             # fastpath: if there's a power level event, that's all we need, and
             # not having a power level event is an extreme edge case
-            pl_event = yield self.store.get_event(pl_event_id)
+            pl_event = await self.store.get_event(pl_event_id)
             auth_events = {POWER_KEY: pl_event}
         else:
-            auth_events_ids = yield self.auth.compute_auth_events(
+            auth_events_ids = await self.auth.compute_auth_events(
                 event, prev_state_ids, for_verification=False
             )
-            auth_events = yield self.store.get_events(auth_events_ids)
+            auth_events = await self.store.get_events(auth_events_ids)
             auth_events = {(e.type, e.state_key): e for e in auth_events.values()}
 
         sender_level = get_user_power_level(event.sender, auth_events)
@@ -136,23 +132,19 @@ class BulkPushRuleEvaluator(object):
 
         return pl_event.content if pl_event else {}, sender_level
 
-    @defer.inlineCallbacks
-    def action_for_event_by_user(self, event, context):
+    async def action_for_event_by_user(self, event, context) -> None:
         """Given an event and context, evaluate the push rules and insert the
         results into the event_push_actions_staging table.
-
-        Returns:
-            Deferred
         """
-        rules_by_user = yield self._get_rules_for_event(event, context)
+        rules_by_user = await self._get_rules_for_event(event, context)
         actions_by_user = {}
 
-        room_members = yield self.store.get_joined_users_from_context(event, context)
+        room_members = await self.store.get_joined_users_from_context(event, context)
 
         (
             power_levels,
             sender_power_level,
-        ) = yield self._get_power_levels_and_sender_level(event, context)
+        ) = await self._get_power_levels_and_sender_level(event, context)
 
         evaluator = PushRuleEvaluatorForEvent(
             event, len(room_members), sender_power_level, power_levels
@@ -165,7 +157,7 @@ class BulkPushRuleEvaluator(object):
                 continue
 
             if not event.is_state():
-                is_ignored = yield self.store.is_ignored_by(event.sender, uid)
+                is_ignored = await self.store.is_ignored_by(event.sender, uid)
                 if is_ignored:
                     continue
 
@@ -197,7 +189,7 @@ class BulkPushRuleEvaluator(object):
         # Mark in the DB staging area the push actions for users who should be
         # notified for this event. (This will then get handled when we persist
         # the event)
-        yield self.store.add_push_actions_to_staging(event.event_id, actions_by_user)
+        await self.store.add_push_actions_to_staging(event.event_id, actions_by_user)
 
 
 def _condition_checker(evaluator, conditions, uid, display_name, cache):
@@ -274,8 +266,7 @@ class RulesForRoom(object):
         # to self around in the callback.
         self.invalidate_all_cb = _Invalidation(rules_for_room_cache, room_id)
 
-    @defer.inlineCallbacks
-    def get_rules(self, event, context):
+    async def get_rules(self, event, context):
         """Given an event context return the rules for all users who are
         currently in the room.
         """
@@ -286,7 +277,7 @@ class RulesForRoom(object):
             self.room_push_rule_cache_metrics.inc_hits()
             return self.rules_by_user
 
-        with (yield self.linearizer.queue(())):
+        with (await self.linearizer.queue(())):
             if state_group and self.state_group == state_group:
                 logger.debug("Using cached rules for %r", self.room_id)
                 self.room_push_rule_cache_metrics.inc_hits()
@@ -304,9 +295,7 @@ class RulesForRoom(object):
 
                 push_rules_delta_state_cache_metric.inc_hits()
             else:
-                current_state_ids = yield defer.ensureDeferred(
-                    context.get_current_state_ids()
-                )
+                current_state_ids = await context.get_current_state_ids()
                 push_rules_delta_state_cache_metric.inc_misses()
 
             push_rules_state_size_counter.inc(len(current_state_ids))
@@ -353,7 +342,7 @@ class RulesForRoom(object):
                 # If we have some memebr events we haven't seen, look them up
                 # and fetch push rules for them if appropriate.
                 logger.debug("Found new member events %r", missing_member_event_ids)
-                yield self._update_rules_with_member_event_ids(
+                await self._update_rules_with_member_event_ids(
                     ret_rules_by_user, missing_member_event_ids, state_group, event
                 )
             else:
@@ -371,8 +360,7 @@ class RulesForRoom(object):
             )
         return ret_rules_by_user
 
-    @defer.inlineCallbacks
-    def _update_rules_with_member_event_ids(
+    async def _update_rules_with_member_event_ids(
         self, ret_rules_by_user, member_event_ids, state_group, event
     ):
         """Update the partially filled rules_by_user dict by fetching rules for
@@ -388,7 +376,7 @@ class RulesForRoom(object):
         """
         sequence = self.sequence
 
-        rows = yield self.store.get_membership_from_event_ids(member_event_ids.values())
+        rows = await self.store.get_membership_from_event_ids(member_event_ids.values())
 
         members = {row["event_id"]: (row["user_id"], row["membership"]) for row in rows}
 
@@ -410,7 +398,7 @@ class RulesForRoom(object):
 
         logger.debug("Joined: %r", interested_in_user_ids)
 
-        if_users_with_pushers = yield self.store.get_if_users_have_pushers(
+        if_users_with_pushers = await self.store.get_if_users_have_pushers(
             interested_in_user_ids, on_invalidate=self.invalidate_all_cb
         )
 
@@ -420,7 +408,7 @@ class RulesForRoom(object):
 
         logger.debug("With pushers: %r", user_ids)
 
-        users_with_receipts = yield self.store.get_users_with_read_receipts_in_room(
+        users_with_receipts = await self.store.get_users_with_read_receipts_in_room(
             self.room_id, on_invalidate=self.invalidate_all_cb
         )
 
@@ -431,7 +419,7 @@ class RulesForRoom(object):
             if uid in interested_in_user_ids:
                 user_ids.add(uid)
 
-        rules_by_user = yield self.store.bulk_get_push_rules(
+        rules_by_user = await self.store.bulk_get_push_rules(
             user_ids, on_invalidate=self.invalidate_all_cb
         )
 
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 2fac07593b..4c469efb20 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -17,7 +17,6 @@ import logging
 
 from prometheus_client import Counter
 
-from twisted.internet import defer
 from twisted.internet.error import AlreadyCalled, AlreadyCancelled
 
 from synapse.api.constants import EventTypes
@@ -128,12 +127,11 @@ class HttpPusher(object):
         # but currently that's the only type of receipt anyway...
         run_as_background_process("http_pusher.on_new_receipts", self._update_badge)
 
-    @defer.inlineCallbacks
-    def _update_badge(self):
+    async def _update_badge(self):
         # XXX as per https://github.com/matrix-org/matrix-doc/issues/2627, this seems
         # to be largely redundant. perhaps we can remove it.
-        badge = yield push_tools.get_badge_count(self.hs.get_datastore(), self.user_id)
-        yield self._send_badge(badge)
+        badge = await push_tools.get_badge_count(self.hs.get_datastore(), self.user_id)
+        await self._send_badge(badge)
 
     def on_timer(self):
         self._start_processing()
@@ -152,8 +150,7 @@ class HttpPusher(object):
 
         run_as_background_process("httppush.process", self._process)
 
-    @defer.inlineCallbacks
-    def _process(self):
+    async def _process(self):
         # we should never get here if we are already processing
         assert not self._is_processing
 
@@ -164,7 +161,7 @@ class HttpPusher(object):
             while True:
                 starting_max_ordering = self.max_stream_ordering
                 try:
-                    yield self._unsafe_process()
+                    await self._unsafe_process()
                 except Exception:
                     logger.exception("Exception processing notifs")
                 if self.max_stream_ordering == starting_max_ordering:
@@ -172,8 +169,7 @@ class HttpPusher(object):
         finally:
             self._is_processing = False
 
-    @defer.inlineCallbacks
-    def _unsafe_process(self):
+    async def _unsafe_process(self):
         """
         Looks for unset notifications and dispatch them, in order
         Never call this directly: use _process which will only allow this to
@@ -181,7 +177,7 @@ class HttpPusher(object):
         """
 
         fn = self.store.get_unread_push_actions_for_user_in_range_for_http
-        unprocessed = yield fn(
+        unprocessed = await fn(
             self.user_id, self.last_stream_ordering, self.max_stream_ordering
         )
 
@@ -203,13 +199,13 @@ class HttpPusher(object):
                     "app_display_name": self.app_display_name,
                 },
             ):
-                processed = yield self._process_one(push_action)
+                processed = await self._process_one(push_action)
 
             if processed:
                 http_push_processed_counter.inc()
                 self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
                 self.last_stream_ordering = push_action["stream_ordering"]
-                pusher_still_exists = yield self.store.update_pusher_last_stream_ordering_and_success(
+                pusher_still_exists = await self.store.update_pusher_last_stream_ordering_and_success(
                     self.app_id,
                     self.pushkey,
                     self.user_id,
@@ -224,14 +220,14 @@ class HttpPusher(object):
 
                 if self.failing_since:
                     self.failing_since = None
-                    yield self.store.update_pusher_failing_since(
+                    await self.store.update_pusher_failing_since(
                         self.app_id, self.pushkey, self.user_id, self.failing_since
                     )
             else:
                 http_push_failed_counter.inc()
                 if not self.failing_since:
                     self.failing_since = self.clock.time_msec()
-                    yield self.store.update_pusher_failing_since(
+                    await self.store.update_pusher_failing_since(
                         self.app_id, self.pushkey, self.user_id, self.failing_since
                     )
 
@@ -250,7 +246,7 @@ class HttpPusher(object):
                     )
                     self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
                     self.last_stream_ordering = push_action["stream_ordering"]
-                    pusher_still_exists = yield self.store.update_pusher_last_stream_ordering(
+                    pusher_still_exists = await self.store.update_pusher_last_stream_ordering(
                         self.app_id,
                         self.pushkey,
                         self.user_id,
@@ -263,7 +259,7 @@ class HttpPusher(object):
                         return
 
                     self.failing_since = None
-                    yield self.store.update_pusher_failing_since(
+                    await self.store.update_pusher_failing_since(
                         self.app_id, self.pushkey, self.user_id, self.failing_since
                     )
                 else:
@@ -276,18 +272,17 @@ class HttpPusher(object):
                     )
                     break
 
-    @defer.inlineCallbacks
-    def _process_one(self, push_action):
+    async def _process_one(self, push_action):
         if "notify" not in push_action["actions"]:
             return True
 
         tweaks = push_rule_evaluator.tweaks_for_actions(push_action["actions"])
-        badge = yield push_tools.get_badge_count(self.hs.get_datastore(), self.user_id)
+        badge = await push_tools.get_badge_count(self.hs.get_datastore(), self.user_id)
 
-        event = yield self.store.get_event(push_action["event_id"], allow_none=True)
+        event = await self.store.get_event(push_action["event_id"], allow_none=True)
         if event is None:
             return True  # It's been redacted
-        rejected = yield self.dispatch_push(event, tweaks, badge)
+        rejected = await self.dispatch_push(event, tweaks, badge)
         if rejected is False:
             return False
 
@@ -301,11 +296,10 @@ class HttpPusher(object):
                     )
                 else:
                     logger.info("Pushkey %s was rejected: removing", pk)
-                    yield self.hs.remove_pusher(self.app_id, pk, self.user_id)
+                    await self.hs.remove_pusher(self.app_id, pk, self.user_id)
         return True
 
-    @defer.inlineCallbacks
-    def _build_notification_dict(self, event, tweaks, badge):
+    async def _build_notification_dict(self, event, tweaks, badge):
         priority = "low"
         if (
             event.type == EventTypes.Encrypted
@@ -335,7 +329,7 @@ class HttpPusher(object):
             }
             return d
 
-        ctx = yield push_tools.get_context_for_event(
+        ctx = await push_tools.get_context_for_event(
             self.storage, self.state_handler, event, self.user_id
         )
 
@@ -377,13 +371,12 @@ class HttpPusher(object):
 
         return d
 
-    @defer.inlineCallbacks
-    def dispatch_push(self, event, tweaks, badge):
-        notification_dict = yield self._build_notification_dict(event, tweaks, badge)
+    async def dispatch_push(self, event, tweaks, badge):
+        notification_dict = await self._build_notification_dict(event, tweaks, badge)
         if not notification_dict:
             return []
         try:
-            resp = yield self.http_client.post_json_get_json(
+            resp = await self.http_client.post_json_get_json(
                 self.url, notification_dict
             )
         except Exception as e:
@@ -400,8 +393,7 @@ class HttpPusher(object):
             rejected = resp["rejected"]
         return rejected
 
-    @defer.inlineCallbacks
-    def _send_badge(self, badge):
+    async def _send_badge(self, badge):
         """
         Args:
             badge (int): number of unread messages
@@ -424,7 +416,7 @@ class HttpPusher(object):
             }
         }
         try:
-            yield self.http_client.post_json_get_json(self.url, d)
+            await self.http_client.post_json_get_json(self.url, d)
             http_badges_processed_counter.inc()
         except Exception as e:
             logger.warning(
diff --git a/synapse/push/presentable_names.py b/synapse/push/presentable_names.py
index 0644a13cfc..d8f4a453cd 100644
--- a/synapse/push/presentable_names.py
+++ b/synapse/push/presentable_names.py
@@ -16,8 +16,6 @@
 import logging
 import re
 
-from twisted.internet import defer
-
 from synapse.api.constants import EventTypes
 
 logger = logging.getLogger(__name__)
@@ -29,8 +27,7 @@ ALIAS_RE = re.compile(r"^#.*:.+$")
 ALL_ALONE = "Empty Room"
 
 
-@defer.inlineCallbacks
-def calculate_room_name(
+async def calculate_room_name(
     store,
     room_state_ids,
     user_id,
@@ -53,7 +50,7 @@ def calculate_room_name(
     """
     # does it have a name?
     if (EventTypes.Name, "") in room_state_ids:
-        m_room_name = yield store.get_event(
+        m_room_name = await store.get_event(
             room_state_ids[(EventTypes.Name, "")], allow_none=True
         )
         if m_room_name and m_room_name.content and m_room_name.content["name"]:
@@ -61,7 +58,7 @@ def calculate_room_name(
 
     # does it have a canonical alias?
     if (EventTypes.CanonicalAlias, "") in room_state_ids:
-        canon_alias = yield store.get_event(
+        canon_alias = await store.get_event(
             room_state_ids[(EventTypes.CanonicalAlias, "")], allow_none=True
         )
         if (
@@ -81,7 +78,7 @@ def calculate_room_name(
 
     my_member_event = None
     if (EventTypes.Member, user_id) in room_state_ids:
-        my_member_event = yield store.get_event(
+        my_member_event = await store.get_event(
             room_state_ids[(EventTypes.Member, user_id)], allow_none=True
         )
 
@@ -90,7 +87,7 @@ def calculate_room_name(
         and my_member_event.content["membership"] == "invite"
     ):
         if (EventTypes.Member, my_member_event.sender) in room_state_ids:
-            inviter_member_event = yield store.get_event(
+            inviter_member_event = await store.get_event(
                 room_state_ids[(EventTypes.Member, my_member_event.sender)],
                 allow_none=True,
             )
@@ -107,7 +104,7 @@ def calculate_room_name(
     # we're going to have to generate a name based on who's in the room,
     # so find out who is in the room that isn't the user.
     if EventTypes.Member in room_state_bytype_ids:
-        member_events = yield store.get_events(
+        member_events = await store.get_events(
             list(room_state_bytype_ids[EventTypes.Member].values())
         )
         all_members = [
diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py
index 5dae4648c0..d0145666bf 100644
--- a/synapse/push/push_tools.py
+++ b/synapse/push/push_tools.py
@@ -13,18 +13,15 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.internet import defer
-
 from synapse.push.presentable_names import calculate_room_name, name_from_member_event
 from synapse.storage import Storage
 
 
-@defer.inlineCallbacks
-def get_badge_count(store, user_id):
-    invites = yield store.get_invited_rooms_for_local_user(user_id)
-    joins = yield store.get_rooms_for_user(user_id)
+async def get_badge_count(store, user_id):
+    invites = await store.get_invited_rooms_for_local_user(user_id)
+    joins = await store.get_rooms_for_user(user_id)
 
-    my_receipts_by_room = yield store.get_receipts_for_user(user_id, "m.read")
+    my_receipts_by_room = await store.get_receipts_for_user(user_id, "m.read")
 
     badge = len(invites)
 
@@ -32,7 +29,7 @@ def get_badge_count(store, user_id):
         if room_id in my_receipts_by_room:
             last_unread_event_id = my_receipts_by_room[room_id]
 
-            notifs = yield (
+            notifs = await (
                 store.get_unread_event_push_actions_by_room_for_user(
                     room_id, user_id, last_unread_event_id
                 )
@@ -43,23 +40,22 @@ def get_badge_count(store, user_id):
     return badge
 
 
-@defer.inlineCallbacks
-def get_context_for_event(storage: Storage, state_handler, ev, user_id):
+async def get_context_for_event(storage: Storage, state_handler, ev, user_id):
     ctx = {}
 
-    room_state_ids = yield storage.state.get_state_ids_for_event(ev.event_id)
+    room_state_ids = await storage.state.get_state_ids_for_event(ev.event_id)
 
     # we no longer bother setting room_alias, and make room_name the
     # human-readable name instead, be that m.room.name, an alias or
     # a list of people in the room
-    name = yield calculate_room_name(
+    name = await calculate_room_name(
         storage.main, room_state_ids, user_id, fallback_to_single_member=False
     )
     if name:
         ctx["name"] = name
 
     sender_state_event_id = room_state_ids[("m.room.member", ev.sender)]
-    sender_state_event = yield storage.main.get_event(sender_state_event_id)
+    sender_state_event = await storage.main.get_event(sender_state_event_id)
     ctx["sender_display_name"] = name_from_member_event(sender_state_event)
 
     return ctx
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 2456f12f46..3c3262a88c 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -19,8 +19,6 @@ from typing import TYPE_CHECKING, Dict, Union
 
 from prometheus_client import Gauge
 
-from twisted.internet import defer
-
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.push import PusherConfigException
 from synapse.push.emailpusher import EmailPusher
@@ -52,7 +50,7 @@ class PusherPool:
     Note that it is expected that each pusher will have its own 'processing' loop which
     will send out the notifications in the background, rather than blocking until the
     notifications are sent; accordingly Pusher.on_started, Pusher.on_new_notifications and
-    Pusher.on_new_receipts are not expected to return deferreds.
+    Pusher.on_new_receipts are not expected to return awaitables.
     """
 
     def __init__(self, hs: "HomeServer"):
@@ -77,8 +75,7 @@ class PusherPool:
             return
         run_as_background_process("start_pushers", self._start_pushers)
 
-    @defer.inlineCallbacks
-    def add_pusher(
+    async def add_pusher(
         self,
         user_id,
         access_token,
@@ -94,7 +91,7 @@ class PusherPool:
         """Creates a new pusher and adds it to the pool
 
         Returns:
-            Deferred[EmailPusher|HttpPusher]
+            EmailPusher|HttpPusher
         """
 
         time_now_msec = self.clock.time_msec()
@@ -124,9 +121,9 @@ class PusherPool:
         # create the pusher setting last_stream_ordering to the current maximum
         # stream ordering in event_push_actions, so it will process
         # pushes from this point onwards.
-        last_stream_ordering = yield self.store.get_latest_push_action_stream_ordering()
+        last_stream_ordering = await self.store.get_latest_push_action_stream_ordering()
 
-        yield self.store.add_pusher(
+        await self.store.add_pusher(
             user_id=user_id,
             access_token=access_token,
             kind=kind,
@@ -140,15 +137,14 @@ class PusherPool:
             last_stream_ordering=last_stream_ordering,
             profile_tag=profile_tag,
         )
-        pusher = yield self.start_pusher_by_id(app_id, pushkey, user_id)
+        pusher = await self.start_pusher_by_id(app_id, pushkey, user_id)
 
         return pusher
 
-    @defer.inlineCallbacks
-    def remove_pushers_by_app_id_and_pushkey_not_user(
+    async def remove_pushers_by_app_id_and_pushkey_not_user(
         self, app_id, pushkey, not_user_id
     ):
-        to_remove = yield self.store.get_pushers_by_app_id_and_pushkey(app_id, pushkey)
+        to_remove = await self.store.get_pushers_by_app_id_and_pushkey(app_id, pushkey)
         for p in to_remove:
             if p["user_name"] != not_user_id:
                 logger.info(
@@ -157,10 +153,9 @@ class PusherPool:
                     pushkey,
                     p["user_name"],
                 )
-                yield self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"])
+                await self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"])
 
-    @defer.inlineCallbacks
-    def remove_pushers_by_access_token(self, user_id, access_tokens):
+    async def remove_pushers_by_access_token(self, user_id, access_tokens):
         """Remove the pushers for a given user corresponding to a set of
         access_tokens.
 
@@ -173,7 +168,7 @@ class PusherPool:
             return
 
         tokens = set(access_tokens)
-        for p in (yield self.store.get_pushers_by_user_id(user_id)):
+        for p in await self.store.get_pushers_by_user_id(user_id):
             if p["access_token"] in tokens:
                 logger.info(
                     "Removing pusher for app id %s, pushkey %s, user %s",
@@ -181,16 +176,15 @@ class PusherPool:
                     p["pushkey"],
                     p["user_name"],
                 )
-                yield self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"])
+                await self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"])
 
-    @defer.inlineCallbacks
-    def on_new_notifications(self, min_stream_id, max_stream_id):
+    async def on_new_notifications(self, min_stream_id, max_stream_id):
         if not self.pushers:
             # nothing to do here.
             return
 
         try:
-            users_affected = yield self.store.get_push_action_users_in_range(
+            users_affected = await self.store.get_push_action_users_in_range(
                 min_stream_id, max_stream_id
             )
 
@@ -202,8 +196,7 @@ class PusherPool:
         except Exception:
             logger.exception("Exception in pusher on_new_notifications")
 
-    @defer.inlineCallbacks
-    def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
+    async def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
         if not self.pushers:
             # nothing to do here.
             return
@@ -211,7 +204,7 @@ class PusherPool:
         try:
             # Need to subtract 1 from the minimum because the lower bound here
             # is not inclusive
-            users_affected = yield self.store.get_users_sent_receipts_between(
+            users_affected = await self.store.get_users_sent_receipts_between(
                 min_stream_id - 1, max_stream_id
             )
 
@@ -223,12 +216,11 @@ class PusherPool:
         except Exception:
             logger.exception("Exception in pusher on_new_receipts")
 
-    @defer.inlineCallbacks
-    def start_pusher_by_id(self, app_id, pushkey, user_id):
+    async def start_pusher_by_id(self, app_id, pushkey, user_id):
         """Look up the details for the given pusher, and start it
 
         Returns:
-            Deferred[EmailPusher|HttpPusher|None]: The pusher started, if any
+            EmailPusher|HttpPusher|None: The pusher started, if any
         """
         if not self._should_start_pushers:
             return
@@ -236,7 +228,7 @@ class PusherPool:
         if not self._pusher_shard_config.should_handle(self._instance_name, user_id):
             return
 
-        resultlist = yield self.store.get_pushers_by_app_id_and_pushkey(app_id, pushkey)
+        resultlist = await self.store.get_pushers_by_app_id_and_pushkey(app_id, pushkey)
 
         pusher_dict = None
         for r in resultlist:
@@ -245,34 +237,29 @@ class PusherPool:
 
         pusher = None
         if pusher_dict:
-            pusher = yield self._start_pusher(pusher_dict)
+            pusher = await self._start_pusher(pusher_dict)
 
         return pusher
 
-    @defer.inlineCallbacks
-    def _start_pushers(self):
+    async def _start_pushers(self) -> None:
         """Start all the pushers
-
-        Returns:
-            Deferred
         """
-        pushers = yield self.store.get_all_pushers()
+        pushers = await self.store.get_all_pushers()
 
         # Stagger starting up the pushers so we don't completely drown the
         # process on start up.
-        yield concurrently_execute(self._start_pusher, pushers, 10)
+        await concurrently_execute(self._start_pusher, pushers, 10)
 
         logger.info("Started pushers")
 
-    @defer.inlineCallbacks
-    def _start_pusher(self, pusherdict):
+    async def _start_pusher(self, pusherdict):
         """Start the given pusher
 
         Args:
             pusherdict (dict): dict with the values pulled from the db table
 
         Returns:
-            Deferred[EmailPusher|HttpPusher]
+            EmailPusher|HttpPusher
         """
         if not self._pusher_shard_config.should_handle(
             self._instance_name, pusherdict["user_name"]
@@ -315,7 +302,7 @@ class PusherPool:
         user_id = pusherdict["user_name"]
         last_stream_ordering = pusherdict["last_stream_ordering"]
         if last_stream_ordering:
-            have_notifs = yield self.store.get_if_maybe_push_in_range_for_user(
+            have_notifs = await self.store.get_if_maybe_push_in_range_for_user(
                 user_id, last_stream_ordering
             )
         else:
@@ -327,8 +314,7 @@ class PusherPool:
 
         return p
 
-    @defer.inlineCallbacks
-    def remove_pusher(self, app_id, pushkey, user_id):
+    async def remove_pusher(self, app_id, pushkey, user_id):
         appid_pushkey = "%s:%s" % (app_id, pushkey)
 
         byuser = self.pushers.get(user_id, {})
@@ -340,6 +326,6 @@ class PusherPool:
 
             synapse_pushers.labels(type(pusher).__name__, pusher.app_id).dec()
 
-        yield self.store.delete_pusher_by_app_id_pushkey_user_id(
+        await self.store.delete_pusher_by_app_id_pushkey_user_id(
             app_id, pushkey, user_id
         )
diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py
index c287c4e269..ca065e819e 100644
--- a/synapse/replication/http/federation.py
+++ b/synapse/replication/http/federation.py
@@ -78,7 +78,9 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
         """
         event_payloads = []
         for event, context in event_and_contexts:
-            serialized_context = yield context.serialize(event, store)
+            serialized_context = yield defer.ensureDeferred(
+                context.serialize(event, store)
+            )
 
             event_payloads.append(
                 {
diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
index c981723c1a..b30e4d5039 100644
--- a/synapse/replication/http/send_event.py
+++ b/synapse/replication/http/send_event.py
@@ -77,7 +77,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
             extra_users (list(UserID)): Any extra users to notify about event
         """
 
-        serialized_context = yield context.serialize(event, store)
+        serialized_context = yield defer.ensureDeferred(context.serialize(event, store))
 
         payload = {
             "event": event.get_pdu_json(),
diff --git a/synapse/rest/media/v1/_base.py b/synapse/rest/media/v1/_base.py
index 9a847130c0..20ddb9550b 100644
--- a/synapse/rest/media/v1/_base.py
+++ b/synapse/rest/media/v1/_base.py
@@ -17,7 +17,9 @@
 import logging
 import os
 import urllib
+from typing import Awaitable
 
+from twisted.internet.interfaces import IConsumer
 from twisted.protocols.basic import FileSender
 
 from synapse.api.errors import Codes, SynapseError, cs_error
@@ -240,14 +242,14 @@ class Responder(object):
     held can be cleaned up.
     """
 
-    def write_to_consumer(self, consumer):
+    def write_to_consumer(self, consumer: IConsumer) -> Awaitable:
         """Stream response into consumer
 
         Args:
-            consumer (IConsumer)
+            consumer: The consumer to stream into.
 
         Returns:
-            Deferred: Resolves once the response has finished being written
+            Resolves once the response has finished being written
         """
         pass
 
diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index 45628c07b4..6fb4039e98 100644
--- a/synapse/rest/media/v1/media_repository.py
+++ b/synapse/rest/media/v1/media_repository.py
@@ -18,10 +18,11 @@ import errno
 import logging
 import os
 import shutil
-from typing import Dict, Tuple
+from typing import IO, Dict, Optional, Tuple
 
 import twisted.internet.error
 import twisted.web.http
+from twisted.web.http import Request
 from twisted.web.resource import Resource
 
 from synapse.api.errors import (
@@ -40,6 +41,7 @@ from synapse.util.stringutils import random_string
 
 from ._base import (
     FileInfo,
+    Responder,
     get_filename_from_headers,
     respond_404,
     respond_with_responder,
@@ -135,19 +137,24 @@ class MediaRepository(object):
             self.recently_accessed_locals.add(media_id)
 
     async def create_content(
-        self, media_type, upload_name, content, content_length, auth_user
-    ):
+        self,
+        media_type: str,
+        upload_name: str,
+        content: IO,
+        content_length: int,
+        auth_user: str,
+    ) -> str:
         """Store uploaded content for a local user and return the mxc URL
 
         Args:
-            media_type(str): The content type of the file
-            upload_name(str): The name of the file
+            media_type: The content type of the file
+            upload_name: The name of the file
             content: A file like object that is the content to store
-            content_length(int): The length of the content
-            auth_user(str): The user_id of the uploader
+            content_length: The length of the content
+            auth_user: The user_id of the uploader
 
         Returns:
-            Deferred[str]: The mxc url of the stored content
+            The mxc url of the stored content
         """
         media_id = random_string(24)
 
@@ -170,19 +177,20 @@ class MediaRepository(object):
 
         return "mxc://%s/%s" % (self.server_name, media_id)
 
-    async def get_local_media(self, request, media_id, name):
+    async def get_local_media(
+        self, request: Request, media_id: str, name: Optional[str]
+    ) -> None:
         """Responds to reqests for local media, if exists, or returns 404.
 
         Args:
-            request(twisted.web.http.Request)
-            media_id (str): The media ID of the content. (This is the same as
+            request: The incoming request.
+            media_id: The media ID of the content. (This is the same as
                 the file_id for local content.)
-            name (str|None): Optional name that, if specified, will be used as
+            name: Optional name that, if specified, will be used as
                 the filename in the Content-Disposition header of the response.
 
         Returns:
-            Deferred: Resolves once a response has successfully been written
-                to request
+            Resolves once a response has successfully been written to request
         """
         media_info = await self.store.get_local_media(media_id)
         if not media_info or media_info["quarantined_by"]:
@@ -203,20 +211,20 @@ class MediaRepository(object):
             request, responder, media_type, media_length, upload_name
         )
 
-    async def get_remote_media(self, request, server_name, media_id, name):
+    async def get_remote_media(
+        self, request: Request, server_name: str, media_id: str, name: Optional[str]
+    ) -> None:
         """Respond to requests for remote media.
 
         Args:
-            request(twisted.web.http.Request)
-            server_name (str): Remote server_name where the media originated.
-            media_id (str): The media ID of the content (as defined by the
-                remote server).
-            name (str|None): Optional name that, if specified, will be used as
+            request: The incoming request.
+            server_name: Remote server_name where the media originated.
+            media_id: The media ID of the content (as defined by the remote server).
+            name: Optional name that, if specified, will be used as
                 the filename in the Content-Disposition header of the response.
 
         Returns:
-            Deferred: Resolves once a response has successfully been written
-                to request
+            Resolves once a response has successfully been written to request
         """
         if (
             self.federation_domain_whitelist is not None
@@ -245,17 +253,16 @@ class MediaRepository(object):
         else:
             respond_404(request)
 
-    async def get_remote_media_info(self, server_name, media_id):
+    async def get_remote_media_info(self, server_name: str, media_id: str) -> dict:
         """Gets the media info associated with the remote file, downloading
         if necessary.
 
         Args:
-            server_name (str): Remote server_name where the media originated.
-            media_id (str): The media ID of the content (as defined by the
-                remote server).
+            server_name: Remote server_name where the media originated.
+            media_id: The media ID of the content (as defined by the remote server).
 
         Returns:
-            Deferred[dict]: The media_info of the file
+            The media info of the file
         """
         if (
             self.federation_domain_whitelist is not None
@@ -278,7 +285,9 @@ class MediaRepository(object):
 
         return media_info
 
-    async def _get_remote_media_impl(self, server_name, media_id):
+    async def _get_remote_media_impl(
+        self, server_name: str, media_id: str
+    ) -> Tuple[Optional[Responder], dict]:
         """Looks for media in local cache, if not there then attempt to
         download from remote server.
 
@@ -288,7 +297,7 @@ class MediaRepository(object):
                 remote server).
 
         Returns:
-            Deferred[(Responder, media_info)]
+            A tuple of responder and the media info of the file.
         """
         media_info = await self.store.get_cached_remote_media(server_name, media_id)
 
@@ -319,19 +328,21 @@ class MediaRepository(object):
         responder = await self.media_storage.fetch_media(file_info)
         return responder, media_info
 
-    async def _download_remote_file(self, server_name, media_id, file_id):
+    async def _download_remote_file(
+        self, server_name: str, media_id: str, file_id: str
+    ) -> dict:
         """Attempt to download the remote file from the given server name,
         using the given file_id as the local id.
 
         Args:
-            server_name (str): Originating server
-            media_id (str): The media ID of the content (as defined by the
+            server_name: Originating server
+            media_id: The media ID of the content (as defined by the
                 remote server). This is different than the file_id, which is
                 locally generated.
-            file_id (str): Local file ID
+            file_id: Local file ID
 
         Returns:
-            Deferred[MediaInfo]
+            The media info of the file.
         """
 
         file_info = FileInfo(server_name=server_name, file_id=file_id)
@@ -549,25 +560,31 @@ class MediaRepository(object):
             return output_path
 
     async def _generate_thumbnails(
-        self, server_name, media_id, file_id, media_type, url_cache=False
-    ):
+        self,
+        server_name: Optional[str],
+        media_id: str,
+        file_id: str,
+        media_type: str,
+        url_cache: bool = False,
+    ) -> Optional[dict]:
         """Generate and store thumbnails for an image.
 
         Args:
-            server_name (str|None): The server name if remote media, else None if local
-            media_id (str): The media ID of the content. (This is the same as
+            server_name: The server name if remote media, else None if local
+            media_id: The media ID of the content. (This is the same as
                 the file_id for local content)
-            file_id (str): Local file ID
-            media_type (str): The content type of the file
-            url_cache (bool): If we are thumbnailing images downloaded for the URL cache,
+            file_id: Local file ID
+            media_type: The content type of the file
+            url_cache: If we are thumbnailing images downloaded for the URL cache,
                 used exclusively by the url previewer
 
         Returns:
-            Deferred[dict]: Dict with "width" and "height" keys of original image
+            Dict with "width" and "height" keys of original image or None if the
+            media cannot be thumbnailed.
         """
         requirements = self._get_thumbnail_requirements(media_type)
         if not requirements:
-            return
+            return None
 
         input_path = await self.media_storage.ensure_media_is_in_local_cache(
             FileInfo(server_name, file_id, url_cache=url_cache)
@@ -584,7 +601,7 @@ class MediaRepository(object):
                 m_height,
                 self.max_image_pixels,
             )
-            return
+            return None
 
         if thumbnailer.transpose_method is not None:
             m_width, m_height = await defer_to_thread(
diff --git a/synapse/rest/media/v1/media_storage.py b/synapse/rest/media/v1/media_storage.py
index 66bc1c3360..858b6d3005 100644
--- a/synapse/rest/media/v1/media_storage.py
+++ b/synapse/rest/media/v1/media_storage.py
@@ -12,13 +12,12 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 import contextlib
 import inspect
 import logging
 import os
 import shutil
-from typing import Optional
+from typing import IO, TYPE_CHECKING, Any, Optional, Sequence
 
 from twisted.protocols.basic import FileSender
 
@@ -26,6 +25,12 @@ from synapse.logging.context import defer_to_thread, make_deferred_yieldable
 from synapse.util.file_consumer import BackgroundFileConsumer
 
 from ._base import FileInfo, Responder
+from .filepath import MediaFilePaths
+
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
+    from .storage_provider import StorageProvider
 
 logger = logging.getLogger(__name__)
 
@@ -34,20 +39,25 @@ class MediaStorage(object):
     """Responsible for storing/fetching files from local sources.
 
     Args:
-        hs (synapse.server.Homeserver)
-        local_media_directory (str): Base path where we store media on disk
-        filepaths (MediaFilePaths)
-        storage_providers ([StorageProvider]): List of StorageProvider that are
-            used to fetch and store files.
+        hs
+        local_media_directory: Base path where we store media on disk
+        filepaths
+        storage_providers: List of StorageProvider that are used to fetch and store files.
     """
 
-    def __init__(self, hs, local_media_directory, filepaths, storage_providers):
+    def __init__(
+        self,
+        hs: "HomeServer",
+        local_media_directory: str,
+        filepaths: MediaFilePaths,
+        storage_providers: Sequence["StorageProvider"],
+    ):
         self.hs = hs
         self.local_media_directory = local_media_directory
         self.filepaths = filepaths
         self.storage_providers = storage_providers
 
-    async def store_file(self, source, file_info: FileInfo) -> str:
+    async def store_file(self, source: IO, file_info: FileInfo) -> str:
         """Write `source` to the on disk media store, and also any other
         configured storage providers
 
@@ -69,7 +79,7 @@ class MediaStorage(object):
         return fname
 
     @contextlib.contextmanager
-    def store_into_file(self, file_info):
+    def store_into_file(self, file_info: FileInfo):
         """Context manager used to get a file like object to write into, as
         described by file_info.
 
@@ -85,7 +95,7 @@ class MediaStorage(object):
         error.
 
         Args:
-            file_info (FileInfo): Info about the file to store
+            file_info: Info about the file to store
 
         Example:
 
@@ -143,9 +153,9 @@ class MediaStorage(object):
             return FileResponder(open(local_path, "rb"))
 
         for provider in self.storage_providers:
-            res = provider.fetch(path, file_info)
-            # Fetch is supposed to return an Awaitable, but guard against
-            # improper implementations.
+            res = provider.fetch(path, file_info)  # type: Any
+            # Fetch is supposed to return an Awaitable[Responder], but guard
+            # against improper implementations.
             if inspect.isawaitable(res):
                 res = await res
             if res:
@@ -174,9 +184,9 @@ class MediaStorage(object):
             os.makedirs(dirname)
 
         for provider in self.storage_providers:
-            res = provider.fetch(path, file_info)
-            # Fetch is supposed to return an Awaitable, but guard against
-            # improper implementations.
+            res = provider.fetch(path, file_info)  # type: Any
+            # Fetch is supposed to return an Awaitable[Responder], but guard
+            # against improper implementations.
             if inspect.isawaitable(res):
                 res = await res
             if res:
@@ -190,17 +200,11 @@ class MediaStorage(object):
 
         raise Exception("file could not be found")
 
-    def _file_info_to_path(self, file_info):
+    def _file_info_to_path(self, file_info: FileInfo) -> str:
         """Converts file_info into a relative path.
 
         The path is suitable for storing files under a directory, e.g. used to
         store files on local FS under the base media repository directory.
-
-        Args:
-            file_info (FileInfo)
-
-        Returns:
-            str
         """
         if file_info.url_cache:
             if file_info.thumbnail:
diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py
index 13d1a6d2ed..e12f65a206 100644
--- a/synapse/rest/media/v1/preview_url_resource.py
+++ b/synapse/rest/media/v1/preview_url_resource.py
@@ -231,16 +231,16 @@ class PreviewUrlResource(DirectServeJsonResource):
         og = await make_deferred_yieldable(defer.maybeDeferred(observable.observe))
         respond_with_json_bytes(request, 200, og, send_cors=True)
 
-    async def _do_preview(self, url, user, ts):
+    async def _do_preview(self, url: str, user: str, ts: int) -> bytes:
         """Check the db, and download the URL and build a preview
 
         Args:
-            url (str):
-            user (str):
-            ts (int):
+            url: The URL to preview.
+            user: The user requesting the preview.
+            ts: The timestamp requested for the preview.
 
         Returns:
-            Deferred[bytes]: json-encoded og data
+            json-encoded og data
         """
         # check the URL cache in the DB (which will also provide us with
         # historical previews, if we have any)
diff --git a/synapse/rest/media/v1/storage_provider.py b/synapse/rest/media/v1/storage_provider.py
index 858680be26..a33f56e806 100644
--- a/synapse/rest/media/v1/storage_provider.py
+++ b/synapse/rest/media/v1/storage_provider.py
@@ -16,62 +16,62 @@
 import logging
 import os
 import shutil
-
-from twisted.internet import defer
+from typing import Optional
 
 from synapse.config._base import Config
 from synapse.logging.context import defer_to_thread, run_in_background
 
+from ._base import FileInfo, Responder
 from .media_storage import FileResponder
 
 logger = logging.getLogger(__name__)
 
 
-class StorageProvider(object):
+class StorageProvider:
     """A storage provider is a service that can store uploaded media and
     retrieve them.
     """
 
-    def store_file(self, path, file_info):
+    async def store_file(self, path: str, file_info: FileInfo):
         """Store the file described by file_info. The actual contents can be
         retrieved by reading the file in file_info.upload_path.
 
         Args:
-            path (str): Relative path of file in local cache
-            file_info (FileInfo)
-
-        Returns:
-            Deferred
+            path: Relative path of file in local cache
+            file_info: The metadata of the file.
         """
-        pass
 
-    def fetch(self, path, file_info):
+    async def fetch(self, path: str, file_info: FileInfo) -> Optional[Responder]:
         """Attempt to fetch the file described by file_info and stream it
         into writer.
 
         Args:
-            path (str): Relative path of file in local cache
-            file_info (FileInfo)
+            path: Relative path of file in local cache
+            file_info: The metadata of the file.
 
         Returns:
-            Deferred(Responder): Returns a Responder if the provider has the file,
-                otherwise returns None.
+            Returns a Responder if the provider has the file, otherwise returns None.
         """
-        pass
 
 
 class StorageProviderWrapper(StorageProvider):
     """Wraps a storage provider and provides various config options
 
     Args:
-        backend (StorageProvider)
-        store_local (bool): Whether to store new local files or not.
-        store_synchronous (bool): Whether to wait for file to be successfully
+        backend: The storage provider to wrap.
+        store_local: Whether to store new local files or not.
+        store_synchronous: Whether to wait for file to be successfully
             uploaded, or todo the upload in the background.
-        store_remote (bool): Whether remote media should be uploaded
+        store_remote: Whether remote media should be uploaded
     """
 
-    def __init__(self, backend, store_local, store_synchronous, store_remote):
+    def __init__(
+        self,
+        backend: StorageProvider,
+        store_local: bool,
+        store_synchronous: bool,
+        store_remote: bool,
+    ):
         self.backend = backend
         self.store_local = store_local
         self.store_synchronous = store_synchronous
@@ -80,15 +80,15 @@ class StorageProviderWrapper(StorageProvider):
     def __str__(self):
         return "StorageProviderWrapper[%s]" % (self.backend,)
 
-    def store_file(self, path, file_info):
+    async def store_file(self, path, file_info):
         if not file_info.server_name and not self.store_local:
-            return defer.succeed(None)
+            return None
 
         if file_info.server_name and not self.store_remote:
-            return defer.succeed(None)
+            return None
 
         if self.store_synchronous:
-            return self.backend.store_file(path, file_info)
+            return await self.backend.store_file(path, file_info)
         else:
             # TODO: Handle errors.
             def store():
@@ -98,10 +98,10 @@ class StorageProviderWrapper(StorageProvider):
                     logger.exception("Error storing file")
 
             run_in_background(store)
-            return defer.succeed(None)
+            return None
 
-    def fetch(self, path, file_info):
-        return self.backend.fetch(path, file_info)
+    async def fetch(self, path, file_info):
+        return await self.backend.fetch(path, file_info)
 
 
 class FileStorageProviderBackend(StorageProvider):
@@ -120,7 +120,7 @@ class FileStorageProviderBackend(StorageProvider):
     def __str__(self):
         return "FileStorageProviderBackend[%s]" % (self.base_directory,)
 
-    def store_file(self, path, file_info):
+    async def store_file(self, path, file_info):
         """See StorageProvider.store_file"""
 
         primary_fname = os.path.join(self.cache_directory, path)
@@ -130,11 +130,11 @@ class FileStorageProviderBackend(StorageProvider):
         if not os.path.exists(dirname):
             os.makedirs(dirname)
 
-        return defer_to_thread(
+        return await defer_to_thread(
             self.hs.get_reactor(), shutil.copyfile, primary_fname, backup_fname
         )
 
-    def fetch(self, path, file_info):
+    async def fetch(self, path, file_info):
         """See StorageProvider.fetch"""
 
         backup_fname = os.path.join(self.base_directory, path)
diff --git a/synapse/storage/data_stores/main/event_push_actions.py b/synapse/storage/data_stores/main/event_push_actions.py
index 504babaa7e..18297cf3b8 100644
--- a/synapse/storage/data_stores/main/event_push_actions.py
+++ b/synapse/storage/data_stores/main/event_push_actions.py
@@ -411,7 +411,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
             _get_if_maybe_push_in_range_for_user_txn,
         )
 
-    def add_push_actions_to_staging(self, event_id, user_id_actions):
+    async def add_push_actions_to_staging(self, event_id, user_id_actions):
         """Add the push actions for the event to the push action staging area.
 
         Args:
@@ -457,7 +457,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
                 ),
             )
 
-        return self.db.runInteraction(
+        return await self.db.runInteraction(
             "add_push_actions_to_staging", _add_push_actions_to_staging_txn
         )
 
diff --git a/synapse/visibility.py b/synapse/visibility.py
index 0f042c5696..e3da7744d2 100644
--- a/synapse/visibility.py
+++ b/synapse/visibility.py
@@ -16,8 +16,6 @@
 import logging
 import operator
 
-from twisted.internet import defer
-
 from synapse.api.constants import EventTypes, Membership
 from synapse.events.utils import prune_event
 from synapse.storage import Storage
@@ -39,8 +37,7 @@ MEMBERSHIP_PRIORITY = (
 )
 
 
-@defer.inlineCallbacks
-def filter_events_for_client(
+async def filter_events_for_client(
     storage: Storage,
     user_id,
     events,
@@ -67,19 +64,19 @@ def filter_events_for_client(
             also be called to check whether a user can see the state at a given point.
 
     Returns:
-        Deferred[list[synapse.events.EventBase]]
+        list[synapse.events.EventBase]
     """
     # Filter out events that have been soft failed so that we don't relay them
     # to clients.
     events = [e for e in events if not e.internal_metadata.is_soft_failed()]
 
     types = ((EventTypes.RoomHistoryVisibility, ""), (EventTypes.Member, user_id))
-    event_id_to_state = yield storage.state.get_state_for_events(
+    event_id_to_state = await storage.state.get_state_for_events(
         frozenset(e.event_id for e in events),
         state_filter=StateFilter.from_types(types),
     )
 
-    ignore_dict_content = yield storage.main.get_global_account_data_by_type_for_user(
+    ignore_dict_content = await storage.main.get_global_account_data_by_type_for_user(
         "m.ignored_user_list", user_id
     )
 
@@ -90,7 +87,7 @@ def filter_events_for_client(
         else []
     )
 
-    erased_senders = yield storage.main.are_users_erased((e.sender for e in events))
+    erased_senders = await storage.main.are_users_erased((e.sender for e in events))
 
     if filter_send_to_client:
         room_ids = {e.room_id for e in events}
@@ -99,7 +96,7 @@ def filter_events_for_client(
         for room_id in room_ids:
             retention_policies[
                 room_id
-            ] = yield storage.main.get_retention_policy_for_room(room_id)
+            ] = await storage.main.get_retention_policy_for_room(room_id)
 
     def allowed(event):
         """
@@ -254,8 +251,7 @@ def filter_events_for_client(
     return list(filtered_events)
 
 
-@defer.inlineCallbacks
-def filter_events_for_server(
+async def filter_events_for_server(
     storage: Storage,
     server_name,
     events,
@@ -277,7 +273,7 @@ def filter_events_for_server(
             backfill or not.
 
     Returns
-        Deferred[list[FrozenEvent]]
+        list[FrozenEvent]
     """
 
     def is_sender_erased(event, erased_senders):
@@ -321,7 +317,7 @@ def filter_events_for_server(
     # Lets check to see if all the events have a history visibility
     # of "shared" or "world_readable". If that's the case then we don't
     # need to check membership (as we know the server is in the room).
-    event_to_state_ids = yield storage.state.get_state_ids_for_events(
+    event_to_state_ids = await storage.state.get_state_ids_for_events(
         frozenset(e.event_id for e in events),
         state_filter=StateFilter.from_types(
             types=((EventTypes.RoomHistoryVisibility, ""),)
@@ -339,14 +335,14 @@ def filter_events_for_server(
     if not visibility_ids:
         all_open = True
     else:
-        event_map = yield storage.main.get_events(visibility_ids)
+        event_map = await storage.main.get_events(visibility_ids)
         all_open = all(
             e.content.get("history_visibility") in (None, "shared", "world_readable")
             for e in event_map.values()
         )
 
     if not check_history_visibility_only:
-        erased_senders = yield storage.main.are_users_erased((e.sender for e in events))
+        erased_senders = await storage.main.are_users_erased((e.sender for e in events))
     else:
         # We don't want to check whether users are erased, which is equivalent
         # to no users having been erased.
@@ -375,7 +371,7 @@ def filter_events_for_server(
 
     # first, for each event we're wanting to return, get the event_ids
     # of the history vis and membership state at those events.
-    event_to_state_ids = yield storage.state.get_state_ids_for_events(
+    event_to_state_ids = await storage.state.get_state_ids_for_events(
         frozenset(e.event_id for e in events),
         state_filter=StateFilter.from_types(
             types=((EventTypes.RoomHistoryVisibility, ""), (EventTypes.Member, None))
@@ -405,7 +401,7 @@ def filter_events_for_server(
             return False
         return state_key[idx + 1 :] == server_name
 
-    event_map = yield storage.main.get_events(
+    event_map = await storage.main.get_events(
         [e_id for e_id, key in event_id_to_state_key.items() if include(key[0], key[1])]
     )