summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/api/auth.py123
-rw-r--r--synapse/api/auth_blocking.py13
-rw-r--r--synapse/api/errors.py4
-rw-r--r--synapse/api/filtering.py7
-rw-r--r--synapse/app/generic_worker.py6
-rw-r--r--synapse/app/homeserver.py5
-rw-r--r--synapse/crypto/context_factory.py8
-rw-r--r--synapse/events/builder.py2
-rw-r--r--synapse/handlers/appservice.py2
-rw-r--r--synapse/handlers/auth.py19
-rw-r--r--synapse/handlers/federation.py2
-rw-r--r--synapse/handlers/message.py2
-rw-r--r--synapse/handlers/sync.py6
-rw-r--r--synapse/http/server.py5
-rw-r--r--synapse/http/site.py20
-rw-r--r--synapse/metrics/background_process_metrics.py34
-rw-r--r--synapse/module_api/__init__.py8
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py2
-rw-r--r--synapse/push/push_tools.py17
-rw-r--r--synapse/replication/slave/storage/client_ips.py2
-rw-r--r--synapse/replication/tcp/commands.py5
-rw-r--r--synapse/rest/client/v1/directory.py2
-rw-r--r--synapse/rest/client/v2_alpha/account.py86
-rw-r--r--synapse/rest/client/v2_alpha/register.py112
-rw-r--r--synapse/rest/client/v2_alpha/sync.py1
-rw-r--r--synapse/rest/consent/consent_resource.py4
-rw-r--r--synapse/rest/health.py31
-rw-r--r--synapse/rest/media/v1/preview_url_resource.py7
-rw-r--r--synapse/storage/databases/main/account_data.py7
-rw-r--r--synapse/storage/databases/main/cache.py1
-rw-r--r--synapse/storage/databases/main/client_ips.py59
-rw-r--r--synapse/storage/databases/main/deviceinbox.py9
-rw-r--r--synapse/storage/databases/main/devices.py23
-rw-r--r--synapse/storage/databases/main/directory.py51
-rw-r--r--synapse/storage/databases/main/e2e_room_keys.py41
-rw-r--r--synapse/storage/databases/main/end_to_end_keys.py78
-rw-r--r--synapse/storage/databases/main/event_push_actions.py5
-rw-r--r--synapse/storage/databases/main/events.py48
-rw-r--r--synapse/storage/databases/main/events_worker.py86
-rw-r--r--synapse/storage/databases/main/group_server.py17
-rw-r--r--synapse/storage/databases/main/monthly_active_users.py31
-rw-r--r--synapse/storage/databases/main/push_rule.py9
-rw-r--r--synapse/storage/databases/main/receipts.py9
-rw-r--r--synapse/storage/databases/main/schema/delta/58/12unread_messages.sql18
-rw-r--r--synapse/storage/databases/main/search.py69
-rw-r--r--synapse/storage/databases/main/signatures.py7
-rw-r--r--synapse/storage/databases/main/user_directory.py124
-rw-r--r--synapse/util/__init__.py4
-rw-r--r--synapse/util/caches/descriptors.py2
-rw-r--r--synapse/util/frozenutils.py7
-rw-r--r--synapse/util/metrics.py39
-rw-r--r--synapse/util/retryutils.py16
52 files changed, 599 insertions, 696 deletions
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 2178e623da..d8190f92ab 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -13,12 +13,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-from typing import Optional
+from typing import List, Optional, Tuple
 
 import pymacaroons
 from netaddr import IPAddress
 
-from twisted.internet import defer
 from twisted.web.server import Request
 
 import synapse.types
@@ -80,13 +79,14 @@ class Auth(object):
         self._track_appservice_user_ips = hs.config.track_appservice_user_ips
         self._macaroon_secret_key = hs.config.macaroon_secret_key
 
-    @defer.inlineCallbacks
-    def check_from_context(self, room_version: str, event, context, do_sig_check=True):
-        prev_state_ids = yield defer.ensureDeferred(context.get_prev_state_ids())
-        auth_events_ids = yield self.compute_auth_events(
+    async def check_from_context(
+        self, room_version: str, event, context, do_sig_check=True
+    ):
+        prev_state_ids = await context.get_prev_state_ids()
+        auth_events_ids = self.compute_auth_events(
             event, prev_state_ids, for_verification=True
         )
-        auth_events = yield self.store.get_events(auth_events_ids)
+        auth_events = await self.store.get_events(auth_events_ids)
         auth_events = {(e.type, e.state_key): e for e in auth_events.values()}
 
         room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
@@ -94,14 +94,13 @@ class Auth(object):
             room_version_obj, event, auth_events=auth_events, do_sig_check=do_sig_check
         )
 
-    @defer.inlineCallbacks
-    def check_user_in_room(
+    async def check_user_in_room(
         self,
         room_id: str,
         user_id: str,
         current_state: Optional[StateMap[EventBase]] = None,
         allow_departed_users: bool = False,
-    ):
+    ) -> EventBase:
         """Check if the user is in the room, or was at some point.
         Args:
             room_id: The room to check.
@@ -119,37 +118,35 @@ class Auth(object):
         Raises:
             AuthError if the user is/was not in the room.
         Returns:
-            Deferred[Optional[EventBase]]:
-                Membership event for the user if the user was in the
-                room. This will be the join event if they are currently joined to
-                the room. This will be the leave event if they have left the room.
+            Membership event for the user if the user was in the
+            room. This will be the join event if they are currently joined to
+            the room. This will be the leave event if they have left the room.
         """
         if current_state:
             member = current_state.get((EventTypes.Member, user_id), None)
         else:
-            member = yield defer.ensureDeferred(
-                self.state.get_current_state(
-                    room_id=room_id, event_type=EventTypes.Member, state_key=user_id
-                )
+            member = await self.state.get_current_state(
+                room_id=room_id, event_type=EventTypes.Member, state_key=user_id
             )
-        membership = member.membership if member else None
 
-        if membership == Membership.JOIN:
-            return member
+        if member:
+            membership = member.membership
 
-        # XXX this looks totally bogus. Why do we not allow users who have been banned,
-        # or those who were members previously and have been re-invited?
-        if allow_departed_users and membership == Membership.LEAVE:
-            forgot = yield self.store.did_forget(user_id, room_id)
-            if not forgot:
+            if membership == Membership.JOIN:
                 return member
 
+            # XXX this looks totally bogus. Why do we not allow users who have been banned,
+            # or those who were members previously and have been re-invited?
+            if allow_departed_users and membership == Membership.LEAVE:
+                forgot = await self.store.did_forget(user_id, room_id)
+                if not forgot:
+                    return member
+
         raise AuthError(403, "User %s not in room %s" % (user_id, room_id))
 
-    @defer.inlineCallbacks
-    def check_host_in_room(self, room_id, host):
+    async def check_host_in_room(self, room_id, host):
         with Measure(self.clock, "check_host_in_room"):
-            latest_event_ids = yield self.store.is_host_joined(room_id, host)
+            latest_event_ids = await self.store.is_host_joined(room_id, host)
             return latest_event_ids
 
     def can_federate(self, event, auth_events):
@@ -160,14 +157,13 @@ class Auth(object):
     def get_public_keys(self, invite_event):
         return event_auth.get_public_keys(invite_event)
 
-    @defer.inlineCallbacks
-    def get_user_by_req(
+    async def get_user_by_req(
         self,
         request: Request,
         allow_guest: bool = False,
         rights: str = "access",
         allow_expired: bool = False,
-    ):
+    ) -> synapse.types.Requester:
         """ Get a registered user's ID.
 
         Args:
@@ -180,7 +176,7 @@ class Auth(object):
                 /login will deliver access tokens regardless of expiration.
 
         Returns:
-            defer.Deferred: resolves to a `synapse.types.Requester` object
+            Resolves to the requester
         Raises:
             InvalidClientCredentialsError if no user by that token exists or the token
                 is invalid.
@@ -194,14 +190,14 @@ class Auth(object):
 
             access_token = self.get_access_token_from_request(request)
 
-            user_id, app_service = yield self._get_appservice_user_id(request)
+            user_id, app_service = await self._get_appservice_user_id(request)
             if user_id:
                 request.authenticated_entity = user_id
                 opentracing.set_tag("authenticated_entity", user_id)
                 opentracing.set_tag("appservice_id", app_service.id)
 
                 if ip_addr and self._track_appservice_user_ips:
-                    yield self.store.insert_client_ip(
+                    await self.store.insert_client_ip(
                         user_id=user_id,
                         access_token=access_token,
                         ip=ip_addr,
@@ -211,7 +207,7 @@ class Auth(object):
 
                 return synapse.types.create_requester(user_id, app_service=app_service)
 
-            user_info = yield self.get_user_by_access_token(
+            user_info = await self.get_user_by_access_token(
                 access_token, rights, allow_expired=allow_expired
             )
             user = user_info["user"]
@@ -221,7 +217,7 @@ class Auth(object):
             # Deny the request if the user account has expired.
             if self._account_validity.enabled and not allow_expired:
                 user_id = user.to_string()
-                expiration_ts = yield self.store.get_expiration_ts_for_user(user_id)
+                expiration_ts = await self.store.get_expiration_ts_for_user(user_id)
                 if (
                     expiration_ts is not None
                     and self.clock.time_msec() >= expiration_ts
@@ -235,7 +231,7 @@ class Auth(object):
             device_id = user_info.get("device_id")
 
             if user and access_token and ip_addr:
-                yield self.store.insert_client_ip(
+                await self.store.insert_client_ip(
                     user_id=user.to_string(),
                     access_token=access_token,
                     ip=ip_addr,
@@ -261,8 +257,7 @@ class Auth(object):
         except KeyError:
             raise MissingClientTokenError()
 
-    @defer.inlineCallbacks
-    def _get_appservice_user_id(self, request):
+    async def _get_appservice_user_id(self, request):
         app_service = self.store.get_app_service_by_token(
             self.get_access_token_from_request(request)
         )
@@ -283,14 +278,13 @@ class Auth(object):
 
         if not app_service.is_interested_in_user(user_id):
             raise AuthError(403, "Application service cannot masquerade as this user.")
-        if not (yield self.store.get_user_by_id(user_id)):
+        if not (await self.store.get_user_by_id(user_id)):
             raise AuthError(403, "Application service has not registered this user")
         return user_id, app_service
 
-    @defer.inlineCallbacks
-    def get_user_by_access_token(
+    async def get_user_by_access_token(
         self, token: str, rights: str = "access", allow_expired: bool = False,
-    ):
+    ) -> dict:
         """ Validate access token and get user_id from it
 
         Args:
@@ -300,7 +294,7 @@ class Auth(object):
             allow_expired: If False, raises an InvalidClientTokenError
                 if the token is expired
         Returns:
-            Deferred[dict]: dict that includes:
+            dict that includes:
                `user` (UserID)
                `is_guest` (bool)
                `token_id` (int|None): access token id. May be None if guest
@@ -314,7 +308,7 @@ class Auth(object):
 
         if rights == "access":
             # first look in the database
-            r = yield self._look_up_user_by_access_token(token)
+            r = await self._look_up_user_by_access_token(token)
             if r:
                 valid_until_ms = r["valid_until_ms"]
                 if (
@@ -352,7 +346,7 @@ class Auth(object):
                 # It would of course be much easier to store guest access
                 # tokens in the database as well, but that would break existing
                 # guest tokens.
-                stored_user = yield self.store.get_user_by_id(user_id)
+                stored_user = await self.store.get_user_by_id(user_id)
                 if not stored_user:
                     raise InvalidClientTokenError("Unknown user_id %s" % user_id)
                 if not stored_user["is_guest"]:
@@ -482,9 +476,8 @@ class Auth(object):
         now = self.hs.get_clock().time_msec()
         return now < expiry
 
-    @defer.inlineCallbacks
-    def _look_up_user_by_access_token(self, token):
-        ret = yield self.store.get_user_by_access_token(token)
+    async def _look_up_user_by_access_token(self, token):
+        ret = await self.store.get_user_by_access_token(token)
         if not ret:
             return None
 
@@ -507,7 +500,7 @@ class Auth(object):
             logger.warning("Unrecognised appservice access token.")
             raise InvalidClientTokenError()
         request.authenticated_entity = service.sender
-        return defer.succeed(service)
+        return service
 
     async def is_server_admin(self, user: UserID) -> bool:
         """ Check if the given user is a local server admin.
@@ -522,7 +515,7 @@ class Auth(object):
 
     def compute_auth_events(
         self, event, current_state_ids: StateMap[str], for_verification: bool = False,
-    ):
+    ) -> List[str]:
         """Given an event and current state return the list of event IDs used
         to auth an event.
 
@@ -530,11 +523,11 @@ class Auth(object):
         should be added to the event's `auth_events`.
 
         Returns:
-            defer.Deferred(list[str]): List of event IDs.
+            List of event IDs.
         """
 
         if event.type == EventTypes.Create:
-            return defer.succeed([])
+            return []
 
         # Currently we ignore the `for_verification` flag even though there are
         # some situations where we can drop particular auth events when adding
@@ -553,7 +546,7 @@ class Auth(object):
             if auth_ev_id:
                 auth_ids.append(auth_ev_id)
 
-        return defer.succeed(auth_ids)
+        return auth_ids
 
     async def check_can_change_room_list(self, room_id: str, user: UserID):
         """Determine whether the user is allowed to edit the room's entry in the
@@ -636,10 +629,9 @@ class Auth(object):
 
             return query_params[0].decode("ascii")
 
-    @defer.inlineCallbacks
-    def check_user_in_room_or_world_readable(
+    async def check_user_in_room_or_world_readable(
         self, room_id: str, user_id: str, allow_departed_users: bool = False
-    ):
+    ) -> Tuple[str, Optional[str]]:
         """Checks that the user is or was in the room or the room is world
         readable. If it isn't then an exception is raised.
 
@@ -650,10 +642,9 @@ class Auth(object):
                 members but have now departed
 
         Returns:
-            Deferred[tuple[str, str|None]]: Resolves to the current membership of
-                the user in the room and the membership event ID of the user. If
-                the user is not in the room and never has been, then
-                `(Membership.JOIN, None)` is returned.
+            Resolves to the current membership of the user in the room and the
+            membership event ID of the user. If the user is not in the room and
+            never has been, then `(Membership.JOIN, None)` is returned.
         """
 
         try:
@@ -662,15 +653,13 @@ class Auth(object):
             #  * The user is a non-guest user, and was ever in the room
             #  * The user is a guest user, and has joined the room
             # else it will throw.
-            member_event = yield self.check_user_in_room(
+            member_event = await self.check_user_in_room(
                 room_id, user_id, allow_departed_users=allow_departed_users
             )
             return member_event.membership, member_event.event_id
         except AuthError:
-            visibility = yield defer.ensureDeferred(
-                self.state.get_current_state(
-                    room_id, EventTypes.RoomHistoryVisibility, ""
-                )
+            visibility = await self.state.get_current_state(
+                room_id, EventTypes.RoomHistoryVisibility, ""
             )
             if (
                 visibility
diff --git a/synapse/api/auth_blocking.py b/synapse/api/auth_blocking.py
index 5c499b6b4e..49093bf181 100644
--- a/synapse/api/auth_blocking.py
+++ b/synapse/api/auth_blocking.py
@@ -15,8 +15,6 @@
 
 import logging
 
-from twisted.internet import defer
-
 from synapse.api.constants import LimitBlockingTypes, UserTypes
 from synapse.api.errors import Codes, ResourceLimitError
 from synapse.config.server import is_threepid_reserved
@@ -36,8 +34,7 @@ class AuthBlocking(object):
         self._limit_usage_by_mau = hs.config.limit_usage_by_mau
         self._mau_limits_reserved_threepids = hs.config.mau_limits_reserved_threepids
 
-    @defer.inlineCallbacks
-    def check_auth_blocking(self, user_id=None, threepid=None, user_type=None):
+    async def check_auth_blocking(self, user_id=None, threepid=None, user_type=None):
         """Checks if the user should be rejected for some external reason,
         such as monthly active user limiting or global disable flag
 
@@ -60,7 +57,7 @@ class AuthBlocking(object):
         if user_id is not None:
             if user_id == self._server_notices_mxid:
                 return
-            if (yield self.store.is_support_user(user_id)):
+            if await self.store.is_support_user(user_id):
                 return
 
         if self._hs_disabled:
@@ -76,11 +73,11 @@ class AuthBlocking(object):
 
             # If the user is already part of the MAU cohort or a trial user
             if user_id:
-                timestamp = yield self.store.user_last_seen_monthly_active(user_id)
+                timestamp = await self.store.user_last_seen_monthly_active(user_id)
                 if timestamp:
                     return
 
-                is_trial = yield self.store.is_trial_user(user_id)
+                is_trial = await self.store.is_trial_user(user_id)
                 if is_trial:
                     return
             elif threepid:
@@ -93,7 +90,7 @@ class AuthBlocking(object):
                 # allow registration. Support users are excluded from MAU checks.
                 return
             # Else if there is no room in the MAU bucket, bail
-            current_mau = yield self.store.get_monthly_active_count()
+            current_mau = await self.store.get_monthly_active_count()
             if current_mau >= self._max_mau_value:
                 raise ResourceLimitError(
                     403,
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index b3bab1aa52..6e40630ab6 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -238,14 +238,16 @@ class InteractiveAuthIncompleteError(Exception):
     (This indicates we should return a 401 with 'result' as the body)
 
     Attributes:
+        session_id: The ID of the ongoing interactive auth session.
         result: the server response to the request, which should be
             passed back to the client
     """
 
-    def __init__(self, result: "JsonDict"):
+    def __init__(self, session_id: str, result: "JsonDict"):
         super(InteractiveAuthIncompleteError, self).__init__(
             "Interactive auth not yet complete"
         )
+        self.session_id = session_id
         self.result = result
 
 
diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py
index f988f62a1e..7393d6cb74 100644
--- a/synapse/api/filtering.py
+++ b/synapse/api/filtering.py
@@ -21,8 +21,6 @@ import jsonschema
 from canonicaljson import json
 from jsonschema import FormatChecker
 
-from twisted.internet import defer
-
 from synapse.api.constants import EventContentFields
 from synapse.api.errors import SynapseError
 from synapse.storage.presence import UserPresenceState
@@ -137,9 +135,8 @@ class Filtering(object):
         super(Filtering, self).__init__()
         self.store = hs.get_datastore()
 
-    @defer.inlineCallbacks
-    def get_user_filter(self, user_localpart, filter_id):
-        result = yield self.store.get_user_filter(user_localpart, filter_id)
+    async def get_user_filter(self, user_localpart, filter_id):
+        result = await self.store.get_user_filter(user_localpart, filter_id)
         return FilterCollection(result)
 
     def add_user_filter(self, user_localpart, user_filter):
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 1a16d0b9f8..7957586d69 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -123,6 +123,7 @@ from synapse.rest.client.v2_alpha.account_data import (
 from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet
 from synapse.rest.client.v2_alpha.register import RegisterRestServlet
 from synapse.rest.client.versions import VersionsRestServlet
+from synapse.rest.health import HealthResource
 from synapse.rest.key.v2 import KeyApiV2Resource
 from synapse.server import HomeServer
 from synapse.storage.databases.main.censor_events import CensorEventsStore
@@ -493,7 +494,10 @@ class GenericWorkerServer(HomeServer):
         site_tag = listener_config.http_options.tag
         if site_tag is None:
             site_tag = port
-        resources = {}
+
+        # We always include a health resource.
+        resources = {"/health": HealthResource()}
+
         for res in listener_config.http_options.resources:
             for name in res.names:
                 if name == "metrics":
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index d87a77718e..98d0d14a12 100644
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -68,6 +68,7 @@ from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
 from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
 from synapse.rest import ClientRestResource
 from synapse.rest.admin import AdminRestResource
+from synapse.rest.health import HealthResource
 from synapse.rest.key.v2 import KeyApiV2Resource
 from synapse.rest.well_known import WellKnownResource
 from synapse.server import HomeServer
@@ -98,7 +99,9 @@ class SynapseHomeServer(HomeServer):
         if site_tag is None:
             site_tag = port
 
-        resources = {}
+        # We always include a health resource.
+        resources = {"/health": HealthResource()}
+
         for res in listener_config.http_options.resources:
             for name in res.names:
                 if name == "openid" and "federation" in res.names:
diff --git a/synapse/crypto/context_factory.py b/synapse/crypto/context_factory.py
index a5a2a7815d..777c0f00b1 100644
--- a/synapse/crypto/context_factory.py
+++ b/synapse/crypto/context_factory.py
@@ -48,6 +48,14 @@ class ServerContextFactory(ContextFactory):
     connections."""
 
     def __init__(self, config):
+        # TODO: once pyOpenSSL exposes TLS_METHOD and SSL_CTX_set_min_proto_version,
+        # switch to those (see https://github.com/pyca/cryptography/issues/5379).
+        #
+        # note that, despite the confusing name, SSLv23_METHOD does *not* enforce SSLv2
+        # or v3, but is a synonym for TLS_METHOD, which allows the client and server
+        # to negotiate an appropriate version of TLS constrained by the version options
+        # set with context.set_options.
+        #
         self._context = SSL.Context(SSL.SSLv23_METHOD)
         self.configure_context(self._context, config)
 
diff --git a/synapse/events/builder.py b/synapse/events/builder.py
index 69b53ca2bc..4e179d49b3 100644
--- a/synapse/events/builder.py
+++ b/synapse/events/builder.py
@@ -106,7 +106,7 @@ class EventBuilder(object):
         state_ids = await self._state.get_current_state_ids(
             self.room_id, prev_event_ids
         )
-        auth_ids = await self._auth.compute_auth_events(self, state_ids)
+        auth_ids = self._auth.compute_auth_events(self, state_ids)
 
         format_version = self.room_version.event_format
         if format_version == EventFormatVersions.V1:
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index fbc56c351b..c9044a5019 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -101,7 +101,7 @@ class ApplicationServicesHandler(object):
 
                             async def start_scheduler():
                                 try:
-                                    return self.scheduler.start()
+                                    return await self.scheduler.start()
                                 except Exception:
                                     logger.error("Application Services Failure")
 
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index c7d921c21a..c24e7bafe0 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -162,7 +162,7 @@ class AuthHandler(BaseHandler):
         request_body: Dict[str, Any],
         clientip: str,
         description: str,
-    ) -> dict:
+    ) -> Tuple[dict, str]:
         """
         Checks that the user is who they claim to be, via a UI auth.
 
@@ -183,9 +183,14 @@ class AuthHandler(BaseHandler):
                          describes the operation happening on their account.
 
         Returns:
-            The parameters for this request (which may
+            A tuple of (params, session_id).
+
+                'params' contains the parameters for this request (which may
                 have been given only in a previous call).
 
+                'session_id' is the ID of this session, either passed in by the
+                client or assigned by this call
+
         Raises:
             InteractiveAuthIncompleteError if the client has not yet completed
                 any of the permitted login flows
@@ -207,7 +212,7 @@ class AuthHandler(BaseHandler):
         flows = [[login_type] for login_type in self._supported_ui_auth_types]
 
         try:
-            result, params, _ = await self.check_auth(
+            result, params, session_id = await self.check_ui_auth(
                 flows, request, request_body, clientip, description
             )
         except LoginError:
@@ -230,7 +235,7 @@ class AuthHandler(BaseHandler):
         if user_id != requester.user.to_string():
             raise AuthError(403, "Invalid auth")
 
-        return params
+        return params, session_id
 
     def get_enabled_auth_types(self):
         """Return the enabled user-interactive authentication types
@@ -240,7 +245,7 @@ class AuthHandler(BaseHandler):
         """
         return self.checkers.keys()
 
-    async def check_auth(
+    async def check_ui_auth(
         self,
         flows: List[List[str]],
         request: SynapseRequest,
@@ -363,7 +368,7 @@ class AuthHandler(BaseHandler):
 
         if not authdict:
             raise InteractiveAuthIncompleteError(
-                self._auth_dict_for_flows(flows, session.session_id)
+                session.session_id, self._auth_dict_for_flows(flows, session.session_id)
             )
 
         # check auth type currently being presented
@@ -410,7 +415,7 @@ class AuthHandler(BaseHandler):
         ret = self._auth_dict_for_flows(flows, session.session_id)
         ret["completed"] = list(creds)
         ret.update(errordict)
-        raise InteractiveAuthIncompleteError(ret)
+        raise InteractiveAuthIncompleteError(session.session_id, ret)
 
     async def add_oob_auth(
         self, stagetype: str, authdict: Dict[str, Any], clientip: str
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index b3764dedae..593932adb7 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -2064,7 +2064,7 @@ class FederationHandler(BaseHandler):
 
         if not auth_events:
             prev_state_ids = await context.get_prev_state_ids()
-            auth_events_ids = await self.auth.compute_auth_events(
+            auth_events_ids = self.auth.compute_auth_events(
                 event, prev_state_ids, for_verification=True
             )
             auth_events_x = await self.store.get_events(auth_events_ids)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 43901d0934..708533d4d1 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -1061,7 +1061,7 @@ class EventCreationHandler(object):
                     raise SynapseError(400, "Cannot redact event from a different room")
 
             prev_state_ids = await context.get_prev_state_ids()
-            auth_events_ids = await self.auth.compute_auth_events(
+            auth_events_ids = self.auth.compute_auth_events(
                 event, prev_state_ids, for_verification=True
             )
             auth_events = await self.store.get_events(auth_events_ids)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 5a19bac929..c42dac18f5 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -103,7 +103,6 @@ class JoinedSyncResult:
     account_data = attr.ib(type=List[JsonDict])
     unread_notifications = attr.ib(type=JsonDict)
     summary = attr.ib(type=Optional[JsonDict])
-    unread_count = attr.ib(type=int)
 
     def __nonzero__(self) -> bool:
         """Make the result appear empty if there are no updates. This is used
@@ -1887,10 +1886,6 @@ class SyncHandler(object):
 
         if room_builder.rtype == "joined":
             unread_notifications = {}  # type: Dict[str, str]
-
-            unread_count = await self.store.get_unread_message_count_for_user(
-                room_id, sync_config.user.to_string(),
-            )
             room_sync = JoinedSyncResult(
                 room_id=room_id,
                 timeline=batch,
@@ -1899,7 +1894,6 @@ class SyncHandler(object):
                 account_data=account_data_events,
                 unread_notifications=unread_notifications,
                 summary=summary,
-                unread_count=unread_count,
             )
 
             if room_sync or always_include:
diff --git a/synapse/http/server.py b/synapse/http/server.py
index 94ab29974a..ffe6cfa09e 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -25,7 +25,7 @@ from io import BytesIO
 from typing import Any, Callable, Dict, Tuple, Union
 
 import jinja2
-from canonicaljson import encode_canonical_json, encode_pretty_printed_json, json
+from canonicaljson import encode_canonical_json, encode_pretty_printed_json
 
 from twisted.internet import defer
 from twisted.python import failure
@@ -46,6 +46,7 @@ from synapse.api.errors import (
 from synapse.http.site import SynapseRequest
 from synapse.logging.context import preserve_fn
 from synapse.logging.opentracing import trace_servlet
+from synapse.util import json_encoder
 from synapse.util.caches import intern_dict
 
 logger = logging.getLogger(__name__)
@@ -538,7 +539,7 @@ def respond_with_json(
             # canonicaljson already encodes to bytes
             json_bytes = encode_canonical_json(json_object)
         else:
-            json_bytes = json.dumps(json_object).encode("utf-8")
+            json_bytes = json_encoder.encode(json_object).encode("utf-8")
 
     return respond_with_json_bytes(request, code, json_bytes, send_cors=send_cors)
 
diff --git a/synapse/http/site.py b/synapse/http/site.py
index 6f3b2258cc..6e79b47828 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -146,10 +146,9 @@ class SynapseRequest(Request):
 
         Returns a context manager; the correct way to use this is:
 
-        @defer.inlineCallbacks
-        def handle_request(request):
+        async def handle_request(request):
             with request.processing("FooServlet"):
-                yield really_handle_the_request()
+                await really_handle_the_request()
 
         Once the context manager is closed, the completion of the request will be logged,
         and the various metrics will be updated.
@@ -287,7 +286,9 @@ class SynapseRequest(Request):
             # the connection dropped)
             code += "!"
 
-        self.site.access_logger.info(
+        log_level = logging.INFO if self._should_log_request() else logging.DEBUG
+        self.site.access_logger.log(
+            log_level,
             "%s - %s - {%s}"
             " Processed request: %.3fsec/%.3fsec (%.3fsec, %.3fsec) (%.3fsec/%.3fsec/%d)"
             ' %sB %s "%s %s %s" "%s" [%d dbevts]',
@@ -315,6 +316,17 @@ class SynapseRequest(Request):
         except Exception as e:
             logger.warning("Failed to stop metrics: %r", e)
 
+    def _should_log_request(self) -> bool:
+        """Whether we should log at INFO that we processed the request.
+        """
+        if self.path == b"/health":
+            return False
+
+        if self.method == b"OPTIONS":
+            return False
+
+        return True
+
 
 class XForwardedForRequest(SynapseRequest):
     def __init__(self, *args, **kw):
diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py
index a9269196b3..f766d16db6 100644
--- a/synapse/metrics/background_process_metrics.py
+++ b/synapse/metrics/background_process_metrics.py
@@ -13,16 +13,15 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import inspect
 import logging
 import threading
-from asyncio import iscoroutine
 from functools import wraps
 from typing import TYPE_CHECKING, Dict, Optional, Set
 
 from prometheus_client.core import REGISTRY, Counter, Gauge
 
 from twisted.internet import defer
-from twisted.python.failure import Failure
 
 from synapse.logging.context import LoggingContext, PreserveLoggingContext
 
@@ -167,7 +166,7 @@ class _BackgroundProcess(object):
         )
 
 
-def run_as_background_process(desc, func, *args, **kwargs):
+def run_as_background_process(desc: str, func, *args, **kwargs):
     """Run the given function in its own logcontext, with resource metrics
 
     This should be used to wrap processes which are fired off to run in the
@@ -179,7 +178,7 @@ def run_as_background_process(desc, func, *args, **kwargs):
     normal synapse inlineCallbacks function).
 
     Args:
-        desc (str): a description for this background process type
+        desc: a description for this background process type
         func: a function, which may return a Deferred or a coroutine
         args: positional args for func
         kwargs: keyword args for func
@@ -188,8 +187,7 @@ def run_as_background_process(desc, func, *args, **kwargs):
         follow the synapse logcontext rules.
     """
 
-    @defer.inlineCallbacks
-    def run():
+    async def run():
         with _bg_metrics_lock:
             count = _background_process_counts.get(desc, 0)
             _background_process_counts[desc] = count + 1
@@ -203,29 +201,21 @@ def run_as_background_process(desc, func, *args, **kwargs):
             try:
                 result = func(*args, **kwargs)
 
-                # We probably don't have an ensureDeferred in our call stack to handle
-                # coroutine results, so we need to ensureDeferred here.
-                #
-                # But we need this check because ensureDeferred doesn't like being
-                # called on immediate values (as opposed to Deferreds or coroutines).
-                if iscoroutine(result):
-                    result = defer.ensureDeferred(result)
+                if inspect.isawaitable(result):
+                    result = await result
 
-                return (yield result)
+                return result
             except Exception:
-                # failure.Failure() fishes the original Failure out of our stack, and
-                # thus gives us a sensible stack trace.
-                f = Failure()
-                logger.error(
-                    "Background process '%s' threw an exception",
-                    desc,
-                    exc_info=(f.type, f.value, f.getTracebackObject()),
+                logger.exception(
+                    "Background process '%s' threw an exception", desc,
                 )
             finally:
                 _background_process_in_flight_count.labels(desc).dec()
 
     with PreserveLoggingContext():
-        return run()
+        # Note that we return a Deferred here so that it can be used in a
+        # looping_call and other places that expect a Deferred.
+        return defer.ensureDeferred(run())
 
 
 def wrap_as_background_process(desc):
diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py
index 8201849951..c2fb757d9a 100644
--- a/synapse/module_api/__init__.py
+++ b/synapse/module_api/__init__.py
@@ -194,12 +194,16 @@ class ModuleApi(object):
             synapse.api.errors.AuthError: the access token is invalid
         """
         # see if the access token corresponds to a device
-        user_info = yield self._auth.get_user_by_access_token(access_token)
+        user_info = yield defer.ensureDeferred(
+            self._auth.get_user_by_access_token(access_token)
+        )
         device_id = user_info.get("device_id")
         user_id = user_info["user"].to_string()
         if device_id:
             # delete the device, which will also delete its access tokens
-            yield self._hs.get_device_handler().delete_device(user_id, device_id)
+            yield defer.ensureDeferred(
+                self._hs.get_device_handler().delete_device(user_id, device_id)
+            )
         else:
             # no associated device. Just delete the access token.
             yield defer.ensureDeferred(
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 04b9d8ac82..e7fcee0e87 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -120,7 +120,7 @@ class BulkPushRuleEvaluator(object):
             pl_event = await self.store.get_event(pl_event_id)
             auth_events = {POWER_KEY: pl_event}
         else:
-            auth_events_ids = await self.auth.compute_auth_events(
+            auth_events_ids = self.auth.compute_auth_events(
                 event, prev_state_ids, for_verification=False
             )
             auth_events = await self.store.get_events(auth_events_ids)
diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py
index bc8f71916b..d0145666bf 100644
--- a/synapse/push/push_tools.py
+++ b/synapse/push/push_tools.py
@@ -21,13 +21,22 @@ async def get_badge_count(store, user_id):
     invites = await store.get_invited_rooms_for_local_user(user_id)
     joins = await store.get_rooms_for_user(user_id)
 
+    my_receipts_by_room = await store.get_receipts_for_user(user_id, "m.read")
+
     badge = len(invites)
 
     for room_id in joins:
-        unread_count = await store.get_unread_message_count_for_user(room_id, user_id)
-        # return one badge count per conversation, as count per
-        # message is so noisy as to be almost useless
-        badge += 1 if unread_count else 0
+        if room_id in my_receipts_by_room:
+            last_unread_event_id = my_receipts_by_room[room_id]
+
+            notifs = await (
+                store.get_unread_event_push_actions_by_room_for_user(
+                    room_id, user_id, last_unread_event_id
+                )
+            )
+            # return one badge count per conversation, as count per
+            # message is so noisy as to be almost useless
+            badge += 1 if notifs["notify_count"] else 0
     return badge
 
 
diff --git a/synapse/replication/slave/storage/client_ips.py b/synapse/replication/slave/storage/client_ips.py
index 60dd3f6701..a6fdedde63 100644
--- a/synapse/replication/slave/storage/client_ips.py
+++ b/synapse/replication/slave/storage/client_ips.py
@@ -28,7 +28,7 @@ class SlavedClientIpStore(BaseSlavedStore):
             name="client_ip_last_seen", keylen=4, max_entries=50000
         )
 
-    def insert_client_ip(self, user_id, access_token, ip, user_agent, device_id):
+    async def insert_client_ip(self, user_id, access_token, ip, user_agent, device_id):
         now = int(self._clock.time_msec())
         key = (user_id, access_token, ip)
 
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index f33801f883..d853e4447e 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -18,11 +18,12 @@ The VALID_SERVER_COMMANDS and VALID_CLIENT_COMMANDS define which commands are
 allowed to be sent by which side.
 """
 import abc
-import json
 import logging
 from typing import Tuple, Type
 
-_json_encoder = json.JSONEncoder()
+from canonicaljson import json
+
+from synapse.util import json_encoder as _json_encoder
 
 logger = logging.getLogger(__name__)
 
diff --git a/synapse/rest/client/v1/directory.py b/synapse/rest/client/v1/directory.py
index 5934b1fe8b..b210015173 100644
--- a/synapse/rest/client/v1/directory.py
+++ b/synapse/rest/client/v1/directory.py
@@ -89,7 +89,7 @@ class ClientDirectoryServer(RestServlet):
         dir_handler = self.handlers.directory_handler
 
         try:
-            service = await self.auth.get_appservice_by_req(request)
+            service = self.auth.get_appservice_by_req(request)
             room_alias = RoomAlias.from_string(room_alias)
             await dir_handler.delete_appservice_association(service, room_alias)
             logger.info(
diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py
index 3767a809a4..fead85074b 100644
--- a/synapse/rest/client/v2_alpha/account.py
+++ b/synapse/rest/client/v2_alpha/account.py
@@ -18,7 +18,12 @@ import logging
 from http import HTTPStatus
 
 from synapse.api.constants import LoginType
-from synapse.api.errors import Codes, SynapseError, ThreepidValidationError
+from synapse.api.errors import (
+    Codes,
+    InteractiveAuthIncompleteError,
+    SynapseError,
+    ThreepidValidationError,
+)
 from synapse.config.emailconfig import ThreepidBehaviour
 from synapse.http.server import finish_request, respond_with_html
 from synapse.http.servlet import (
@@ -239,18 +244,12 @@ class PasswordRestServlet(RestServlet):
 
         # we do basic sanity checks here because the auth layer will store these
         # in sessions. Pull out the new password provided to us.
-        if "new_password" in body:
-            new_password = body.pop("new_password")
+        new_password = body.pop("new_password", None)
+        if new_password is not None:
             if not isinstance(new_password, str) or len(new_password) > 512:
                 raise SynapseError(400, "Invalid password")
             self.password_policy_handler.validate_password(new_password)
 
-            # If the password is valid, hash it and store it back on the body.
-            # This ensures that only the hashed password is handled everywhere.
-            if "new_password_hash" in body:
-                raise SynapseError(400, "Unexpected property: new_password_hash")
-            body["new_password_hash"] = await self.auth_handler.hash(new_password)
-
         # there are two possibilities here. Either the user does not have an
         # access token, and needs to do a password reset; or they have one and
         # need to validate their identity.
@@ -263,23 +262,49 @@ class PasswordRestServlet(RestServlet):
 
         if self.auth.has_access_token(request):
             requester = await self.auth.get_user_by_req(request)
-            params = await self.auth_handler.validate_user_via_ui_auth(
-                requester,
-                request,
-                body,
-                self.hs.get_ip_from_request(request),
-                "modify your account password",
-            )
+            try:
+                params, session_id = await self.auth_handler.validate_user_via_ui_auth(
+                    requester,
+                    request,
+                    body,
+                    self.hs.get_ip_from_request(request),
+                    "modify your account password",
+                )
+            except InteractiveAuthIncompleteError as e:
+                # The user needs to provide more steps to complete auth, but
+                # they're not required to provide the password again.
+                #
+                # If a password is available now, hash the provided password and
+                # store it for later.
+                if new_password:
+                    password_hash = await self.auth_handler.hash(new_password)
+                    await self.auth_handler.set_session_data(
+                        e.session_id, "password_hash", password_hash
+                    )
+                raise
             user_id = requester.user.to_string()
         else:
             requester = None
-            result, params, _ = await self.auth_handler.check_auth(
-                [[LoginType.EMAIL_IDENTITY]],
-                request,
-                body,
-                self.hs.get_ip_from_request(request),
-                "modify your account password",
-            )
+            try:
+                result, params, session_id = await self.auth_handler.check_ui_auth(
+                    [[LoginType.EMAIL_IDENTITY]],
+                    request,
+                    body,
+                    self.hs.get_ip_from_request(request),
+                    "modify your account password",
+                )
+            except InteractiveAuthIncompleteError as e:
+                # The user needs to provide more steps to complete auth, but
+                # they're not required to provide the password again.
+                #
+                # If a password is available now, hash the provided password and
+                # store it for later.
+                if new_password:
+                    password_hash = await self.auth_handler.hash(new_password)
+                    await self.auth_handler.set_session_data(
+                        e.session_id, "password_hash", password_hash
+                    )
+                raise
 
             if LoginType.EMAIL_IDENTITY in result:
                 threepid = result[LoginType.EMAIL_IDENTITY]
@@ -304,12 +329,21 @@ class PasswordRestServlet(RestServlet):
                 logger.error("Auth succeeded but no known type! %r", result.keys())
                 raise SynapseError(500, "", Codes.UNKNOWN)
 
-        assert_params_in_dict(params, ["new_password_hash"])
-        new_password_hash = params["new_password_hash"]
+        # If we have a password in this request, prefer it. Otherwise, there
+        # must be a password hash from an earlier request.
+        if new_password:
+            password_hash = await self.auth_handler.hash(new_password)
+        else:
+            password_hash = await self.auth_handler.get_session_data(
+                session_id, "password_hash", None
+            )
+        if not password_hash:
+            raise SynapseError(400, "Missing params: password", Codes.MISSING_PARAM)
+
         logout_devices = params.get("logout_devices", True)
 
         await self._set_password_handler.set_password(
-            user_id, new_password_hash, logout_devices, requester
+            user_id, password_hash, logout_devices, requester
         )
 
         return 200, {}
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index 370742ce59..f808175698 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -24,6 +24,7 @@ import synapse.types
 from synapse.api.constants import LoginType
 from synapse.api.errors import (
     Codes,
+    InteractiveAuthIncompleteError,
     SynapseError,
     ThreepidValidationError,
     UnrecognizedRequestError,
@@ -387,6 +388,7 @@ class RegisterRestServlet(RestServlet):
         self.ratelimiter = hs.get_registration_ratelimiter()
         self.password_policy_handler = hs.get_password_policy_handler()
         self.clock = hs.get_clock()
+        self._registration_enabled = self.hs.config.enable_registration
 
         self._registration_flows = _calculate_registration_flows(
             hs.config, self.auth_handler
@@ -412,20 +414,8 @@ class RegisterRestServlet(RestServlet):
                 "Do not understand membership kind: %s" % (kind.decode("utf8"),)
             )
 
-        # we do basic sanity checks here because the auth layer will store these
-        # in sessions. Pull out the username/password provided to us.
-        if "password" in body:
-            password = body.pop("password")
-            if not isinstance(password, str) or len(password) > 512:
-                raise SynapseError(400, "Invalid password")
-            self.password_policy_handler.validate_password(password)
-
-            # If the password is valid, hash it and store it back on the body.
-            # This ensures that only the hashed password is handled everywhere.
-            if "password_hash" in body:
-                raise SynapseError(400, "Unexpected property: password_hash")
-            body["password_hash"] = await self.auth_handler.hash(password)
-
+        # Pull out the provided username and do basic sanity checks early since
+        # the auth layer will store these in sessions.
         desired_username = None
         if "username" in body:
             if not isinstance(body["username"], str) or len(body["username"]) > 512:
@@ -434,7 +424,7 @@ class RegisterRestServlet(RestServlet):
 
         appservice = None
         if self.auth.has_access_token(request):
-            appservice = await self.auth.get_appservice_by_req(request)
+            appservice = self.auth.get_appservice_by_req(request)
 
         # fork off as soon as possible for ASes which have completely
         # different registration flows to normal users
@@ -459,22 +449,35 @@ class RegisterRestServlet(RestServlet):
                 )
             return 200, result  # we throw for non 200 responses
 
-        # for regular registration, downcase the provided username before
-        # attempting to register it. This should mean
-        # that people who try to register with upper-case in their usernames
-        # don't get a nasty surprise. (Note that we treat username
-        # case-insenstively in login, so they are free to carry on imagining
-        # that their username is CrAzYh4cKeR if that keeps them happy)
-        if desired_username is not None:
-            desired_username = desired_username.lower()
-
         # == Normal User Registration == (everyone else)
-        if not self.hs.config.enable_registration:
+        if not self._registration_enabled:
             raise SynapseError(403, "Registration has been disabled")
 
+        # For regular registration, convert the provided username to lowercase
+        # before attempting to register it. This should mean that people who try
+        # to register with upper-case in their usernames don't get a nasty surprise.
+        #
+        # Note that we treat usernames case-insensitively in login, so they are
+        # free to carry on imagining that their username is CrAzYh4cKeR if that
+        # keeps them happy.
+        if desired_username is not None:
+            desired_username = desired_username.lower()
+
+        # Check if this account is upgrading from a guest account.
         guest_access_token = body.get("guest_access_token", None)
 
-        if "initial_device_display_name" in body and "password_hash" not in body:
+        # Pull out the provided password and do basic sanity checks early.
+        #
+        # Note that we remove the password from the body since the auth layer
+        # will store the body in the session and we don't want a plaintext
+        # password store there.
+        password = body.pop("password", None)
+        if password is not None:
+            if not isinstance(password, str) or len(password) > 512:
+                raise SynapseError(400, "Invalid password")
+            self.password_policy_handler.validate_password(password)
+
+        if "initial_device_display_name" in body and password is None:
             # ignore 'initial_device_display_name' if sent without
             # a password to work around a client bug where it sent
             # the 'initial_device_display_name' param alone, wiping out
@@ -484,6 +487,7 @@ class RegisterRestServlet(RestServlet):
 
         session_id = self.auth_handler.get_session_id(body)
         registered_user_id = None
+        password_hash = None
         if session_id:
             # if we get a registered user id out of here, it means we previously
             # registered a user for this session, so we could just return the
@@ -492,7 +496,12 @@ class RegisterRestServlet(RestServlet):
             registered_user_id = await self.auth_handler.get_session_data(
                 session_id, "registered_user_id", None
             )
+            # Extract the previously-hashed password from the session.
+            password_hash = await self.auth_handler.get_session_data(
+                session_id, "password_hash", None
+            )
 
+        # Ensure that the username is valid.
         if desired_username is not None:
             await self.registration_handler.check_username(
                 desired_username,
@@ -500,20 +509,38 @@ class RegisterRestServlet(RestServlet):
                 assigned_user_id=registered_user_id,
             )
 
-        auth_result, params, session_id = await self.auth_handler.check_auth(
-            self._registration_flows,
-            request,
-            body,
-            self.hs.get_ip_from_request(request),
-            "register a new account",
-        )
+        # Check if the user-interactive authentication flows are complete, if
+        # not this will raise a user-interactive auth error.
+        try:
+            auth_result, params, session_id = await self.auth_handler.check_ui_auth(
+                self._registration_flows,
+                request,
+                body,
+                self.hs.get_ip_from_request(request),
+                "register a new account",
+            )
+        except InteractiveAuthIncompleteError as e:
+            # The user needs to provide more steps to complete auth.
+            #
+            # Hash the password and store it with the session since the client
+            # is not required to provide the password again.
+            #
+            # If a password hash was previously stored we will not attempt to
+            # re-hash and store it for efficiency. This assumes the password
+            # does not change throughout the authentication flow, but this
+            # should be fine since the data is meant to be consistent.
+            if not password_hash and password:
+                password_hash = await self.auth_handler.hash(password)
+                await self.auth_handler.set_session_data(
+                    e.session_id, "password_hash", password_hash
+                )
+            raise
 
         # Check that we're not trying to register a denied 3pid.
         #
         # the user-facing checks will probably already have happened in
         # /register/email/requestToken when we requested a 3pid, but that's not
         # guaranteed.
-
         if auth_result:
             for login_type in [LoginType.EMAIL_IDENTITY, LoginType.MSISDN]:
                 if login_type in auth_result:
@@ -535,12 +562,15 @@ class RegisterRestServlet(RestServlet):
             # don't re-register the threepids
             registered = False
         else:
-            # NB: This may be from the auth handler and NOT from the POST
-            assert_params_in_dict(params, ["password_hash"])
+            # If we have a password in this request, prefer it. Otherwise, there
+            # might be a password hash from an earlier request.
+            if password:
+                password_hash = await self.auth_handler.hash(password)
+            if not password_hash:
+                raise SynapseError(400, "Missing params: password", Codes.MISSING_PARAM)
 
             desired_username = params.get("username", None)
             guest_access_token = params.get("guest_access_token", None)
-            new_password_hash = params.get("password_hash", None)
 
             if desired_username is not None:
                 desired_username = desired_username.lower()
@@ -582,7 +612,7 @@ class RegisterRestServlet(RestServlet):
 
             registered_user_id = await self.registration_handler.register_user(
                 localpart=desired_username,
-                password_hash=new_password_hash,
+                password_hash=password_hash,
                 guest_access_token=guest_access_token,
                 threepid=threepid,
                 address=client_addr,
@@ -595,8 +625,8 @@ class RegisterRestServlet(RestServlet):
                 ):
                     await self.store.upsert_monthly_active_user(registered_user_id)
 
-            # remember that we've now registered that user account, and with
-            #  what user ID (since the user may not have specified)
+            # Remember that the user account has been registered (and the user
+            # ID it was registered with, since it might not have been specified).
             await self.auth_handler.set_session_data(
                 session_id, "registered_user_id", registered_user_id
             )
@@ -635,7 +665,7 @@ class RegisterRestServlet(RestServlet):
             (object) params: registration parameters, from which we pull
                 device_id, initial_device_name and inhibit_login
         Returns:
-            defer.Deferred: (object) dictionary for response from /register
+            (object) dictionary for response from /register
         """
         result = {"user_id": user_id, "home_server": self.hs.hostname}
         if not params.get("inhibit_login", False):
diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py
index 3f5bf75e59..a5c24fbd63 100644
--- a/synapse/rest/client/v2_alpha/sync.py
+++ b/synapse/rest/client/v2_alpha/sync.py
@@ -426,7 +426,6 @@ class SyncRestServlet(RestServlet):
             result["ephemeral"] = {"events": ephemeral_events}
             result["unread_notifications"] = room.unread_notifications
             result["summary"] = room.summary
-            result["org.matrix.msc2654.unread_count"] = room.unread_count
 
         return result
 
diff --git a/synapse/rest/consent/consent_resource.py b/synapse/rest/consent/consent_resource.py
index 4386eb4e72..b3e4d5612e 100644
--- a/synapse/rest/consent/consent_resource.py
+++ b/synapse/rest/consent/consent_resource.py
@@ -22,8 +22,6 @@ from os import path
 import jinja2
 from jinja2 import TemplateNotFound
 
-from twisted.internet import defer
-
 from synapse.api.errors import NotFoundError, StoreError, SynapseError
 from synapse.config import ConfigError
 from synapse.http.server import DirectServeHtmlResource, respond_with_html
@@ -135,7 +133,7 @@ class ConsentResource(DirectServeHtmlResource):
             else:
                 qualified_user_id = UserID(username, self.hs.hostname).to_string()
 
-            u = await defer.maybeDeferred(self.store.get_user_by_id, qualified_user_id)
+            u = await self.store.get_user_by_id(qualified_user_id)
             if u is None:
                 raise NotFoundError("Unknown user")
 
diff --git a/synapse/rest/health.py b/synapse/rest/health.py
new file mode 100644
index 0000000000..0170950bf3
--- /dev/null
+++ b/synapse/rest/health.py
@@ -0,0 +1,31 @@
+# -*- coding: utf-8 -*-
+# Copyright 2020 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.web.resource import Resource
+
+
+class HealthResource(Resource):
+    """A resource that does nothing except return a 200 with a body of `OK`,
+    which can be used as a health check.
+
+    Note: `SynapseRequest._should_log_request` ensures that requests to
+    `/health` do not get logged at INFO.
+    """
+
+    isLeaf = 1
+
+    def render_GET(self, request):
+        request.setHeader(b"Content-Type", b"text/plain")
+        return b"OK"
diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py
index f4768a9e8b..cd8c246594 100644
--- a/synapse/rest/media/v1/preview_url_resource.py
+++ b/synapse/rest/media/v1/preview_url_resource.py
@@ -27,9 +27,7 @@ from typing import Dict, Optional
 from urllib import parse as urlparse
 
 import attr
-from canonicaljson import json
 
-from twisted.internet import defer
 from twisted.internet.error import DNSLookupError
 
 from synapse.api.errors import Codes, SynapseError
@@ -43,6 +41,7 @@ from synapse.http.servlet import parse_integer, parse_string
 from synapse.logging.context import make_deferred_yieldable, run_in_background
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.rest.media.v1._base import get_filename_from_headers
+from synapse.util import json_encoder
 from synapse.util.async_helpers import ObservableDeferred
 from synapse.util.caches.expiringcache import ExpiringCache
 from synapse.util.stringutils import random_string
@@ -228,7 +227,7 @@ class PreviewUrlResource(DirectServeJsonResource):
         else:
             logger.info("Returning cached response")
 
-        og = await make_deferred_yieldable(defer.maybeDeferred(observable.observe))
+        og = await make_deferred_yieldable(observable.observe())
         respond_with_json_bytes(request, 200, og, send_cors=True)
 
     async def _do_preview(self, url: str, user: str, ts: int) -> bytes:
@@ -355,7 +354,7 @@ class PreviewUrlResource(DirectServeJsonResource):
 
         logger.debug("Calculated OG for %s as %s", url, og)
 
-        jsonog = json.dumps(og)
+        jsonog = json_encoder.encode(og)
 
         # store OG in history-aware DB cache
         await self.store.store_url_cache(
diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py
index 2193d8fdc5..cf039e7f7d 100644
--- a/synapse/storage/databases/main/account_data.py
+++ b/synapse/storage/databases/main/account_data.py
@@ -18,13 +18,12 @@ import abc
 import logging
 from typing import List, Tuple
 
-from canonicaljson import json
-
 from twisted.internet import defer
 
 from synapse.storage._base import SQLBaseStore, db_to_json
 from synapse.storage.database import DatabasePool
 from synapse.storage.util.id_generators import StreamIdGenerator
+from synapse.util import json_encoder
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 
@@ -327,7 +326,7 @@ class AccountDataStore(AccountDataWorkerStore):
         Returns:
             A deferred that completes once the account_data has been added.
         """
-        content_json = json.dumps(content)
+        content_json = json_encoder.encode(content)
 
         with self._account_data_id_gen.get_next() as next_id:
             # no need to lock here as room_account_data has a unique constraint
@@ -373,7 +372,7 @@ class AccountDataStore(AccountDataWorkerStore):
         Returns:
             A deferred that completes once the account_data has been added.
         """
-        content_json = json.dumps(content)
+        content_json = json_encoder.encode(content)
 
         with self._account_data_id_gen.get_next() as next_id:
             # no need to lock here as account_data has a unique constraint on
diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py
index 683afde52b..10de446065 100644
--- a/synapse/storage/databases/main/cache.py
+++ b/synapse/storage/databases/main/cache.py
@@ -172,7 +172,6 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
 
         self.get_latest_event_ids_in_room.invalidate((room_id,))
 
-        self.get_unread_message_count_for_user.invalidate_many((room_id,))
         self.get_unread_event_push_actions_by_room_for_user.invalidate_many((room_id,))
 
         if not backfilled:
diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py
index 712c8d0264..216a5925fc 100644
--- a/synapse/storage/databases/main/client_ips.py
+++ b/synapse/storage/databases/main/client_ips.py
@@ -14,8 +14,7 @@
 # limitations under the License.
 
 import logging
-
-from twisted.internet import defer
+from typing import Dict, Optional, Tuple
 
 from synapse.metrics.background_process_metrics import wrap_as_background_process
 from synapse.storage._base import SQLBaseStore
@@ -82,21 +81,19 @@ class ClientIpBackgroundUpdateStore(SQLBaseStore):
             "devices_last_seen", self._devices_last_seen_update
         )
 
-    @defer.inlineCallbacks
-    def _remove_user_ip_nonunique(self, progress, batch_size):
+    async def _remove_user_ip_nonunique(self, progress, batch_size):
         def f(conn):
             txn = conn.cursor()
             txn.execute("DROP INDEX IF EXISTS user_ips_user_ip")
             txn.close()
 
-        yield self.db_pool.runWithConnection(f)
-        yield self.db_pool.updates._end_background_update(
+        await self.db_pool.runWithConnection(f)
+        await self.db_pool.updates._end_background_update(
             "user_ips_drop_nonunique_index"
         )
         return 1
 
-    @defer.inlineCallbacks
-    def _analyze_user_ip(self, progress, batch_size):
+    async def _analyze_user_ip(self, progress, batch_size):
         # Background update to analyze user_ips table before we run the
         # deduplication background update. The table may not have been analyzed
         # for ages due to the table locks.
@@ -106,14 +103,13 @@ class ClientIpBackgroundUpdateStore(SQLBaseStore):
         def user_ips_analyze(txn):
             txn.execute("ANALYZE user_ips")
 
-        yield self.db_pool.runInteraction("user_ips_analyze", user_ips_analyze)
+        await self.db_pool.runInteraction("user_ips_analyze", user_ips_analyze)
 
-        yield self.db_pool.updates._end_background_update("user_ips_analyze")
+        await self.db_pool.updates._end_background_update("user_ips_analyze")
 
         return 1
 
-    @defer.inlineCallbacks
-    def _remove_user_ip_dupes(self, progress, batch_size):
+    async def _remove_user_ip_dupes(self, progress, batch_size):
         # This works function works by scanning the user_ips table in batches
         # based on `last_seen`. For each row in a batch it searches the rest of
         # the table to see if there are any duplicates, if there are then they
@@ -140,7 +136,7 @@ class ClientIpBackgroundUpdateStore(SQLBaseStore):
                 return None
 
         # Get a last seen that has roughly `batch_size` since `begin_last_seen`
-        end_last_seen = yield self.db_pool.runInteraction(
+        end_last_seen = await self.db_pool.runInteraction(
             "user_ips_dups_get_last_seen", get_last_seen
         )
 
@@ -275,15 +271,14 @@ class ClientIpBackgroundUpdateStore(SQLBaseStore):
                 txn, "user_ips_remove_dupes", {"last_seen": end_last_seen}
             )
 
-        yield self.db_pool.runInteraction("user_ips_dups_remove", remove)
+        await self.db_pool.runInteraction("user_ips_dups_remove", remove)
 
         if last:
-            yield self.db_pool.updates._end_background_update("user_ips_remove_dupes")
+            await self.db_pool.updates._end_background_update("user_ips_remove_dupes")
 
         return batch_size
 
-    @defer.inlineCallbacks
-    def _devices_last_seen_update(self, progress, batch_size):
+    async def _devices_last_seen_update(self, progress, batch_size):
         """Background update to insert last seen info into devices table
         """
 
@@ -346,12 +341,12 @@ class ClientIpBackgroundUpdateStore(SQLBaseStore):
 
             return len(rows)
 
-        updated = yield self.db_pool.runInteraction(
+        updated = await self.db_pool.runInteraction(
             "_devices_last_seen_update", _devices_last_seen_update_txn
         )
 
         if not updated:
-            yield self.db_pool.updates._end_background_update("devices_last_seen")
+            await self.db_pool.updates._end_background_update("devices_last_seen")
 
         return updated
 
@@ -380,8 +375,7 @@ class ClientIpStore(ClientIpBackgroundUpdateStore):
         if self.user_ips_max_age:
             self._clock.looping_call(self._prune_old_user_ips, 5 * 1000)
 
-    @defer.inlineCallbacks
-    def insert_client_ip(
+    async def insert_client_ip(
         self, user_id, access_token, ip, user_agent, device_id, now=None
     ):
         if not now:
@@ -392,7 +386,7 @@ class ClientIpStore(ClientIpBackgroundUpdateStore):
             last_seen = self.client_ip_last_seen.get(key)
         except KeyError:
             last_seen = None
-        yield self.populate_monthly_active_users(user_id)
+        await self.populate_monthly_active_users(user_id)
         # Rate-limited inserts
         if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
             return
@@ -461,25 +455,25 @@ class ClientIpStore(ClientIpBackgroundUpdateStore):
                 # Failed to upsert, log and continue
                 logger.error("Failed to insert client IP %r: %r", entry, e)
 
-    @defer.inlineCallbacks
-    def get_last_client_ip_by_device(self, user_id, device_id):
+    async def get_last_client_ip_by_device(
+        self, user_id: str, device_id: Optional[str]
+    ) -> Dict[Tuple[str, str], dict]:
         """For each device_id listed, give the user_ip it was last seen on
 
         Args:
-            user_id (str)
-            device_id (str): If None fetches all devices for the user
+            user_id: The user to fetch devices for.
+            device_id: If None fetches all devices for the user
 
         Returns:
-            defer.Deferred: resolves to a dict, where the keys
-            are (user_id, device_id) tuples. The values are also dicts, with
-            keys giving the column names
+            A dictionary mapping a tuple of (user_id, device_id) to dicts, with
+            keys giving the column names from the devices table.
         """
 
         keyvalues = {"user_id": user_id}
         if device_id is not None:
             keyvalues["device_id"] = device_id
 
-        res = yield self.db_pool.simple_select_list(
+        res = await self.db_pool.simple_select_list(
             table="devices",
             keyvalues=keyvalues,
             retcols=("user_id", "ip", "user_agent", "device_id", "last_seen"),
@@ -501,8 +495,7 @@ class ClientIpStore(ClientIpBackgroundUpdateStore):
                     }
         return ret
 
-    @defer.inlineCallbacks
-    def get_user_ip_and_agents(self, user):
+    async def get_user_ip_and_agents(self, user):
         user_id = user.to_string()
         results = {}
 
@@ -512,7 +505,7 @@ class ClientIpStore(ClientIpBackgroundUpdateStore):
                 user_agent, _, last_seen = self._batch_row_update[key]
                 results[(access_token, ip)] = (user_agent, last_seen)
 
-        rows = yield self.db_pool.simple_select_list(
+        rows = await self.db_pool.simple_select_list(
             table="user_ips",
             keyvalues={"user_id": user_id},
             retcols=["access_token", "ip", "user_agent", "last_seen"],
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index 874ecdf8d2..76ec954f44 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -16,13 +16,12 @@
 import logging
 from typing import List, Tuple
 
-from canonicaljson import json
-
 from twisted.internet import defer
 
 from synapse.logging.opentracing import log_kv, set_tag, trace
 from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
 from synapse.storage.database import DatabasePool
+from synapse.util import json_encoder
 from synapse.util.caches.expiringcache import ExpiringCache
 
 logger = logging.getLogger(__name__)
@@ -354,7 +353,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore)
             )
             rows = []
             for destination, edu in remote_messages_by_destination.items():
-                edu_json = json.dumps(edu)
+                edu_json = json_encoder.encode(edu)
                 rows.append((destination, stream_id, now_ms, edu_json))
             txn.executemany(sql, rows)
 
@@ -432,7 +431,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore)
                 # Handle wildcard device_ids.
                 sql = "SELECT device_id FROM devices WHERE user_id = ?"
                 txn.execute(sql, (user_id,))
-                message_json = json.dumps(messages_by_device["*"])
+                message_json = json_encoder.encode(messages_by_device["*"])
                 for row in txn:
                     # Add the message for all devices for this user on this
                     # server.
@@ -454,7 +453,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore)
                     # Only insert into the local inbox if the device exists on
                     # this server
                     device = row[0]
-                    message_json = json.dumps(messages_by_device[device])
+                    message_json = json_encoder.encode(messages_by_device[device])
                     messages_json_for_user[device] = message_json
 
             if messages_json_for_user:
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index 88a7aadfc6..7a5f0bab05 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -17,8 +17,6 @@
 import logging
 from typing import List, Optional, Set, Tuple
 
-from canonicaljson import json
-
 from twisted.internet import defer
 
 from synapse.api.errors import Codes, StoreError
@@ -36,6 +34,7 @@ from synapse.storage.database import (
     make_tuple_comparison_clause,
 )
 from synapse.types import Collection, get_verify_key_from_cross_signing_key
+from synapse.util import json_encoder
 from synapse.util.caches.descriptors import (
     Cache,
     cached,
@@ -137,7 +136,9 @@ class DeviceWorkerStore(SQLBaseStore):
         master_key_by_user = {}
         self_signing_key_by_user = {}
         for user in users:
-            cross_signing_key = yield self.get_e2e_cross_signing_key(user, "master")
+            cross_signing_key = yield defer.ensureDeferred(
+                self.get_e2e_cross_signing_key(user, "master")
+            )
             if cross_signing_key:
                 key_id, verify_key = get_verify_key_from_cross_signing_key(
                     cross_signing_key
@@ -150,8 +151,8 @@ class DeviceWorkerStore(SQLBaseStore):
                     "device_id": verify_key.version,
                 }
 
-            cross_signing_key = yield self.get_e2e_cross_signing_key(
-                user, "self_signing"
+            cross_signing_key = yield defer.ensureDeferred(
+                self.get_e2e_cross_signing_key(user, "self_signing")
             )
             if cross_signing_key:
                 key_id, verify_key = get_verify_key_from_cross_signing_key(
@@ -247,7 +248,7 @@ class DeviceWorkerStore(SQLBaseStore):
             destination (str): The host the device updates are intended for
             from_stream_id (int): The minimum stream_id to filter updates by, exclusive
             query_map (Dict[(str, str): (int, str|None)]): Dictionary mapping
-                user_id/device_id to update stream_id and the relevent json-encoded
+                user_id/device_id to update stream_id and the relevant json-encoded
                 opentracing context
 
         Returns:
@@ -397,7 +398,7 @@ class DeviceWorkerStore(SQLBaseStore):
             values={
                 "stream_id": stream_id,
                 "from_user_id": from_user_id,
-                "user_ids": json.dumps(user_ids),
+                "user_ids": json_encoder.encode(user_ids),
             },
         )
 
@@ -600,7 +601,7 @@ class DeviceWorkerStore(SQLBaseStore):
             between the requested tokens due to the limit.
 
             The token returned can be used in a subsequent call to this
-            function to get further updatees.
+            function to get further updates.
 
             The updates are a list of 2-tuples of stream ID and the row data
         """
@@ -1032,7 +1033,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
                 txn,
                 table="device_lists_remote_cache",
                 keyvalues={"user_id": user_id, "device_id": device_id},
-                values={"content": json.dumps(content)},
+                values={"content": json_encoder.encode(content)},
                 # we don't need to lock, because we assume we are the only thread
                 # updating this user's devices.
                 lock=False,
@@ -1088,7 +1089,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
                 {
                     "user_id": user_id,
                     "device_id": content["device_id"],
-                    "content": json.dumps(content),
+                    "content": json_encoder.encode(content),
                 }
                 for content in devices
             ],
@@ -1209,7 +1210,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
                     "device_id": device_id,
                     "sent": False,
                     "ts": now,
-                    "opentracing_context": json.dumps(context)
+                    "opentracing_context": json_encoder.encode(context)
                     if whitelisted_homeserver(destination)
                     else "{}",
                 }
diff --git a/synapse/storage/databases/main/directory.py b/synapse/storage/databases/main/directory.py
index 7819bfcbb3..037e02603c 100644
--- a/synapse/storage/databases/main/directory.py
+++ b/synapse/storage/databases/main/directory.py
@@ -14,30 +14,29 @@
 # limitations under the License.
 
 from collections import namedtuple
-from typing import Optional
-
-from twisted.internet import defer
+from typing import Iterable, Optional
 
 from synapse.api.errors import SynapseError
 from synapse.storage._base import SQLBaseStore
+from synapse.types import RoomAlias
 from synapse.util.caches.descriptors import cached
 
 RoomAliasMapping = namedtuple("RoomAliasMapping", ("room_id", "room_alias", "servers"))
 
 
 class DirectoryWorkerStore(SQLBaseStore):
-    @defer.inlineCallbacks
-    def get_association_from_room_alias(self, room_alias):
-        """ Get's the room_id and server list for a given room_alias
+    async def get_association_from_room_alias(
+        self, room_alias: RoomAlias
+    ) -> Optional[RoomAliasMapping]:
+        """Gets the room_id and server list for a given room_alias
 
         Args:
-            room_alias (RoomAlias)
+            room_alias: The alias to translate to an ID.
 
         Returns:
-            Deferred: results in namedtuple with keys "room_id" and
-            "servers" or None if no association can be found
+            The room alias mapping or None if no association can be found.
         """
-        room_id = yield self.db_pool.simple_select_one_onecol(
+        room_id = await self.db_pool.simple_select_one_onecol(
             "room_aliases",
             {"room_alias": room_alias.to_string()},
             "room_id",
@@ -48,7 +47,7 @@ class DirectoryWorkerStore(SQLBaseStore):
         if not room_id:
             return None
 
-        servers = yield self.db_pool.simple_select_onecol(
+        servers = await self.db_pool.simple_select_onecol(
             "room_alias_servers",
             {"room_alias": room_alias.to_string()},
             "server",
@@ -79,18 +78,20 @@ class DirectoryWorkerStore(SQLBaseStore):
 
 
 class DirectoryStore(DirectoryWorkerStore):
-    @defer.inlineCallbacks
-    def create_room_alias_association(self, room_alias, room_id, servers, creator=None):
+    async def create_room_alias_association(
+        self,
+        room_alias: RoomAlias,
+        room_id: str,
+        servers: Iterable[str],
+        creator: Optional[str] = None,
+    ) -> None:
         """ Creates an association between a room alias and room_id/servers
 
         Args:
-            room_alias (RoomAlias)
-            room_id (str)
-            servers (list)
-            creator (str): Optional user_id of creator.
-
-        Returns:
-            Deferred
+            room_alias: The alias to create.
+            room_id: The target of the alias.
+            servers: A list of servers through which it may be possible to join the room
+            creator: Optional user_id of creator.
         """
 
         def alias_txn(txn):
@@ -118,24 +119,22 @@ class DirectoryStore(DirectoryWorkerStore):
             )
 
         try:
-            ret = yield self.db_pool.runInteraction(
+            await self.db_pool.runInteraction(
                 "create_room_alias_association", alias_txn
             )
         except self.database_engine.module.IntegrityError:
             raise SynapseError(
                 409, "Room alias %s already exists" % room_alias.to_string()
             )
-        return ret
 
-    @defer.inlineCallbacks
-    def delete_room_alias(self, room_alias):
-        room_id = yield self.db_pool.runInteraction(
+    async def delete_room_alias(self, room_alias: RoomAlias) -> str:
+        room_id = await self.db_pool.runInteraction(
             "delete_room_alias", self._delete_room_alias_txn, room_alias
         )
 
         return room_id
 
-    def _delete_room_alias_txn(self, txn, room_alias):
+    def _delete_room_alias_txn(self, txn, room_alias: RoomAlias) -> str:
         txn.execute(
             "SELECT room_id FROM room_aliases WHERE room_alias = ?",
             (room_alias.to_string(),),
diff --git a/synapse/storage/databases/main/e2e_room_keys.py b/synapse/storage/databases/main/e2e_room_keys.py
index 90152edc3c..2eeb9f97dc 100644
--- a/synapse/storage/databases/main/e2e_room_keys.py
+++ b/synapse/storage/databases/main/e2e_room_keys.py
@@ -14,18 +14,16 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from canonicaljson import json
-
-from twisted.internet import defer
-
 from synapse.api.errors import StoreError
 from synapse.logging.opentracing import log_kv, trace
 from synapse.storage._base import SQLBaseStore, db_to_json
+from synapse.util import json_encoder
 
 
 class EndToEndRoomKeyStore(SQLBaseStore):
-    @defer.inlineCallbacks
-    def update_e2e_room_key(self, user_id, version, room_id, session_id, room_key):
+    async def update_e2e_room_key(
+        self, user_id, version, room_id, session_id, room_key
+    ):
         """Replaces the encrypted E2E room key for a given session in a given backup
 
         Args:
@@ -38,7 +36,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
             StoreError
         """
 
-        yield self.db_pool.simple_update_one(
+        await self.db_pool.simple_update_one(
             table="e2e_room_keys",
             keyvalues={
                 "user_id": user_id,
@@ -50,13 +48,12 @@ class EndToEndRoomKeyStore(SQLBaseStore):
                 "first_message_index": room_key["first_message_index"],
                 "forwarded_count": room_key["forwarded_count"],
                 "is_verified": room_key["is_verified"],
-                "session_data": json.dumps(room_key["session_data"]),
+                "session_data": json_encoder.encode(room_key["session_data"]),
             },
             desc="update_e2e_room_key",
         )
 
-    @defer.inlineCallbacks
-    def add_e2e_room_keys(self, user_id, version, room_keys):
+    async def add_e2e_room_keys(self, user_id, version, room_keys):
         """Bulk add room keys to a given backup.
 
         Args:
@@ -77,7 +74,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
                     "first_message_index": room_key["first_message_index"],
                     "forwarded_count": room_key["forwarded_count"],
                     "is_verified": room_key["is_verified"],
-                    "session_data": json.dumps(room_key["session_data"]),
+                    "session_data": json_encoder.encode(room_key["session_data"]),
                 }
             )
             log_kv(
@@ -89,13 +86,12 @@ class EndToEndRoomKeyStore(SQLBaseStore):
                 }
             )
 
-        yield self.db_pool.simple_insert_many(
+        await self.db_pool.simple_insert_many(
             table="e2e_room_keys", values=values, desc="add_e2e_room_keys"
         )
 
     @trace
-    @defer.inlineCallbacks
-    def get_e2e_room_keys(self, user_id, version, room_id=None, session_id=None):
+    async def get_e2e_room_keys(self, user_id, version, room_id=None, session_id=None):
         """Bulk get the E2E room keys for a given backup, optionally filtered to a given
         room, or a given session.
 
@@ -110,7 +106,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
                 the backup (or for the specified room)
 
         Returns:
-            A deferred list of dicts giving the session_data and message metadata for
+            A list of dicts giving the session_data and message metadata for
             these room keys.
         """
 
@@ -125,7 +121,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
             if session_id:
                 keyvalues["session_id"] = session_id
 
-        rows = yield self.db_pool.simple_select_list(
+        rows = await self.db_pool.simple_select_list(
             table="e2e_room_keys",
             keyvalues=keyvalues,
             retcols=(
@@ -243,8 +239,9 @@ class EndToEndRoomKeyStore(SQLBaseStore):
         )
 
     @trace
-    @defer.inlineCallbacks
-    def delete_e2e_room_keys(self, user_id, version, room_id=None, session_id=None):
+    async def delete_e2e_room_keys(
+        self, user_id, version, room_id=None, session_id=None
+    ):
         """Bulk delete the E2E room keys for a given backup, optionally filtered to a given
         room or a given session.
 
@@ -259,7 +256,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
                 the backup (or for the specified room)
 
         Returns:
-            A deferred of the deletion transaction
+            The deletion transaction
         """
 
         keyvalues = {"user_id": user_id, "version": int(version)}
@@ -268,7 +265,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
             if session_id:
                 keyvalues["session_id"] = session_id
 
-        yield self.db_pool.simple_delete(
+        await self.db_pool.simple_delete(
             table="e2e_room_keys", keyvalues=keyvalues, desc="delete_e2e_room_keys"
         )
 
@@ -360,7 +357,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
                     "user_id": user_id,
                     "version": new_version,
                     "algorithm": info["algorithm"],
-                    "auth_data": json.dumps(info["auth_data"]),
+                    "auth_data": json_encoder.encode(info["auth_data"]),
                 },
             )
 
@@ -387,7 +384,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
         updatevalues = {}
 
         if info is not None and "auth_data" in info:
-            updatevalues["auth_data"] = json.dumps(info["auth_data"])
+            updatevalues["auth_data"] = json_encoder.encode(info["auth_data"])
         if version_etag is not None:
             updatevalues["etag"] = version_etag
 
diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py
index 40354b8304..f93e0d320d 100644
--- a/synapse/storage/databases/main/end_to_end_keys.py
+++ b/synapse/storage/databases/main/end_to_end_keys.py
@@ -14,24 +14,23 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from typing import Dict, List, Tuple
+from typing import Dict, Iterable, List, Optional, Tuple
 
-from canonicaljson import encode_canonical_json, json
+from canonicaljson import encode_canonical_json
 
 from twisted.enterprise.adbapi import Connection
-from twisted.internet import defer
 
 from synapse.logging.opentracing import log_kv, set_tag, trace
 from synapse.storage._base import SQLBaseStore, db_to_json
 from synapse.storage.database import make_in_list_sql_clause
+from synapse.util import json_encoder
 from synapse.util.caches.descriptors import cached, cachedList
 from synapse.util.iterutils import batch_iter
 
 
 class EndToEndKeyWorkerStore(SQLBaseStore):
     @trace
-    @defer.inlineCallbacks
-    def get_e2e_device_keys(
+    async def get_e2e_device_keys(
         self, query_list, include_all_devices=False, include_deleted_devices=False
     ):
         """Fetch a list of device keys.
@@ -51,7 +50,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
         if not query_list:
             return {}
 
-        results = yield self.db_pool.runInteraction(
+        results = await self.db_pool.runInteraction(
             "get_e2e_device_keys",
             self._get_e2e_device_keys_txn,
             query_list,
@@ -174,8 +173,9 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
         log_kv(result)
         return result
 
-    @defer.inlineCallbacks
-    def get_e2e_one_time_keys(self, user_id, device_id, key_ids):
+    async def get_e2e_one_time_keys(
+        self, user_id: str, device_id: str, key_ids: List[str]
+    ) -> Dict[Tuple[str, str], str]:
         """Retrieve a number of one-time keys for a user
 
         Args:
@@ -185,11 +185,10 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
                 retrieve
 
         Returns:
-            deferred resolving to Dict[(str, str), str]: map from (algorithm,
-            key_id) to json string for key
+            A map from (algorithm, key_id) to json string for key
         """
 
-        rows = yield self.db_pool.simple_select_many_batch(
+        rows = await self.db_pool.simple_select_many_batch(
             table="e2e_one_time_keys_json",
             column="key_id",
             iterable=key_ids,
@@ -201,17 +200,21 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
         log_kv({"message": "Fetched one time keys for user", "one_time_keys": result})
         return result
 
-    @defer.inlineCallbacks
-    def add_e2e_one_time_keys(self, user_id, device_id, time_now, new_keys):
+    async def add_e2e_one_time_keys(
+        self,
+        user_id: str,
+        device_id: str,
+        time_now: int,
+        new_keys: Iterable[Tuple[str, str, str]],
+    ) -> None:
         """Insert some new one time keys for a device. Errors if any of the
         keys already exist.
 
         Args:
-            user_id(str): id of user to get keys for
-            device_id(str): id of device to get keys for
-            time_now(long): insertion time to record (ms since epoch)
-            new_keys(iterable[(str, str, str)]: keys to add - each a tuple of
-                (algorithm, key_id, key json)
+            user_id: id of user to get keys for
+            device_id: id of device to get keys for
+            time_now: insertion time to record (ms since epoch)
+            new_keys: keys to add - each a tuple of (algorithm, key_id, key json)
         """
 
         def _add_e2e_one_time_keys(txn):
@@ -241,7 +244,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
                 txn, self.count_e2e_one_time_keys, (user_id, device_id)
             )
 
-        yield self.db_pool.runInteraction(
+        await self.db_pool.runInteraction(
             "add_e2e_one_time_keys_insert", _add_e2e_one_time_keys
         )
 
@@ -268,22 +271,23 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
             "count_e2e_one_time_keys", _count_e2e_one_time_keys
         )
 
-    @defer.inlineCallbacks
-    def get_e2e_cross_signing_key(self, user_id, key_type, from_user_id=None):
+    async def get_e2e_cross_signing_key(
+        self, user_id: str, key_type: str, from_user_id: Optional[str] = None
+    ) -> Optional[dict]:
         """Returns a user's cross-signing key.
 
         Args:
-            user_id (str): the user whose key is being requested
-            key_type (str): the type of key that is being requested: either 'master'
+            user_id: the user whose key is being requested
+            key_type: the type of key that is being requested: either 'master'
                 for a master key, 'self_signing' for a self-signing key, or
                 'user_signing' for a user-signing key
-            from_user_id (str): if specified, signatures made by this user on
+            from_user_id: if specified, signatures made by this user on
                 the self-signing key will be included in the result
 
         Returns:
             dict of the key data or None if not found
         """
-        res = yield self.get_e2e_cross_signing_keys_bulk([user_id], from_user_id)
+        res = await self.get_e2e_cross_signing_keys_bulk([user_id], from_user_id)
         user_keys = res.get(user_id)
         if not user_keys:
             return None
@@ -449,28 +453,26 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
 
         return keys
 
-    @defer.inlineCallbacks
-    def get_e2e_cross_signing_keys_bulk(
-        self, user_ids: List[str], from_user_id: str = None
-    ) -> defer.Deferred:
+    async def get_e2e_cross_signing_keys_bulk(
+        self, user_ids: List[str], from_user_id: Optional[str] = None
+    ) -> Dict[str, Dict[str, dict]]:
         """Returns the cross-signing keys for a set of users.
 
         Args:
-            user_ids (list[str]): the users whose keys are being requested
-            from_user_id (str): if specified, signatures made by this user on
+            user_ids: the users whose keys are being requested
+            from_user_id: if specified, signatures made by this user on
                 the self-signing keys will be included in the result
 
         Returns:
-            Deferred[dict[str, dict[str, dict]]]: map of user ID to key type to
-                key data.  If a user's cross-signing keys were not found, either
-                their user ID will not be in the dict, or their user ID will map
-                to None.
+            A map of user ID to key type to key data.  If a user's cross-signing
+            keys were not found, either their user ID will not be in the dict,
+            or their user ID will map to None.
         """
 
-        result = yield self._get_bare_e2e_cross_signing_keys_bulk(user_ids)
+        result = await self._get_bare_e2e_cross_signing_keys_bulk(user_ids)
 
         if from_user_id:
-            result = yield self.db_pool.runInteraction(
+            result = await self.db_pool.runInteraction(
                 "get_e2e_cross_signing_signatures",
                 self._get_e2e_cross_signing_signatures_txn,
                 result,
@@ -700,7 +702,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
                 values={
                     "user_id": user_id,
                     "keytype": key_type,
-                    "keydata": json.dumps(key),
+                    "keydata": json_encoder.encode(key),
                     "stream_id": stream_id,
                 },
             )
diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index b8cefb4d5e..7c246d3e4c 100644
--- a/synapse/storage/databases/main/event_push_actions.py
+++ b/synapse/storage/databases/main/event_push_actions.py
@@ -17,11 +17,10 @@
 import logging
 from typing import List
 
-from canonicaljson import json
-
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage._base import LoggingTransaction, SQLBaseStore, db_to_json
 from synapse.storage.database import DatabasePool
+from synapse.util import json_encoder
 from synapse.util.caches.descriptors import cachedInlineCallbacks
 
 logger = logging.getLogger(__name__)
@@ -50,7 +49,7 @@ def _serialize_action(actions, is_highlight):
     else:
         if actions == DEFAULT_NOTIF_ACTION:
             return ""
-    return json.dumps(actions)
+    return json_encoder.encode(actions)
 
 
 def _deserialize_action(actions, is_highlight):
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 4d8a24ce4b..1a68bf32cb 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -53,47 +53,6 @@ event_counter = Counter(
     ["type", "origin_type", "origin_entity"],
 )
 
-STATE_EVENT_TYPES_TO_MARK_UNREAD = {
-    EventTypes.Topic,
-    EventTypes.Name,
-    EventTypes.RoomAvatar,
-    EventTypes.Tombstone,
-}
-
-
-def should_count_as_unread(event: EventBase, context: EventContext) -> bool:
-    # Exclude rejected and soft-failed events.
-    if context.rejected or event.internal_metadata.is_soft_failed():
-        return False
-
-    # Exclude notices.
-    if (
-        not event.is_state()
-        and event.type == EventTypes.Message
-        and event.content.get("msgtype") == "m.notice"
-    ):
-        return False
-
-    # Exclude edits.
-    relates_to = event.content.get("m.relates_to", {})
-    if relates_to.get("rel_type") == RelationTypes.REPLACE:
-        return False
-
-    # Mark events that have a non-empty string body as unread.
-    body = event.content.get("body")
-    if isinstance(body, str) and body:
-        return True
-
-    # Mark some state events as unread.
-    if event.is_state() and event.type in STATE_EVENT_TYPES_TO_MARK_UNREAD:
-        return True
-
-    # Mark encrypted events as unread.
-    if not event.is_state() and event.type == EventTypes.Encrypted:
-        return True
-
-    return False
-
 
 def encode_json(json_object):
     """
@@ -239,10 +198,6 @@ class PersistEventsStore:
 
                 event_counter.labels(event.type, origin_type, origin_entity).inc()
 
-                self.store.get_unread_message_count_for_user.invalidate_many(
-                    (event.room_id,),
-                )
-
             for room_id, new_state in current_state_for_room.items():
                 self.store.get_current_state_ids.prefill((room_id,), new_state)
 
@@ -864,9 +819,8 @@ class PersistEventsStore:
                     "contains_url": (
                         "url" in event.content and isinstance(event.content["url"], str)
                     ),
-                    "count_as_unread": should_count_as_unread(event, context),
                 }
-                for event, context in events_and_contexts
+                for event, _ in events_and_contexts
             ],
         )
 
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index a7b7393f6e..755b7a2a85 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -41,15 +41,9 @@ from synapse.replication.tcp.streams import BackfillStream
 from synapse.replication.tcp.streams.events import EventsStream
 from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
 from synapse.storage.database import DatabasePool
-from synapse.storage.types import Cursor
 from synapse.storage.util.id_generators import StreamIdGenerator
 from synapse.types import get_domain_from_id
-from synapse.util.caches.descriptors import (
-    Cache,
-    _CacheContext,
-    cached,
-    cachedInlineCallbacks,
-)
+from synapse.util.caches.descriptors import Cache, cached, cachedInlineCallbacks
 from synapse.util.iterutils import batch_iter
 from synapse.util.metrics import Measure
 
@@ -1364,84 +1358,6 @@ class EventsWorkerStore(SQLBaseStore):
             desc="get_next_event_to_expire", func=get_next_event_to_expire_txn
         )
 
-    @cached(tree=True, cache_context=True)
-    async def get_unread_message_count_for_user(
-        self, room_id: str, user_id: str, cache_context: _CacheContext,
-    ) -> int:
-        """Retrieve the count of unread messages for the given room and user.
-
-        Args:
-            room_id: The ID of the room to count unread messages in.
-            user_id: The ID of the user to count unread messages for.
-
-        Returns:
-            The number of unread messages for the given user in the given room.
-        """
-        with Measure(self._clock, "get_unread_message_count_for_user"):
-            last_read_event_id = await self.get_last_receipt_event_id_for_user(
-                user_id=user_id,
-                room_id=room_id,
-                receipt_type="m.read",
-                on_invalidate=cache_context.invalidate,
-            )
-
-            return await self.db_pool.runInteraction(
-                "get_unread_message_count_for_user",
-                self._get_unread_message_count_for_user_txn,
-                user_id,
-                room_id,
-                last_read_event_id,
-            )
-
-    def _get_unread_message_count_for_user_txn(
-        self,
-        txn: Cursor,
-        user_id: str,
-        room_id: str,
-        last_read_event_id: Optional[str],
-    ) -> int:
-        if last_read_event_id:
-            # Get the stream ordering for the last read event.
-            stream_ordering = self.db_pool.simple_select_one_onecol_txn(
-                txn=txn,
-                table="events",
-                keyvalues={"room_id": room_id, "event_id": last_read_event_id},
-                retcol="stream_ordering",
-            )
-        else:
-            # If there's no read receipt for that room, it probably means the user hasn't
-            # opened it yet, in which case use the stream ID of their join event.
-            # We can't just set it to 0 otherwise messages from other local users from
-            # before this user joined will be counted as well.
-            txn.execute(
-                """
-                SELECT stream_ordering FROM local_current_membership
-                LEFT JOIN events USING (event_id, room_id)
-                WHERE membership = 'join'
-                    AND user_id = ?
-                    AND room_id = ?
-                """,
-                (user_id, room_id),
-            )
-            row = txn.fetchone()
-
-            if row is None:
-                return 0
-
-            stream_ordering = row[0]
-
-        # Count the messages that qualify as unread after the stream ordering we've just
-        # retrieved.
-        sql = """
-            SELECT COUNT(*) FROM events
-            WHERE sender != ? AND room_id = ? AND stream_ordering > ? AND count_as_unread
-        """
-
-        txn.execute(sql, (user_id, room_id, stream_ordering))
-        row = txn.fetchone()
-
-        return row[0] if row else 0
-
 
 AllNewEventsResult = namedtuple(
     "AllNewEventsResult",
diff --git a/synapse/storage/databases/main/group_server.py b/synapse/storage/databases/main/group_server.py
index a98181f445..75ea6d4b2f 100644
--- a/synapse/storage/databases/main/group_server.py
+++ b/synapse/storage/databases/main/group_server.py
@@ -16,12 +16,11 @@
 
 from typing import List, Tuple
 
-from canonicaljson import json
-
 from twisted.internet import defer
 
 from synapse.api.errors import SynapseError
 from synapse.storage._base import SQLBaseStore, db_to_json
+from synapse.util import json_encoder
 
 # The category ID for the "default" category. We don't store as null in the
 # database to avoid the fun of null != null
@@ -752,7 +751,7 @@ class GroupServerStore(GroupServerWorkerStore):
         if profile is None:
             insertion_values["profile"] = "{}"
         else:
-            update_values["profile"] = json.dumps(profile)
+            update_values["profile"] = json_encoder.encode(profile)
 
         if is_public is None:
             insertion_values["is_public"] = True
@@ -783,7 +782,7 @@ class GroupServerStore(GroupServerWorkerStore):
         if profile is None:
             insertion_values["profile"] = "{}"
         else:
-            update_values["profile"] = json.dumps(profile)
+            update_values["profile"] = json_encoder.encode(profile)
 
         if is_public is None:
             insertion_values["is_public"] = True
@@ -1007,7 +1006,7 @@ class GroupServerStore(GroupServerWorkerStore):
                         "group_id": group_id,
                         "user_id": user_id,
                         "valid_until_ms": remote_attestation["valid_until_ms"],
-                        "attestation_json": json.dumps(remote_attestation),
+                        "attestation_json": json_encoder.encode(remote_attestation),
                     },
                 )
 
@@ -1131,7 +1130,7 @@ class GroupServerStore(GroupServerWorkerStore):
                     "is_admin": is_admin,
                     "membership": membership,
                     "is_publicised": is_publicised,
-                    "content": json.dumps(content),
+                    "content": json_encoder.encode(content),
                 },
             )
 
@@ -1143,7 +1142,7 @@ class GroupServerStore(GroupServerWorkerStore):
                     "group_id": group_id,
                     "user_id": user_id,
                     "type": "membership",
-                    "content": json.dumps(
+                    "content": json_encoder.encode(
                         {"membership": membership, "content": content}
                     ),
                 },
@@ -1171,7 +1170,7 @@ class GroupServerStore(GroupServerWorkerStore):
                             "group_id": group_id,
                             "user_id": user_id,
                             "valid_until_ms": remote_attestation["valid_until_ms"],
-                            "attestation_json": json.dumps(remote_attestation),
+                            "attestation_json": json_encoder.encode(remote_attestation),
                         },
                     )
             else:
@@ -1240,7 +1239,7 @@ class GroupServerStore(GroupServerWorkerStore):
             keyvalues={"group_id": group_id, "user_id": user_id},
             updatevalues={
                 "valid_until_ms": attestation["valid_until_ms"],
-                "attestation_json": json.dumps(attestation),
+                "attestation_json": json_encoder.encode(attestation),
             },
             desc="update_remote_attestion",
         )
diff --git a/synapse/storage/databases/main/monthly_active_users.py b/synapse/storage/databases/main/monthly_active_users.py
index 02b01d9619..e71cdd2cb4 100644
--- a/synapse/storage/databases/main/monthly_active_users.py
+++ b/synapse/storage/databases/main/monthly_active_users.py
@@ -15,8 +15,6 @@
 import logging
 from typing import List
 
-from twisted.internet import defer
-
 from synapse.storage._base import SQLBaseStore
 from synapse.storage.database import DatabasePool, make_in_list_sql_clause
 from synapse.util.caches.descriptors import cached
@@ -252,16 +250,12 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore):
             "reap_monthly_active_users", _reap_users, reserved_users
         )
 
-    @defer.inlineCallbacks
-    def upsert_monthly_active_user(self, user_id):
+    async def upsert_monthly_active_user(self, user_id: str) -> None:
         """Updates or inserts the user into the monthly active user table, which
         is used to track the current MAU usage of the server
 
         Args:
-            user_id (str): user to add/update
-
-        Returns:
-            Deferred
+            user_id: user to add/update
         """
         # Support user never to be included in MAU stats. Note I can't easily call this
         # from upsert_monthly_active_user_txn because then I need a _txn form of
@@ -271,11 +265,11 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore):
         # _initialise_reserved_users reasoning that it would be very strange to
         #  include a support user in this context.
 
-        is_support = yield self.is_support_user(user_id)
+        is_support = await self.is_support_user(user_id)
         if is_support:
             return
 
-        yield self.db_pool.runInteraction(
+        await self.db_pool.runInteraction(
             "upsert_monthly_active_user", self.upsert_monthly_active_user_txn, user_id
         )
 
@@ -322,8 +316,7 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore):
 
         return is_insert
 
-    @defer.inlineCallbacks
-    def populate_monthly_active_users(self, user_id):
+    async def populate_monthly_active_users(self, user_id):
         """Checks on the state of monthly active user limits and optionally
         add the user to the monthly active tables
 
@@ -332,14 +325,14 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore):
         """
         if self._limit_usage_by_mau or self._mau_stats_only:
             # Trial users and guests should not be included as part of MAU group
-            is_guest = yield self.is_guest(user_id)
+            is_guest = await self.is_guest(user_id)
             if is_guest:
                 return
-            is_trial = yield self.is_trial_user(user_id)
+            is_trial = await self.is_trial_user(user_id)
             if is_trial:
                 return
 
-            last_seen_timestamp = yield self.user_last_seen_monthly_active(user_id)
+            last_seen_timestamp = await self.user_last_seen_monthly_active(user_id)
             now = self.hs.get_clock().time_msec()
 
             # We want to reduce to the total number of db writes, and are happy
@@ -352,10 +345,10 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore):
                 # False, there is no point in checking get_monthly_active_count - it
                 # adds no value and will break the logic if max_mau_value is exceeded.
                 if not self._limit_usage_by_mau:
-                    yield self.upsert_monthly_active_user(user_id)
+                    await self.upsert_monthly_active_user(user_id)
                 else:
-                    count = yield self.get_monthly_active_count()
+                    count = await self.get_monthly_active_count()
                     if count < self._max_mau_value:
-                        yield self.upsert_monthly_active_user(user_id)
+                        await self.upsert_monthly_active_user(user_id)
             elif now - last_seen_timestamp > LAST_SEEN_GRANULARITY:
-                yield self.upsert_monthly_active_user(user_id)
+                await self.upsert_monthly_active_user(user_id)
diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py
index 5fd899326a..19a0211a03 100644
--- a/synapse/storage/databases/main/push_rule.py
+++ b/synapse/storage/databases/main/push_rule.py
@@ -18,8 +18,6 @@ import abc
 import logging
 from typing import List, Tuple, Union
 
-from canonicaljson import json
-
 from twisted.internet import defer
 
 from synapse.push.baserules import list_with_base_rules
@@ -33,6 +31,7 @@ from synapse.storage.databases.main.receipts import ReceiptsWorkerStore
 from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
 from synapse.storage.push_rule import InconsistentRuleException, RuleNotFoundException
 from synapse.storage.util.id_generators import ChainedIdGenerator
+from synapse.util import json_encoder
 from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 
@@ -419,8 +418,8 @@ class PushRuleStore(PushRulesWorkerStore):
         before=None,
         after=None,
     ):
-        conditions_json = json.dumps(conditions)
-        actions_json = json.dumps(actions)
+        conditions_json = json_encoder.encode(conditions)
+        actions_json = json_encoder.encode(actions)
         with self._push_rules_stream_id_gen.get_next() as ids:
             stream_id, event_stream_ordering = ids
             if before or after:
@@ -689,7 +688,7 @@ class PushRuleStore(PushRulesWorkerStore):
 
     @defer.inlineCallbacks
     def set_push_rule_actions(self, user_id, rule_id, actions, is_default_rule):
-        actions_json = json.dumps(actions)
+        actions_json = json_encoder.encode(actions)
 
         def set_push_rule_actions_txn(txn, stream_id, event_stream_ordering):
             if is_default_rule:
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 6255977c92..1920a8a152 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -18,13 +18,12 @@ import abc
 import logging
 from typing import List, Tuple
 
-from canonicaljson import json
-
 from twisted.internet import defer
 
 from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
 from synapse.storage.database import DatabasePool
 from synapse.storage.util.id_generators import StreamIdGenerator
+from synapse.util import json_encoder
 from synapse.util.async_helpers import ObservableDeferred
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
 from synapse.util.caches.stream_change_cache import StreamChangeCache
@@ -459,7 +458,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
             values={
                 "stream_id": stream_id,
                 "event_id": event_id,
-                "data": json.dumps(data),
+                "data": json_encoder.encode(data),
             },
             # receipts_linearized has a unique constraint on
             # (user_id, room_id, receipt_type), so no need to lock
@@ -585,7 +584,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
                 "room_id": room_id,
                 "receipt_type": receipt_type,
                 "user_id": user_id,
-                "event_ids": json.dumps(event_ids),
-                "data": json.dumps(data),
+                "event_ids": json_encoder.encode(event_ids),
+                "data": json_encoder.encode(data),
             },
         )
diff --git a/synapse/storage/databases/main/schema/delta/58/12unread_messages.sql b/synapse/storage/databases/main/schema/delta/58/12unread_messages.sql
deleted file mode 100644
index 531b532c73..0000000000
--- a/synapse/storage/databases/main/schema/delta/58/12unread_messages.sql
+++ /dev/null
@@ -1,18 +0,0 @@
-/* Copyright 2020 The Matrix.org Foundation C.I.C
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
--- Store a boolean value in the events table for whether the event should be counted in
--- the unread_count property of sync responses.
-ALTER TABLE events ADD COLUMN count_as_unread BOOLEAN;
diff --git a/synapse/storage/databases/main/search.py b/synapse/storage/databases/main/search.py
index 2162d0712d..7f8d1880e5 100644
--- a/synapse/storage/databases/main/search.py
+++ b/synapse/storage/databases/main/search.py
@@ -16,8 +16,7 @@
 import logging
 import re
 from collections import namedtuple
-
-from twisted.internet import defer
+from typing import List, Optional
 
 from synapse.api.errors import SynapseError
 from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
@@ -114,8 +113,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
             self.EVENT_SEARCH_USE_GIN_POSTGRES_NAME, self._background_reindex_gin_search
         )
 
-    @defer.inlineCallbacks
-    def _background_reindex_search(self, progress, batch_size):
+    async def _background_reindex_search(self, progress, batch_size):
         # we work through the events table from highest stream id to lowest
         target_min_stream_id = progress["target_min_stream_id_inclusive"]
         max_stream_id = progress["max_stream_id_exclusive"]
@@ -206,19 +204,18 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
 
             return len(event_search_rows)
 
-        result = yield self.db_pool.runInteraction(
+        result = await self.db_pool.runInteraction(
             self.EVENT_SEARCH_UPDATE_NAME, reindex_search_txn
         )
 
         if not result:
-            yield self.db_pool.updates._end_background_update(
+            await self.db_pool.updates._end_background_update(
                 self.EVENT_SEARCH_UPDATE_NAME
             )
 
         return result
 
-    @defer.inlineCallbacks
-    def _background_reindex_gin_search(self, progress, batch_size):
+    async def _background_reindex_gin_search(self, progress, batch_size):
         """This handles old synapses which used GIST indexes, if any;
         converting them back to be GIN as per the actual schema.
         """
@@ -255,15 +252,14 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
                 conn.set_session(autocommit=False)
 
         if isinstance(self.database_engine, PostgresEngine):
-            yield self.db_pool.runWithConnection(create_index)
+            await self.db_pool.runWithConnection(create_index)
 
-        yield self.db_pool.updates._end_background_update(
+        await self.db_pool.updates._end_background_update(
             self.EVENT_SEARCH_USE_GIN_POSTGRES_NAME
         )
         return 1
 
-    @defer.inlineCallbacks
-    def _background_reindex_search_order(self, progress, batch_size):
+    async def _background_reindex_search_order(self, progress, batch_size):
         target_min_stream_id = progress["target_min_stream_id_inclusive"]
         max_stream_id = progress["max_stream_id_exclusive"]
         rows_inserted = progress.get("rows_inserted", 0)
@@ -288,12 +284,12 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
                 )
                 conn.set_session(autocommit=False)
 
-            yield self.db_pool.runWithConnection(create_index)
+            await self.db_pool.runWithConnection(create_index)
 
             pg = dict(progress)
             pg["have_added_indexes"] = True
 
-            yield self.db_pool.runInteraction(
+            await self.db_pool.runInteraction(
                 self.EVENT_SEARCH_ORDER_UPDATE_NAME,
                 self.db_pool.updates._background_update_progress_txn,
                 self.EVENT_SEARCH_ORDER_UPDATE_NAME,
@@ -331,12 +327,12 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
 
             return len(rows), True
 
-        num_rows, finished = yield self.db_pool.runInteraction(
+        num_rows, finished = await self.db_pool.runInteraction(
             self.EVENT_SEARCH_ORDER_UPDATE_NAME, reindex_search_txn
         )
 
         if not finished:
-            yield self.db_pool.updates._end_background_update(
+            await self.db_pool.updates._end_background_update(
                 self.EVENT_SEARCH_ORDER_UPDATE_NAME
             )
 
@@ -347,8 +343,7 @@ class SearchStore(SearchBackgroundUpdateStore):
     def __init__(self, database: DatabasePool, db_conn, hs):
         super(SearchStore, self).__init__(database, db_conn, hs)
 
-    @defer.inlineCallbacks
-    def search_msgs(self, room_ids, search_term, keys):
+    async def search_msgs(self, room_ids, search_term, keys):
         """Performs a full text search over events with given keys.
 
         Args:
@@ -425,7 +420,7 @@ class SearchStore(SearchBackgroundUpdateStore):
         # entire table from the database.
         sql += " ORDER BY rank DESC LIMIT 500"
 
-        results = yield self.db_pool.execute(
+        results = await self.db_pool.execute(
             "search_msgs", self.db_pool.cursor_to_dict, sql, *args
         )
 
@@ -433,7 +428,7 @@ class SearchStore(SearchBackgroundUpdateStore):
 
         # We set redact_behaviour to BLOCK here to prevent redacted events being returned in
         # search results (which is a data leak)
-        events = yield self.get_events_as_list(
+        events = await self.get_events_as_list(
             [r["event_id"] for r in results],
             redact_behaviour=EventRedactBehaviour.BLOCK,
         )
@@ -442,11 +437,11 @@ class SearchStore(SearchBackgroundUpdateStore):
 
         highlights = None
         if isinstance(self.database_engine, PostgresEngine):
-            highlights = yield self._find_highlights_in_postgres(search_query, events)
+            highlights = await self._find_highlights_in_postgres(search_query, events)
 
         count_sql += " GROUP BY room_id"
 
-        count_results = yield self.db_pool.execute(
+        count_results = await self.db_pool.execute(
             "search_rooms_count", self.db_pool.cursor_to_dict, count_sql, *count_args
         )
 
@@ -462,19 +457,25 @@ class SearchStore(SearchBackgroundUpdateStore):
             "count": count,
         }
 
-    @defer.inlineCallbacks
-    def search_rooms(self, room_ids, search_term, keys, limit, pagination_token=None):
+    async def search_rooms(
+        self,
+        room_ids: List[str],
+        search_term: str,
+        keys: List[str],
+        limit,
+        pagination_token: Optional[str] = None,
+    ) -> List[dict]:
         """Performs a full text search over events with given keys.
 
         Args:
-            room_id (list): The room_ids to search in
-            search_term (str): Search term to search for
-            keys (list): List of keys to search in, currently supports
-                "content.body", "content.name", "content.topic"
-            pagination_token (str): A pagination token previously returned
+            room_ids: The room_ids to search in
+            search_term: Search term to search for
+            keys: List of keys to search in, currently supports "content.body",
+                "content.name", "content.topic"
+            pagination_token: A pagination token previously returned
 
         Returns:
-            list of dicts
+            Each match as a dictionary.
         """
         clauses = []
 
@@ -577,7 +578,7 @@ class SearchStore(SearchBackgroundUpdateStore):
 
         args.append(limit)
 
-        results = yield self.db_pool.execute(
+        results = await self.db_pool.execute(
             "search_rooms", self.db_pool.cursor_to_dict, sql, *args
         )
 
@@ -585,7 +586,7 @@ class SearchStore(SearchBackgroundUpdateStore):
 
         # We set redact_behaviour to BLOCK here to prevent redacted events being returned in
         # search results (which is a data leak)
-        events = yield self.get_events_as_list(
+        events = await self.get_events_as_list(
             [r["event_id"] for r in results],
             redact_behaviour=EventRedactBehaviour.BLOCK,
         )
@@ -594,11 +595,11 @@ class SearchStore(SearchBackgroundUpdateStore):
 
         highlights = None
         if isinstance(self.database_engine, PostgresEngine):
-            highlights = yield self._find_highlights_in_postgres(search_query, events)
+            highlights = await self._find_highlights_in_postgres(search_query, events)
 
         count_sql += " GROUP BY room_id"
 
-        count_results = yield self.db_pool.execute(
+        count_results = await self.db_pool.execute(
             "search_rooms_count", self.db_pool.cursor_to_dict, count_sql, *count_args
         )
 
diff --git a/synapse/storage/databases/main/signatures.py b/synapse/storage/databases/main/signatures.py
index dae8e8bd29..be191dd870 100644
--- a/synapse/storage/databases/main/signatures.py
+++ b/synapse/storage/databases/main/signatures.py
@@ -15,8 +15,6 @@
 
 from unpaddedbase64 import encode_base64
 
-from twisted.internet import defer
-
 from synapse.storage._base import SQLBaseStore
 from synapse.util.caches.descriptors import cached, cachedList
 
@@ -40,9 +38,8 @@ class SignatureWorkerStore(SQLBaseStore):
 
         return self.db_pool.runInteraction("get_event_reference_hashes", f)
 
-    @defer.inlineCallbacks
-    def add_event_hashes(self, event_ids):
-        hashes = yield self.get_event_reference_hashes(event_ids)
+    async def add_event_hashes(self, event_ids):
+        hashes = await self.get_event_reference_hashes(event_ids)
         hashes = {
             e_id: {k: encode_base64(v) for k, v in h.items() if k == "sha256"}
             for e_id, h in hashes.items()
diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py
index d73a8e8ab9..af21fe457a 100644
--- a/synapse/storage/databases/main/user_directory.py
+++ b/synapse/storage/databases/main/user_directory.py
@@ -16,8 +16,6 @@
 import logging
 import re
 
-from twisted.internet import defer
-
 from synapse.api.constants import EventTypes, JoinRules
 from synapse.storage.database import DatabasePool
 from synapse.storage.databases.main.state import StateFilter
@@ -59,8 +57,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
             "populate_user_directory_cleanup", self._populate_user_directory_cleanup
         )
 
-    @defer.inlineCallbacks
-    def _populate_user_directory_createtables(self, progress, batch_size):
+    async def _populate_user_directory_createtables(self, progress, batch_size):
 
         # Get all the rooms that we want to process.
         def _make_staging_area(txn):
@@ -102,45 +99,43 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
 
                 self.db_pool.simple_insert_many_txn(txn, TEMP_TABLE + "_users", users)
 
-        new_pos = yield self.get_max_stream_id_in_current_state_deltas()
-        yield self.db_pool.runInteraction(
+        new_pos = await self.get_max_stream_id_in_current_state_deltas()
+        await self.db_pool.runInteraction(
             "populate_user_directory_temp_build", _make_staging_area
         )
-        yield self.db_pool.simple_insert(
+        await self.db_pool.simple_insert(
             TEMP_TABLE + "_position", {"position": new_pos}
         )
 
-        yield self.db_pool.updates._end_background_update(
+        await self.db_pool.updates._end_background_update(
             "populate_user_directory_createtables"
         )
         return 1
 
-    @defer.inlineCallbacks
-    def _populate_user_directory_cleanup(self, progress, batch_size):
+    async def _populate_user_directory_cleanup(self, progress, batch_size):
         """
         Update the user directory stream position, then clean up the old tables.
         """
-        position = yield self.db_pool.simple_select_one_onecol(
+        position = await self.db_pool.simple_select_one_onecol(
             TEMP_TABLE + "_position", None, "position"
         )
-        yield self.update_user_directory_stream_pos(position)
+        await self.update_user_directory_stream_pos(position)
 
         def _delete_staging_area(txn):
             txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms")
             txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_users")
             txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position")
 
-        yield self.db_pool.runInteraction(
+        await self.db_pool.runInteraction(
             "populate_user_directory_cleanup", _delete_staging_area
         )
 
-        yield self.db_pool.updates._end_background_update(
+        await self.db_pool.updates._end_background_update(
             "populate_user_directory_cleanup"
         )
         return 1
 
-    @defer.inlineCallbacks
-    def _populate_user_directory_process_rooms(self, progress, batch_size):
+    async def _populate_user_directory_process_rooms(self, progress, batch_size):
         """
         Args:
             progress (dict)
@@ -151,7 +146,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
 
         # If we don't have progress filed, delete everything.
         if not progress:
-            yield self.delete_all_from_user_dir()
+            await self.delete_all_from_user_dir()
 
         def _get_next_batch(txn):
             # Only fetch 250 rooms, so we don't fetch too many at once, even
@@ -176,13 +171,13 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
 
             return rooms_to_work_on
 
-        rooms_to_work_on = yield self.db_pool.runInteraction(
+        rooms_to_work_on = await self.db_pool.runInteraction(
             "populate_user_directory_temp_read", _get_next_batch
         )
 
         # No more rooms -- complete the transaction.
         if not rooms_to_work_on:
-            yield self.db_pool.updates._end_background_update(
+            await self.db_pool.updates._end_background_update(
                 "populate_user_directory_process_rooms"
             )
             return 1
@@ -195,21 +190,19 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
         processed_event_count = 0
 
         for room_id, event_count in rooms_to_work_on:
-            is_in_room = yield self.is_host_joined(room_id, self.server_name)
+            is_in_room = await self.is_host_joined(room_id, self.server_name)
 
             if is_in_room:
-                is_public = yield self.is_room_world_readable_or_publicly_joinable(
+                is_public = await self.is_room_world_readable_or_publicly_joinable(
                     room_id
                 )
 
-                users_with_profile = yield defer.ensureDeferred(
-                    state.get_current_users_in_room(room_id)
-                )
+                users_with_profile = await state.get_current_users_in_room(room_id)
                 user_ids = set(users_with_profile)
 
                 # Update each user in the user directory.
                 for user_id, profile in users_with_profile.items():
-                    yield self.update_profile_in_user_dir(
+                    await self.update_profile_in_user_dir(
                         user_id, profile.display_name, profile.avatar_url
                     )
 
@@ -223,7 +216,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
                         to_insert.add(user_id)
 
                     if to_insert:
-                        yield self.add_users_in_public_rooms(room_id, to_insert)
+                        await self.add_users_in_public_rooms(room_id, to_insert)
                         to_insert.clear()
                 else:
                     for user_id in user_ids:
@@ -243,22 +236,22 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
                             # If it gets too big, stop and write to the database
                             # to prevent storing too much in RAM.
                             if len(to_insert) >= self.SHARE_PRIVATE_WORKING_SET:
-                                yield self.add_users_who_share_private_room(
+                                await self.add_users_who_share_private_room(
                                     room_id, to_insert
                                 )
                                 to_insert.clear()
 
                     if to_insert:
-                        yield self.add_users_who_share_private_room(room_id, to_insert)
+                        await self.add_users_who_share_private_room(room_id, to_insert)
                         to_insert.clear()
 
             # We've finished a room. Delete it from the table.
-            yield self.db_pool.simple_delete_one(
+            await self.db_pool.simple_delete_one(
                 TEMP_TABLE + "_rooms", {"room_id": room_id}
             )
             # Update the remaining counter.
             progress["remaining"] -= 1
-            yield self.db_pool.runInteraction(
+            await self.db_pool.runInteraction(
                 "populate_user_directory",
                 self.db_pool.updates._background_update_progress_txn,
                 "populate_user_directory_process_rooms",
@@ -273,13 +266,12 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
 
         return processed_event_count
 
-    @defer.inlineCallbacks
-    def _populate_user_directory_process_users(self, progress, batch_size):
+    async def _populate_user_directory_process_users(self, progress, batch_size):
         """
         If search_all_users is enabled, add all of the users to the user directory.
         """
         if not self.hs.config.user_directory_search_all_users:
-            yield self.db_pool.updates._end_background_update(
+            await self.db_pool.updates._end_background_update(
                 "populate_user_directory_process_users"
             )
             return 1
@@ -305,13 +297,13 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
 
             return users_to_work_on
 
-        users_to_work_on = yield self.db_pool.runInteraction(
+        users_to_work_on = await self.db_pool.runInteraction(
             "populate_user_directory_temp_read", _get_next_batch
         )
 
         # No more users -- complete the transaction.
         if not users_to_work_on:
-            yield self.db_pool.updates._end_background_update(
+            await self.db_pool.updates._end_background_update(
                 "populate_user_directory_process_users"
             )
             return 1
@@ -322,18 +314,18 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
         )
 
         for user_id in users_to_work_on:
-            profile = yield self.get_profileinfo(get_localpart_from_id(user_id))
-            yield self.update_profile_in_user_dir(
+            profile = await self.get_profileinfo(get_localpart_from_id(user_id))
+            await self.update_profile_in_user_dir(
                 user_id, profile.display_name, profile.avatar_url
             )
 
             # We've finished processing a user. Delete it from the table.
-            yield self.db_pool.simple_delete_one(
+            await self.db_pool.simple_delete_one(
                 TEMP_TABLE + "_users", {"user_id": user_id}
             )
             # Update the remaining counter.
             progress["remaining"] -= 1
-            yield self.db_pool.runInteraction(
+            await self.db_pool.runInteraction(
                 "populate_user_directory",
                 self.db_pool.updates._background_update_progress_txn,
                 "populate_user_directory_process_users",
@@ -342,8 +334,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
 
         return len(users_to_work_on)
 
-    @defer.inlineCallbacks
-    def is_room_world_readable_or_publicly_joinable(self, room_id):
+    async def is_room_world_readable_or_publicly_joinable(self, room_id):
         """Check if the room is either world_readable or publically joinable
         """
 
@@ -353,20 +344,20 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
             (EventTypes.RoomHistoryVisibility, ""),
         )
 
-        current_state_ids = yield self.get_filtered_current_state_ids(
+        current_state_ids = await self.get_filtered_current_state_ids(
             room_id, StateFilter.from_types(types_to_filter)
         )
 
         join_rules_id = current_state_ids.get((EventTypes.JoinRules, ""))
         if join_rules_id:
-            join_rule_ev = yield self.get_event(join_rules_id, allow_none=True)
+            join_rule_ev = await self.get_event(join_rules_id, allow_none=True)
             if join_rule_ev:
                 if join_rule_ev.content.get("join_rule") == JoinRules.PUBLIC:
                     return True
 
         hist_vis_id = current_state_ids.get((EventTypes.RoomHistoryVisibility, ""))
         if hist_vis_id:
-            hist_vis_ev = yield self.get_event(hist_vis_id, allow_none=True)
+            hist_vis_ev = await self.get_event(hist_vis_id, allow_none=True)
             if hist_vis_ev:
                 if hist_vis_ev.content.get("history_visibility") == "world_readable":
                     return True
@@ -590,19 +581,18 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore):
             "remove_from_user_dir", _remove_from_user_dir_txn
         )
 
-    @defer.inlineCallbacks
-    def get_users_in_dir_due_to_room(self, room_id):
+    async def get_users_in_dir_due_to_room(self, room_id):
         """Get all user_ids that are in the room directory because they're
         in the given room_id
         """
-        user_ids_share_pub = yield self.db_pool.simple_select_onecol(
+        user_ids_share_pub = await self.db_pool.simple_select_onecol(
             table="users_in_public_rooms",
             keyvalues={"room_id": room_id},
             retcol="user_id",
             desc="get_users_in_dir_due_to_room",
         )
 
-        user_ids_share_priv = yield self.db_pool.simple_select_onecol(
+        user_ids_share_priv = await self.db_pool.simple_select_onecol(
             table="users_who_share_private_rooms",
             keyvalues={"room_id": room_id},
             retcol="other_user_id",
@@ -645,8 +635,7 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore):
             "remove_user_who_share_room", _remove_user_who_share_room_txn
         )
 
-    @defer.inlineCallbacks
-    def get_user_dir_rooms_user_is_in(self, user_id):
+    async def get_user_dir_rooms_user_is_in(self, user_id):
         """
         Returns the rooms that a user is in.
 
@@ -656,14 +645,14 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore):
         Returns:
             list: user_id
         """
-        rows = yield self.db_pool.simple_select_onecol(
+        rows = await self.db_pool.simple_select_onecol(
             table="users_who_share_private_rooms",
             keyvalues={"user_id": user_id},
             retcol="room_id",
             desc="get_rooms_user_is_in",
         )
 
-        pub_rows = yield self.db_pool.simple_select_onecol(
+        pub_rows = await self.db_pool.simple_select_onecol(
             table="users_in_public_rooms",
             keyvalues={"user_id": user_id},
             retcol="room_id",
@@ -674,32 +663,6 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore):
         users.update(rows)
         return list(users)
 
-    @defer.inlineCallbacks
-    def get_rooms_in_common_for_users(self, user_id, other_user_id):
-        """Given two user_ids find out the list of rooms they share.
-        """
-        sql = """
-            SELECT room_id FROM (
-                SELECT c.room_id FROM current_state_events AS c
-                INNER JOIN room_memberships AS m USING (event_id)
-                WHERE type = 'm.room.member'
-                    AND m.membership = 'join'
-                    AND state_key = ?
-            ) AS f1 INNER JOIN (
-                SELECT c.room_id FROM current_state_events AS c
-                INNER JOIN room_memberships AS m USING (event_id)
-                WHERE type = 'm.room.member'
-                    AND m.membership = 'join'
-                    AND state_key = ?
-            ) f2 USING (room_id)
-        """
-
-        rows = yield self.db_pool.execute(
-            "get_rooms_in_common_for_users", None, sql, user_id, other_user_id
-        )
-
-        return [room_id for room_id, in rows]
-
     def get_user_directory_stream_pos(self):
         return self.db_pool.simple_select_one_onecol(
             table="user_directory_stream_pos",
@@ -708,8 +671,7 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore):
             desc="get_user_directory_stream_pos",
         )
 
-    @defer.inlineCallbacks
-    def search_user_dir(self, user_id, search_term, limit):
+    async def search_user_dir(self, user_id, search_term, limit):
         """Searches for users in directory
 
         Returns:
@@ -806,7 +768,7 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore):
             # This should be unreachable.
             raise Exception("Unrecognized database engine")
 
-        results = yield self.db_pool.execute(
+        results = await self.db_pool.execute(
             "search_user_dir", self.db_pool.cursor_to_dict, sql, *args
         )
 
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index c63256d3bd..b3f76428b6 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -17,6 +17,7 @@ import logging
 import re
 
 import attr
+from canonicaljson import json
 
 from twisted.internet import defer, task
 
@@ -24,6 +25,9 @@ from synapse.logging import context
 
 logger = logging.getLogger(__name__)
 
+# Create a custom encoder to reduce the whitespace produced by JSON encoding.
+json_encoder = json.JSONEncoder(separators=(",", ":"))
+
 
 def unwrapFirstError(failure):
     # defer.gatherResults and DeferredLists wrap failures.
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index 9b09c08b89..c2d72a82cf 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -192,7 +192,7 @@ class Cache(object):
         callbacks = [callback] if callback else []
         self.check_thread()
         observable = ObservableDeferred(value, consumeErrors=True)
-        observer = defer.maybeDeferred(observable.observe)
+        observer = observable.observe()
         entry = CacheEntry(deferred=observable, callbacks=callbacks)
 
         existing_entry = self._pending_deferred_cache.pop(key, None)
diff --git a/synapse/util/frozenutils.py b/synapse/util/frozenutils.py
index eab78dd256..0e445e01d7 100644
--- a/synapse/util/frozenutils.py
+++ b/synapse/util/frozenutils.py
@@ -63,5 +63,8 @@ def _handle_frozendict(obj):
     )
 
 
-# A JSONEncoder which is capable of encoding frozendicts without barfing
-frozendict_json_encoder = json.JSONEncoder(default=_handle_frozendict)
+# A JSONEncoder which is capable of encoding frozendicts without barfing.
+# Additionally reduce the whitespace produced by JSON encoding.
+frozendict_json_encoder = json.JSONEncoder(
+    default=_handle_frozendict, separators=(",", ":"),
+)
diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py
index ec61e14423..a805f51df1 100644
--- a/synapse/util/metrics.py
+++ b/synapse/util/metrics.py
@@ -13,14 +13,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import inspect
 import logging
 from functools import wraps
 
 from prometheus_client import Counter
 
-from twisted.internet import defer
-
 from synapse.logging.context import LoggingContext, current_context
 from synapse.metrics import InFlightGauge
 
@@ -62,25 +59,31 @@ in_flight = InFlightGauge(
 
 
 def measure_func(name=None):
-    def wrapper(func):
-        block_name = func.__name__ if name is None else name
+    """
+    Used to decorate an async function with a `Measure` context manager.
+
+    Usage:
 
-        if inspect.iscoroutinefunction(func):
+    @measure_func()
+    async def foo(...):
+        ...
 
-            @wraps(func)
-            async def measured_func(self, *args, **kwargs):
-                with Measure(self.clock, block_name):
-                    r = await func(self, *args, **kwargs)
-                return r
+    Which is analogous to:
 
-        else:
+    async def foo(...):
+        with Measure(...):
+            ...
+
+    """
+
+    def wrapper(func):
+        block_name = func.__name__ if name is None else name
 
-            @wraps(func)
-            @defer.inlineCallbacks
-            def measured_func(self, *args, **kwargs):
-                with Measure(self.clock, block_name):
-                    r = yield func(self, *args, **kwargs)
-                return r
+        @wraps(func)
+        async def measured_func(self, *args, **kwargs):
+            with Measure(self.clock, block_name):
+                r = await func(self, *args, **kwargs)
+            return r
 
         return measured_func
 
diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
index 8794317caa..919988d3bc 100644
--- a/synapse/util/retryutils.py
+++ b/synapse/util/retryutils.py
@@ -15,8 +15,6 @@
 import logging
 import random
 
-from twisted.internet import defer
-
 import synapse.logging.context
 from synapse.api.errors import CodeMessageException
 
@@ -54,8 +52,7 @@ class NotRetryingDestination(Exception):
         self.destination = destination
 
 
-@defer.inlineCallbacks
-def get_retry_limiter(destination, clock, store, ignore_backoff=False, **kwargs):
+async def get_retry_limiter(destination, clock, store, ignore_backoff=False, **kwargs):
     """For a given destination check if we have previously failed to
     send a request there and are waiting before retrying the destination.
     If we are not ready to retry the destination, this will raise a
@@ -73,9 +70,9 @@ def get_retry_limiter(destination, clock, store, ignore_backoff=False, **kwargs)
     Example usage:
 
         try:
-            limiter = yield get_retry_limiter(destination, clock, store)
+            limiter = await get_retry_limiter(destination, clock, store)
             with limiter:
-                response = yield do_request()
+                response = await do_request()
         except NotRetryingDestination:
             # We aren't ready to retry that destination.
             raise
@@ -83,7 +80,7 @@ def get_retry_limiter(destination, clock, store, ignore_backoff=False, **kwargs)
     failure_ts = None
     retry_last_ts, retry_interval = (0, 0)
 
-    retry_timings = yield store.get_destination_retry_timings(destination)
+    retry_timings = await store.get_destination_retry_timings(destination)
 
     if retry_timings:
         failure_ts = retry_timings["failure_ts"]
@@ -222,10 +219,9 @@ class RetryDestinationLimiter(object):
             if self.failure_ts is None:
                 self.failure_ts = retry_last_ts
 
-        @defer.inlineCallbacks
-        def store_retry_timings():
+        async def store_retry_timings():
             try:
-                yield self.store.set_destination_retry_timings(
+                await self.store.set_destination_retry_timings(
                     self.destination,
                     self.failure_ts,
                     retry_last_ts,