summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorBrendan Abolivier <babolivier@matrix.org>2020-07-30 19:00:29 +0100
committerBrendan Abolivier <babolivier@matrix.org>2020-07-30 19:00:29 +0100
commit69158e554f30ac8b6b646a62fa496a2c0005dea6 (patch)
tree42fdb177abede9c0128906d4e6661cde0ee9cd6c /synapse/handlers
parentChangelog (diff)
parentUpdate workers docs (#7990) (diff)
downloadsynapse-69158e554f30ac8b6b646a62fa496a2c0005dea6.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into babolivier/new_push_rules
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/admin.py2
-rw-r--r--synapse/handlers/appservice.py10
-rw-r--r--synapse/handlers/auth.py7
-rw-r--r--synapse/handlers/deactivate_account.py22
-rw-r--r--synapse/handlers/e2e_keys.py24
-rw-r--r--synapse/handlers/federation.py24
-rw-r--r--synapse/handlers/groups_local.py35
-rw-r--r--synapse/handlers/identity.py271
-rw-r--r--synapse/handlers/message.py307
-rw-r--r--synapse/handlers/presence.py47
-rw-r--r--synapse/handlers/room.py13
-rw-r--r--synapse/handlers/room_member.py37
-rw-r--r--synapse/handlers/saml_handler.py3
-rw-r--r--synapse/handlers/search.py7
-rw-r--r--synapse/handlers/sync.py10
-rw-r--r--synapse/handlers/ui_auth/checkers.py35
16 files changed, 455 insertions, 399 deletions
diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index f3c0aeceb6..506bb2b275 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -72,7 +72,7 @@ class AdminHandler(BaseHandler):
             writer (ExfiltrationWriter)
 
         Returns:
-            defer.Deferred: Resolves when all data for a user has been written.
+            Resolves when all data for a user has been written.
             The returned value is that returned by `writer.finished()`.
         """
         # Get all rooms the user is in or has been in
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 92d4c6e16c..fbc56c351b 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -27,7 +27,6 @@ from synapse.metrics import (
     event_processing_loop_room_count,
 )
 from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.util import log_failure
 from synapse.util.metrics import Measure
 
 logger = logging.getLogger(__name__)
@@ -100,10 +99,11 @@ class ApplicationServicesHandler(object):
 
                         if not self.started_scheduler:
 
-                            def start_scheduler():
-                                return self.scheduler.start().addErrback(
-                                    log_failure, "Application Services Failure"
-                                )
+                            async def start_scheduler():
+                                try:
+                                    return self.scheduler.start()
+                                except Exception:
+                                    logger.error("Application Services Failure")
 
                             run_as_background_process("as_scheduler", start_scheduler)
                             self.started_scheduler = True
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index a162392e4c..c7d921c21a 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -13,6 +13,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import inspect
 import logging
 import time
 import unicodedata
@@ -863,11 +864,15 @@ class AuthHandler(BaseHandler):
         # see if any of our auth providers want to know about this
         for provider in self.password_providers:
             if hasattr(provider, "on_logged_out"):
-                await provider.on_logged_out(
+                # This might return an awaitable, if it does block the log out
+                # until it completes.
+                result = provider.on_logged_out(
                     user_id=str(user_info["user"]),
                     device_id=user_info["device_id"],
                     access_token=access_token,
                 )
+                if inspect.isawaitable(result):
+                    await result
 
         # delete pushers associated with this access token
         if user_info["token_id"] is not None:
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index 696d85b5f9..25169157c1 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -30,6 +30,7 @@ class DeactivateAccountHandler(BaseHandler):
 
     def __init__(self, hs):
         super(DeactivateAccountHandler, self).__init__(hs)
+        self.hs = hs
         self._auth_handler = hs.get_auth_handler()
         self._device_handler = hs.get_device_handler()
         self._room_member_handler = hs.get_room_member_handler()
@@ -222,13 +223,26 @@ class DeactivateAccountHandler(BaseHandler):
         """
         Activate an account that was previously deactivated.
 
-        This simply marks the user as activate in the database and does not
-        attempt to rejoin rooms, re-add threepids, etc.
+        This marks the user as active and not erased in the database, but does
+        not attempt to rejoin rooms, re-add threepids, etc.
+
+        If enabled, the user will be re-added to the user directory.
 
         The user will also need a password hash set to actually login.
 
         Args:
-            user_id: ID of user to be deactivated
+            user_id: ID of user to be re-activated
         """
-        # Mark the user as activate.
+        # Add the user to the directory, if necessary.
+        user = UserID.from_string(user_id)
+        if self.hs.config.user_directory_search_all_users:
+            profile = await self.store.get_profileinfo(user.localpart)
+            await self.user_directory_handler.handle_local_profile_change(
+                user_id, profile
+            )
+
+        # Ensure the user is not marked as erased.
+        await self.store.mark_user_not_erased(user_id)
+
+        # Mark the user as active.
         await self.store.set_user_deactivated_status(user_id, False)
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 361dd64cd2..84169c1022 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -16,10 +16,11 @@
 # limitations under the License.
 
 import logging
+from typing import Dict, List, Optional, Tuple
 
 import attr
 from canonicaljson import encode_canonical_json, json
-from signedjson.key import decode_verify_key_bytes
+from signedjson.key import VerifyKey, decode_verify_key_bytes
 from signedjson.sign import SignatureVerifyException, verify_signed_json
 from unpaddedbase64 import decode_base64
 
@@ -265,7 +266,9 @@ class E2eKeysHandler(object):
 
         return ret
 
-    async def get_cross_signing_keys_from_cache(self, query, from_user_id):
+    async def get_cross_signing_keys_from_cache(
+        self, query, from_user_id
+    ) -> Dict[str, Dict[str, dict]]:
         """Get cross-signing keys for users from the database
 
         Args:
@@ -277,8 +280,7 @@ class E2eKeysHandler(object):
                 can see.
 
         Returns:
-            defer.Deferred[dict[str, dict[str, dict]]]: map from
-                (master_keys|self_signing_keys|user_signing_keys) -> user_id -> key
+            A map from (master_keys|self_signing_keys|user_signing_keys) -> user_id -> key
         """
         master_keys = {}
         self_signing_keys = {}
@@ -312,16 +314,17 @@ class E2eKeysHandler(object):
         }
 
     @trace
-    async def query_local_devices(self, query):
+    async def query_local_devices(
+        self, query: Dict[str, Optional[List[str]]]
+    ) -> Dict[str, Dict[str, dict]]:
         """Get E2E device keys for local users
 
         Args:
-            query (dict[string, list[string]|None): map from user_id to a list
+            query: map from user_id to a list
                  of devices to query (None for all devices)
 
         Returns:
-            defer.Deferred: (resolves to dict[string, dict[string, dict]]):
-                 map from user_id -> device_id -> device details
+            A map from user_id -> device_id -> device details
         """
         set_tag("local_query", query)
         local_query = []
@@ -1004,7 +1007,7 @@ class E2eKeysHandler(object):
 
     async def _retrieve_cross_signing_keys_for_remote_user(
         self, user: UserID, desired_key_type: str,
-    ):
+    ) -> Tuple[Optional[dict], Optional[str], Optional[VerifyKey]]:
         """Queries cross-signing keys for a remote user and saves them to the database
 
         Only the key specified by `key_type` will be returned, while all retrieved keys
@@ -1015,8 +1018,7 @@ class E2eKeysHandler(object):
             desired_key_type: The type of key to receive. One of "master", "self_signing"
 
         Returns:
-            Deferred[Tuple[Optional[Dict], Optional[str], Optional[VerifyKey]]]: A tuple
-            of the retrieved key content, the key's ID and the matching VerifyKey.
+            A tuple of the retrieved key content, the key's ID and the matching VerifyKey.
             If the key cannot be retrieved, all values in the tuple will instead be None.
         """
         try:
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 71ac5dca99..0d7d1adcea 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1394,7 +1394,7 @@ class FederationHandler(BaseHandler):
             # it's just a best-effort thing at this point. We do want to do
             # them roughly in order, though, otherwise we'll end up making
             # lots of requests for missing prev_events which we do actually
-            # have. Hence we fire off the deferred, but don't wait for it.
+            # have. Hence we fire off the background task, but don't wait for it.
 
             run_in_background(self._handle_queued_pdus, room_queue)
 
@@ -1887,9 +1887,6 @@ class FederationHandler(BaseHandler):
             origin, event, state=state, auth_events=auth_events, backfilled=backfilled
         )
 
-        # reraise does not allow inlineCallbacks to preserve the stacktrace, so we
-        # hack around with a try/finally instead.
-        success = False
         try:
             if (
                 not event.internal_metadata.is_outlier()
@@ -1903,12 +1900,11 @@ class FederationHandler(BaseHandler):
             await self.persist_events_and_notify(
                 [(event, context)], backfilled=backfilled
             )
-            success = True
-        finally:
-            if not success:
-                run_in_background(
-                    self.store.remove_push_actions_from_staging, event.event_id
-                )
+        except Exception:
+            run_in_background(
+                self.store.remove_push_actions_from_staging, event.event_id
+            )
+            raise
 
         return context
 
@@ -2474,7 +2470,7 @@ class FederationHandler(BaseHandler):
         }
 
         current_state_ids = await context.get_current_state_ids()
-        current_state_ids = dict(current_state_ids)
+        current_state_ids = dict(current_state_ids)  # type: ignore
 
         current_state_ids.update(state_updates)
 
@@ -2994,7 +2990,9 @@ class FederationHandler(BaseHandler):
         else:
             user_joined_room(self.distributor, user, room_id)
 
-    async def get_room_complexity(self, remote_room_hosts, room_id):
+    async def get_room_complexity(
+        self, remote_room_hosts: List[str], room_id: str
+    ) -> Optional[dict]:
         """
         Fetch the complexity of a remote room over federation.
 
@@ -3003,7 +3001,7 @@ class FederationHandler(BaseHandler):
             room_id (str): The room ID to ask about.
 
         Returns:
-            Deferred[dict] or Deferred[None]: Dict contains the complexity
+            Dict contains the complexity
             metric versions, while None means we could not fetch the complexity.
         """
 
diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py
index ecdb12a7bf..0e2656ccb3 100644
--- a/synapse/handlers/groups_local.py
+++ b/synapse/handlers/groups_local.py
@@ -23,39 +23,32 @@ logger = logging.getLogger(__name__)
 
 
 def _create_rerouter(func_name):
-    """Returns a function that looks at the group id and calls the function
+    """Returns an async function that looks at the group id and calls the function
     on federation or the local group server if the group is local
     """
 
-    def f(self, group_id, *args, **kwargs):
+    async def f(self, group_id, *args, **kwargs):
         if self.is_mine_id(group_id):
-            return getattr(self.groups_server_handler, func_name)(
+            return await getattr(self.groups_server_handler, func_name)(
                 group_id, *args, **kwargs
             )
         else:
             destination = get_domain_from_id(group_id)
-            d = getattr(self.transport_client, func_name)(
-                destination, group_id, *args, **kwargs
-            )
 
-            # Capture errors returned by the remote homeserver and
-            # re-throw specific errors as SynapseErrors. This is so
-            # when the remote end responds with things like 403 Not
-            # In Group, we can communicate that to the client instead
-            # of a 500.
-            def http_response_errback(failure):
-                failure.trap(HttpResponseException)
-                e = failure.value
+            try:
+                return await getattr(self.transport_client, func_name)(
+                    destination, group_id, *args, **kwargs
+                )
+            except HttpResponseException as e:
+                # Capture errors returned by the remote homeserver and
+                # re-throw specific errors as SynapseErrors. This is so
+                # when the remote end responds with things like 403 Not
+                # In Group, we can communicate that to the client instead
+                # of a 500.
                 raise e.to_synapse_error()
-
-            def request_failed_errback(failure):
-                failure.trap(RequestSendFailed)
+            except RequestSendFailed:
                 raise SynapseError(502, "Failed to contact group server")
 
-            d.addErrback(http_response_errback)
-            d.addErrback(request_failed_errback)
-            return d
-
     return f
 
 
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 701233ebb4..0bd2c3e37a 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -19,6 +19,7 @@
 
 import logging
 import urllib.parse
+from typing import Awaitable, Callable, Dict, List, Optional, Tuple
 
 from canonicaljson import json
 from signedjson.key import decode_verify_key_bytes
@@ -36,6 +37,7 @@ from synapse.api.errors import (
 )
 from synapse.config.emailconfig import ThreepidBehaviour
 from synapse.http.client import SimpleHttpClient
+from synapse.types import JsonDict, Requester
 from synapse.util.hash import sha256_and_url_safe_base64
 from synapse.util.stringutils import assert_valid_client_secret, random_string
 
@@ -59,23 +61,23 @@ class IdentityHandler(BaseHandler):
         self.federation_http_client = hs.get_http_client()
         self.hs = hs
 
-    async def threepid_from_creds(self, id_server, creds):
+    async def threepid_from_creds(
+        self, id_server: str, creds: Dict[str, str]
+    ) -> Optional[JsonDict]:
         """
         Retrieve and validate a threepid identifier from a "credentials" dictionary against a
         given identity server
 
         Args:
-            id_server (str): The identity server to validate 3PIDs against. Must be a
+            id_server: The identity server to validate 3PIDs against. Must be a
                 complete URL including the protocol (http(s)://)
-
-            creds (dict[str, str]): Dictionary containing the following keys:
+            creds: Dictionary containing the following keys:
                 * client_secret|clientSecret: A unique secret str provided by the client
                 * sid: The ID of the validation session
 
         Returns:
-            Deferred[dict[str,str|int]|None]: A dictionary consisting of response params to
-                the /getValidated3pid endpoint of the Identity Service API, or None if the
-                threepid was not found
+            A dictionary consisting of response params to the /getValidated3pid
+            endpoint of the Identity Service API, or None if the threepid was not found
         """
         client_secret = creds.get("client_secret") or creds.get("clientSecret")
         if not client_secret:
@@ -119,26 +121,27 @@ class IdentityHandler(BaseHandler):
         return None
 
     async def bind_threepid(
-        self, client_secret, sid, mxid, id_server, id_access_token=None, use_v2=True
-    ):
+        self,
+        client_secret: str,
+        sid: str,
+        mxid: str,
+        id_server: str,
+        id_access_token: Optional[str] = None,
+        use_v2: bool = True,
+    ) -> JsonDict:
         """Bind a 3PID to an identity server
 
         Args:
-            client_secret (str): A unique secret provided by the client
-
-            sid (str): The ID of the validation session
-
-            mxid (str): The MXID to bind the 3PID to
-
-            id_server (str): The domain of the identity server to query
-
-            id_access_token (str): The access token to authenticate to the identity
+            client_secret: A unique secret provided by the client
+            sid: The ID of the validation session
+            mxid: The MXID to bind the 3PID to
+            id_server: The domain of the identity server to query
+            id_access_token: The access token to authenticate to the identity
                 server with, if necessary. Required if use_v2 is true
-
-            use_v2 (bool): Whether to use v2 Identity Service API endpoints. Defaults to True
+            use_v2: Whether to use v2 Identity Service API endpoints. Defaults to True
 
         Returns:
-            Deferred[dict]: The response from the identity server
+            The response from the identity server
         """
         logger.debug("Proxying threepid bind request for %s to %s", mxid, id_server)
 
@@ -151,7 +154,7 @@ class IdentityHandler(BaseHandler):
         bind_data = {"sid": sid, "client_secret": client_secret, "mxid": mxid}
         if use_v2:
             bind_url = "https://%s/_matrix/identity/v2/3pid/bind" % (id_server,)
-            headers["Authorization"] = create_id_access_token_header(id_access_token)
+            headers["Authorization"] = create_id_access_token_header(id_access_token)  # type: ignore
         else:
             bind_url = "https://%s/_matrix/identity/api/v1/3pid/bind" % (id_server,)
 
@@ -187,20 +190,20 @@ class IdentityHandler(BaseHandler):
         )
         return res
 
-    async def try_unbind_threepid(self, mxid, threepid):
+    async def try_unbind_threepid(self, mxid: str, threepid: dict) -> bool:
         """Attempt to remove a 3PID from an identity server, or if one is not provided, all
         identity servers we're aware the binding is present on
 
         Args:
-            mxid (str): Matrix user ID of binding to be removed
-            threepid (dict): Dict with medium & address of binding to be
+            mxid: Matrix user ID of binding to be removed
+            threepid: Dict with medium & address of binding to be
                 removed, and an optional id_server.
 
         Raises:
             SynapseError: If we failed to contact the identity server
 
         Returns:
-            Deferred[bool]: True on success, otherwise False if the identity
+            True on success, otherwise False if the identity
             server doesn't support unbinding (or no identity server found to
             contact).
         """
@@ -223,19 +226,21 @@ class IdentityHandler(BaseHandler):
 
         return changed
 
-    async def try_unbind_threepid_with_id_server(self, mxid, threepid, id_server):
+    async def try_unbind_threepid_with_id_server(
+        self, mxid: str, threepid: dict, id_server: str
+    ) -> bool:
         """Removes a binding from an identity server
 
         Args:
-            mxid (str): Matrix user ID of binding to be removed
-            threepid (dict): Dict with medium & address of binding to be removed
-            id_server (str): Identity server to unbind from
+            mxid: Matrix user ID of binding to be removed
+            threepid: Dict with medium & address of binding to be removed
+            id_server: Identity server to unbind from
 
         Raises:
             SynapseError: If we failed to contact the identity server
 
         Returns:
-            Deferred[bool]: True on success, otherwise False if the identity
+            True on success, otherwise False if the identity
             server doesn't support unbinding
         """
         url = "https://%s/_matrix/identity/api/v1/3pid/unbind" % (id_server,)
@@ -287,23 +292,23 @@ class IdentityHandler(BaseHandler):
 
     async def send_threepid_validation(
         self,
-        email_address,
-        client_secret,
-        send_attempt,
-        send_email_func,
-        next_link=None,
-    ):
+        email_address: str,
+        client_secret: str,
+        send_attempt: int,
+        send_email_func: Callable[[str, str, str, str], Awaitable],
+        next_link: Optional[str] = None,
+    ) -> str:
         """Send a threepid validation email for password reset or
         registration purposes
 
         Args:
-            email_address (str): The user's email address
-            client_secret (str): The provided client secret
-            send_attempt (int): Which send attempt this is
-            send_email_func (func): A function that takes an email address, token,
-                                    client_secret and session_id, sends an email
-                                    and returns a Deferred.
-            next_link (str|None): The URL to redirect the user to after validation
+            email_address: The user's email address
+            client_secret: The provided client secret
+            send_attempt: Which send attempt this is
+            send_email_func: A function that takes an email address, token,
+                             client_secret and session_id, sends an email
+                             and returns an Awaitable.
+            next_link: The URL to redirect the user to after validation
 
         Returns:
             The new session_id upon success
@@ -372,17 +377,22 @@ class IdentityHandler(BaseHandler):
         return session_id
 
     async def requestEmailToken(
-        self, id_server, email, client_secret, send_attempt, next_link=None
-    ):
+        self,
+        id_server: str,
+        email: str,
+        client_secret: str,
+        send_attempt: int,
+        next_link: Optional[str] = None,
+    ) -> JsonDict:
         """
         Request an external server send an email on our behalf for the purposes of threepid
         validation.
 
         Args:
