summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/__init__.py5
-rwxr-xr-xsynapse/_scripts/synapse_port_db.py2
-rw-r--r--synapse/api/auth.py12
-rw-r--r--synapse/api/filtering.py8
-rw-r--r--synapse/api/room_versions.py45
-rw-r--r--synapse/app/phone_stats_home.py6
-rw-r--r--synapse/config/key.py13
-rw-r--r--synapse/event_auth.py4
-rw-r--r--synapse/events/__init__.py12
-rw-r--r--synapse/events/builder.py4
-rw-r--r--synapse/events/validator.py2
-rw-r--r--synapse/federation/federation_base.py2
-rw-r--r--synapse/federation/federation_client.py2
-rw-r--r--synapse/federation/sender/__init__.py4
-rw-r--r--synapse/handlers/admin.py1
-rw-r--r--synapse/handlers/device.py3
-rw-r--r--synapse/handlers/e2e_keys.py40
-rw-r--r--synapse/handlers/pagination.py37
-rw-r--r--synapse/handlers/room_summary.py1
-rw-r--r--synapse/handlers/sync.py20
-rw-r--r--synapse/http/servlet.py19
-rw-r--r--synapse/logging/opentracing.py19
-rw-r--r--synapse/metrics/_legacy_exposition.py16
-rw-r--r--synapse/push/push_tools.py13
-rw-r--r--synapse/rest/admin/__init__.py4
-rw-r--r--synapse/rest/admin/rooms.py104
-rw-r--r--synapse/rest/client/account.py54
-rw-r--r--synapse/rest/client/keys.py6
-rw-r--r--synapse/rest/client/models.py24
-rw-r--r--synapse/server.py12
-rw-r--r--synapse/storage/controllers/state.py4
-rw-r--r--synapse/storage/database.py47
-rw-r--r--synapse/storage/databases/main/devices.py4
-rw-r--r--synapse/storage/databases/main/end_to_end_keys.py5
-rw-r--r--synapse/storage/databases/main/event_federation.py4
-rw-r--r--synapse/storage/databases/main/events_worker.py10
-rw-r--r--synapse/storage/databases/main/lock.py121
-rw-r--r--synapse/storage/databases/main/receipts.py86
-rw-r--r--synapse/storage/databases/main/registration.py6
-rw-r--r--synapse/storage/databases/main/roommember.py226
-rw-r--r--synapse/storage/databases/main/state.py4
-rw-r--r--synapse/storage/databases/main/stats.py86
-rw-r--r--synapse/storage/databases/main/stream.py2
-rw-r--r--synapse/storage/databases/main/transactions.py30
-rw-r--r--synapse/storage/databases/state/store.py3
-rw-r--r--synapse/storage/engines/_base.py8
-rw-r--r--synapse/storage/engines/postgres.py7
-rw-r--r--synapse/storage/engines/sqlite.py13
-rw-r--r--synapse/storage/schema/main/delta/72/05receipts_event_stream_ordering.sql19
-rw-r--r--synapse/storage/schema/main/delta/72/06add_consent_ts_to_users.sql16
-rw-r--r--synapse/storage/schema/main/delta/72/07force_update_current_state_events_membership.py52
-rw-r--r--synapse/storage/schema/state/delta/30/state_stream.sql4
-rw-r--r--synapse/storage/util/partial_state_events_tracker.py3
-rw-r--r--synapse/types.py5
-rw-r--r--synapse/util/caches/__init__.py60
-rw-r--r--synapse/util/metrics.py34
-rw-r--r--synapse/util/rust.py84
57 files changed, 909 insertions, 528 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py
index b1369aca8f..1bed6393bd 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -20,6 +20,8 @@ import json
 import os
 import sys
 
+from synapse.util.rust import check_rust_lib_up_to_date
+
 # Check that we're not running on an unsupported Python version.
 if sys.version_info < (3, 7):
     print("Synapse requires Python 3.7 or above.")
@@ -78,3 +80,6 @@ if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
     from synapse.util.patch_inline_callbacks import do_patch
 
     do_patch()
+
+
+check_rust_lib_up_to_date()
diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py
index 543bba27c2..30983c47fb 100755
--- a/synapse/_scripts/synapse_port_db.py
+++ b/synapse/_scripts/synapse_port_db.py
@@ -67,6 +67,7 @@ from synapse.storage.databases.main.media_repository import (
 )
 from synapse.storage.databases.main.presence import PresenceBackgroundUpdateStore
 from synapse.storage.databases.main.pusher import PusherWorkerStore
+from synapse.storage.databases.main.receipts import ReceiptsBackgroundUpdateStore
 from synapse.storage.databases.main.registration import (
     RegistrationBackgroundUpdateStore,
     find_max_generated_user_id_localpart,
@@ -203,6 +204,7 @@ class Store(
     PushRuleStore,
     PusherWorkerStore,
     PresenceBackgroundUpdateStore,
+    ReceiptsBackgroundUpdateStore,
 ):
     def execute(self, f: Callable[..., R], *args: Any, **kwargs: Any) -> Awaitable[R]:
         return self.db_pool.runInteraction(f.__name__, f, *args, **kwargs)
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 9a1aea083f..4a75eb6b21 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -32,12 +32,14 @@ from synapse.appservice import ApplicationService
 from synapse.http import get_request_user_agent
 from synapse.http.site import SynapseRequest
 from synapse.logging.opentracing import (
+    SynapseTags,
     active_span,
     force_tracing,
     start_active_span,
     trace,
 )
 from synapse.types import Requester, create_requester
+from synapse.util.cancellation import cancellable
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -118,6 +120,7 @@ class Auth:
             errcode=Codes.NOT_JOINED,
         )
 