-            id_server (str): The identity server to proxy to
-            email (str): The email to send the message to
-            client_secret (str): The unique client_secret sends by the user
-            send_attempt (int): Which attempt this is
+            id_server: The identity server to proxy to
+            email: The email to send the message to
+            client_secret: The unique client_secret sends by the user
+            send_attempt: Which attempt this is
             next_link: A link to redirect the user to once they submit the token
 
         Returns:
@@ -419,22 +429,22 @@ class IdentityHandler(BaseHandler):
 
     async def requestMsisdnToken(
         self,
-        id_server,
-        country,
-        phone_number,
-        client_secret,
-        send_attempt,
-        next_link=None,
-    ):
+        id_server: str,
+        country: str,
+        phone_number: str,
+        client_secret: str,
+        send_attempt: int,
+        next_link: Optional[str] = None,
+    ) -> JsonDict:
         """
         Request an external server send an SMS message on our behalf for the purposes of
         threepid validation.
         Args:
-            id_server (str): The identity server to proxy to
-            country (str): The country code of the phone number
-            phone_number (str): The number to send the message to
-            client_secret (str): The unique client_secret sends by the user
-            send_attempt (int): Which attempt this is
+            id_server: The identity server to proxy to
+            country: The country code of the phone number
+            phone_number: The number to send the message to
+            client_secret: The unique client_secret sends by the user
+            send_attempt: Which attempt this is
             next_link: A link to redirect the user to once they submit the token
 
         Returns:
@@ -480,17 +490,18 @@ class IdentityHandler(BaseHandler):
         )
         return data
 
-    async def validate_threepid_session(self, client_secret, sid):
+    async def validate_threepid_session(
+        self, client_secret: str, sid: str
+    ) -> Optional[JsonDict]:
         """Validates a threepid session with only the client secret and session ID
         Tries validating against any configured account_threepid_delegates as well as locally.
 
         Args:
-            client_secret (str): A secret provided by the client
-
-            sid (str): The ID of the session
+            client_secret: A secret provided by the client
+            sid: The ID of the session
 
         Returns:
-            Dict[str, str|int] if validation was successful, otherwise None
+            The json response if validation was successful, otherwise None
         """
         # XXX: We shouldn't need to keep wrapping and unwrapping this value
         threepid_creds = {"client_secret": client_secret, "sid": sid}
@@ -523,23 +534,22 @@ class IdentityHandler(BaseHandler):
 
         return validation_session
 
-    async def proxy_msisdn_submit_token(self, id_server, client_secret, sid, token):
+    async def proxy_msisdn_submit_token(
+        self, id_server: str, client_secret: str, sid: str, token: str
+    ) -> JsonDict:
         """Proxy a POST submitToken request to an identity server for verification purposes
 
         Args:
-            id_server (str): The identity server URL to contact
-
-            client_secret (str): Secret provided by the client
-
-            sid (str): The ID of the session
-
-            token (str): The verification token
+            id_server: The identity server URL to contact
+            client_secret: Secret provided by the client
+            sid: The ID of the session
+            token: The verification token
 
         Raises:
             SynapseError: If we failed to contact the identity server
 
         Returns:
-            Deferred[dict]: The response dict from the identity server
+            The response dict from the identity server
         """
         body = {"client_secret": client_secret, "sid": sid, "token": token}
 
@@ -554,19 +564,25 @@ class IdentityHandler(BaseHandler):
             logger.warning("Error contacting msisdn account_threepid_delegate: %s", e)
             raise SynapseError(400, "Error contacting the identity server")
 
-    async def lookup_3pid(self, id_server, medium, address, id_access_token=None):
+    async def lookup_3pid(
+        self,
+        id_server: str,
+        medium: str,
+        address: str,
+        id_access_token: Optional[str] = None,
+    ) -> Optional[str]:
         """Looks up a 3pid in the passed identity server.
 
         Args:
-            id_server (str): The server name (including port, if required)
+            id_server: The server name (including port, if required)
                 of the identity server to use.
-            medium (str): The type of the third party identifier (e.g. "email").
-            address (str): The third party identifier (e.g. "foo@example.com").
-            id_access_token (str|None): The access token to authenticate to the identity
+            medium: The type of the third party identifier (e.g. "email").
+            address: The third party identifier (e.g. "foo@example.com").
+            id_access_token: The access token to authenticate to the identity
                 server with
 
         Returns:
-            str|None: the matrix ID of the 3pid, or None if it is not recognized.
+            the matrix ID of the 3pid, or None if it is not recognized.
         """
         if id_access_token is not None:
             try:
@@ -591,17 +607,19 @@ class IdentityHandler(BaseHandler):
 
         return await self._lookup_3pid_v1(id_server, medium, address)
 
-    async def _lookup_3pid_v1(self, id_server, medium, address):
+    async def _lookup_3pid_v1(
+        self, id_server: str, medium: str, address: str
+    ) -> Optional[str]:
         """Looks up a 3pid in the passed identity server using v1 lookup.
 
         Args:
-            id_server (str): The server name (including port, if required)
+            id_server: The server name (including port, if required)
                 of the identity server to use.
-            medium (str): The type of the third party identifier (e.g. "email").
-            address (str): The third party identifier (e.g. "foo@example.com").
+            medium: The type of the third party identifier (e.g. "email").
+            address: The third party identifier (e.g. "foo@example.com").
 
         Returns:
-            str: the matrix ID of the 3pid, or None if it is not recognized.
+            the matrix ID of the 3pid, or None if it is not recognized.
         """
         try:
             data = await self.blacklisting_http_client.get_json(
@@ -621,18 +639,20 @@ class IdentityHandler(BaseHandler):
 
         return None
 
-    async def _lookup_3pid_v2(self, id_server, id_access_token, medium, address):
+    async def _lookup_3pid_v2(
+        self, id_server: str, id_access_token: str, medium: str, address: str
+    ) -> Optional[str]:
         """Looks up a 3pid in the passed identity server using v2 lookup.
 
         Args:
-            id_server (str): The server name (including port, if required)
+            id_server: The server name (including port, if required)
                 of the identity server to use.
-            id_access_token (str): The access token to authenticate to the identity server with
-            medium (str): The type of the third party identifier (e.g. "email").
-            address (str): The third party identifier (e.g. "foo@example.com").
+            id_access_token: The access token to authenticate to the identity server with
+            medium: The type of the third party identifier (e.g. "email").
+            address: The third party identifier (e.g. "foo@example.com").
 
         Returns:
-            Deferred[str|None]: the matrix ID of the 3pid, or None if it is not recognised.
+            the matrix ID of the 3pid, or None if it is not recognised.
         """
         # Check what hashing details are supported by this identity server
         try:
@@ -757,49 +777,48 @@ class IdentityHandler(BaseHandler):
 
     async def ask_id_server_for_third_party_invite(
         self,
-        requester,
-        id_server,
-        medium,
-        address,
-        room_id,
-        inviter_user_id,
-        room_alias,
-        room_avatar_url,
-        room_join_rules,
-        room_name,
-        inviter_display_name,
-        inviter_avatar_url,
-        id_access_token=None,
-    ):
+        requester: Requester,
+        id_server: str,
+        medium: str,
+        address: str,
+        room_id: str,
+        inviter_user_id: str,
+        room_alias: str,
+        room_avatar_url: str,
+        room_join_rules: str,
+        room_name: str,
+        inviter_display_name: str,
+        inviter_avatar_url: str,
+        id_access_token: Optional[str] = None,
+    ) -> Tuple[str, List[Dict[str, str]], Dict[str, str], str]:
         """
         Asks an identity server for a third party invite.
 
         Args:
-            requester (Requester)
-            id_server (str): hostname + optional port for the identity server.
-            medium (str): The literal string "email".
-            address (str): The third party address being invited.
-            room_id (str): The ID of the room to which the user is invited.
-            inviter_user_id (str): The user ID of the inviter.
-            room_alias (str): An alias for the room, for cosmetic notifications.
-            room_avatar_url (str): The URL of the room's avatar, for cosmetic
+            requester
+            id_server: hostname + optional port for the identity server.
+            medium: The literal string "email".
+            address: The third party address being invited.
+            room_id: The ID of the room to which the user is invited.
+            inviter_user_id: The user ID of the inviter.
+            room_alias: An alias for the room, for cosmetic notifications.
+            room_avatar_url: The URL of the room's avatar, for cosmetic
                 notifications.
-            room_join_rules (str): The join rules of the email (e.g. "public").
-            room_name (str): The m.room.name of the room.
-            inviter_display_name (str): The current display name of the
+            room_join_rules: The join rules of the email (e.g. "public").
+            room_name: The m.room.name of the room.
+            inviter_display_name: The current display name of the
                 inviter.
-            inviter_avatar_url (str): The URL of the inviter's avatar.
+            inviter_avatar_url: The URL of the inviter's avatar.
             id_access_token (str|None): The access token to authenticate to the identity
                 server with
 
         Returns:
-            A deferred tuple containing:
-                token (str): The token which must be signed to prove authenticity.
+            A tuple containing:
+                token: The token which must be signed to prove authenticity.
                 public_keys ([{"public_key": str, "key_validity_url": str}]):
                     public_key is a base64-encoded ed25519 public key.
                 fallback_public_key: One element from public_keys.
-                display_name (str): A user-friendly name to represent the invited
-                    user.
+                display_name: A user-friendly name to represent the invited user.
         """
         invite_config = {
             "medium": medium,
@@ -896,15 +915,15 @@ class IdentityHandler(BaseHandler):
         return token, public_keys, fallback_public_key, display_name
 
 
-def create_id_access_token_header(id_access_token):
+def create_id_access_token_header(id_access_token: str) -> List[str]:
     """Create an Authorization header for passing to SimpleHttpClient as the header value
     of an HTTP request.
 
     Args:
-        id_access_token (str): An identity server access token.
+        id_access_token: An identity server access token.
 
     Returns:
-        list[str]: The ascii-encoded bearer token encased in a list.
+        The ascii-encoded bearer token encased in a list.
     """
     # Prefix with Bearer
     bearer_token = "Bearer %s" % id_access_token
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index c47764a4ce..e451d6dc86 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -15,12 +15,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-from typing import TYPE_CHECKING, Optional, Tuple
+from typing import TYPE_CHECKING, List, Optional, Tuple
 
 from canonicaljson import encode_canonical_json, json
 
-from twisted.internet import defer
-from twisted.internet.defer import succeed
 from twisted.internet.interfaces import IDelayedCall
 
 from synapse import event_auth
@@ -41,13 +39,22 @@ from synapse.api.errors import (
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions
 from synapse.api.urls import ConsentURIBuilder
 from synapse.events import EventBase
+from synapse.events.builder import EventBuilder
+from synapse.events.snapshot import EventContext
 from synapse.events.validator import EventValidator
 from synapse.logging.context import run_in_background
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.replication.http.send_event import ReplicationSendEventRestServlet
 from synapse.storage.data_stores.main.events_worker import EventRedactBehaviour
 from synapse.storage.state import StateFilter
-from synapse.types import Collection, RoomAlias, UserID, create_requester
+from synapse.types import (
+    Collection,
+    Requester,
+    RoomAlias,
+    StreamToken,
+    UserID,
+    create_requester,
+)
 from synapse.util.async_helpers import Linearizer
 from synapse.util.frozenutils import frozendict_json_encoder
 from synapse.util.metrics import measure_func
@@ -84,14 +91,22 @@ class MessageHandler(object):
                 "_schedule_next_expiry", self._schedule_next_expiry
             )
 
-    @defer.inlineCallbacks
-    def get_room_data(
-        self, user_id=None, room_id=None, event_type=None, state_key="", is_guest=False
-    ):
+    async def get_room_data(
+        self,
+        user_id: str = None,
+        room_id: str = None,
+        event_type: Optional[str] = None,
+        state_key: str = "",
+        is_guest: bool = False,
+    ) -> dict:
         """ Get data from a room.
 
         Args:
-            event : The room path event
+            user_id
+            room_id
+            event_type
+            state_key
+            is_guest
         Returns:
             The path data content.
         Raises:
@@ -100,30 +115,29 @@ class MessageHandler(object):
         (
             membership,
             membership_event_id,
-        ) = yield self.auth.check_user_in_room_or_world_readable(
+        ) = await self.auth.check_user_in_room_or_world_readable(
             room_id, user_id, allow_departed_users=True
         )
 
         if membership == Membership.JOIN:
-            data = yield self.state.get_current_state(room_id, event_type, state_key)
+            data = await self.state.get_current_state(room_id, event_type, state_key)
         elif membership == Membership.LEAVE:
             key = (event_type, state_key)