+    @cancellable
     async def get_user_by_req(
         self,
         request: SynapseRequest,
@@ -159,6 +162,12 @@ class Auth:
                 parent_span.set_tag(
                     "authenticated_entity", requester.authenticated_entity
                 )
+                # We tag the Synapse instance name so that it's an easy jumping
+                # off point into the logs. Can also be used to filter for an
+                # instance that is under load.
+                parent_span.set_tag(
+                    SynapseTags.INSTANCE_NAME, self.hs.get_instance_name()
+                )
                 parent_span.set_tag("user_id", requester.user.to_string())
                 if requester.device_id is not None:
                     parent_span.set_tag("device_id", requester.device_id)
@@ -166,6 +175,7 @@ class Auth:
                     parent_span.set_tag("appservice_id", requester.app_service.id)
             return requester
 
+    @cancellable
     async def _wrapped_get_user_by_req(
         self,
         request: SynapseRequest,
@@ -281,6 +291,7 @@ class Auth:
                 403, "Application service has not registered this user (%s)" % user_id
             )
 
+    @cancellable
     async def _get_appservice_user(self, request: Request) -> Optional[Requester]:
         """
         Given a request, reads the request parameters to determine:
@@ -523,6 +534,7 @@ class Auth:
         return bool(query_params) or bool(auth_headers)
 
     @staticmethod
+    @cancellable
     def get_access_token_from_request(request: Request) -> str:
         """Extracts the access_token from the request.
 
diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py
index 102889ac49..f7f46f8d80 100644
--- a/synapse/api/filtering.py
+++ b/synapse/api/filtering.py
@@ -140,13 +140,13 @@ USER_FILTER_SCHEMA = {
 
 
 @FormatChecker.cls_checks("matrix_room_id")
-def matrix_room_id_validator(room_id_str: str) -> bool:
-    return RoomID.is_valid(room_id_str)
+def matrix_room_id_validator(room_id: object) -> bool:
+    return isinstance(room_id, str) and RoomID.is_valid(room_id)
 
 
 @FormatChecker.cls_checks("matrix_user_id")
-def matrix_user_id_validator(user_id_str: str) -> bool:
-    return UserID.is_valid(user_id_str)
+def matrix_user_id_validator(user_id: object) -> bool:
+    return isinstance(user_id, str) and UserID.is_valid(user_id)
 
 
 class Filtering:
diff --git a/synapse/api/room_versions.py b/synapse/api/room_versions.py
index a0e4ab6db6..e37acb0f1e 100644
--- a/synapse/api/room_versions.py
+++ b/synapse/api/room_versions.py
@@ -19,18 +19,23 @@ import attr
 
 class EventFormatVersions:
     """This is an internal enum for tracking the version of the event format,
-    independently from the room version.
+    independently of the room version.
+
+    To reduce confusion, the event format versions are named after the room
+    versions that they were used or introduced in.
+    The concept of an 'event format version' is specific to Synapse (the
+    specification does not mention this term.)
     """
 
-    V1 = 1  # $id:server event id format
-    V2 = 2  # MSC1659-style $hash event id format: introduced for room v3
-    V3 = 3  # MSC1884-style $hash format: introduced for room v4
+    ROOM_V1_V2 = 1  # $id:server event id format: used for room v1 and v2
+    ROOM_V3 = 2  # MSC1659-style $hash event id format: used for room v3
+    ROOM_V4_PLUS = 3  # MSC1884-style $hash format: introduced for room v4
 
 
 KNOWN_EVENT_FORMAT_VERSIONS = {
-    EventFormatVersions.V1,
-    EventFormatVersions.V2,
-    EventFormatVersions.V3,
+    EventFormatVersions.ROOM_V1_V2,
+    EventFormatVersions.ROOM_V3,
+    EventFormatVersions.ROOM_V4_PLUS,
 }
 
 
@@ -92,7 +97,7 @@ class RoomVersions:
     V1 = RoomVersion(
         "1",
         RoomDisposition.STABLE,
-        EventFormatVersions.V1,
+        EventFormatVersions.ROOM_V1_V2,
         StateResolutionVersions.V1,
         enforce_key_validity=False,
         special_case_aliases_auth=True,
@@ -110,7 +115,7 @@ class RoomVersions:
     V2 = RoomVersion(
         "2",
         RoomDisposition.STABLE,
-        EventFormatVersions.V1,
+        EventFormatVersions.ROOM_V1_V2,
         StateResolutionVersions.V2,
         enforce_key_validity=False,
         special_case_aliases_auth=True,
@@ -128,7 +133,7 @@ class RoomVersions:
     V3 = RoomVersion(
         "3",
         RoomDisposition.STABLE,
-        EventFormatVersions.V2,
+        EventFormatVersions.ROOM_V3,
         StateResolutionVersions.V2,
         enforce_key_validity=False,
         special_case_aliases_auth=True,
@@ -146,7 +151,7 @@ class RoomVersions:
     V4 = RoomVersion(
         "4",
         RoomDisposition.STABLE,
-        EventFormatVersions.V3,
+        EventFormatVersions.ROOM_V4_PLUS,
         StateResolutionVersions.V2,
         enforce_key_validity=False,
         special_case_aliases_auth=True,
@@ -164,7 +169,7 @@ class RoomVersions:
     V5 = RoomVersion(
         "5",
         RoomDisposition.STABLE,
-        EventFormatVersions.V3,
+        EventFormatVersions.ROOM_V4_PLUS,
         StateResolutionVersions.V2,
         enforce_key_validity=True,
         special_case_aliases_auth=True,
@@ -182,7 +187,7 @@ class RoomVersions:
     V6 = RoomVersion(
         "6",
         RoomDisposition.STABLE,
-        EventFormatVersions.V3,
+        EventFormatVersions.ROOM_V4_PLUS,
         StateResolutionVersions.V2,
         enforce_key_validity=True,
         special_case_aliases_auth=False,
@@ -200,7 +205,7 @@ class RoomVersions:
     MSC2176 = RoomVersion(
         "org.matrix.msc2176",
         RoomDisposition.UNSTABLE,
-        EventFormatVersions.V3,
+        EventFormatVersions.ROOM_V4_PLUS,
         StateResolutionVersions.V2,
         enforce_key_validity=True,
         special_case_aliases_auth=False,
@@ -218,7 +223,7 @@ class RoomVersions:
     V7 = RoomVersion(
         "7",
         RoomDisposition.STABLE,
-        EventFormatVersions.V3,
+        EventFormatVersions.ROOM_V4_PLUS,
         StateResolutionVersions.V2,
         enforce_key_validity=True,
         special_case_aliases_auth=False,
@@ -236,7 +241,7 @@ class RoomVersions:
     V8 = RoomVersion(
         "8",
         RoomDisposition.STABLE,
-        EventFormatVersions.V3,
+        EventFormatVersions.ROOM_V4_PLUS,
         StateResolutionVersions.V2,
         enforce_key_validity=True,
         special_case_aliases_auth=False,
@@ -254,7 +259,7 @@ class RoomVersions:
     V9 = RoomVersion(
         "9",
         RoomDisposition.STABLE,
-        EventFormatVersions.V3,
+        EventFormatVersions.ROOM_V4_PLUS,
         StateResolutionVersions.V2,
         enforce_key_validity=True,
         special_case_aliases_auth=False,
@@ -272,7 +277,7 @@ class RoomVersions:
     MSC3787 = RoomVersion(
         "org.matrix.msc3787",
         RoomDisposition.UNSTABLE,
-        EventFormatVersions.V3,
+        EventFormatVersions.ROOM_V4_PLUS,
         StateResolutionVersions.V2,
         enforce_key_validity=True,
         special_case_aliases_auth=False,
@@ -290,7 +295,7 @@ class RoomVersions:
     V10 = RoomVersion(
         "10",
         RoomDisposition.STABLE,
-        EventFormatVersions.V3,
+        EventFormatVersions.ROOM_V4_PLUS,
         StateResolutionVersions.V2,
         enforce_key_validity=True,
         special_case_aliases_auth=False,
@@ -308,7 +313,7 @@ class RoomVersions:
     MSC2716v4 = RoomVersion(
         "org.matrix.msc2716v4",
         RoomDisposition.UNSTABLE,
-        EventFormatVersions.V3,
+        EventFormatVersions.ROOM_V4_PLUS,
         StateResolutionVersions.V2,
         enforce_key_validity=True,
         special_case_aliases_auth=False,
diff --git a/synapse/app/phone_stats_home.py b/synapse/app/phone_stats_home.py
index 51c8d15711..53db1e85b3 100644
--- a/synapse/app/phone_stats_home.py
+++ b/synapse/app/phone_stats_home.py
@@ -32,15 +32,15 @@ logger = logging.getLogger("synapse.app.homeserver")
 _stats_process: List[Tuple[int, "resource.struct_rusage"]] = []
 
 # Gauges to expose monthly active user control metrics
-current_mau_gauge = Gauge("synapse_admin_mau:current", "Current MAU")
+current_mau_gauge = Gauge("synapse_admin_mau_current", "Current MAU")
 current_mau_by_service_gauge = Gauge(
     "synapse_admin_mau_current_mau_by_service",
     "Current MAU by service",
     ["app_service"],
 )
-max_mau_gauge = Gauge("synapse_admin_mau:max", "MAU Limit")
+max_mau_gauge = Gauge("synapse_admin_mau_max", "MAU Limit")
 registered_reserved_users_mau_gauge = Gauge(
-    "synapse_admin_mau:registered_reserved_users",
+    "synapse_admin_mau_registered_reserved_users",
     "Registered users with reserved threepids",
 )
 
diff --git a/synapse/config/key.py b/synapse/config/key.py
index cc75efdf8f..f3dc4df695 100644
--- a/synapse/config/key.py
+++ b/synapse/config/key.py
@@ -217,7 +217,18 @@ class KeyConfig(Config):
 
         signing_keys = self.read_file(signing_key_path, name)
         try:
-            return read_signing_keys(signing_keys.splitlines(True))
+            loaded_signing_keys = read_signing_keys(
+                [
+                    signing_key_line
+                    for signing_key_line in signing_keys.splitlines(keepends=False)
+                    if signing_key_line.strip()
+                ]
+            )
+
+            if not loaded_signing_keys:
+                raise ConfigError(f"No signing keys in file {signing_key_path}")
+
+            return loaded_signing_keys
         except Exception as e:
             raise ConfigError("Error reading %s: %s" % (name, str(e)))
 
diff --git a/synapse/event_auth.py b/synapse/event_auth.py
index 389b0c5d53..c7d5ef92fc 100644
--- a/synapse/event_auth.py
+++ b/synapse/event_auth.py
@@ -109,7 +109,7 @@ def validate_event_for_room_version(event: "EventBase") -> None:
         if not is_invite_via_3pid:
             raise AuthError(403, "Event not signed by sender's server")
 
-    if event.format_version in (EventFormatVersions.V1,):
+    if event.format_version in (EventFormatVersions.ROOM_V1_V2,):
         # Only older room versions have event IDs to check.
         event_id_domain = get_domain_from_id(event.event_id)
 
@@ -716,7 +716,7 @@ def check_redaction(
     if user_level >= redact_level:
         return False
 
-    if room_version_obj.event_format == EventFormatVersions.V1:
+    if room_version_obj.event_format == EventFormatVersions.ROOM_V1_V2:
         redacter_domain = get_domain_from_id(event.event_id)
         if not isinstance(event.redacts, str):
             return False
diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py
index 39ad2793d9..b2c9119fd0 100644
--- a/synapse/events/__init__.py
+++ b/synapse/events/__init__.py
@@ -442,7 +442,7 @@ class EventBase(metaclass=abc.ABCMeta):
 
 
 class FrozenEvent(EventBase):
-    format_version = EventFormatVersions.V1  # All events of this type are V1
+    format_version = EventFormatVersions.ROOM_V1_V2  # All events of this type are V1
 
     def __init__(
         self,
@@ -490,7 +490,7 @@ class FrozenEvent(EventBase):
 
 
 class FrozenEventV2(EventBase):
-    format_version = EventFormatVersions.V2  # All events of this type are V2
+    format_version = EventFormatVersions.ROOM_V3  # All events of this type are V2
 
     def __init__(
         self,
@@ -567,7 +567,7 @@ class FrozenEventV2(EventBase):
 class FrozenEventV3(FrozenEventV2):
     """FrozenEventV3, which differs from FrozenEventV2 only in the event_id format"""
 
-    format_version = EventFormatVersions.V3  # All events of this type are V3
+    format_version = EventFormatVersions.ROOM_V4_PLUS  # All events of this type are V3
 
     @property
     def event_id(self) -> str:
@@ -597,11 +597,11 @@ def _event_type_from_format_version(
         `FrozenEvent`
     """
 
-    if format_version == EventFormatVersions.V1:
+    if format_version == EventFormatVersions.ROOM_V1_V2:
         return FrozenEvent
-    elif format_version == EventFormatVersions.V2:
+    elif format_version == EventFormatVersions.ROOM_V3:
         return FrozenEventV2
-    elif format_version == EventFormatVersions.V3:
+    elif format_version == EventFormatVersions.ROOM_V4_PLUS:
         return FrozenEventV3
     else:
         raise Exception("No event format %r" % (format_version,))
diff --git a/synapse/events/builder.py b/synapse/events/builder.py
index 17f624b68f..746bd3978d 100644
--- a/synapse/events/builder.py
+++ b/synapse/events/builder.py
@@ -137,7 +137,7 @@ class EventBuilder:
         # The types of auth/prev events changes between event versions.
         prev_events: Union[List[str], List[Tuple[str, Dict[str, str]]]]
         auth_events: Union[List[str], List[Tuple[str, Dict[str, str]]]]
-        if format_version == EventFormatVersions.V1:
+        if format_version == EventFormatVersions.ROOM_V1_V2:
             auth_events = await self._store.add_event_hashes(auth_event_ids)
             prev_events = await self._store.add_event_hashes(prev_event_ids)
         else:
@@ -253,7 +253,7 @@ def create_local_event_from_event_dict(
 
     time_now = int(clock.time_msec())
 
-    if format_version == EventFormatVersions.V1:
+    if format_version == EventFormatVersions.ROOM_V1_V2:
         event_dict["event_id"] = _create_event_id(clock, hostname)
 
     event_dict["origin"] = hostname
diff --git a/synapse/events/validator.py b/synapse/events/validator.py
index 27c8beba25..a6f0104396 100644
--- a/synapse/events/validator.py
+++ b/synapse/events/validator.py
@@ -45,7 +45,7 @@ class EventValidator:
         """
         self.validate_builder(event)
 
-        if event.format_version == EventFormatVersions.V1:
+        if event.format_version == EventFormatVersions.ROOM_V1_V2:
             EventID.from_string(event.event_id)
 
         required = [
diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index 4269a98db2..abe2c1971a 100644
--- a/synapse/federation/federation_base.py
+++ b/synapse/federation/federation_base.py
@@ -194,7 +194,7 @@ async def _check_sigs_on_pdu(
     # event id's domain (normally only the case for joins/leaves), and add additional
     # checks. Only do this if the room version has a concept of event ID domain
     # (ie, the room version uses old-style non-hash event IDs).
-    if room_version.event_format == EventFormatVersions.V1:
+    if room_version.event_format == EventFormatVersions.ROOM_V1_V2:
         event_domain = get_domain_from_id(pdu.event_id)
         if event_domain != sender_domain:
             try:
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 7ee2974bb1..4a4289ee7c 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -1190,7 +1190,7 @@ class FederationClient(FederationBase):
             # Otherwise, consider it a legitimate error and raise.
             err = e.to_synapse_error()
             if self._is_unknown_endpoint(e, err):
-                if room_version.event_format != EventFormatVersions.V1:
+                if room_version.event_format != EventFormatVersions.ROOM_V1_V2:
                     raise SynapseError(
                         400,
                         "User's homeserver does not support this room version",
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 8bc60e3e3e..a6cb3ba58f 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -62,12 +62,12 @@ if TYPE_CHECKING:
 logger = logging.getLogger(__name__)
 
 sent_pdus_destination_dist_count = Counter(
-    "synapse_federation_client_sent_pdu_destinations:count",
+    "synapse_federation_client_sent_pdu_destinations_count",
     "Number of PDUs queued for sending to one or more destinations",
 )
 
 sent_pdus_destination_dist_total = Counter(
-    "synapse_federation_client_sent_pdu_destinations:total",
+    "synapse_federation_client_sent_pdu_destinations",
     "Total number of PDUs queued for sending across all destinations",
 )
 
diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index d4fe7df533..cf9f19608a 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -70,6 +70,7 @@ class AdminHandler:
             "appservice_id",
             "consent_server_notice_sent",
             "consent_version",
+            "consent_ts",
             "user_type",
             "is_guest",
         }
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 9c2c3a0e68..c5ac169644 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -52,6 +52,7 @@ from synapse.types import (
 from synapse.util import stringutils
 from synapse.util.async_helpers import Linearizer
 from synapse.util.caches.expiringcache import ExpiringCache
+from synapse.util.cancellation import cancellable
 from synapse.util.metrics import measure_func
 from synapse.util.retryutils import NotRetryingDestination
 
@@ -124,6 +125,7 @@ class DeviceWorkerHandler:
 
         return device
 
+    @cancellable
     async def get_device_changes_in_shared_rooms(
         self, user_id: str, room_ids: Collection[str], from_token: StreamToken
     ) -> Collection[str]:
@@ -163,6 +165,7 @@ class DeviceWorkerHandler:
 
     @trace
     @measure_func("device.get_user_ids_changed")
+    @cancellable
     async def get_user_ids_changed(
         self, user_id: str, from_token: StreamToken
     ) -> JsonDict:
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index c938339ddd..ec81639c78 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -37,7 +37,8 @@ from synapse.types import (
     get_verify_key_from_cross_signing_key,
 )
 from synapse.util import json_decoder, unwrapFirstError
-from synapse.util.async_helpers import Linearizer
+from synapse.util.async_helpers import Linearizer, delay_cancellation
+from synapse.util.cancellation import cancellable
 from synapse.util.retryutils import NotRetryingDestination
 
 if TYPE_CHECKING:
@@ -91,6 +92,7 @@ class E2eKeysHandler:
         )
 
     @trace
+    @cancellable
     async def query_devices(
         self,
         query_body: JsonDict,
@@ -208,22 +210,26 @@ class E2eKeysHandler:
                     r[user_id] = remote_queries[user_id]
 
             # Now fetch any devices that we don't have in our cache
+            # TODO It might make sense to propagate cancellations into the
+            #      deferreds which are querying remote homeservers.
             await make_deferred_yieldable(
-                defer.gatherResults(
-                    [
-                        run_in_background(
-                            self._query_devices_for_destination,
-                            results,
-                            cross_signing_keys,
-                            failures,
-                            destination,
-                            queries,
-                            timeout,
-                        )
-                        for destination, queries in remote_queries_not_in_cache.items()
-                    ],
-                    consumeErrors=True,
-                ).addErrback(unwrapFirstError)
+                delay_cancellation(
+                    defer.gatherResults(
+                        [
+                            run_in_background(
+                                self._query_devices_for_destination,
+                                results,
+                                cross_signing_keys,
+                                failures,
+                                destination,
+                                queries,
+                                timeout,
+                            )
+                            for destination, queries in remote_queries_not_in_cache.items()
+                        ],
+                        consumeErrors=True,
+                    ).addErrback(unwrapFirstError)
+                )
             )
 
             ret = {"device_keys": results, "failures": failures}
@@ -347,6 +353,7 @@ class E2eKeysHandler:
 
         return
 
+    @cancellable
     async def get_cross_signing_keys_from_cache(
         self, query: Iterable[str], from_user_id: Optional[str]
     ) -> Dict[str, Dict[str, dict]]:
@@ -393,6 +400,7 @@ class E2eKeysHandler:
         }
 
     @trace
+    @cancellable
     async def query_local_devices(
         self, query: Mapping[str, Optional[List[str]]]
     ) -> Dict[str, Dict[str, dict]]:
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index a0c39778ab..1f83bab836 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -26,6 +26,7 @@ from synapse.events.utils import SerializeEventConfig
 from synapse.handlers.room import ShutdownRoomResponse
 from synapse.logging.opentracing import trace
 from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.rest.admin._base import assert_user_is_admin
 from synapse.storage.state import StateFilter
 from synapse.streams.config import PaginationConfig
 from synapse.types import JsonDict, Requester, StreamKeyType
@@ -423,6 +424,7 @@ class PaginationHandler:
         pagin_config: PaginationConfig,
         as_client_event: bool = True,
         event_filter: Optional[Filter] = None,
+        use_admin_priviledge: bool = False,
     ) -> JsonDict:
         """Get messages in a room.
 
@@ -432,10 +434,16 @@ class PaginationHandler:
             pagin_config: The pagination config rules to apply, if any.
             as_client_event: True to get events in client-server format.
             event_filter: Filter to apply to results or None
+            use_admin_priviledge: if `True`, return all events, regardless
+                of whether `user` has access to them. To be used **ONLY**
+                from the admin API.
 
         Returns:
             Pagination API results
         """
+        if use_admin_priviledge:
+            await assert_user_is_admin(self.auth, requester)
+
         user_id = requester.user.to_string()
 
         if pagin_config.from_token:
@@ -458,12 +466,14 @@ class PaginationHandler:
         room_token = from_token.room_key
 
         async with self.pagination_lock.read(room_id):
-            (
-                membership,
-                member_event_id,
-            ) = await self.auth.check_user_in_room_or_world_readable(
-                room_id, requester, allow_departed_users=True
-            )
+            (membership, member_event_id) = (None, None)
+            if not use_admin_priviledge:
+                (
+                    membership,
+                    member_event_id,
+                ) = await self.auth.check_user_in_room_or_world_readable(
+                    room_id, requester, allow_departed_users=True
+                )
 
             if pagin_config.direction == "b":
                 # if we're going backwards, we might need to backfill. This
@@ -475,7 +485,7 @@ class PaginationHandler:
                         room_id, room_token.stream
                     )
 
-                if membership == Membership.LEAVE:
+                if not use_admin_priviledge and membership == Membership.LEAVE:
                     # If they have left the room then clamp the token to be before
                     # they left the room, to save the effort of loading from the
                     # database.
@@ -528,12 +538,13 @@ class PaginationHandler:
         if event_filter:
             events = await event_filter.filter(events)
 
-        events = await filter_events_for_client(
-            self._storage_controllers,
-            user_id,
-            events,
-            is_peeking=(member_event_id is None),
-        )
+        if not use_admin_priviledge:
+            events = await filter_events_for_client(
+                self._storage_controllers,
+                user_id,
+                events,
+                is_peeking=(member_event_id is None),
+            )
 
         # if after the filter applied there are no more events
         # return immediately - but there might be more in next_token batch
diff --git a/synapse/handlers/room_summary.py b/synapse/handlers/room_summary.py
index 732b0310bc..ebd445adca 100644
--- a/synapse/handlers/room_summary.py
+++ b/synapse/handlers/room_summary.py
@@ -453,7 +453,6 @@ class RoomSummaryHandler:
                 "type": e.type,
                 "state_key": e.state_key,
                 "content": e.content,
-                "room_id": e.room_id,
                 "sender": e.sender,
                 "origin_server_ts": e.origin_server_ts,
             }
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 2d95b1fa24..5293fa4d0e 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -15,6 +15,7 @@ import itertools
 import logging
 from typing import (
     TYPE_CHECKING,
+    AbstractSet,
     Any,
     Collection,
     Dict,
@@ -1413,10 +1414,10 @@ class SyncHandler:
     async def _generate_sync_entry_for_device_list(
         self,
         sync_result_builder: "SyncResultBuilder",
-        newly_joined_rooms: Set[str],
-        newly_joined_or_invited_or_knocked_users: Set[str],
-        newly_left_rooms: Set[str],
-        newly_left_users: Set[str],
+        newly_joined_rooms: AbstractSet[str],
+        newly_joined_or_invited_or_knocked_users: AbstractSet[str],
+        newly_left_rooms: AbstractSet[str],
+        newly_left_users: AbstractSet[str],
     ) -> DeviceListUpdates:
         """Generate the DeviceListUpdates section of sync
 
@@ -1434,8 +1435,7 @@ class SyncHandler:
         user_id = sync_result_builder.sync_config.user.to_string()
         since_token = sync_result_builder.since_token
 
-        # We're going to mutate these fields, so lets copy them rather than
-        # assume they won't get used later.
+        # Take a copy since these fields will be mutated later.
         newly_joined_or_invited_or_knocked_users = set(
             newly_joined_or_invited_or_knocked_users
         )
@@ -1635,8 +1635,8 @@ class SyncHandler:
     async def _generate_sync_entry_for_presence(
         self,
         sync_result_builder: "SyncResultBuilder",
-        newly_joined_rooms: Set[str],
-        newly_joined_or_invited_users: Set[str],
+        newly_joined_rooms: AbstractSet[str],
+        newly_joined_or_invited_users: AbstractSet[str],
     ) -> None:
         """Generates the presence portion of the sync response. Populates the
         `sync_result_builder` with the result.
@@ -1694,7 +1694,7 @@ class SyncHandler:
         self,
         sync_result_builder: "SyncResultBuilder",
         account_data_by_room: Dict[str, Dict[str, JsonDict]],
-    ) -> Tuple[Set[str], Set[str], Set[str], Set[str]]:
+    ) -> Tuple[AbstractSet[str], AbstractSet[str], AbstractSet[str], AbstractSet[str]]:
         """Generates the rooms portion of the sync response. Populates the
         `sync_result_builder` with the result.
 
@@ -2534,7 +2534,7 @@ class SyncResultBuilder:
     archived: List[ArchivedSyncResult] = attr.Factory(list)
     to_device: List[JsonDict] = attr.Factory(list)
 
-    def calculate_user_changes(self) -> Tuple[Set[str], Set[str]]:
+    def calculate_user_changes(self) -> Tuple[AbstractSet[str], AbstractSet[str]]:
         """Work out which other users have joined or left rooms we are joined to.
 
         This data only is only useful for an incremental sync.
diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py
index 26aaabfb34..80acbdcf3c 100644
--- a/synapse/http/servlet.py
+++ b/synapse/http/servlet.py
@@ -28,7 +28,8 @@ from typing import (
     overload,
 )
 
-from pydantic import BaseModel, ValidationError
+from pydantic import BaseModel, MissingError, PydanticValueError, ValidationError
+from pydantic.error_wrappers import ErrorWrapper
 from typing_extensions import Literal
 
 from twisted.web.server import Request
@@ -714,7 +715,21 @@ def parse_and_validate_json_object_from_request(
     try:
         instance = model_type.parse_obj(content)
     except ValidationError as e:
-        raise SynapseError(HTTPStatus.BAD_REQUEST, str(e), errcode=Codes.BAD_JSON)
+        # Choose a matrix error code. The catch-all is BAD_JSON, but we try to find a
+        # more specific error if possible (which occasionally helps us to be spec-
+        # compliant) This is a bit awkward because the spec's error codes aren't very
+        # clear-cut: BAD_JSON arguably overlaps with MISSING_PARAM and INVALID_PARAM.
+        errcode = Codes.BAD_JSON
+
+        raw_errors = e.raw_errors
+        if len(raw_errors) == 1 and isinstance(raw_errors[0], ErrorWrapper):
+            raw_error = raw_errors[0].exc
+            if isinstance(raw_error, MissingError):
+                errcode = Codes.MISSING_PARAM
+            elif isinstance(raw_error, PydanticValueError):
+                errcode = Codes.INVALID_PARAM
+
+        raise SynapseError(HTTPStatus.BAD_REQUEST, str(e), errcode=errcode)
 
     return instance
 
diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py
index 482316a1ff..ca2735dd6d 100644
--- a/synapse/logging/opentracing.py
+++ b/synapse/logging/opentracing.py
@@ -203,6 +203,9 @@ if TYPE_CHECKING:
 
 # Helper class
 
+# Matches the number suffix in an instance name like "matrix.org client_reader-8"
+STRIP_INSTANCE_NUMBER_SUFFIX_REGEX = re.compile(r"[_-]?\d+$")
+
 
 class _DummyTagNames:
     """wrapper of opentracings tags. We need to have them if we
@@ -295,6 +298,8 @@ class SynapseTags:
     # Whether the sync response has new data to be returned to the client.
     SYNC_RESULT = "sync.new_data"
 
+    INSTANCE_NAME = "instance_name"
+
     # incoming HTTP request ID  (as written in the logs)
     REQUEST_ID = "request_id"
 
@@ -441,9 +446,17 @@ def init_tracer(hs: "HomeServer") -> None:
 
     from jaeger_client.metrics.prometheus import PrometheusMetricsFactory
 
+    # Instance names are opaque strings but by stripping off the number suffix,
+    # we can get something that looks like a "worker type", e.g.
+    # "client_reader-1" -> "client_reader" so we don't spread the traces across
+    # so many services.
+    instance_name_by_type = re.sub(
+        STRIP_INSTANCE_NUMBER_SUFFIX_REGEX, "", hs.get_instance_name()
+    )
+
     config = JaegerConfig(
         config=hs.config.tracing.jaeger_config,
-        service_name=f"{hs.config.server.server_name} {hs.get_instance_name()}",
+        service_name=f"{hs.config.server.server_name} {instance_name_by_type}",
         scope_manager=LogContextScopeManager(),
         metrics_factory=PrometheusMetricsFactory(),
     )
@@ -1032,11 +1045,11 @@ def trace_servlet(
             # with JsonResource).
             scope.span.set_operation_name(request.request_metrics.name)
 
-            # set the tags *after* the servlet completes, in case it decided to
-            # prioritise the span (tags will get dropped on unprioritised spans)
             request_tags[
                 SynapseTags.REQUEST_TAG
             ] = request.request_metrics.start_context.tag
 
+            # set the tags *after* the servlet completes, in case it decided to
+            # prioritise the span (tags will get dropped on unprioritised spans)
             for k, v in request_tags.items():
                 scope.span.set_tag(k, v)
diff --git a/synapse/metrics/_legacy_exposition.py b/synapse/metrics/_legacy_exposition.py
index ff640a49af..563d8cc2c6 100644
--- a/synapse/metrics/_legacy_exposition.py
+++ b/synapse/metrics/_legacy_exposition.py
@@ -34,8 +34,6 @@ from prometheus_client.core import Sample
 from twisted.web.resource import Resource
 from twisted.web.server import Request
 
-from synapse.util import caches
-
 CONTENT_TYPE_LATEST = "text/plain; version=0.0.4; charset=utf-8"
 
 
@@ -88,11 +86,16 @@ LEGACY_METRIC_NAMES = {
     "synapse_util_caches_cache_hits": "synapse_util_caches_cache:hits",
     "synapse_util_caches_cache_size": "synapse_util_caches_cache:size",
     "synapse_util_caches_cache_evicted_size": "synapse_util_caches_cache:evicted_size",
-    "synapse_util_caches_cache_total": "synapse_util_caches_cache:total",
+    "synapse_util_caches_cache": "synapse_util_caches_cache:total",
     "synapse_util_caches_response_cache_size": "synapse_util_caches_response_cache:size",
     "synapse_util_caches_response_cache_hits": "synapse_util_caches_response_cache:hits",
     "synapse_util_caches_response_cache_evicted_size": "synapse_util_caches_response_cache:evicted_size",
-    "synapse_util_caches_response_cache_total": "synapse_util_caches_response_cache:total",
+    "synapse_util_caches_response_cache": "synapse_util_caches_response_cache:total",
+    "synapse_federation_client_sent_pdu_destinations": "synapse_federation_client_sent_pdu_destinations:total",
+    "synapse_federation_client_sent_pdu_destinations_count": "synapse_federation_client_sent_pdu_destinations:count",
+    "synapse_admin_mau_current": "synapse_admin_mau:current",
+    "synapse_admin_mau_max": "synapse_admin_mau:max",
+    "synapse_admin_mau_registered_reserved_users": "synapse_admin_mau:registered_reserved_users",
 }
 
 
@@ -102,11 +105,6 @@ def generate_latest(registry: CollectorRegistry, emit_help: bool = False) -> byt
     by prometheus-client.
     """
 
-    # Trigger the cache metrics to be rescraped, which updates the common
-    # metrics but do not produce metrics themselves
-    for collector in caches.collectors_by_name.values():
-        collector.collect()
-
     output = []
 
     for metric in registry.collect():
diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py
index 6661887d9f..658bf373b7 100644
--- a/synapse/push/push_tools.py
+++ b/synapse/push/push_tools.py
@@ -17,6 +17,7 @@ from synapse.events import EventBase
 from synapse.push.presentable_names import calculate_room_name, name_from_member_event
 from synapse.storage.controllers import StorageControllers
 from synapse.storage.databases.main import DataStore
+from synapse.util.async_helpers import concurrently_execute
 
 
 async def get_badge_count(store: DataStore, user_id: str, group_by_room: bool) -> int:
@@ -25,13 +26,19 @@ async def get_badge_count(store: DataStore, user_id: str, group_by_room: bool) -
 
     badge = len(invites)
 
-    for room_id in joins:
-        notifs = await (
-            store.get_unread_event_push_actions_by_room_for_user(
+    room_notifs = []
+
+    async def get_room_unread_count(room_id: str) -> None:
+        room_notifs.append(
+            await store.get_unread_event_push_actions_by_room_for_user(
                 room_id,
                 user_id,
             )
         )
+
+    await concurrently_execute(get_room_unread_count, joins, 10)
+
+    for notifs in room_notifs:
         if notifs.notify_count == 0:
             continue
 
diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py
index fa3266720b..bac754e1b1 100644
--- a/synapse/rest/admin/__init__.py
+++ b/synapse/rest/admin/__init__.py
@@ -61,9 +61,11 @@ from synapse.rest.admin.rooms import (
     MakeRoomAdminRestServlet,
     RoomEventContextServlet,
     RoomMembersRestServlet,
+    RoomMessagesRestServlet,
     RoomRestServlet,
     RoomRestV2Servlet,
     RoomStateRestServlet,
+    RoomTimestampToEventRestServlet,
 )
 from synapse.rest.admin.server_notice_servlet import SendServerNoticeServlet
 from synapse.rest.admin.statistics import UserMediaStatisticsRestServlet
@@ -271,6 +273,8 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
     DestinationResetConnectionRestServlet(hs).register(http_server)
     DestinationRestServlet(hs).register(http_server)
     ListDestinationsRestServlet(hs).register(http_server)
+    RoomMessagesRestServlet(hs).register(http_server)
+    RoomTimestampToEventRestServlet(hs).register(http_server)
 
     # Some servlets only get registered for the main process.
     if hs.config.worker.worker_app is None:
diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py
index 3d870629c4..747e6fda83 100644
--- a/synapse/rest/admin/rooms.py
+++ b/synapse/rest/admin/rooms.py
@@ -35,6 +35,7 @@ from synapse.rest.admin._base import (
 )
 from synapse.storage.databases.main.room import RoomSortOrder
 from synapse.storage.state import StateFilter
+from synapse.streams.config import PaginationConfig
 from synapse.types import JsonDict, RoomID, UserID, create_requester
 from synapse.util import json_decoder
 
@@ -858,3 +859,106 @@ class BlockRoomRestServlet(RestServlet):
             await self._store.unblock_room(room_id)
 
         return HTTPStatus.OK, {"block": block}
+
+
+class RoomMessagesRestServlet(RestServlet):
+    """
+    Get messages list of a room.
+    """
+
+    PATTERNS = admin_patterns("/rooms/(?P<room_id>[^/]*)/messages$")
+
+    def __init__(self, hs: "HomeServer"):
+        self._hs = hs
+        self._clock = hs.get_clock()
+        self._pagination_handler = hs.get_pagination_handler()
+        self._auth = hs.get_auth()
+        self._store = hs.get_datastores().main
+
+    async def on_GET(
+        self, request: SynapseRequest, room_id: str
+    ) -> Tuple[int, JsonDict]:
+        requester = await self._auth.get_user_by_req(request)
+        await assert_user_is_admin(self._auth, requester)
+
+        pagination_config = await PaginationConfig.from_request(
+            self._store, request, default_limit=10
+        )
+        # Twisted will have processed the args by now.
+        assert request.args is not None
+        as_client_event = b"raw" not in request.args
+        filter_str = parse_string(request, "filter", encoding="utf-8")
+        if filter_str:
+            filter_json = urlparse.unquote(filter_str)
+            event_filter: Optional[Filter] = Filter(
+                self._hs, json_decoder.decode(filter_json)
+            )
+            if (
+                event_filter
+                and event_filter.filter_json.get("event_format", "client")
+                == "federation"
+            ):
+                as_client_event = False
+        else:
+            event_filter = None
+
+        msgs = await self._pagination_handler.get_messages(
+            room_id=room_id,
+            requester=requester,
+            pagin_config=pagination_config,
+            as_client_event=as_client_event,
+            event_filter=event_filter,
+            use_admin_priviledge=True,
+        )
+
+        return HTTPStatus.OK, msgs
+
+
+class RoomTimestampToEventRestServlet(RestServlet):
+    """
+    API endpoint to fetch the `event_id` of the closest event to the given
+    timestamp (`ts` query parameter) in the given direction (`dir` query
+    parameter).
+
+    Useful for cases like jump to date so you can start paginating messages from
+    a given date in the archive.
+
+    `ts` is a timestamp in milliseconds where we will find the closest event in
+    the given direction.
+
+    `dir` can be `f` or `b` to indicate forwards and backwards in time from the
+    given timestamp.
+
+    GET /_synapse/admin/v1/rooms/<roomID>/timestamp_to_event?ts=<timestamp>&dir=<direction>
+    {
+        "event_id": ...
+    }
+    """
+
+    PATTERNS = admin_patterns("/rooms/(?P<room_id>[^/]*)/timestamp_to_event$")
+
+    def __init__(self, hs: "HomeServer"):
+        self._auth = hs.get_auth()
+        self._store = hs.get_datastores().main
+        self._timestamp_lookup_handler = hs.get_timestamp_lookup_handler()
+
+    async def on_GET(
+        self, request: SynapseRequest, room_id: str
+    ) -> Tuple[int, JsonDict]:
+        requester = await self._auth.get_user_by_req(request)
+        await assert_user_is_admin(self._auth, requester)
+
+        timestamp = parse_integer(request, "ts", required=True)
+        direction = parse_string(request, "dir", default="f", allowed_values=["f", "b"])
+
+        (
+            event_id,
+            origin_server_ts,
+        ) = await self._timestamp_lookup_handler.get_event_for_timestamp(
+            requester, room_id, timestamp, direction
+        )
+
+        return HTTPStatus.OK, {
+            "event_id": event_id,
+            "origin_server_ts": origin_server_ts,
+        }
diff --git a/synapse/rest/client/account.py b/synapse/rest/client/account.py
index 1f9a8ccc23..a09aaf3448 100644
--- a/synapse/rest/client/account.py
+++ b/synapse/rest/client/account.py
@@ -15,7 +15,7 @@
 # limitations under the License.
 import logging
 import random
-from typing import TYPE_CHECKING, Optional, Tuple
+from typing import TYPE_CHECKING, List, Optional, Tuple
 from urllib.parse import urlparse
 
 from pydantic import StrictBool, StrictStr, constr
@@ -41,7 +41,11 @@ from synapse.http.servlet import (
 from synapse.http.site import SynapseRequest
 from synapse.metrics import threepid_send_requests
 from synapse.push.mailer import Mailer
-from synapse.rest.client.models import AuthenticationData, EmailRequestTokenBody
+from synapse.rest.client.models import (
+    AuthenticationData,
+    EmailRequestTokenBody,
+    MsisdnRequestTokenBody,
+)
 from synapse.rest.models import RequestBodyModel
 from synapse.types import JsonDict
 from synapse.util.msisdn import phone_number_to_msisdn
@@ -400,23 +404,16 @@ class MsisdnThreepidRequestTokenRestServlet(RestServlet):
         self.identity_handler = hs.get_identity_handler()
 
     async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
-        body = parse_json_object_from_request(request)
-        assert_params_in_dict(
-            body, ["client_secret", "country", "phone_number", "send_attempt"]
+        body = parse_and_validate_json_object_from_request(
+            request, MsisdnRequestTokenBody
         )
-        client_secret = body["client_secret"]
-        assert_valid_client_secret(client_secret)
-
-        country = body["country"]
-        phone_number = body["phone_number"]
-        send_attempt = body["send_attempt"]
-        next_link = body.get("next_link")  # Optional param
-
-        msisdn = phone_number_to_msisdn(country, phone_number)
+        msisdn = phone_number_to_msisdn(body.country, body.phone_number)
 
         if not await check_3pid_allowed(self.hs, "msisdn", msisdn):
             raise SynapseError(
                 403,
+                # TODO: is this error message accurate? Looks like we've only rejected
+                #       this phone number, not necessarily all phone numbers
                 "Account phone numbers are not authorized on this server",
                 Codes.THREEPID_DENIED,
             )
@@ -425,9 +422,9 @@ class MsisdnThreepidRequestTokenRestServlet(RestServlet):
             request, "msisdn", msisdn
         )
 
-        if next_link:
+        if body.next_link:
             # Raise if the provided next_link value isn't valid
-            assert_valid_next_link(self.hs, next_link)
+            assert_valid_next_link(self.hs, body.next_link)
 
         existing_user_id = await self.store.get_user_id_by_threepid("msisdn", msisdn)
 
@@ -454,15 +451,15 @@ class MsisdnThreepidRequestTokenRestServlet(RestServlet):
 
         ret = await self.identity_handler.requestMsisdnToken(
             self.hs.config.registration.account_threepid_delegate_msisdn,
-            country,
-            phone_number,
-            client_secret,
-            send_attempt,
-            next_link,
+            body.country,
+            body.phone_number,
+            body.client_secret,
+            body.send_attempt,
+            body.next_link,
         )
 
         threepid_send_requests.labels(type="msisdn", reason="add_threepid").observe(
-            send_attempt
+            body.send_attempt
         )
 
         return 200, ret
@@ -845,17 +842,18 @@ class AccountStatusRestServlet(RestServlet):
         self._auth = hs.get_auth()
         self._account_handler = hs.get_account_handler()
 
+    class PostBody(RequestBodyModel):
+        # TODO: we could validate that each user id is an mxid here, and/or parse it
+        #       as a UserID
+        user_ids: List[StrictStr]
+
     async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
         await self._auth.get_user_by_req(request)
 
-        body = parse_json_object_from_request(request)
-        if "user_ids" not in body:
-            raise SynapseError(
-                400, "Required parameter 'user_ids' is missing", Codes.MISSING_PARAM
-            )
+        body = parse_and_validate_json_object_from_request(request, self.PostBody)
 
         statuses, failures = await self._account_handler.get_account_statuses(
-            body["user_ids"],
+            body.user_ids,
             allow_remote=True,
         )
 
diff --git a/synapse/rest/client/keys.py b/synapse/rest/client/keys.py
index a395694fa5..f653d2a3e1 100644
--- a/synapse/rest/client/keys.py
+++ b/synapse/rest/client/keys.py
@@ -27,9 +27,9 @@ from synapse.http.servlet import (
 )
 from synapse.http.site import SynapseRequest
 from synapse.logging.opentracing import log_kv, set_tag
+from synapse.rest.client._base import client_patterns, interactive_auth_handler
 from synapse.types import JsonDict, StreamToken
-
-from ._base import client_patterns, interactive_auth_handler
+from synapse.util.cancellation import cancellable
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -156,6 +156,7 @@ class KeyQueryServlet(RestServlet):
         self.auth = hs.get_auth()
         self.e2e_keys_handler = hs.get_e2e_keys_handler()
 
+    @cancellable
     async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
         requester = await self.auth.get_user_by_req(request, allow_guest=True)
         user_id = requester.user.to_string()
@@ -199,6 +200,7 @@ class KeyChangesServlet(RestServlet):
         self.device_handler = hs.get_device_handler()
         self.store = hs.get_datastores().main
 
+    @cancellable
     async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
         requester = await self.auth.get_user_by_req(request, allow_guest=True)
 
diff --git a/synapse/rest/client/models.py b/synapse/rest/client/models.py
index 3150602997..6278450c70 100644
--- a/synapse/rest/client/models.py
+++ b/synapse/rest/client/models.py
@@ -25,8 +25,8 @@ class AuthenticationData(RequestBodyModel):
 
     (The name "Authentication Data" is taken directly from the spec.)
 
-    Additional keys will be present, depending on the `type` field. Use `.dict()` to
-    access them.
+    Additional keys will be present, depending on the `type` field. Use
+    `.dict(exclude_unset=True)` to access them.
     """
 
     class Config:
@@ -36,7 +36,7 @@ class AuthenticationData(RequestBodyModel):
     type: Optional[StrictStr] = None
 
 
-class EmailRequestTokenBody(RequestBodyModel):
+class ThreePidRequestTokenBody(RequestBodyModel):
     if TYPE_CHECKING:
         client_secret: StrictStr
     else:
@@ -47,7 +47,7 @@ class EmailRequestTokenBody(RequestBodyModel):
             max_length=255,
             strict=True,
         )
-    email: StrictStr
+
     id_server: Optional[StrictStr]
     id_access_token: Optional[StrictStr]
     next_link: Optional[StrictStr]
@@ -61,9 +61,25 @@ class EmailRequestTokenBody(RequestBodyModel):
             raise ValueError("id_access_token is required if an id_server is supplied.")
         return token
 
+
+class EmailRequestTokenBody(ThreePidRequestTokenBody):
+    email: StrictStr
+
     # Canonicalise the email address. The addresses are all stored canonicalised
     # in the database. This allows the user to reset his password without having to
     # know the exact spelling (eg. upper and lower case) of address in the database.
     # Without this, an email stored in the database as "foo@bar.com" would cause
     # user requests for "FOO@bar.com" to raise a Not Found error.
     _email_validator = validator("email", allow_reuse=True)(validate_email)
+
+
+if TYPE_CHECKING:
+    ISO3116_1_Alpha_2 = StrictStr
+else:
+    # Per spec: two-letter uppercase ISO-3166-1-alpha-2
+    ISO3116_1_Alpha_2 = constr(regex="[A-Z]{2}", strict=True)
+
+
+class MsisdnRequestTokenBody(ThreePidRequestTokenBody):
+    country: ISO3116_1_Alpha_2
+    phone_number: StrictStr
diff --git a/synapse/server.py b/synapse/server.py
index 5a99c0b344..df3a1cb405 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -341,7 +341,17 @@ class HomeServer(metaclass=abc.ABCMeta):
         return domain_specific_string.domain == self.hostname
 
     def is_mine_id(self, string: str) -> bool:
-        return string.split(":", 1)[1] == self.hostname
+        """Determines whether a user ID or room alias originates from this homeserver.
+
+        Returns:
+            `True` if the hostname part of the user ID or room alias matches this
+            homeserver.
+            `False` otherwise, or if the user ID or room alias is malformed.
+        """
+        localpart_hostname = string.split(":", 1)
+        if len(localpart_hostname) < 2:
+            return False
+        return localpart_hostname[1] == self.hostname
 
     @cache_in_self
     def get_clock(self) -> Clock:
diff --git a/synapse/storage/controllers/state.py b/synapse/storage/controllers/state.py
index ba5380ce3e..bbe568bf05 100644
--- a/synapse/storage/controllers/state.py
+++ b/synapse/storage/controllers/state.py
@@ -36,6 +36,7 @@ from synapse.storage.util.partial_state_events_tracker import (
     PartialStateEventsTracker,
 )
 from synapse.types import MutableStateMap, StateMap
+from synapse.util.cancellation import cancellable
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -229,6 +230,7 @@ class StateStorageController:
 
     @trace
     @tag_args
+    @cancellable
     async def get_state_ids_for_events(
         self,
         event_ids: Collection[str],
@@ -350,6 +352,7 @@ class StateStorageController:
 
     @trace
     @tag_args
+    @cancellable
     async def get_state_group_for_events(
         self,
         event_ids: Collection[str],
@@ -398,6 +401,7 @@ class StateStorageController:
             event_id, room_id, prev_group, delta_ids, current_state_ids
         )
 
+    @cancellable
     async def get_current_state_ids(
         self,
         room_id: str,
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index b394a6658b..e881bff7fb 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -533,15 +533,14 @@ class DatabasePool:
         if isinstance(self.engine, Sqlite3Engine):
             self._unsafe_to_upsert_tables.add("user_directory_search")
 
-        if self.engine.can_native_upsert:
-            # Check ASAP (and then later, every 1s) to see if we have finished
-            # background updates of tables that aren't safe to update.
-            self._clock.call_later(
-                0.0,
-                run_as_background_process,
-                "upsert_safety_check",
-                self._check_safe_to_upsert,
-            )
+        # Check ASAP (and then later, every 1s) to see if we have finished
+        # background updates of tables that aren't safe to update.
+        self._clock.call_later(
+            0.0,
+            run_as_background_process,
+            "upsert_safety_check",
+            self._check_safe_to_upsert,
+        )
 
     def name(self) -> str:
         "Return the name of this database"
@@ -1160,11 +1159,8 @@ class DatabasePool:
         attempts = 0
         while True:
             try:
-                # We can autocommit if we are going to use native upserts
-                autocommit = (
-                    self.engine.can_native_upsert
-                    and table not in self._unsafe_to_upsert_tables
-                )
+                # We can autocommit if it is safe to upsert
+                autocommit = table not in self._unsafe_to_upsert_tables
 
                 return await self.runInteraction(
                     desc,
@@ -1199,7 +1195,7 @@ class DatabasePool:
     ) -> bool:
         """
         Pick the UPSERT method which works best on the platform. Either the
-        native one (Pg9.5+, recent SQLites), or fall back to an emulated method.
+        native one (Pg9.5+, SQLite >= 3.24), or fall back to an emulated method.
 
         Args:
             txn: The transaction to use.
@@ -1207,14 +1203,15 @@ class DatabasePool:
             keyvalues: The unique key tables and their new values
             values: The nonunique columns and their new values
             insertion_values: additional key/values to use only when inserting
-            lock: True to lock the table when doing the upsert.
+            lock: True to lock the table when doing the upsert. Unused when performing
+                a native upsert.
         Returns:
             Returns True if a row was inserted or updated (i.e. if `values` is
             not empty then this always returns True)
         """
         insertion_values = insertion_values or {}
 
-        if self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables:
+        if table not in self._unsafe_to_upsert_tables:
             return self.simple_upsert_txn_native_upsert(
                 txn, table, keyvalues, values, insertion_values=insertion_values
             )
@@ -1365,14 +1362,12 @@ class DatabasePool:
             value_names: The value column names
             value_values: A list of each row's value column values.
                 Ignored if value_names is empty.
-            lock: True to lock the table when doing the upsert. Unused if the database engine
-                supports native upserts.
+            lock: True to lock the table when doing the upsert. Unused when performing
+                a native upsert.
         """
 
-        # We can autocommit if we are going to use native upserts
-        autocommit = (
-            self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables
-        )
+        # We can autocommit if it safe to upsert
+        autocommit = table not in self._unsafe_to_upsert_tables
 
         await self.runInteraction(
             desc,
@@ -1406,10 +1401,10 @@ class DatabasePool:
             value_names: The value column names
             value_values: A list of each row's value column values.
                 Ignored if value_names is empty.
-            lock: True to lock the table when doing the upsert. Unused if the database engine
-                supports native upserts.
+            lock: True to lock the table when doing the upsert. Unused when performing
+                a native upsert.
         """
-        if self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables:
+        if table not in self._unsafe_to_upsert_tables:
             return self.simple_upsert_many_txn_native_upsert(
                 txn, table, key_names, key_values, value_names, value_values
             )
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index ca0fe8c4be..5d700ca6c3 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -53,6 +53,7 @@ from synapse.util import json_decoder, json_encoder
 from synapse.util.caches.descriptors import cached, cachedList
 from synapse.util.caches.lrucache import LruCache
 from synapse.util.caches.stream_change_cache import StreamChangeCache
+from synapse.util.cancellation import cancellable
 from synapse.util.iterutils import batch_iter
 from synapse.util.stringutils import shortstr
 
@@ -668,6 +669,7 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore):
         ...
 
     @trace
+    @cancellable
     async def get_user_devices_from_cache(
         self, query_list: List[Tuple[str, Optional[str]]]
     ) -> Tuple[Set[str], Dict[str, Dict[str, JsonDict]]]:
@@ -743,6 +745,7 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore):
 
         return self._device_list_stream_cache.get_all_entities_changed(from_key)
 
+    @cancellable
     async def get_users_whose_devices_changed(
         self,
         from_key: int,
@@ -1221,6 +1224,7 @@ class DeviceWorkerStore(EndToEndKeyWorkerStore):
             desc="get_min_device_lists_changes_in_room",
         )
 
+    @cancellable
     async def get_device_list_changes_in_rooms(
         self, room_ids: Collection[str], from_id: int
     ) -> Optional[Set[str]]:
diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py
index 46c0d06157..8e9e1b0b4b 100644
--- a/synapse/storage/databases/main/end_to_end_keys.py
+++ b/synapse/storage/databases/main/end_to_end_keys.py
@@ -50,6 +50,7 @@ from synapse.storage.util.id_generators import StreamIdGenerator
 from synapse.types import JsonDict
 from synapse.util import json_encoder
 from synapse.util.caches.descriptors import cached, cachedList
+from synapse.util.cancellation import cancellable
 from synapse.util.iterutils import batch_iter
 
 if TYPE_CHECKING:
@@ -135,6 +136,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
         return now_stream_id, []
 
     @trace
+    @cancellable
     async def get_e2e_device_keys_for_cs_api(
         self, query_list: List[Tuple[str, Optional[str]]]
     ) -> Dict[str, Dict[str, JsonDict]]:
@@ -197,6 +199,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
         ...
 
     @trace
+    @cancellable
     async def get_e2e_device_keys_and_signatures(
         self,
         query_list: Collection[Tuple[str, Optional[str]]],
@@ -887,6 +890,7 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
 
         return keys
 
+    @cancellable
     async def get_e2e_cross_signing_keys_bulk(
         self, user_ids: List[str], from_user_id: Optional[str] = None
     ) -> Dict[str, Optional[Dict[str, JsonDict]]]:
@@ -902,7 +906,6 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
             keys were not found, either their user ID will not be in the dict,
             or their user ID will map to None.
         """
-
         result = await self._get_bare_e2e_cross_signing_keys_bulk(user_ids)
 
         if from_user_id:
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index c836078da6..ca47a22bf1 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -48,6 +48,7 @@ from synapse.types import JsonDict
 from synapse.util import json_encoder
 from synapse.util.caches.descriptors import cached
 from synapse.util.caches.lrucache import LruCache
+from synapse.util.cancellation import cancellable
 from synapse.util.iterutils import batch_iter
 
 if TYPE_CHECKING:
@@ -976,6 +977,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
 
         return int(min_depth) if min_depth is not None else None
 
+    @cancellable
     async def get_forward_extremities_for_room_at_stream_ordering(
         self, room_id: str, stream_ordering: int
     ) -> List[str]:
@@ -1606,7 +1608,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
                 logger.info("Invalid prev_events for %s", event_id)
                 continue
 
-            if room_version.event_format == EventFormatVersions.V1:
+            if room_version.event_format == EventFormatVersions.ROOM_V1_V2:
                 for prev_event_tuple in prev_events:
                     if (
                         not isinstance(prev_event_tuple, list)
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 9b997c304d..52914febf9 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -81,6 +81,7 @@ from synapse.util import unwrapFirstError
 from synapse.util.async_helpers import ObservableDeferred, delay_cancellation
 from synapse.util.caches.descriptors import cached, cachedList
 from synapse.util.caches.lrucache import AsyncLruCache
+from synapse.util.cancellation import cancellable
 from synapse.util.iterutils import batch_iter
 from synapse.util.metrics import Measure
 
@@ -339,6 +340,7 @@ class EventsWorkerStore(SQLBaseStore):
     ) -> Optional[EventBase]:
         ...
 
+    @cancellable
     async def get_event(
         self,
         event_id: str,
@@ -433,6 +435,7 @@ class EventsWorkerStore(SQLBaseStore):
 
     @trace
     @tag_args
+    @cancellable
     async def get_events_as_list(
         self,
         event_ids: Collection[str],
@@ -584,6 +587,7 @@ class EventsWorkerStore(SQLBaseStore):
 
         return events
 
+    @cancellable
     async def _get_events_from_cache_or_db(
         self, event_ids: Iterable[str], allow_rejected: bool = False
     ) -> Dict[str, EventCacheEntry]:
@@ -1156,7 +1160,7 @@ class EventsWorkerStore(SQLBaseStore):
             if format_version is None:
                 # This means that we stored the event before we had the concept
                 # of a event format version, so it must be a V1 event.
-                format_version = EventFormatVersions.V1
+                format_version = EventFormatVersions.ROOM_V1_V2
 
             room_version_id = row.room_version_id
 
@@ -1186,10 +1190,10 @@ class EventsWorkerStore(SQLBaseStore):
                 #
                 # So, the following approximations should be adequate.
 
-                if format_version == EventFormatVersions.V1:
+                if format_version == EventFormatVersions.ROOM_V1_V2:
                     # if it's event format v1 then it must be room v1 or v2
                     room_version = RoomVersions.V1
-                elif format_version == EventFormatVersions.V2:
+                elif format_version == EventFormatVersions.ROOM_V3:
                     # if it's event format v2 then it must be room v3
                     room_version = RoomVersions.V3
                 else:
diff --git a/synapse/storage/databases/main/lock.py b/synapse/storage/databases/main/lock.py
index 2d7633fbd5..7270ef09da 100644
--- a/synapse/storage/databases/main/lock.py
+++ b/synapse/storage/databases/main/lock.py
@@ -129,91 +129,48 @@ class LockStore(SQLBaseStore):
         now = self._clock.time_msec()
         token = random_string(6)
 
-        if self.db_pool.engine.can_native_upsert:
-
-            def _try_acquire_lock_txn(txn: LoggingTransaction) -> bool:
-                # We take out the lock if either a) there is no row for the lock
-                # already, b) the existing row has timed out, or c) the row is
-                # for this instance (which means the process got killed and
-                # restarted)
-                sql = """
-                    INSERT INTO worker_locks (lock_name, lock_key, instance_name, token, last_renewed_ts)
-                    VALUES (?, ?, ?, ?, ?)
-                    ON CONFLICT (lock_name, lock_key)
-                    DO UPDATE
-                        SET
-                            token = EXCLUDED.token,
-                            instance_name = EXCLUDED.instance_name,
-                            last_renewed_ts = EXCLUDED.last_renewed_ts
-                        WHERE
-                            worker_locks.last_renewed_ts < ?
-                            OR worker_locks.instance_name = EXCLUDED.instance_name
-                """
-                txn.execute(
-                    sql,
-                    (
-                        lock_name,
-                        lock_key,
-                        self._instance_name,
-                        token,
-                        now,
-                        now - _LOCK_TIMEOUT_MS,
-                    ),
-                )
-
-                # We only acquired the lock if we inserted or updated the table.
-                return bool(txn.rowcount)
-
-            did_lock = await self.db_pool.runInteraction(
-                "try_acquire_lock",
-                _try_acquire_lock_txn,
-                # We can autocommit here as we're executing a single query, this
-                # will avoid serialization errors.
-                db_autocommit=True,
+        def _try_acquire_lock_txn(txn: LoggingTransaction) -> bool:
+            # We take out the lock if either a) there is no row for the lock
+            # already, b) the existing row has timed out, or c) the row is
+            # for this instance (which means the process got killed and
+            # restarted)
+            sql = """
+               INSERT INTO worker_locks (lock_name, lock_key, instance_name, token, last_renewed_ts)
+               VALUES (?, ?, ?, ?, ?)
+               ON CONFLICT (lock_name, lock_key)
+               DO UPDATE
+                   SET
+                       token = EXCLUDED.token,
+                       instance_name = EXCLUDED.instance_name,
+                       last_renewed_ts = EXCLUDED.last_renewed_ts
+                   WHERE
+                       worker_locks.last_renewed_ts < ?
+                       OR worker_locks.instance_name = EXCLUDED.instance_name
+           """
+            txn.execute(
+                sql,
+                (
+                    lock_name,
+                    lock_key,
+                    self._instance_name,
+                    token,
+                    now,
+                    now - _LOCK_TIMEOUT_MS,
+                ),
             )
-            if not did_lock:
-                return None
-
-        else:
-            # If we're on an old SQLite we emulate the above logic by first
-            # clearing out any existing stale locks and then upserting.
-
-            def _try_acquire_lock_emulated_txn(txn: LoggingTransaction) -> bool:
-                sql = """
-                    DELETE FROM worker_locks
-                    WHERE
-                        lock_name = ?
-                        AND lock_key = ?
-                        AND (last_renewed_ts < ? OR instance_name = ?)
-                """
-                txn.execute(
-                    sql,
-                    (lock_name, lock_key, now - _LOCK_TIMEOUT_MS, self._instance_name),
-                )
-
-                inserted = self.db_pool.simple_upsert_txn_emulated(
-                    txn,
-                    table="worker_locks",
-                    keyvalues={
-                        "lock_name": lock_name,
-                        "lock_key": lock_key,
-                    },
-                    values={},
-                    insertion_values={
-                        "token": token,
-                        "last_renewed_ts": self._clock.time_msec(),
-                        "instance_name": self._instance_name,
-                    },
-                )
-
-                return inserted
 
-            did_lock = await self.db_pool.runInteraction(
-                "try_acquire_lock_emulated", _try_acquire_lock_emulated_txn
-            )
+            # We only acquired the lock if we inserted or updated the table.
+            return bool(txn.rowcount)
 
-            if not did_lock:
-                return None
+        did_lock = await self.db_pool.runInteraction(
+            "try_acquire_lock",
+            _try_acquire_lock_txn,
+            # We can autocommit here as we're executing a single query, this
+            # will avoid serialization errors.
+            db_autocommit=True,
+        )
+        if not did_lock:
+            return None
 
         lock = Lock(
             self._reactor,
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 124c70ad37..719a12b0ae 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -675,6 +675,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
             values={
                 "stream_id": stream_id,
                 "event_id": event_id,
+                "event_stream_ordering": stream_ordering,
                 "data": json_encoder.encode(data),
             },
             # receipts_linearized has a unique constraint on
@@ -812,7 +813,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
         # FIXME: This shouldn't invalidate the whole cache
         txn.call_after(self._get_linearized_receipts_for_room.invalidate, (room_id,))
 
-        self.db_pool.simple_delete_txn(
+        self.db_pool.simple_upsert_txn(
             txn,
             table="receipts_graph",
             keyvalues={
@@ -820,19 +821,86 @@ class ReceiptsWorkerStore(SQLBaseStore):
                 "receipt_type": receipt_type,
                 "user_id": user_id,
             },
-        )
-        self.db_pool.simple_insert_txn(
-            txn,
-            table="receipts_graph",
             values={
-                "room_id": room_id,
-                "receipt_type": receipt_type,
-                "user_id": user_id,
                 "event_ids": json_encoder.encode(event_ids),
                 "data": json_encoder.encode(data),
             },
+            # receipts_graph has a unique constraint on
+            # (user_id, room_id, receipt_type), so no need to lock
+            lock=False,
         )
 
 
-class ReceiptsStore(ReceiptsWorkerStore):
+class ReceiptsBackgroundUpdateStore(SQLBaseStore):
+    POPULATE_RECEIPT_EVENT_STREAM_ORDERING = "populate_event_stream_ordering"
+
+    def __init__(
+        self,
+        database: DatabasePool,
+        db_conn: LoggingDatabaseConnection,
+        hs: "HomeServer",
+    ):
+        super().__init__(database, db_conn, hs)
+
+        self.db_pool.updates.register_background_update_handler(
+            self.POPULATE_RECEIPT_EVENT_STREAM_ORDERING,
+            self._populate_receipt_event_stream_ordering,
+        )
+
+    async def _populate_receipt_event_stream_ordering(
+        self, progress: JsonDict, batch_size: int
+    ) -> int:
+        def _populate_receipt_event_stream_ordering_txn(
+            txn: LoggingTransaction,
+        ) -> bool:
+
+            if "max_stream_id" in progress:
+                max_stream_id = progress["max_stream_id"]
+            else:
+                txn.execute("SELECT max(stream_id) FROM receipts_linearized")
+                res = txn.fetchone()
+                if res is None or res[0] is None:
+                    return True
+                else:
+                    max_stream_id = res[0]
+
+            start = progress.get("stream_id", 0)
+            stop = start + batch_size
+
+            sql = """
+                UPDATE receipts_linearized
+                SET event_stream_ordering = (
+                    SELECT stream_ordering
+                    FROM events
+                    WHERE event_id = receipts_linearized.event_id
+                )
+                WHERE stream_id >= ? AND stream_id < ?
+            """
+            txn.execute(sql, (start, stop))
+
+            self.db_pool.updates._background_update_progress_txn(
+                txn,
+                self.POPULATE_RECEIPT_EVENT_STREAM_ORDERING,
+                {
+                    "stream_id": stop,
+                    "max_stream_id": max_stream_id,
+                },
+            )
+
+            return stop > max_stream_id
+
+        finished = await self.db_pool.runInteraction(
+            "_remove_devices_from_device_inbox_txn",
+            _populate_receipt_event_stream_ordering_txn,
+        )
+
+        if finished:
+            await self.db_pool.updates._end_background_update(
+                self.POPULATE_RECEIPT_EVENT_STREAM_ORDERING
+            )
+
+        return batch_size
+
+
+class ReceiptsStore(ReceiptsWorkerStore, ReceiptsBackgroundUpdateStore):
     pass
diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index 7fb9c801da..ac821878b0 100644
--- a/synapse/storage/databases/main/registration.py
+++ b/synapse/storage/databases/main/registration.py
@@ -175,6 +175,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
                 "is_guest",
                 "admin",
                 "consent_version",
+                "consent_ts",
                 "consent_server_notice_sent",
                 "appservice_id",
                 "creation_ts",
@@ -2227,7 +2228,10 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
                 txn,
                 table="users",
                 keyvalues={"name": user_id},
-                updatevalues={"consent_version": consent_version},
+                updatevalues={
+                    "consent_version": consent_version,
+                    "consent_ts": self._clock.time_msec(),
+                },
             )
             self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
 
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index 4f0adb136a..fdb4684e12 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -32,10 +32,7 @@ import attr
 
 from synapse.api.constants import EventTypes, Membership
 from synapse.metrics import LaterGauge
-from synapse.metrics.background_process_metrics import (
-    run_as_background_process,
-    wrap_as_background_process,
-)
+from synapse.metrics.background_process_metrics import wrap_as_background_process
 from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
 from synapse.storage.database import (
     DatabasePool,
@@ -55,6 +52,7 @@ from synapse.types import JsonDict, PersistedEventPosition, StateMap, get_domain
 from synapse.util.async_helpers import Linearizer
 from synapse.util.caches import intern_string
 from synapse.util.caches.descriptors import _CacheContext, cached, cachedList
+from synapse.util.cancellation import cancellable
 from synapse.util.iterutils import batch_iter
 from synapse.util.metrics import Measure
 
@@ -90,16 +88,6 @@ class RoomMemberWorkerStore(EventsWorkerStore):
         # at a time. Keyed by room_id.
         self._joined_host_linearizer = Linearizer("_JoinedHostsCache")
 
-        # Is the current_state_events.membership up to date? Or is the
-        # background update still running?
-        self._current_state_events_membership_up_to_date = False
-
-        txn = db_conn.cursor(
-            txn_name="_check_safe_current_state_events_membership_updated"
-        )
-        self._check_safe_current_state_events_membership_updated_txn(txn)
-        txn.close()
-
         if (
             self.hs.config.worker.run_background_tasks
             and self.hs.config.metrics.metrics_flags.known_servers
@@ -156,34 +144,6 @@ class RoomMemberWorkerStore(EventsWorkerStore):
         self._known_servers_count = max([count, 1])
         return self._known_servers_count
 
-    def _check_safe_current_state_events_membership_updated_txn(
-        self, txn: LoggingTransaction
-    ) -> None:
-        """Checks if it is safe to assume the new current_state_events
-        membership column is up to date
-        """
-
-        pending_update = self.db_pool.simple_select_one_txn(
-            txn,
-            table="background_updates",
-            keyvalues={"update_name": _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME},
-            retcols=["update_name"],
-            allow_none=True,
-        )
-
-        self._current_state_events_membership_up_to_date = not pending_update
-
-        # If the update is still running, reschedule to run.
-        if pending_update:
-            self._clock.call_later(
-                15.0,
-                run_as_background_process,
-                "_check_safe_current_state_events_membership_updated",
-                self.db_pool.runInteraction,
-                "_check_safe_current_state_events_membership_updated",
-                self._check_safe_current_state_events_membership_updated_txn,
-            )
-
     @cached(max_entries=100000, iterable=True)
     async def get_users_in_room(self, room_id: str) -> List[str]:
         """
@@ -191,8 +151,15 @@ class RoomMemberWorkerStore(EventsWorkerStore):
         (aka. with the lowest depth). This is done to match the sort in
         `get_current_hosts_in_room()` and so we can re-use the cache but it's
         not horrible to have here either.
-        """
 
+        Uses `m.room.member`s in the room state at the current forward extremities to
+        determine which users are in the room.
+
+        Will return inaccurate results for rooms with partial state, since the state for
+        the forward extremities of those rooms will exclude most members. We may also
+        calculate room state incorrectly for such rooms and believe that a member is or
+        is not in the room when the opposite is true.
+        """
         return await self.db_pool.runInteraction(
             "get_users_in_room", self.get_users_in_room_txn, room_id
         )
@@ -204,31 +171,14 @@ class RoomMemberWorkerStore(EventsWorkerStore):
         `get_current_hosts_in_room()` and so we can re-use the cache but it's
         not horrible to have here either.
         """
-        # If we can assume current_state_events.membership is up to date
-        # then we can avoid a join, which is a Very Good Thing given how
-        # frequently this function gets called.
-        if self._current_state_events_membership_up_to_date:
-            sql = """
-                SELECT c.state_key FROM current_state_events as c
-                /* Get the depth of the event from the events table */
-                INNER JOIN events AS e USING (event_id)
-                WHERE c.type = 'm.room.member' AND c.room_id = ? AND membership = ?
-                /* Sorted by lowest depth first */
-                ORDER BY e.depth ASC;
-            """
-        else:
-            sql = """
-                SELECT c.state_key FROM room_memberships as m
-                /* Get the depth of the event from the events table */
-                INNER JOIN events AS e USING (event_id)
-                INNER JOIN current_state_events as c
-                ON m.event_id = c.event_id
-                AND m.room_id = c.room_id
-                AND m.user_id = c.state_key
-                WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?
-                /* Sorted by lowest depth first */
-                ORDER BY e.depth ASC;
-            """
+        sql = """
+            SELECT c.state_key FROM current_state_events as c
+            /* Get the depth of the event from the events table */
+            INNER JOIN events AS e USING (event_id)
+            WHERE c.type = 'm.room.member' AND c.room_id = ? AND membership = ?
+            /* Sorted by lowest depth first */
+            ORDER BY e.depth ASC;
+        """
 
         txn.execute(sql, (room_id, Membership.JOIN))
         return [r[0] for r in txn]
@@ -345,28 +295,14 @@ class RoomMemberWorkerStore(EventsWorkerStore):
             # We do this all in one transaction to keep the cache small.
             # FIXME: get rid of this when we have room_stats
 
-            # If we can assume current_state_events.membership is up to date
-            # then we can avoid a join, which is a Very Good Thing given how
-            # frequently this function gets called.
-            if self._current_state_events_membership_up_to_date:
-                # Note, rejected events will have a null membership field, so
-                # we we manually filter them out.
-                sql = """
-                    SELECT count(*), membership FROM current_state_events
-                    WHERE type = 'm.room.member' AND room_id = ?
-                        AND membership IS NOT NULL
-                    GROUP BY membership
-                """
-            else:
-                sql = """
-                    SELECT count(*), m.membership FROM room_memberships as m
-                    INNER JOIN current_state_events as c
-                    ON m.event_id = c.event_id
-                    AND m.room_id = c.room_id
-                    AND m.user_id = c.state_key
-                    WHERE c.type = 'm.room.member' AND c.room_id = ?
-                    GROUP BY m.membership
-                """
+            # Note, rejected events will have a null membership field, so
+            # we we manually filter them out.
+            sql = """
+                SELECT count(*), membership FROM current_state_events
+                WHERE type = 'm.room.member' AND room_id = ?
+                    AND membership IS NOT NULL
+                GROUP BY membership
+            """
 
             txn.execute(sql, (room_id,))
             res: Dict[str, MemberSummary] = {}
@@ -375,30 +311,18 @@ class RoomMemberWorkerStore(EventsWorkerStore):
 
             # we order by membership and then fairly arbitrarily by event_id so
             # heroes are consistent
-            if self._current_state_events_membership_up_to_date:
-                # Note, rejected events will have a null membership field, so
-                # we we manually filter them out.
-                sql = """
-                    SELECT state_key, membership, event_id
-                    FROM current_state_events
-                    WHERE type = 'm.room.member' AND room_id = ?
-                        AND membership IS NOT NULL
-                    ORDER BY
-                        CASE membership WHEN ? THEN 1 WHEN ? THEN 2 ELSE 3 END ASC,
-                        event_id ASC
-                    LIMIT ?
-                """
-            else:
-                sql = """
-                    SELECT c.state_key, m.membership, c.event_id
-                    FROM room_memberships as m
-                    INNER JOIN current_state_events as c USING (room_id, event_id)
-                    WHERE c.type = 'm.room.member' AND c.room_id = ?
-                    ORDER BY
-                        CASE m.membership WHEN ? THEN 1 WHEN ? THEN 2 ELSE 3 END ASC,
-                        c.event_id ASC
-                    LIMIT ?
-                """
+            # Note, rejected events will have a null membership field, so
+            # we we manually filter them out.
+            sql = """
+                SELECT state_key, membership, event_id
+                FROM current_state_events
+                WHERE type = 'm.room.member' AND room_id = ?
+                    AND membership IS NOT NULL
+                ORDER BY
+                    CASE membership WHEN ? THEN 1 WHEN ? THEN 2 ELSE 3 END ASC,
+                    event_id ASC
+                LIMIT ?
+            """
 
             # 6 is 5 (number of heroes) plus 1, in case one of them is the calling user.
             txn.execute(sql, (room_id, Membership.JOIN, Membership.INVITE, 6))
@@ -641,27 +565,15 @@ class RoomMemberWorkerStore(EventsWorkerStore):
         # We use `current_state_events` here and not `local_current_membership`
         # as a) this gets called with remote users and b) this only gets called
         # for rooms the server is participating in.
-        if self._current_state_events_membership_up_to_date:
-            sql = """
-                SELECT room_id, e.instance_name, e.stream_ordering
-                FROM current_state_events AS c
-                INNER JOIN events AS e USING (room_id, event_id)
-                WHERE
-                    c.type = 'm.room.member'
-                    AND c.state_key = ?
-                    AND c.membership = ?
-            """
-        else:
-            sql = """
-                SELECT room_id, e.instance_name, e.stream_ordering
-                FROM current_state_events AS c
-                INNER JOIN room_memberships AS m USING (room_id, event_id)
-                INNER JOIN events AS e USING (room_id, event_id)
-                WHERE
-                    c.type = 'm.room.member'
-                    AND c.state_key = ?
-                    AND m.membership = ?
-            """
+        sql = """
+            SELECT room_id, e.instance_name, e.stream_ordering
+            FROM current_state_events AS c
+            INNER JOIN events AS e USING (room_id, event_id)
+            WHERE
+                c.type = 'm.room.member'
+                AND c.state_key = ?
+                AND c.membership = ?
+        """
 
         txn.execute(sql, (user_id, Membership.JOIN))
         return frozenset(
@@ -699,27 +611,15 @@ class RoomMemberWorkerStore(EventsWorkerStore):
             user_ids,
         )
 
-        if self._current_state_events_membership_up_to_date:
-            sql = f"""
-                SELECT c.state_key, room_id, e.instance_name, e.stream_ordering
-                FROM current_state_events AS c
-                INNER JOIN events AS e USING (room_id, event_id)
-                WHERE
-                    c.type = 'm.room.member'
-                    AND c.membership = ?
-                    AND {clause}
-            """
-        else:
-            sql = f"""
-                SELECT c.state_key, room_id, e.instance_name, e.stream_ordering
-                FROM current_state_events AS c
-                INNER JOIN room_memberships AS m USING (room_id, event_id)
-                INNER JOIN events AS e USING (room_id, event_id)
-                WHERE
-                    c.type = 'm.room.member'
-                    AND m.membership = ?
-                    AND {clause}
-            """
+        sql = f"""
+            SELECT c.state_key, room_id, e.instance_name, e.stream_ordering
+            FROM current_state_events AS c
+            INNER JOIN events AS e USING (room_id, event_id)
+            WHERE
+                c.type = 'm.room.member'
+                AND c.membership = ?
+                AND {clause}
+        """
 
         txn.execute(sql, [Membership.JOIN] + args)
 
@@ -770,6 +670,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
             _get_users_server_still_shares_room_with_txn,
         )
 
+    @cancellable
     async def get_rooms_for_user(
         self, user_id: str, on_invalidate: Optional[Callable[[], None]] = None
     ) -> FrozenSet[str]:
@@ -1020,6 +921,14 @@ class RoomMemberWorkerStore(EventsWorkerStore):
         longest is good because they're most likely to have anything we ask
         about.
 
+        Uses `m.room.member`s in the room state at the current forward extremities to
+        determine which hosts are in the room.
+
+        Will return inaccurate results for rooms with partial state, since the state for
+        the forward extremities of those rooms will exclude most members. We may also
+        calculate room state incorrectly for such rooms and believe that a host is or
+        is not in the room when the opposite is true.
+
         Returns:
             Returns a list of servers sorted by longest in the room first. (aka.
             sorted by join with the lowest depth first).
@@ -1042,6 +951,8 @@ class RoomMemberWorkerStore(EventsWorkerStore):
             # We use a `Set` just for fast lookups
             domain_set: Set[str] = set()
             for u in users:
+                if ":" not in u:
+                    continue
                 domain = get_domain_from_id(u)
                 if domain not in domain_set:
                     domain_set.add(domain)
@@ -1075,7 +986,8 @@ class RoomMemberWorkerStore(EventsWorkerStore):
                 ORDER BY min(e.depth) ASC;
             """
             txn.execute(sql, (room_id,))
-            return [d for d, in txn]
+            # `server_domain` will be `NULL` for malformed MXIDs with no colons.
+            return [d for d, in txn if d is not None]
 
         return await self.db_pool.runInteraction(
             "get_current_hosts_in_room", get_current_hosts_in_room_txn
diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py
index 0b10af0e58..af7bebee80 100644
--- a/synapse/storage/databases/main/state.py
+++ b/synapse/storage/databases/main/state.py
@@ -23,6 +23,7 @@ from synapse.api.errors import NotFoundError, UnsupportedRoomVersionError
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
 from synapse.events import EventBase
 from synapse.events.snapshot import EventContext
+from synapse.logging.opentracing import trace
 from synapse.storage._base import SQLBaseStore
 from synapse.storage.database import (
     DatabasePool,
@@ -36,6 +37,7 @@ from synapse.storage.state import StateFilter
 from synapse.types import JsonDict, JsonMapping, StateMap
 from synapse.util.caches import intern_string
 from synapse.util.caches.descriptors import cached, cachedList
+from synapse.util.cancellation import cancellable
 from synapse.util.iterutils import batch_iter
 
 if TYPE_CHECKING:
@@ -142,6 +144,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         return room_version
 
+    @trace
     async def get_metadata_for_events(
         self, event_ids: Collection[str]
     ) -> Dict[str, EventMetadata]:
@@ -281,6 +284,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
         )
 
     # FIXME: how should this be cached?
+    @cancellable
     async def get_partial_filtered_current_state_ids(
         self, room_id: str, state_filter: Optional[StateFilter] = None
     ) -> StateMap[str]:
diff --git a/synapse/storage/databases/main/stats.py b/synapse/storage/databases/main/stats.py
index b4c652acf3..356d4ca788 100644
--- a/synapse/storage/databases/main/stats.py
+++ b/synapse/storage/databases/main/stats.py
@@ -446,59 +446,41 @@ class StatsStore(StateDeltasStore):
             absolutes: Absolute (set) fields
             additive_relatives: Fields that will be added onto if existing row present.
         """
-        if self.database_engine.can_native_upsert:
-            absolute_updates = [
-                "%(field)s = EXCLUDED.%(field)s" % {"field": field}
-                for field in absolutes.keys()
-            ]
-
-            relative_updates = [
-                "%(field)s = EXCLUDED.%(field)s + COALESCE(%(table)s.%(field)s, 0)"
-                % {"table": table, "field": field}
-                for field in additive_relatives.keys()
-            ]
-
-            insert_cols = []
-            qargs = []
-
-            for (key, val) in chain(
-                keyvalues.items(), absolutes.items(), additive_relatives.items()
-            ):
-                insert_cols.append(key)
-                qargs.append(val)
+        absolute_updates = [
+            "%(field)s = EXCLUDED.%(field)s" % {"field": field}
+            for field in absolutes.keys()
+        ]
+
+        relative_updates = [
+            "%(field)s = EXCLUDED.%(field)s + COALESCE(%(table)s.%(field)s, 0)"
+            % {"table": table, "field": field}
+            for field in additive_relatives.keys()
+        ]
+
+        insert_cols = []
+        qargs = []
+
+        for (key, val) in chain(
+            keyvalues.items(), absolutes.items(), additive_relatives.items()
+        ):
+            insert_cols.append(key)
+            qargs.append(val)
+
+        sql = """
+            INSERT INTO %(table)s (%(insert_cols_cs)s)
+            VALUES (%(insert_vals_qs)s)
+            ON CONFLICT (%(key_columns)s) DO UPDATE SET %(updates)s
+        """ % {
+            "table": table,
+            "insert_cols_cs": ", ".join(insert_cols),
+            "insert_vals_qs": ", ".join(
+                ["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives))
+            ),
+            "key_columns": ", ".join(keyvalues),
+            "updates": ", ".join(chain(absolute_updates, relative_updates)),
+        }
 
-            sql = """
-                INSERT INTO %(table)s (%(insert_cols_cs)s)
-                VALUES (%(insert_vals_qs)s)
-                ON CONFLICT (%(key_columns)s) DO UPDATE SET %(updates)s
-            """ % {
-                "table": table,
-                "insert_cols_cs": ", ".join(insert_cols),
-                "insert_vals_qs": ", ".join(
-                    ["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives))
-                ),
-                "key_columns": ", ".join(keyvalues),
-                "updates": ", ".join(chain(absolute_updates, relative_updates)),
-            }
-
-            txn.execute(sql, qargs)
-        else:
-            self.database_engine.lock_table(txn, table)
-            retcols = list(chain(absolutes.keys(), additive_relatives.keys()))
-            current_row = self.db_pool.simple_select_one_txn(
-                txn, table, keyvalues, retcols, allow_none=True
-            )
-            if current_row is None:
-                merged_dict = {**keyvalues, **absolutes, **additive_relatives}
-                self.db_pool.simple_insert_txn(txn, table, merged_dict)
-            else:
-                for (key, val) in additive_relatives.items():
-                    if current_row[key] is None:
-                        current_row[key] = val
-                    else:
-                        current_row[key] += val
-                current_row.update(absolutes)
-                self.db_pool.simple_update_one_txn(txn, table, keyvalues, current_row)
+        txn.execute(sql, qargs)
 
     async def _calculate_and_set_initial_state_for_room(self, room_id: str) -> None:
         """Calculate and insert an entry into room_stats_current.
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index a347430aa7..3f9bfaeac5 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -72,6 +72,7 @@ from synapse.storage.util.id_generators import MultiWriterIdGenerator
 from synapse.types import PersistedEventPosition, RoomStreamToken
 from synapse.util.caches.descriptors import cached
 from synapse.util.caches.stream_change_cache import StreamChangeCache
+from synapse.util.cancellation import cancellable
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -597,6 +598,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         return ret, key
 
+    @cancellable
     async def get_membership_changes_for_user(
         self,
         user_id: str,
diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index ba79e19f7f..f8c6877ee8 100644
--- a/synapse/storage/databases/main/transactions.py
+++ b/synapse/storage/databases/main/transactions.py
@@ -221,25 +221,15 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
             retry_interval: how long until next retry in ms
         """
 
-        if self.database_engine.can_native_upsert:
-            await self.db_pool.runInteraction(
-                "set_destination_retry_timings",
-                self._set_destination_retry_timings_native,
-                destination,
-                failure_ts,
-                retry_last_ts,
-                retry_interval,
-                db_autocommit=True,  # Safe as its a single upsert
-            )
-        else:
-            await self.db_pool.runInteraction(
-                "set_destination_retry_timings",
-                self._set_destination_retry_timings_emulated,
-                destination,
-                failure_ts,
-                retry_last_ts,
-                retry_interval,
-            )
+        await self.db_pool.runInteraction(
+            "set_destination_retry_timings",
+            self._set_destination_retry_timings_native,
+            destination,
+            failure_ts,
+            retry_last_ts,
+            retry_interval,
+            db_autocommit=True,  # Safe as it's a single upsert
+        )
 
     def _set_destination_retry_timings_native(
         self,
@@ -249,8 +239,6 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
         retry_last_ts: int,
         retry_interval: int,
     ) -> None:
-        assert self.database_engine.can_native_upsert
-
         # Upsert retry time interval if retry_interval is zero (i.e. we're
         # resetting it) or greater than the existing retry interval.
         #
diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py
index bb64543c1f..f8cfcaca83 100644
--- a/synapse/storage/databases/state/store.py
+++ b/synapse/storage/databases/state/store.py
@@ -31,6 +31,7 @@ from synapse.storage.util.sequence import build_sequence_generator
 from synapse.types import MutableStateMap, StateKey, StateMap
 from synapse.util.caches.descriptors import cached
 from synapse.util.caches.dictionary_cache import DictionaryCache
+from synapse.util.cancellation import cancellable
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -156,6 +157,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
             "get_state_group_delta", _get_state_group_delta_txn
         )
 
+    @cancellable
     async def _get_state_groups_from_groups(
         self, groups: List[int], state_filter: StateFilter
     ) -> Dict[int, StateMap[str]]:
@@ -235,6 +237,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
 
         return state_filter.filter_state(state_dict_ids), not missing_types
 
+    @cancellable
     async def _get_state_for_groups(
         self, groups: Iterable[int], state_filter: Optional[StateFilter] = None
     ) -> Dict[int, MutableStateMap[str]]:
diff --git a/synapse/storage/engines/_base.py b/synapse/storage/engines/_base.py
index 971ff82693..0d16a419a4 100644
--- a/synapse/storage/engines/_base.py
+++ b/synapse/storage/engines/_base.py
@@ -45,14 +45,6 @@ class BaseDatabaseEngine(Generic[ConnectionType], metaclass=abc.ABCMeta):
 
     @property
     @abc.abstractmethod
-    def can_native_upsert(self) -> bool:
-        """
-        Do we support native UPSERTs?
-        """
-        ...
-
-    @property
-    @abc.abstractmethod
     def supports_using_any_list(self) -> bool:
         """
         Do we support using `a = ANY(?)` and passing a list
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index 517f9d5f98..7f7d006ac2 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -159,13 +159,6 @@ class PostgresEngine(BaseDatabaseEngine[psycopg2.extensions.connection]):
         db_conn.commit()
 
     @property
-    def can_native_upsert(self) -> bool:
-        """
-        Can we use native UPSERTs?
-        """
-        return True
-
-    @property
     def supports_using_any_list(self) -> bool:
         """Do we support using `a = ANY(?)` and passing a list"""
         return True
diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py
index 621f2c5efe..095ae0a096 100644
--- a/synapse/storage/engines/sqlite.py
+++ b/synapse/storage/engines/sqlite.py
@@ -49,14 +49,6 @@ class Sqlite3Engine(BaseDatabaseEngine[sqlite3.Connection]):
         return True
 
     @property
-    def can_native_upsert(self) -> bool:
-        """
-        Do we support native UPSERTs? This requires SQLite3 3.24+, plus some
-        more work we haven't done yet to tell what was inserted vs updated.
-        """
-        return sqlite3.sqlite_version_info >= (3, 24, 0)
-
-    @property
     def supports_using_any_list(self) -> bool:
         """Do we support using `a = ANY(?)` and passing a list"""
         return False
@@ -70,12 +62,11 @@ class Sqlite3Engine(BaseDatabaseEngine[sqlite3.Connection]):
         self, db_conn: sqlite3.Connection, allow_outdated_version: bool = False
     ) -> None:
         if not allow_outdated_version:
-            version = sqlite3.sqlite_version_info
             # Synapse is untested against older SQLite versions, and we don't want
             # to let users upgrade to a version of Synapse with broken support for their
             # sqlite version, because it risks leaving them with a half-upgraded db.
-            if version < (3, 22, 0):
-                raise RuntimeError("Synapse requires sqlite 3.22 or above.")
+            if sqlite3.sqlite_version_info < (3, 27, 0):
+                raise RuntimeError("Synapse requires sqlite 3.27 or above.")
 
     def check_new_database(self, txn: Cursor) -> None:
         """Gets called when setting up a brand new database. This allows us to
diff --git a/synapse/storage/schema/main/delta/72/05receipts_event_stream_ordering.sql b/synapse/storage/schema/main/delta/72/05receipts_event_stream_ordering.sql
new file mode 100644
index 0000000000..2a822f4509
--- /dev/null
+++ b/synapse/storage/schema/main/delta/72/05receipts_event_stream_ordering.sql
@@ -0,0 +1,19 @@
+/* Copyright 2022 Beeper
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE receipts_linearized ADD COLUMN event_stream_ordering BIGINT;
+
+INSERT INTO background_updates (update_name, progress_json) VALUES
+  ('populate_event_stream_ordering', '{}');
diff --git a/synapse/storage/schema/main/delta/72/06add_consent_ts_to_users.sql b/synapse/storage/schema/main/delta/72/06add_consent_ts_to_users.sql
new file mode 100644
index 0000000000..609eb1750f
--- /dev/null
+++ b/synapse/storage/schema/main/delta/72/06add_consent_ts_to_users.sql
@@ -0,0 +1,16 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE users ADD consent_ts bigint;
diff --git a/synapse/storage/schema/main/delta/72/07force_update_current_state_events_membership.py b/synapse/storage/schema/main/delta/72/07force_update_current_state_events_membership.py
new file mode 100644
index 0000000000..b5853d125c
--- /dev/null
+++ b/synapse/storage/schema/main/delta/72/07force_update_current_state_events_membership.py
@@ -0,0 +1,52 @@
+# Copyright 2022 Beeper
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+"""
+Forces through the `current_state_events_membership` background job so checks
+for its completion can be removed.
+
+Note the background job must still remain defined in the database class.
+"""
+
+
+def run_upgrade(cur, database_engine, *args, **kwargs):
+    cur.execute("SELECT update_name FROM background_updates")
+    rows = cur.fetchall()
+    for row in rows:
+        if row[0] == "current_state_events_membership":
+            break
+    # No pending background job so nothing to do here
+    else:
+        return
+
+    # Populate membership field for all current_state_events, this may take
+    # a while but was originally handled via a background update in 2019.
+    cur.execute(
+        """
+        UPDATE current_state_events
+        SET membership = (
+            SELECT membership FROM room_memberships
+            WHERE event_id = current_state_events.event_id
+        )
+        """
+    )
+
+    # Finally, delete the background job because we've handled it above
+    cur.execute(
+        """
+        DELETE FROM background_updates
+        WHERE update_name = 'current_state_events_membership'
+        """
+    )
diff --git a/synapse/storage/schema/state/delta/30/state_stream.sql b/synapse/storage/schema/state/delta/30/state_stream.sql
index e85699e82e..bdaf8b02d5 100644
--- a/synapse/storage/schema/state/delta/30/state_stream.sql
+++ b/synapse/storage/schema/state/delta/30/state_stream.sql
@@ -26,6 +26,10 @@
  * (event, state) pair, we can use that stream_ordering to identify when
  * the new state was assigned for the event.
  */
+
+/* NB: This table belongs to the `main` logical database; it should not be present
+ * in `state`.
+ */
 CREATE TABLE IF NOT EXISTS ex_outlier_stream(
     event_stream_ordering BIGINT PRIMARY KEY NOT NULL,
     event_id TEXT NOT NULL,
diff --git a/synapse/storage/util/partial_state_events_tracker.py b/synapse/storage/util/partial_state_events_tracker.py
index b4bf49dace..8d8894d1d5 100644
--- a/synapse/storage/util/partial_state_events_tracker.py
+++ b/synapse/storage/util/partial_state_events_tracker.py
@@ -24,6 +24,7 @@ from synapse.logging.opentracing import trace_with_opname
 from synapse.storage.databases.main.events_worker import EventsWorkerStore
 from synapse.storage.databases.main.room import RoomWorkerStore
 from synapse.util import unwrapFirstError
+from synapse.util.cancellation import cancellable
 
 logger = logging.getLogger(__name__)
 
@@ -60,6 +61,7 @@ class PartialStateEventsTracker:
                 o.callback(None)
 
     @trace_with_opname("PartialStateEventsTracker.await_full_state")
+    @cancellable
     async def await_full_state(self, event_ids: Collection[str]) -> None:
         """Wait for all the given events to have full state.
 
@@ -154,6 +156,7 @@ class PartialCurrentStateTracker:
                 o.callback(None)
 
     @trace_with_opname("PartialCurrentStateTracker.await_full_state")
+    @cancellable
     async def await_full_state(self, room_id: str) -> None:
         # We add the deferred immediately so that the DB call to check for
         # partial state doesn't race when we unpartial the room.
diff --git a/synapse/types.py b/synapse/types.py
index 668d48d646..ec44601f54 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -52,6 +52,7 @@ from twisted.internet.interfaces import (
 )
 
 from synapse.api.errors import Codes, SynapseError
+from synapse.util.cancellation import cancellable
 from synapse.util.stringutils import parse_and_validate_server_name
 
 if TYPE_CHECKING:
@@ -699,7 +700,11 @@ class StreamToken:
     START: ClassVar["StreamToken"]
 
     @classmethod
+    @cancellable
     async def from_string(cls, store: "DataStore", string: str) -> "StreamToken":
+        """
+        Creates a RoomStreamToken from its textual representation.
+        """
         try:
             keys = string.split(cls._SEPARATOR)
             while len(keys) < len(attr.fields(cls)):
diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py
index bdf9b0dc8c..35c0be08b0 100644
--- a/synapse/util/caches/__init__.py
+++ b/synapse/util/caches/__init__.py
@@ -20,9 +20,11 @@ from sys import intern
 from typing import Any, Callable, Dict, List, Optional, Sized, TypeVar
 
 import attr
+from prometheus_client import REGISTRY
 from prometheus_client.core import Gauge
 
 from synapse.config.cache import add_resizable_cache
+from synapse.util.metrics import DynamicCollectorRegistry
 
 logger = logging.getLogger(__name__)
 
@@ -30,27 +32,62 @@ logger = logging.getLogger(__name__)
 # Whether to track estimated memory usage of the LruCaches.
 TRACK_MEMORY_USAGE = False
 
+# We track cache metrics in a special registry that lets us update the metrics
+# just before they are returned from the scrape endpoint.
+CACHE_METRIC_REGISTRY = DynamicCollectorRegistry()
 
 caches_by_name: Dict[str, Sized] = {}
-collectors_by_name: Dict[str, "CacheMetric"] = {}
 
-cache_size = Gauge("synapse_util_caches_cache_size", "", ["name"])
-cache_hits = Gauge("synapse_util_caches_cache_hits", "", ["name"])
-cache_evicted = Gauge("synapse_util_caches_cache_evicted_size", "", ["name", "reason"])
-cache_total = Gauge("synapse_util_caches_cache_total", "", ["name"])
-cache_max_size = Gauge("synapse_util_caches_cache_max_size", "", ["name"])
+cache_size = Gauge(
+    "synapse_util_caches_cache_size", "", ["name"], registry=CACHE_METRIC_REGISTRY
+)
+cache_hits = Gauge(
+    "synapse_util_caches_cache_hits", "", ["name"], registry=CACHE_METRIC_REGISTRY
+)
+cache_evicted = Gauge(
+    "synapse_util_caches_cache_evicted_size",
+    "",
+    ["name", "reason"],
+    registry=CACHE_METRIC_REGISTRY,
+)
+cache_total = Gauge(
+    "synapse_util_caches_cache", "", ["name"], registry=CACHE_METRIC_REGISTRY
+)
+cache_max_size = Gauge(
+    "synapse_util_caches_cache_max_size", "", ["name"], registry=CACHE_METRIC_REGISTRY
+)
 cache_memory_usage = Gauge(
     "synapse_util_caches_cache_size_bytes",
     "Estimated memory usage of the caches",
     ["name"],
+    registry=CACHE_METRIC_REGISTRY,
 )
 
-response_cache_size = Gauge("synapse_util_caches_response_cache_size", "", ["name"])
-response_cache_hits = Gauge("synapse_util_caches_response_cache_hits", "", ["name"])
+response_cache_size = Gauge(
+    "synapse_util_caches_response_cache_size",
+    "",
+    ["name"],
+    registry=CACHE_METRIC_REGISTRY,
+)
+response_cache_hits = Gauge(
+    "synapse_util_caches_response_cache_hits",
+    "",
+    ["name"],
+    registry=CACHE_METRIC_REGISTRY,
+)
 response_cache_evicted = Gauge(
-    "synapse_util_caches_response_cache_evicted_size", "", ["name", "reason"]
+    "synapse_util_caches_response_cache_evicted_size",
+    "",
+    ["name", "reason"],
+    registry=CACHE_METRIC_REGISTRY,
 )
-response_cache_total = Gauge("synapse_util_caches_response_cache_total", "", ["name"])
+response_cache_total = Gauge(
+    "synapse_util_caches_response_cache", "", ["name"], registry=CACHE_METRIC_REGISTRY
+)
+
+
+# Register our custom cache metrics registry with the global registry
+REGISTRY.register(CACHE_METRIC_REGISTRY)
 
 
 class EvictionReason(Enum):
@@ -168,9 +205,8 @@ def register_cache(
         add_resizable_cache(cache_name, resize_callback)
 
     metric = CacheMetric(cache, cache_type, cache_name, collect_callback)
-    metric_name = "cache_%s_%s" % (cache_type, cache_name)
     caches_by_name[cache_name] = cache
-    collectors_by_name[metric_name] = metric
+    CACHE_METRIC_REGISTRY.register_hook(metric.collect)
     return metric
 
 
diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py
index bc3b4938ea..9687120ebf 100644
--- a/synapse/util/metrics.py
+++ b/synapse/util/metrics.py
@@ -15,9 +15,9 @@
 import logging
 from functools import wraps
 from types import TracebackType
-from typing import Awaitable, Callable, Optional, Type, TypeVar
+from typing import Awaitable, Callable, Generator, List, Optional, Type, TypeVar
 
-from prometheus_client import Counter
+from prometheus_client import CollectorRegistry, Counter, Metric
 from typing_extensions import Concatenate, ParamSpec, Protocol
 
 from synapse.logging.context import (
@@ -208,3 +208,33 @@ class Measure:
         metrics.real_time_sum += duration
 
         # TODO: Add other in flight metrics.
+
+
+class DynamicCollectorRegistry(CollectorRegistry):
+    """
+    Custom Prometheus Collector registry that calls a hook first, allowing you
+    to update metrics on-demand.
+
+    Don't forget to register this registry with the main registry!
+    """
+
+    def __init__(self) -> None:
+        super().__init__()
+        self._pre_update_hooks: List[Callable[[], None]] = []
+
+    def collect(self) -> Generator[Metric, None, None]:
+        """
+        Collects metrics, calling pre-update hooks first.
+        """
+
+        for pre_update_hook in self._pre_update_hooks:
+            pre_update_hook()
+
+        yield from super().collect()
+
+    def register_hook(self, hook: Callable[[], None]) -> None:
+        """
+        Registers a hook that is called before metric collection.
+        """
+
+        self._pre_update_hooks.append(hook)
diff --git a/synapse/util/rust.py b/synapse/util/rust.py
new file mode 100644
index 0000000000..30ecb9ffd9
--- /dev/null
+++ b/synapse/util/rust.py
@@ -0,0 +1,84 @@
+# Copyright 2022 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import sys
+from hashlib import blake2b
+
+import synapse
+from synapse.synapse_rust import get_rust_file_digest
+
+
+def check_rust_lib_up_to_date() -> None:
+    """For editable installs check if the rust library is outdated and needs to
+    be rebuilt.
+    """
+
+    if not _dist_is_editable():
+        return
+
+    synapse_dir = os.path.dirname(synapse.__file__)
+    synapse_root = os.path.abspath(os.path.join(synapse_dir, ".."))
+
+    # Double check we've not gone into site-packages...
+    if os.path.basename(synapse_root) == "site-packages":
+        return
+
+    # ... and it looks like the root of a python project.
+    if not os.path.exists("pyproject.toml"):
+        return
+
+    # Get the hash of all Rust source files
+    hash = _hash_rust_files_in_directory(os.path.join(synapse_root, "rust", "src"))
+
+    if hash != get_rust_file_digest():
+        raise Exception("Rust module outdated. Please rebuild using `poetry install`")
+
+
+def _hash_rust_files_in_directory(directory: str) -> str:
+    """Get the hash of all files in a directory (recursively)"""
+
+    directory = os.path.abspath(directory)
+
+    paths = []
+
+    dirs = [directory]
+    while dirs:
+        dir = dirs.pop()
+        with os.scandir(dir) as d:
+            for entry in d:
+                if entry.is_dir():
+                    dirs.append(entry.path)
+                else:
+                    paths.append(entry.path)
+
+    # We sort to make sure that we get a consistent and well-defined ordering.
+    paths.sort()
+
+    hasher = blake2b()
+
+    for path in paths:
+        with open(os.path.join(directory, path), "rb") as f:
+            hasher.update(f.read())
+
+    return hasher.hexdigest()
+
+
+def _dist_is_editable() -> bool:
+    """Is distribution an editable install?"""
+    for path_item in sys.path:
+        egg_link = os.path.join(path_item, "matrix-synapse.egg-link")
+        if os.path.isfile(egg_link):
+            return True
+    return False