-            room_state = yield self.state_store.get_state_for_events(
+            room_state = await self.state_store.get_state_for_events(
                 [membership_event_id], StateFilter.from_types([key])
             )
             data = room_state[membership_event_id].get(key)
 
         return data
 
-    @defer.inlineCallbacks
-    def get_state_events(
+    async def get_state_events(
         self,
-        user_id,
-        room_id,
-        state_filter=StateFilter.all(),
-        at_token=None,
-        is_guest=False,
-    ):
+        user_id: str,
+        room_id: str,
+        state_filter: StateFilter = StateFilter.all(),
+        at_token: Optional[StreamToken] = None,
+        is_guest: bool = False,
+    ) -> List[dict]:
         """Retrieve all state events for a given room. If the user is
         joined to the room then return the current state. If the user has
         left the room return the state events from when they left. If an explicit
@@ -131,15 +145,14 @@ class MessageHandler(object):
         visible.
 
         Args:
-            user_id(str): The user requesting state events.
-            room_id(str): The room ID to get all state events from.
-            state_filter (StateFilter): The state filter used to fetch state
-                from the database.
-            at_token(StreamToken|None): the stream token of the at which we are requesting
+            user_id: The user requesting state events.
+            room_id: The room ID to get all state events from.
+            state_filter: The state filter used to fetch state from the database.
+            at_token: the stream token of the at which we are requesting
                 the stats. If the user is not allowed to view the state as of that
                 stream token, we raise a 403 SynapseError. If None, returns the current
                 state based on the current_state_events table.
-            is_guest(bool): whether this user is a guest
+            is_guest: whether this user is a guest
         Returns:
             A list of dicts representing state events. [{}, {}, {}]
         Raises:
@@ -153,20 +166,20 @@ class MessageHandler(object):
             # get_recent_events_for_room operates by topo ordering. This therefore
             # does not reliably give you the state at the given stream position.
             # (https://github.com/matrix-org/synapse/issues/3305)
-            last_events, _ = yield self.store.get_recent_events_for_room(
+            last_events, _ = await self.store.get_recent_events_for_room(
                 room_id, end_token=at_token.room_key, limit=1
             )
 
             if not last_events:
                 raise NotFoundError("Can't find event for token %s" % (at_token,))
 
-            visible_events = yield filter_events_for_client(
+            visible_events = await filter_events_for_client(
                 self.storage, user_id, last_events, filter_send_to_client=False
             )
 
             event = last_events[0]
             if visible_events:
-                room_state = yield self.state_store.get_state_for_events(
+                room_state = await self.state_store.get_state_for_events(
                     [event.event_id], state_filter=state_filter
                 )
                 room_state = room_state[event.event_id]
@@ -180,23 +193,23 @@ class MessageHandler(object):
             (
                 membership,
                 membership_event_id,
-            ) = yield self.auth.check_user_in_room_or_world_readable(
+            ) = await self.auth.check_user_in_room_or_world_readable(
                 room_id, user_id, allow_departed_users=True
             )
 
             if membership == Membership.JOIN:
-                state_ids = yield self.store.get_filtered_current_state_ids(
+                state_ids = await self.store.get_filtered_current_state_ids(
                     room_id, state_filter=state_filter
                 )
-                room_state = yield self.store.get_events(state_ids.values())
+                room_state = await self.store.get_events(state_ids.values())
             elif membership == Membership.LEAVE:
-                room_state = yield self.state_store.get_state_for_events(
+                room_state = await self.state_store.get_state_for_events(
                     [membership_event_id], state_filter=state_filter
                 )
                 room_state = room_state[membership_event_id]
 
         now = self.clock.time_msec()
-        events = yield self._event_serializer.serialize_events(
+        events = await self._event_serializer.serialize_events(
             room_state.values(),
             now,
             # We don't bother bundling aggregations in when asked for state
@@ -205,15 +218,14 @@ class MessageHandler(object):
         )
         return events
 
-    @defer.inlineCallbacks
-    def get_joined_members(self, requester, room_id):
+    async def get_joined_members(self, requester: Requester, room_id: str) -> dict:
         """Get all the joined members in the room and their profile information.
 
         If the user has left the room return the state events from when they left.
 
         Args:
-            requester(Requester): The user requesting state events.
-            room_id(str): The room ID to get all state events from.
+            requester: The user requesting state events.
+            room_id: The room ID to get all state events from.
         Returns:
             A dict of user_id to profile info
         """
@@ -221,7 +233,7 @@ class MessageHandler(object):
         if not requester.app_service:
             # We check AS auth after fetching the room membership, as it
             # requires us to pull out all joined members anyway.
-            membership, _ = yield self.auth.check_user_in_room_or_world_readable(
+            membership, _ = await self.auth.check_user_in_room_or_world_readable(
                 room_id, user_id, allow_departed_users=True
             )
             if membership != Membership.JOIN:
@@ -229,7 +241,7 @@ class MessageHandler(object):
                     "Getting joined members after leaving is not implemented"
                 )
 
-        users_with_profile = yield self.state.get_current_users_in_room(room_id)
+        users_with_profile = await self.state.get_current_users_in_room(room_id)
 
         # If this is an AS, double check that they are allowed to see the members.
         # This can either be because the AS user is in the room or because there
@@ -250,7 +262,7 @@ class MessageHandler(object):
             for user_id, profile in users_with_profile.items()
         }
 
-    def maybe_schedule_expiry(self, event):
+    def maybe_schedule_expiry(self, event: EventBase):
         """Schedule the expiry of an event if there's not already one scheduled,
         or if the one running is for an event that will expire after the provided
         timestamp.
@@ -259,7 +271,7 @@ class MessageHandler(object):
         the master process, and therefore needs to be run on there.
 
         Args:
-            event (EventBase): The event to schedule the expiry of.
+            event: The event to schedule the expiry of.
         """
 
         expiry_ts = event.content.get(EventContentFields.SELF_DESTRUCT_AFTER)
@@ -270,8 +282,7 @@ class MessageHandler(object):
         # a task scheduled for a timestamp that's sooner than the provided one.
         self._schedule_expiry_for_event(event.event_id, expiry_ts)
 
-    @defer.inlineCallbacks
-    def _schedule_next_expiry(self):
+    async def _schedule_next_expiry(self):
         """Retrieve the ID and the expiry timestamp of the next event to be expired,
         and schedule an expiry task for it.
 
@@ -279,18 +290,18 @@ class MessageHandler(object):
         future call to save_expiry_ts can schedule a new expiry task.
         """
         # Try to get the expiry timestamp of the next event to expire.
-        res = yield self.store.get_next_event_to_expire()
+        res = await self.store.get_next_event_to_expire()
         if res:
             event_id, expiry_ts = res
             self._schedule_expiry_for_event(event_id, expiry_ts)
 
-    def _schedule_expiry_for_event(self, event_id, expiry_ts):
+    def _schedule_expiry_for_event(self, event_id: str, expiry_ts: int):
         """Schedule an expiry task for the provided event if there's not already one
         scheduled at a timestamp that's sooner than the provided one.
 
         Args:
-            event_id (str): The ID of the event to expire.
-            expiry_ts (int): The timestamp at which to expire the event.
+            event_id: The ID of the event to expire.
+            expiry_ts: The timestamp at which to expire the event.
         """
         if self._scheduled_expiry:
             # If the provided timestamp refers to a time before the scheduled time of the
@@ -320,8 +331,7 @@ class MessageHandler(object):
             event_id,
         )
 
-    @defer.inlineCallbacks
-    def _expire_event(self, event_id):
+    async def _expire_event(self, event_id: str):
         """Retrieve and expire an event that needs to be expired from the database.
 
         If the event doesn't exist in the database, log it and delete the expiry date
@@ -336,12 +346,12 @@ class MessageHandler(object):
         try:
             # Expire the event if we know about it. This function also deletes the expiry
             # date from the database in the same database transaction.
-            yield self.store.expire_event(event_id)
+            await self.store.expire_event(event_id)
         except Exception as e:
             logger.error("Could not expire event %s: %r", event_id, e)
 
         # Schedule the expiry of the next event to expire.
-        yield self._schedule_next_expiry()
+        await self._schedule_next_expiry()
 
 
 # The duration (in ms) after which rooms should be removed
@@ -423,16 +433,15 @@ class EventCreationHandler(object):
 
         self._dummy_events_threshold = hs.config.dummy_events_threshold
 
-    @defer.inlineCallbacks
-    def create_event(
+    async def create_event(
         self,
-        requester,
-        event_dict,
-        token_id=None,
-        txn_id=None,
+        requester: Requester,
+        event_dict: dict,
+        token_id: Optional[str] = None,
+        txn_id: Optional[str] = None,
         prev_event_ids: Optional[Collection[str]] = None,
-        require_consent=True,
-    ):
+        require_consent: bool = True,
+    ) -> Tuple[EventBase, EventContext]:
         """
         Given a dict from a client, create a new event.
 
@@ -443,31 +452,29 @@ class EventCreationHandler(object):
 
         Args:
             requester
-            event_dict (dict): An entire event
-            token_id (str)
-            txn_id (str)
-
+            event_dict: An entire event
+            token_id
+            txn_id
             prev_event_ids:
                 the forward extremities to use as the prev_events for the
                 new event.
 
                 If None, they will be requested from the database.
-
-            require_consent (bool): Whether to check if the requester has
-                consented to privacy policy.
+            require_consent: Whether to check if the requester has
+                consented to the privacy policy.
         Raises:
             ResourceLimitError if server is blocked to some resource being
             exceeded
         Returns:
-            Tuple of created event (FrozenEvent), Context
+            Tuple of created event, Context
         """
-        yield self.auth.check_auth_blocking(requester.user.to_string())
+        await self.auth.check_auth_blocking(requester.user.to_string())
 
         if event_dict["type"] == EventTypes.Create and event_dict["state_key"] == "":
             room_version = event_dict["content"]["room_version"]
         else:
             try:
-                room_version = yield self.store.get_room_version_id(
+                room_version = await self.store.get_room_version_id(
                     event_dict["room_id"]
                 )
             except NotFoundError:
@@ -488,15 +495,11 @@ class EventCreationHandler(object):
 
                 try:
                     if "displayname" not in content:
-                        displayname = yield defer.ensureDeferred(
-                            profile.get_displayname(target)
-                        )
+                        displayname = await profile.get_displayname(target)
                         if displayname is not None:
                             content["displayname"] = displayname
                     if "avatar_url" not in content:
-                        avatar_url = yield defer.ensureDeferred(
-                            profile.get_avatar_url(target)
-                        )
+                        avatar_url = await profile.get_avatar_url(target)
                         if avatar_url is not None:
                             content["avatar_url"] = avatar_url
                 except Exception as e:
@@ -504,9 +507,9 @@ class EventCreationHandler(object):
                         "Failed to get profile information for %r: %s", target, e
                     )
 
-        is_exempt = yield self._is_exempt_from_privacy_policy(builder, requester)
+        is_exempt = await self._is_exempt_from_privacy_policy(builder, requester)
         if require_consent and not is_exempt:
-            yield self.assert_accepted_privacy_policy(requester)
+            await self.assert_accepted_privacy_policy(requester)
 
         if token_id is not None:
             builder.internal_metadata.token_id = token_id
@@ -514,7 +517,7 @@ class EventCreationHandler(object):
         if txn_id is not None:
             builder.internal_metadata.txn_id = txn_id
 
-        event, context = yield self.create_new_client_event(
+        event, context = await self.create_new_client_event(
             builder=builder, requester=requester, prev_event_ids=prev_event_ids,
         )
 
@@ -530,10 +533,10 @@ class EventCreationHandler(object):
             # federation as well as those created locally. As of room v3, aliases events
             # can be created by users that are not in the room, therefore we have to
             # tolerate them in event_auth.check().
-            prev_state_ids = yield context.get_prev_state_ids()
+            prev_state_ids = await context.get_prev_state_ids()
             prev_event_id = prev_state_ids.get((EventTypes.Member, event.sender))
             prev_event = (
-                yield self.store.get_event(prev_event_id, allow_none=True)
+                await self.store.get_event(prev_event_id, allow_none=True)
                 if prev_event_id
                 else None
             )
@@ -556,37 +559,36 @@ class EventCreationHandler(object):
 
         return (event, context)
 
-    def _is_exempt_from_privacy_policy(self, builder, requester):
+    async def _is_exempt_from_privacy_policy(
+        self, builder: EventBuilder, requester: Requester
+    ) -> bool:
         """"Determine if an event to be sent is exempt from having to consent
         to the privacy policy
 
         Args:
-            builder (synapse.events.builder.EventBuilder): event being created
-            requester (Requster): user requesting this event
+            builder: event being created
+            requester: user requesting this event
 
         Returns:
-            Deferred[bool]: true if the event can be sent without the user
-                consenting
+            true if the event can be sent without the user consenting
         """
         # the only thing the user can do is join the server notices room.
         if builder.type == EventTypes.Member:
             membership = builder.content.get("membership", None)
             if membership == Membership.JOIN:
-                return self._is_server_notices_room(builder.room_id)
+                return await self._is_server_notices_room(builder.room_id)
             elif membership == Membership.LEAVE:
                 # the user is always allowed to leave (but not kick people)
                 return builder.state_key == requester.user.to_string()
-        return succeed(False)
+        return False
 
-    @defer.inlineCallbacks
-    def _is_server_notices_room(self, room_id):
+    async def _is_server_notices_room(self, room_id: str) -> bool:
         if self.config.server_notices_mxid is None:
             return False
-        user_ids = yield self.store.get_users_in_room(room_id)
+        user_ids = await self.store.get_users_in_room(room_id)
         return self.config.server_notices_mxid in user_ids
 
-    @defer.inlineCallbacks
-    def assert_accepted_privacy_policy(self, requester):
+    async def assert_accepted_privacy_policy(self, requester: Requester) -> None:
         """Check if a user has accepted the privacy policy
 
         Called when the given user is about to do something that requires
@@ -595,12 +597,10 @@ class EventCreationHandler(object):
         raised.
 
         Args:
-            requester (synapse.types.Requester):
-                The user making the request
+            requester: The user making the request
 
         Returns:
-            Deferred[None]: returns normally if the user has consented or is
-                exempt
+            Returns normally if the user has consented or is exempt
 
         Raises:
             ConsentNotGivenError: if the user has not given consent yet
@@ -621,7 +621,7 @@ class EventCreationHandler(object):
         ):
             return
 
-        u = yield self.store.get_user_by_id(user_id)
+        u = await self.store.get_user_by_id(user_id)
         assert u is not None
         if u["user_type"] in (UserTypes.SUPPORT, UserTypes.BOT):
             # support and bot users are not required to consent
@@ -639,16 +639,20 @@ class EventCreationHandler(object):
         raise ConsentNotGivenError(msg=msg, consent_uri=consent_uri)
 
     async def send_nonmember_event(
-        self, requester, event, context, ratelimit=True
+        self,
+        requester: Requester,
+        event: EventBase,
+        context: EventContext,
+        ratelimit: bool = True,
     ) -> int:
         """
         Persists and notifies local clients and federation of an event.
 
         Args:
-            event (FrozenEvent) the event to send.
-            context (Context) the context of the event.
-            ratelimit (bool): Whether to rate limit this send.
-            is_guest (bool): Whether the sender is a guest.
+            requester
+            event the event to send.
+            context: the context of the event.
+            ratelimit: Whether to rate limit this send.
 
         Return:
             The stream_id of the persisted event.
@@ -676,19 +680,20 @@ class EventCreationHandler(object):
             requester=requester, event=event, context=context, ratelimit=ratelimit
         )
 
-    @defer.inlineCallbacks
-    def deduplicate_state_event(self, event, context):
+    async def deduplicate_state_event(
+        self, event: EventBase, context: EventContext
+    ) -> None:
         """
         Checks whether event is in the latest resolved state in context.
 
         If so, returns the version of the event in context.
         Otherwise, returns None.
         """
-        prev_state_ids = yield context.get_prev_state_ids()
+        prev_state_ids = await context.get_prev_state_ids()
         prev_event_id = prev_state_ids.get((event.type, event.state_key))
         if not prev_event_id:
             return
-        prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
+        prev_event = await self.store.get_event(prev_event_id, allow_none=True)
         if not prev_event:
             return
 
@@ -700,7 +705,11 @@ class EventCreationHandler(object):
         return
 
     async def create_and_send_nonmember_event(
-        self, requester, event_dict, ratelimit=True, txn_id=None
+        self,
+        requester: Requester,
+        event_dict: EventBase,
+        ratelimit: bool = True,
+        txn_id: Optional[str] = None,
     ) -> Tuple[EventBase, int]:
         """
         Creates an event, then sends it.
@@ -730,17 +739,17 @@ class EventCreationHandler(object):
         return event, stream_id
 
     @measure_func("create_new_client_event")
-    @defer.inlineCallbacks
-    def create_new_client_event(
-        self, builder, requester=None, prev_event_ids: Optional[Collection[str]] = None
-    ):
+    async def create_new_client_event(
+        self,
+        builder: EventBuilder,
+        requester: Optional[Requester] = None,
+        prev_event_ids: Optional[Collection[str]] = None,
+    ) -> Tuple[EventBase, EventContext]:
         """Create a new event for a local client
 
         Args:
-            builder (EventBuilder):
-
-            requester (synapse.types.Requester|None):
-
+            builder:
+            requester:
             prev_event_ids:
                 the forward extremities to use as the prev_events for the
                 new event.
@@ -748,7 +757,7 @@ class EventCreationHandler(object):
                 If None, they will be requested from the database.
 
         Returns:
-            Deferred[(synapse.events.EventBase, synapse.events.snapshot.EventContext)]
+            Tuple of created event, context
         """
 
         if prev_event_ids is not None:
@@ -757,10 +766,10 @@ class EventCreationHandler(object):
                 % (len(prev_event_ids),)
             )
         else:
-            prev_event_ids = yield self.store.get_prev_events_for_room(builder.room_id)
+            prev_event_ids = await self.store.get_prev_events_for_room(builder.room_id)
 
-        event = yield builder.build(prev_event_ids=prev_event_ids)
-        context = yield self.state.compute_event_context(event)
+        event = await builder.build(prev_event_ids=prev_event_ids)
+        context = await self.state.compute_event_context(event)
         if requester:
             context.app_service = requester.app_service
 
@@ -774,7 +783,7 @@ class EventCreationHandler(object):
             relates_to = relation["event_id"]
             aggregation_key = relation["key"]
 
-            already_exists = yield self.store.has_user_annotated_event(
+            already_exists = await self.store.has_user_annotated_event(
                 relates_to, event.type, aggregation_key, event.sender
             )
             if already_exists:
@@ -786,7 +795,12 @@ class EventCreationHandler(object):
 
     @measure_func("handle_new_client_event")
     async def handle_new_client_event(
-        self, requester, event, context, ratelimit=True, extra_users=[]
+        self,
+        requester: Requester,
+        event: EventBase,
+        context: EventContext,
+        ratelimit: bool = True,
+        extra_users: List[UserID] = [],
     ) -> int:
         """Processes a new event. This includes checking auth, persisting it,
         notifying users, sending to remote servers, etc.
@@ -795,11 +809,11 @@ class EventCreationHandler(object):
         processing.
 
         Args:
-            requester (Requester)
-            event (FrozenEvent)
-            context (EventContext)
-            ratelimit (bool)
-            extra_users (list(UserID)): Any extra users to notify about event
+            requester
+            event
+            context
+            ratelimit
+            extra_users: Any extra users to notify about event
 
         Return:
             The stream_id of the persisted event.
@@ -843,9 +857,6 @@ class EventCreationHandler(object):
 
         await self.action_generator.handle_push_actions_for_event(event, context)
 
-        # reraise does not allow inlineCallbacks to preserve the stacktrace, so we
-        # hack around with a try/finally instead.
-        success = False
         try:
             # If we're a worker we need to hit out to the master.
             if not self._is_event_writer:
@@ -861,27 +872,24 @@ class EventCreationHandler(object):
                 )
                 stream_id = result["stream_id"]
                 event.internal_metadata.stream_ordering = stream_id
-                success = True
                 return stream_id
 
             stream_id = await self.persist_and_notify_client_event(
                 requester, event, context, ratelimit=ratelimit, extra_users=extra_users
             )
 
-            success = True
             return stream_id
-        finally:
-            if not success:
-                # Ensure that we actually remove the entries in the push actions
-                # staging area, if we calculated them.
-                run_in_background(
-                    self.store.remove_push_actions_from_staging, event.event_id
-                )
+        except Exception:
+            # Ensure that we actually remove the entries in the push actions
+            # staging area, if we calculated them.
+            run_in_background(
+                self.store.remove_push_actions_from_staging, event.event_id
+            )
+            raise
 
-    @defer.inlineCallbacks
-    def _validate_canonical_alias(
-        self, directory_handler, room_alias_str, expected_room_id
-    ):
+    async def _validate_canonical_alias(
+        self, directory_handler, room_alias_str: str, expected_room_id: str
+    ) -> None:
         """
         Ensure that the given room alias points to the expected room ID.
 
@@ -892,9 +900,7 @@ class EventCreationHandler(object):
         """
         room_alias = RoomAlias.from_string(room_alias_str)
         try:
-            mapping = yield defer.ensureDeferred(
-                directory_handler.get_association(room_alias)
-            )
+            mapping = await directory_handler.get_association(room_alias)
         except SynapseError as e:
             # Turn M_NOT_FOUND errors into M_BAD_ALIAS errors.
             if e.errcode == Codes.NOT_FOUND:
@@ -913,7 +919,12 @@ class EventCreationHandler(object):
             )
 
     async def persist_and_notify_client_event(
-        self, requester, event, context, ratelimit=True, extra_users=[]
+        self,
+        requester: Requester,
+        event: EventBase,
+        context: EventContext,
+        ratelimit: bool = True,
+        extra_users: List[UserID] = [],
     ) -> int:
         """Called when we have fully built the event, have already
         calculated the push actions for the event, and checked auth.
@@ -1106,7 +1117,7 @@ class EventCreationHandler(object):
 
         return event_stream_id
 
-    async def _bump_active_time(self, user):
+    async def _bump_active_time(self, user: UserID) -> None:
         try:
             presence = self.hs.get_presence_handler()
             await presence.bump_presence_active_time(user)
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index d2f25ae12a..b3a3bb8c3f 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -30,8 +30,6 @@ from typing import Dict, Iterable, List, Set, Tuple
 from prometheus_client import Counter
 from typing_extensions import ContextManager
 
-from twisted.internet import defer
-
 import synapse.metrics
 from synapse.api.constants import EventTypes, Membership, PresenceState
 from synapse.api.errors import SynapseError
@@ -39,6 +37,8 @@ from synapse.logging.context import run_in_background
 from synapse.logging.utils import log_function
 from synapse.metrics import LaterGauge
 from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.state import StateHandler
+from synapse.storage.data_stores.main import DataStore
 from synapse.storage.presence import UserPresenceState
 from synapse.types import JsonDict, UserID, get_domain_from_id
 from synapse.util.async_helpers import Linearizer
@@ -895,16 +895,9 @@ class PresenceHandler(BasePresenceHandler):
 
             await self._on_user_joined_room(room_id, state_key)
 
-    async def _on_user_joined_room(self, room_id, user_id):
+    async def _on_user_joined_room(self, room_id: str, user_id: str) -> None:
         """Called when we detect a user joining the room via the current state
         delta stream.
-
-        Args:
-            room_id (str)
-            user_id (str)
-
-        Returns:
-            Deferred
         """
 
         if self.is_mine_id(user_id):
@@ -935,8 +928,8 @@ class PresenceHandler(BasePresenceHandler):
             # TODO: Check that this is actually a new server joining the
             # room.
 
-            user_ids = await self.state.get_current_users_in_room(room_id)
-            user_ids = list(filter(self.is_mine_id, user_ids))
+            users = await self.state.get_current_users_in_room(room_id)
+            user_ids = list(filter(self.is_mine_id, users))
 
             states_d = await self.current_state_for_users(user_ids)
 
@@ -1296,22 +1289,24 @@ def handle_update(prev_state, new_state, is_mine, wheel_timer, now):
     return new_state, persist_and_notify, federation_ping
 
 
-@defer.inlineCallbacks
-def get_interested_parties(store, states):
+async def get_interested_parties(
+    store: DataStore, states: List[UserPresenceState]
+) -> Tuple[Dict[str, List[UserPresenceState]], Dict[str, List[UserPresenceState]]]:
     """Given a list of states return which entities (rooms, users)
     are interested in the given states.
 
     Args:
-        states (list(UserPresenceState))
+        store
+        states
 
     Returns:
-        2-tuple: `(room_ids_to_states, users_to_states)`,
+        A 2-tuple of `(room_ids_to_states, users_to_states)`,
         with each item being a dict of `entity_name` -> `[UserPresenceState]`
     """
     room_ids_to_states = {}  # type: Dict[str, List[UserPresenceState]]
     users_to_states = {}  # type: Dict[str, List[UserPresenceState]]
     for state in states:
-        room_ids = yield store.get_rooms_for_user(state.user_id)
+        room_ids = await store.get_rooms_for_user(state.user_id)
         for room_id in room_ids:
             room_ids_to_states.setdefault(room_id, []).append(state)
 
@@ -1321,20 +1316,22 @@ def get_interested_parties(store, states):
     return room_ids_to_states, users_to_states
 
 
-@defer.inlineCallbacks
-def get_interested_remotes(store, states, state_handler):
+async def get_interested_remotes(
+    store: DataStore, states: List[UserPresenceState], state_handler: StateHandler
+) -> List[Tuple[List[str], List[UserPresenceState]]]:
     """Given a list of presence states figure out which remote servers
     should be sent which.
 
     All the presence states should be for local users only.
 
     Args:
-        store (DataStore)
-        states (list(UserPresenceState))
+        store
+        states
+        state_handler
 
     Returns:
-        Deferred list of ([destinations], [UserPresenceState]), where for
-        each row the list of UserPresenceState should be sent to each
+        A list of 2-tuples of destinations and states, where for
+        each tuple the list of UserPresenceState should be sent to each
         destination
     """
     hosts_and_states = []
@@ -1342,10 +1339,10 @@ def get_interested_remotes(store, states, state_handler):
     # First we look up the rooms each user is in (as well as any explicit
     # subscriptions), then for each distinct room we look up the remote
     # hosts in those rooms.
-    room_ids_to_states, users_to_states = yield get_interested_parties(store, states)
+    room_ids_to_states, users_to_states = await get_interested_parties(store, states)
 
     for room_id, states in room_ids_to_states.items():
-        hosts = yield state_handler.get_current_hosts_in_room(room_id)
+        hosts = await state_handler.get_current_hosts_in_room(room_id)
         hosts_and_states.append((hosts, states))
 
     for user_id, states in users_to_states.items():
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index fb37d371ad..0c5b99234d 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -119,7 +119,7 @@ class RoomCreationHandler(BaseHandler):
 
     async def upgrade_room(
         self, requester: Requester, old_room_id: str, new_version: RoomVersion
-    ):
+    ) -> str:
         """Replace a room with a new room with a different version
 
         Args:
@@ -128,7 +128,7 @@ class RoomCreationHandler(BaseHandler):
             new_version: the new room version to use
 
         Returns:
-            Deferred[unicode]: the new room id
+            the new room id
         """
         await self.ratelimit(requester)
 
@@ -239,7 +239,7 @@ class RoomCreationHandler(BaseHandler):
         old_room_id: str,
         new_room_id: str,
         old_room_state: StateMap[str],
-    ):
+    ) -> None:
         """Send updated power levels in both rooms after an upgrade
 
         Args:
@@ -247,9 +247,6 @@ class RoomCreationHandler(BaseHandler):
             old_room_id: the id of the room to be replaced
             new_room_id: the id of the replacement room
             old_room_state: the state map for the old room
-
-        Returns:
-            Deferred
         """
         old_room_pl_event_id = old_room_state.get((EventTypes.PowerLevels, ""))
 
@@ -322,7 +319,7 @@ class RoomCreationHandler(BaseHandler):
         new_room_id: str,
         new_room_version: RoomVersion,
         tombstone_event_id: str,
-    ):
+    ) -> None:
         """Populate a new room based on an old room
 
         Args:
@@ -332,8 +329,6 @@ class RoomCreationHandler(BaseHandler):
                 created with _gemerate_room_id())
             new_room_version: the new room version to use
             tombstone_event_id: the ID of the tombstone event in the old room.
-        Returns:
-            Deferred
         """
         user_id = requester.user.to_string()
 
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index a1a8fa1d3b..78586a0a1e 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -469,26 +469,39 @@ class RoomMemberHandler(object):
                     user_id=target.to_string(), room_id=room_id
                 )  # type: Optional[RoomsForUser]
                 if not invite:
+                    logger.info(
+                        "%s sent a leave request to %s, but that is not an active room "
+                        "on this server, and there is no pending invite",
+                        target,
+                        room_id,
+                    )
+
                     raise SynapseError(404, "Not a known room")
 
                 logger.info(
                     "%s rejects invite to %s from %s", target, room_id, invite.sender
                 )
 
-                if self.hs.is_mine_id(invite.sender):
-                    # the inviter was on our server, but has now left. Carry on
-                    # with the normal rejection codepath.
-                    #
-                    # This is a bit of a hack, because the room might still be
-                    # active on other servers.
-                    pass
-                else:
+                if not self.hs.is_mine_id(invite.sender):
                     # send the rejection to the inviter's HS (with fallback to
                     # local event)
                     return await self.remote_reject_invite(
                         invite.event_id, txn_id, requester, content,
                     )
 
+                # the inviter was on our server, but has now left. Carry on
+                # with the normal rejection codepath, which will also send the
+                # rejection out to any other servers we believe are still in the room.
+
+                # thanks to overzealous cleaning up of event_forward_extremities in
+                # `delete_old_current_state_events`, it's possible to end up with no
+                # forward extremities here. If that happens, let's just hang the
+                # rejection off the invite event.
+                #
+                # see: https://github.com/matrix-org/synapse/issues/7139
+                if len(latest_event_ids) == 0:
+                    latest_event_ids = [invite.event_id]
+
         return await self._local_membership_update(
             requester=requester,
             target=target,
@@ -952,7 +965,11 @@ class RoomMemberMasterHandler(RoomMemberHandler):
         if len(remote_room_hosts) == 0:
             raise SynapseError(404, "No known servers")
 
-        if self.hs.config.limit_remote_rooms.enabled:
+        check_complexity = self.hs.config.limit_remote_rooms.enabled
+        if check_complexity and self.hs.config.limit_remote_rooms.admins_can_join:
+            check_complexity = not await self.hs.auth.is_server_admin(user)
+
+        if check_complexity:
             # Fetch the room complexity
             too_complex = await self._is_remote_room_too_complex(
                 room_id, remote_room_hosts
@@ -975,7 +992,7 @@ class RoomMemberMasterHandler(RoomMemberHandler):
 
         # Check the room we just joined wasn't too large, if we didn't fetch the
         # complexity of it before.
-        if self.hs.config.limit_remote_rooms.enabled:
+        if check_complexity:
             if too_complex is False:
                 # We checked, and we're under the limit.
                 return event_id, stream_id
diff --git a/synapse/handlers/saml_handler.py b/synapse/handlers/saml_handler.py
index abecaa8313..2d506dc1f2 100644
--- a/synapse/handlers/saml_handler.py
+++ b/synapse/handlers/saml_handler.py
@@ -96,6 +96,9 @@ class SamlHandler:
             relay_state=client_redirect_url
         )
 
+        # Since SAML sessions timeout it is useful to log when they were created.
+        logger.info("Initiating a new SAML session: %s" % (reqid,))
+
         now = self._clock.time_msec()
         self._outstanding_requests_dict[reqid] = Saml2SessionData(
             creation_time=now, ui_auth_session_id=ui_auth_session_id,
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index 4d40d3ac9c..9b312a1558 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -15,6 +15,7 @@
 
 import itertools
 import logging
+from typing import Iterable
 
 from unpaddedbase64 import decode_base64, encode_base64
 
@@ -37,7 +38,7 @@ class SearchHandler(BaseHandler):
         self.state_store = self.storage.state
         self.auth = hs.get_auth()
 
-    async def get_old_rooms_from_upgraded_room(self, room_id):
+    async def get_old_rooms_from_upgraded_room(self, room_id: str) -> Iterable[str]:
         """Retrieves room IDs of old rooms in the history of an upgraded room.
 
         We do so by checking the m.room.create event of the room for a
@@ -48,10 +49,10 @@ class SearchHandler(BaseHandler):
         The full list of all found rooms in then returned.
 
         Args:
-            room_id (str): id of the room to search through.
+            room_id: id of the room to search through.
 
         Returns:
-            Deferred[iterable[str]]: predecessor room ids
+            Predecessor room ids
         """
 
         historical_room_ids = []
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 365d7323e4..eaa4eeadf7 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -103,6 +103,7 @@ class JoinedSyncResult:
     account_data = attr.ib(type=List[JsonDict])
     unread_notifications = attr.ib(type=JsonDict)
     summary = attr.ib(type=Optional[JsonDict])
+    unread_count = attr.ib(type=int)
 
     def __nonzero__(self) -> bool:
         """Make the result appear empty if there are no updates. This is used
@@ -421,10 +422,6 @@ class SyncHandler(object):
         potential_recents: Optional[List[EventBase]] = None,
         newly_joined_room: bool = False,
     ) -> TimelineBatch:
-        """
-        Returns:
-            a Deferred TimelineBatch
-        """
         with Measure(self.clock, "load_filtered_recents"):
             timeline_limit = sync_config.filter_collection.timeline_limit()
             block_all_timeline = (
@@ -1890,6 +1887,10 @@ class SyncHandler(object):
 
         if room_builder.rtype == "joined":
             unread_notifications = {}  # type: Dict[str, str]
+
+            unread_count = await self.store.get_unread_message_count_for_user(
+                room_id, sync_config.user.to_string(),
+            )
             room_sync = JoinedSyncResult(
                 room_id=room_id,
                 timeline=batch,
@@ -1898,6 +1899,7 @@ class SyncHandler(object):
                 account_data=account_data_events,
                 unread_notifications=unread_notifications,
                 summary=summary,
+                unread_count=unread_count,
             )
 
             if room_sync or always_include:
diff --git a/synapse/handlers/ui_auth/checkers.py b/synapse/handlers/ui_auth/checkers.py
index a140e9391e..a011e9fe29 100644
--- a/synapse/handlers/ui_auth/checkers.py
+++ b/synapse/handlers/ui_auth/checkers.py
@@ -14,10 +14,10 @@
 # limitations under the License.
 
 import logging
+from typing import Any
 
 from canonicaljson import json
 
-from twisted.internet import defer
 from twisted.web.client import PartialDownloadError
 
 from synapse.api.constants import LoginType
@@ -33,25 +33,25 @@ class UserInteractiveAuthChecker:
     def __init__(self, hs):
         pass
 
-    def is_enabled(self):
+    def is_enabled(self) -> bool:
         """Check if the configuration of the homeserver allows this checker to work
 
         Returns:
-            bool: True if this login type is enabled.
+            True if this login type is enabled.
         """
 
-    def check_auth(self, authdict, clientip):
+    async def check_auth(self, authdict: dict, clientip: str) -> Any:
         """Given the authentication dict from the client, attempt to check this step
 
         Args:
-            authdict (dict): authentication dictionary from the client
-            clientip (str): The IP address of the client.
+            authdict: authentication dictionary from the client
+            clientip: The IP address of the client.
 
         Raises:
             SynapseError if authentication failed
 
         Returns:
-            Deferred: the result of authentication (to pass back to the client?)
+            The result of authentication (to pass back to the client?)
         """
         raise NotImplementedError()
 
@@ -62,8 +62,8 @@ class DummyAuthChecker(UserInteractiveAuthChecker):
     def is_enabled(self):
         return True
 
-    def check_auth(self, authdict, clientip):
-        return defer.succeed(True)
+    async def check_auth(self, authdict, clientip):
+        return True
 
 
 class TermsAuthChecker(UserInteractiveAuthChecker):
@@ -72,8 +72,8 @@ class TermsAuthChecker(UserInteractiveAuthChecker):
     def is_enabled(self):
         return True
 
-    def check_auth(self, authdict, clientip):
-        return defer.succeed(True)
+    async def check_auth(self, authdict, clientip):
+        return True
 
 
 class RecaptchaAuthChecker(UserInteractiveAuthChecker):
@@ -89,8 +89,7 @@ class RecaptchaAuthChecker(UserInteractiveAuthChecker):
     def is_enabled(self):
         return self._enabled
 
-    @defer.inlineCallbacks
-    def check_auth(self, authdict, clientip):
+    async def check_auth(self, authdict, clientip):
         try:
             user_response = authdict["response"]
         except KeyError:
@@ -107,7 +106,7 @@ class RecaptchaAuthChecker(UserInteractiveAuthChecker):
         # TODO: get this from the homeserver rather than creating a new one for
         # each request
         try:
-            resp_body = yield self._http_client.post_urlencoded_get_json(
+            resp_body = await self._http_client.post_urlencoded_get_json(
                 self._url,
                 args={
                     "secret": self._secret,
@@ -219,8 +218,8 @@ class EmailIdentityAuthChecker(UserInteractiveAuthChecker, _BaseThreepidAuthChec
             ThreepidBehaviour.LOCAL,
         )
 
-    def check_auth(self, authdict, clientip):
-        return defer.ensureDeferred(self._check_threepid("email", authdict))
+    async def check_auth(self, authdict, clientip):
+        return await self._check_threepid("email", authdict)
 
 
 class MsisdnAuthChecker(UserInteractiveAuthChecker, _BaseThreepidAuthChecker):
@@ -233,8 +232,8 @@ class MsisdnAuthChecker(UserInteractiveAuthChecker, _BaseThreepidAuthChecker):
     def is_enabled(self):
         return bool(self.hs.config.account_threepid_delegate_msisdn)
 
-    def check_auth(self, authdict, clientip):
-        return defer.ensureDeferred(self._check_threepid("msisdn", authdict))
+    async def check_auth(self, authdict, clientip):
+        return await self._check_threepid("msisdn", authdict)
 
 
 INTERACTIVE_AUTH_CHECKERS = [