summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/api/constants.py2
-rw-r--r--synapse/api/urls.py1
-rw-r--r--synapse/app/homeserver.py43
-rw-r--r--synapse/appservice/__init__.py26
-rw-r--r--synapse/appservice/api.py29
-rw-r--r--synapse/appservice/scheduler.py97
-rw-r--r--synapse/config/experimental.py7
-rw-r--r--synapse/config/ratelimiting.py15
-rw-r--r--synapse/config/server.py61
-rw-r--r--synapse/federation/transport/server/_base.py67
-rw-r--r--synapse/federation/transport/server/federation.py4
-rw-r--r--synapse/handlers/appservice.py138
-rw-r--r--synapse/handlers/auth.py44
-rw-r--r--synapse/handlers/federation.py27
-rw-r--r--synapse/handlers/federation_event.py34
-rw-r--r--synapse/handlers/message.py20
-rw-r--r--synapse/handlers/room_batch.py44
-rw-r--r--synapse/handlers/room_member.py31
-rw-r--r--synapse/handlers/sync.py4
-rw-r--r--synapse/handlers/typing.py4
-rw-r--r--synapse/handlers/ui_auth/__init__.py2
-rw-r--r--synapse/logging/opentracing.py41
-rw-r--r--synapse/logging/scopecontextmanager.py76
-rw-r--r--synapse/metrics/__init__.py10
-rw-r--r--synapse/module_api/__init__.py29
-rw-r--r--synapse/notifier.py4
-rw-r--r--synapse/push/baserules.py219
-rw-r--r--synapse/python_dependencies.py3
-rw-r--r--synapse/replication/http/__init__.py2
-rw-r--r--synapse/replication/http/_base.py31
-rw-r--r--synapse/replication/http/account_data.py38
-rw-r--r--synapse/replication/http/devices.py14
-rw-r--r--synapse/replication/http/federation.py65
-rw-r--r--synapse/replication/http/login.py47
-rw-r--r--synapse/replication/http/membership.py31
-rw-r--r--synapse/replication/http/presence.py38
-rw-r--r--synapse/replication/http/push.py14
-rw-r--r--synapse/replication/http/register.py79
-rw-r--r--synapse/replication/http/send_event.py44
-rw-r--r--synapse/replication/http/streams.py16
-rw-r--r--synapse/rest/client/account.py4
-rw-r--r--synapse/rest/client/push_rule.py13
-rw-r--r--synapse/rest/client/register.py15
-rw-r--r--synapse/rest/client/room_batch.py17
-rw-r--r--synapse/rest/media/v1/upload_resource.py13
-rw-r--r--synapse/storage/_base.py11
-rw-r--r--synapse/storage/databases/main/account_data.py164
-rw-r--r--synapse/storage/databases/main/appservice.py24
-rw-r--r--synapse/storage/databases/main/cache.py16
-rw-r--r--synapse/storage/databases/main/deviceinbox.py276
-rw-r--r--synapse/storage/databases/main/devices.py22
-rw-r--r--synapse/storage/databases/main/event_federation.py313
-rw-r--r--synapse/storage/databases/main/events.py17
-rw-r--r--synapse/storage/databases/main/push_rule.py20
-rw-r--r--synapse/storage/databases/main/relations.py150
-rw-r--r--synapse/storage/schema/main/delta/68/02_msc2409_add_device_id_appservice_stream_type.sql21
-rw-r--r--synapse/storage/schema/main/delta/68/03_delete_account_data_for_deactivated_accounts.sql20
-rw-r--r--synapse/util/caches/deferred_cache.py5
-rw-r--r--synapse/util/caches/descriptors.py8
-rw-r--r--synapse/util/caches/lrucache.py6
-rw-r--r--synapse/util/threepids.py13
61 files changed, 1695 insertions, 954 deletions
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index 52c083a20b..36ace7c613 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -81,7 +81,7 @@ class LoginType:
     TERMS: Final = "m.login.terms"
     SSO: Final = "m.login.sso"
     DUMMY: Final = "m.login.dummy"
-    REGISTRATION_TOKEN: Final = "org.matrix.msc3231.login.registration_token"
+    REGISTRATION_TOKEN: Final = "m.login.registration_token"
 
 
 # This is used in the `type` parameter for /register when called by
diff --git a/synapse/api/urls.py b/synapse/api/urls.py
index f9f9467dc1..bd49fa6a5f 100644
--- a/synapse/api/urls.py
+++ b/synapse/api/urls.py
@@ -28,7 +28,6 @@ FEDERATION_V1_PREFIX = FEDERATION_PREFIX + "/v1"
 FEDERATION_V2_PREFIX = FEDERATION_PREFIX + "/v2"
 FEDERATION_UNSTABLE_PREFIX = FEDERATION_PREFIX + "/unstable"
 STATIC_PREFIX = "/_matrix/static"
-WEB_CLIENT_PREFIX = "/_matrix/client"
 SERVER_KEY_V2_PREFIX = "/_matrix/key/v2"
 MEDIA_R0_PREFIX = "/_matrix/media/r0"
 MEDIA_V3_PREFIX = "/_matrix/media/v3"
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index efedcc8889..66e1a21331 100644
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -21,19 +21,18 @@ from typing import Dict, Iterable, Iterator, List
 from twisted.internet.tcp import Port
 from twisted.web.resource import EncodingResourceWrapper, Resource
 from twisted.web.server import GzipEncoderFactory
-from twisted.web.static import File
 
 import synapse
 import synapse.config.logger
 from synapse import events
 from synapse.api.urls import (
+    CLIENT_API_PREFIX,
     FEDERATION_PREFIX,
     LEGACY_MEDIA_PREFIX,
     MEDIA_R0_PREFIX,
     MEDIA_V3_PREFIX,
     SERVER_KEY_V2_PREFIX,
     STATIC_PREFIX,
-    WEB_CLIENT_PREFIX,
 )
 from synapse.app import _base
 from synapse.app._base import (
@@ -53,7 +52,6 @@ from synapse.http.additional_resource import AdditionalResource
 from synapse.http.server import (
     OptionsResource,
     RootOptionsRedirectResource,
-    RootRedirect,
     StaticResource,
 )
 from synapse.http.site import SynapseSite
@@ -134,15 +132,12 @@ class SynapseHomeServer(HomeServer):
         # Try to find something useful to serve at '/':
         #
         # 1. Redirect to the web client if it is an HTTP(S) URL.
-        # 2. Redirect to the web client served via Synapse.
-        # 3. Redirect to the static "Synapse is running" page.
-        # 4. Do not redirect and use a blank resource.
-        if self.config.server.web_client_location_is_redirect:
+        # 2. Redirect to the static "Synapse is running" page.
+        # 3. Do not redirect and use a blank resource.
+        if self.config.server.web_client_location:
             root_resource: Resource = RootOptionsRedirectResource(
                 self.config.server.web_client_location
             )
-        elif WEB_CLIENT_PREFIX in resources:
-            root_resource = RootOptionsRedirectResource(WEB_CLIENT_PREFIX)
         elif STATIC_PREFIX in resources:
             root_resource = RootOptionsRedirectResource(STATIC_PREFIX)
         else:
@@ -201,13 +196,7 @@ class SynapseHomeServer(HomeServer):
 
             resources.update(
                 {
-                    "/_matrix/client/api/v1": client_resource,
-                    "/_matrix/client/r0": client_resource,
-                    "/_matrix/client/v1": client_resource,
-                    "/_matrix/client/v3": client_resource,
-                    "/_matrix/client/unstable": client_resource,
-                    "/_matrix/client/v2_alpha": client_resource,
-                    "/_matrix/client/versions": client_resource,
+                    CLIENT_API_PREFIX: client_resource,
                     "/.well-known": well_known_resource(self),
                     "/_synapse/admin": AdminRestResource(self),
                     **build_synapse_client_resource_tree(self),
@@ -270,28 +259,6 @@ class SynapseHomeServer(HomeServer):
         if name in ["keys", "federation"]:
             resources[SERVER_KEY_V2_PREFIX] = KeyApiV2Resource(self)
 
-        if name == "webclient":
-            # webclient listeners are deprecated as of Synapse v1.51.0, remove it
-            # in > v1.53.0.
-            webclient_loc = self.config.server.web_client_location
-
-            if webclient_loc is None:
-                logger.warning(
-                    "Not enabling webclient resource, as web_client_location is unset."
-                )
-            elif self.config.server.web_client_location_is_redirect:
-                resources[WEB_CLIENT_PREFIX] = RootRedirect(webclient_loc)
-            else:
-                logger.warning(
-                    "Running webclient on the same domain is not recommended: "
-                    "https://github.com/matrix-org/synapse#security-note - "
-                    "after you move webclient to different host you can set "
-                    "web_client_location to its full URL to enable redirection."
-                )
-                # GZip is disabled here due to
-                # https://twistedmatrix.com/trac/ticket/7678
-                resources[WEB_CLIENT_PREFIX] = File(webclient_loc)
-
         if name == "metrics" and self.config.metrics.enable_metrics:
             resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
 
diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py
index 8c9ff93b2c..a340a8c9c7 100644
--- a/synapse/appservice/__init__.py
+++ b/synapse/appservice/__init__.py
@@ -165,23 +165,16 @@ class ApplicationService:
             return namespace.exclusive
         return False
 
-    async def _matches_user(
-        self, event: Optional[EventBase], store: Optional["DataStore"] = None
-    ) -> bool:
-        if not event:
-            return False
-
+    async def _matches_user(self, event: EventBase, store: "DataStore") -> bool:
         if self.is_interested_in_user(event.sender):
             return True
+
         # also check m.room.member state key
         if event.type == EventTypes.Member and self.is_interested_in_user(
             event.state_key
         ):
             return True
 
-        if not store:
-            return False
-
         does_match = await self.matches_user_in_member_list(event.room_id, store)
         return does_match
 
@@ -216,21 +209,15 @@ class ApplicationService:
             return self.is_interested_in_room(event.room_id)
         return False
 
-    async def _matches_aliases(
-        self, event: EventBase, store: Optional["DataStore"] = None
-    ) -> bool:
-        if not store or not event:
-            return False
-
+    async def _matches_aliases(self, event: EventBase, store: "DataStore") -> bool:
         alias_list = await store.get_aliases_for_room(event.room_id)
         for alias in alias_list:
             if self.is_interested_in_alias(alias):
                 return True
+
         return False
 
-    async def is_interested(
-        self, event: EventBase, store: Optional["DataStore"] = None
-    ) -> bool:
+    async def is_interested(self, event: EventBase, store: "DataStore") -> bool:
         """Check if this service is interested in this event.
 
         Args:
@@ -351,11 +338,13 @@ class AppServiceTransaction:
         id: int,
         events: List[EventBase],
         ephemeral: List[JsonDict],
+        to_device_messages: List[JsonDict],
     ):
         self.service = service
         self.id = id
         self.events = events
         self.ephemeral = ephemeral
+        self.to_device_messages = to_device_messages
 
     async def send(self, as_api: "ApplicationServiceApi") -> bool:
         """Sends this transaction using the provided AS API interface.
@@ -369,6 +358,7 @@ class AppServiceTransaction:
             service=self.service,
             events=self.events,
             ephemeral=self.ephemeral,
+            to_device_messages=self.to_device_messages,
             txn_id=self.id,
         )
 
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index def4424af0..73be7ff3d4 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -218,8 +218,23 @@ class ApplicationServiceApi(SimpleHttpClient):
         service: "ApplicationService",
         events: List[EventBase],
         ephemeral: List[JsonDict],
+        to_device_messages: List[JsonDict],
         txn_id: Optional[int] = None,
     ) -> bool:
+        """
+        Push data to an application service.
+
+        Args:
+            service: The application service to send to.
+            events: The persistent events to send.
+            ephemeral: The ephemeral events to send.
+            to_device_messages: The to-device messages to send.
+            txn_id: An unique ID to assign to this transaction. Application services should
+                deduplicate transactions received with identitical IDs.
+
+        Returns:
+            True if the task succeeded, False if it failed.
+        """
         if service.url is None:
             return True
 
@@ -237,13 +252,15 @@ class ApplicationServiceApi(SimpleHttpClient):
         uri = service.url + ("/transactions/%s" % urllib.parse.quote(str(txn_id)))
 
         # Never send ephemeral events to appservices that do not support it
+        body: Dict[str, List[JsonDict]] = {"events": serialized_events}
         if service.supports_ephemeral:
-            body = {
-                "events": serialized_events,
-                "de.sorunome.msc2409.ephemeral": ephemeral,
-            }
-        else:
-            body = {"events": serialized_events}
+            body.update(
+                {
+                    # TODO: Update to stable prefixes once MSC2409 completes FCP merge.
+                    "de.sorunome.msc2409.ephemeral": ephemeral,
+                    "de.sorunome.msc2409.to_device": to_device_messages,
+                }
+            )
 
         try:
             await self.put_json(
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 185e3a5278..c42fa32fff 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -48,7 +48,16 @@ This is all tied together by the AppServiceScheduler which DIs the required
 components.
 """
 import logging
-from typing import TYPE_CHECKING, Awaitable, Callable, Dict, List, Optional, Set
+from typing import (
+    TYPE_CHECKING,
+    Awaitable,
+    Callable,
+    Collection,
+    Dict,
+    List,
+    Optional,
+    Set,
+)
 
 from synapse.appservice import ApplicationService, ApplicationServiceState
 from synapse.appservice.api import ApplicationServiceApi
@@ -71,6 +80,9 @@ MAX_PERSISTENT_EVENTS_PER_TRANSACTION = 100
 # Maximum number of ephemeral events to provide in an AS transaction.
 MAX_EPHEMERAL_EVENTS_PER_TRANSACTION = 100
 
+# Maximum number of to-device messages to provide in an AS transaction.
+MAX_TO_DEVICE_MESSAGES_PER_TRANSACTION = 100
+
 
 class ApplicationServiceScheduler:
     """Public facing API for this module. Does the required DI to tie the
@@ -97,15 +109,40 @@ class ApplicationServiceScheduler:
         for service in services:
             self.txn_ctrl.start_recoverer(service)
 
-    def submit_event_for_as(
-        self, service: ApplicationService, event: EventBase
+    def enqueue_for_appservice(
+        self,
+        appservice: ApplicationService,
+        events: Optional[Collection[EventBase]] = None,
+        ephemeral: Optional[Collection[JsonDict]] = None,
+        to_device_messages: Optional[Collection[JsonDict]] = None,
     ) -> None:
-        self.queuer.enqueue_event(service, event)
+        """
+        Enqueue some data to be sent off to an application service.
 
-    def submit_ephemeral_events_for_as(
-        self, service: ApplicationService, events: List[JsonDict]
-    ) -> None:
-        self.queuer.enqueue_ephemeral(service, events)
+        Args:
+            appservice: The application service to create and send a transaction to.
+            events: The persistent room events to send.
+            ephemeral: The ephemeral events to send.
+            to_device_messages: The to-device messages to send. These differ from normal
+                to-device messages sent to clients, as they have 'to_device_id' and
+                'to_user_id' fields.
+        """
+        # We purposefully allow this method to run with empty events/ephemeral
+        # collections, so that callers do not need to check iterable size themselves.
+        if not events and not ephemeral and not to_device_messages:
+            return
+
+        if events:
+            self.queuer.queued_events.setdefault(appservice.id, []).extend(events)
+        if ephemeral:
+            self.queuer.queued_ephemeral.setdefault(appservice.id, []).extend(ephemeral)
+        if to_device_messages:
+            self.queuer.queued_to_device_messages.setdefault(appservice.id, []).extend(
+                to_device_messages
+            )
+
+        # Kick off a new application service transaction
+        self.queuer.start_background_request(appservice)
 
 
 class _ServiceQueuer:
@@ -121,13 +158,15 @@ class _ServiceQueuer:
         self.queued_events: Dict[str, List[EventBase]] = {}
         # dict of {service_id: [events]}
         self.queued_ephemeral: Dict[str, List[JsonDict]] = {}
+        # dict of {service_id: [to_device_message_json]}
+        self.queued_to_device_messages: Dict[str, List[JsonDict]] = {}
 
         # the appservices which currently have a transaction in flight
         self.requests_in_flight: Set[str] = set()
         self.txn_ctrl = txn_ctrl
         self.clock = clock
 
-    def _start_background_request(self, service: ApplicationService) -> None:
+    def start_background_request(self, service: ApplicationService) -> None:
         # start a sender for this appservice if we don't already have one
         if service.id in self.requests_in_flight:
             return
@@ -136,16 +175,6 @@ class _ServiceQueuer:
             "as-sender-%s" % (service.id,), self._send_request, service
         )
 
-    def enqueue_event(self, service: ApplicationService, event: EventBase) -> None:
-        self.queued_events.setdefault(service.id, []).append(event)
-        self._start_background_request(service)
-
-    def enqueue_ephemeral(
-        self, service: ApplicationService, events: List[JsonDict]
-    ) -> None:
-        self.queued_ephemeral.setdefault(service.id, []).extend(events)
-        self._start_background_request(service)
-
     async def _send_request(self, service: ApplicationService) -> None:
         # sanity-check: we shouldn't get here if this service already has a sender
         # running.
@@ -162,11 +191,21 @@ class _ServiceQueuer:
                 ephemeral = all_events_ephemeral[:MAX_EPHEMERAL_EVENTS_PER_TRANSACTION]
                 del all_events_ephemeral[:MAX_EPHEMERAL_EVENTS_PER_TRANSACTION]
 
-                if not events and not ephemeral:
+                all_to_device_messages = self.queued_to_device_messages.get(
+                    service.id, []
+                )
+                to_device_messages_to_send = all_to_device_messages[
+                    :MAX_TO_DEVICE_MESSAGES_PER_TRANSACTION
+                ]
+                del all_to_device_messages[:MAX_TO_DEVICE_MESSAGES_PER_TRANSACTION]
+
+                if not events and not ephemeral and not to_device_messages_to_send:
                     return
 
                 try:
-                    await self.txn_ctrl.send(service, events, ephemeral)
+                    await self.txn_ctrl.send(
+                        service, events, ephemeral, to_device_messages_to_send
+                    )
                 except Exception:
                     logger.exception("AS request failed")
         finally:
@@ -198,10 +237,24 @@ class _TransactionController:
         service: ApplicationService,
         events: List[EventBase],
         ephemeral: Optional[List[JsonDict]] = None,
+        to_device_messages: Optional[List[JsonDict]] = None,
     ) -> None:
+        """
+        Create a transaction with the given data and send to the provided
+        application service.
+
+        Args:
+            service: The application service to send the transaction to.
+            events: The persistent events to include in the transaction.
+            ephemeral: The ephemeral events to include in the transaction.
+            to_device_messages: The to-device messages to include in the transaction.
+        """
         try:
             txn = await self.store.create_appservice_txn(
-                service=service, events=events, ephemeral=ephemeral or []
+                service=service,
+                events=events,
+                ephemeral=ephemeral or [],
+                to_device_messages=to_device_messages or [],
             )
             service_is_up = await self._is_service_up(service)
             if service_is_up:
diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py
index 65c807a19a..e4719d19b8 100644
--- a/synapse/config/experimental.py
+++ b/synapse/config/experimental.py
@@ -52,3 +52,10 @@ class ExperimentalConfig(Config):
         self.msc3202_device_masquerading_enabled: bool = experimental.get(
             "msc3202_device_masquerading", False
         )
+
+        # MSC2409 (this setting only relates to optionally sending to-device messages).
+        # Presence, typing and read receipt EDUs are already sent to application services that
+        # have opted in to receive them. If enabled, this adds to-device messages to that list.
+        self.msc2409_to_device_messages_enabled: bool = experimental.get(
+            "msc2409_to_device_messages_enabled", False
+        )
diff --git a/synapse/config/ratelimiting.py b/synapse/config/ratelimiting.py
index 36636ab07e..e9ccf1bd62 100644
--- a/synapse/config/ratelimiting.py
+++ b/synapse/config/ratelimiting.py
@@ -134,6 +134,14 @@ class RatelimitConfig(Config):
             defaults={"per_second": 0.003, "burst_count": 5},
         )
 
+        self.rc_third_party_invite = RateLimitConfig(
+            config.get("rc_third_party_invite", {}),
+            defaults={
+                "per_second": self.rc_message.per_second,
+                "burst_count": self.rc_message.burst_count,
+            },
+        )
+
     def generate_config_section(self, **kwargs):
         return """\
         ## Ratelimiting ##
@@ -168,6 +176,9 @@ class RatelimitConfig(Config):
         #   - one for ratelimiting how often a user or IP can attempt to validate a 3PID.
         #   - two for ratelimiting how often invites can be sent in a room or to a
         #     specific user.
+        #   - one for ratelimiting 3PID invites (i.e. invites sent to a third-party ID
+        #     such as an email address or a phone number) based on the account that's
+        #     sending the invite.
         #
         # The defaults are as shown below.
         #
@@ -217,6 +228,10 @@ class RatelimitConfig(Config):
         #  per_user:
         #    per_second: 0.003
         #    burst_count: 5
+        #
+        #rc_third_party_invite:
+        #  per_second: 0.2
+        #  burst_count: 10
 
         # Ratelimiting settings for incoming federation
         #
diff --git a/synapse/config/server.py b/synapse/config/server.py
index a460cf25b4..7bc9624546 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -179,7 +179,6 @@ KNOWN_RESOURCES = {
     "openid",
     "replication",
     "static",
-    "webclient",
 }
 
 
@@ -519,16 +518,12 @@ class ServerConfig(Config):
             self.listeners = l2
 
         self.web_client_location = config.get("web_client_location", None)
-        self.web_client_location_is_redirect = self.web_client_location and (
+        # Non-HTTP(S) web client location is not supported.
+        if self.web_client_location and not (
             self.web_client_location.startswith("http://")
             or self.web_client_location.startswith("https://")
-        )
-        # A non-HTTP(S) web client location is deprecated.
-        if self.web_client_location and not self.web_client_location_is_redirect:
-            logger.warning(NO_MORE_NONE_HTTP_WEB_CLIENT_LOCATION_WARNING)
-
-        # Warn if webclient is configured for a worker.
-        _warn_if_webclient_configured(self.listeners)
+        ):
+            raise ConfigError("web_client_location must point to a HTTP(S) URL.")
 
         self.gc_thresholds = read_gc_thresholds(config.get("gc_thresholds", None))
         self.gc_seconds = self.read_gc_intervals(config.get("gc_min_interval", None))
@@ -656,19 +651,6 @@ class ServerConfig(Config):
             False,
         )
 
-        # List of users trialing the new experimental default push rules. This setting is
-        # not included in the sample configuration file on purpose as it's a temporary
-        # hack, so that some users can trial the new defaults without impacting every
-        # user on the homeserver.
-        users_new_default_push_rules: list = (
-            config.get("users_new_default_push_rules") or []
-        )
-        if not isinstance(users_new_default_push_rules, list):
-            raise ConfigError("'users_new_default_push_rules' must be a list")
-
-        # Turn the list into a set to improve lookup speed.
-        self.users_new_default_push_rules: set = set(users_new_default_push_rules)
-
         # Whitelist of domain names that given next_link parameters must have
         next_link_domain_whitelist: Optional[List[str]] = config.get(
             "next_link_domain_whitelist"
@@ -1364,11 +1346,16 @@ def parse_listener_def(listener: Any) -> ListenerConfig:
 
     http_config = None
     if listener_type == "http":
+        try:
+            resources = [
+                HttpResourceConfig(**res) for res in listener.get("resources", [])
+            ]
+        except ValueError as e:
+            raise ConfigError("Unknown listener resource") from e
+
         http_config = HttpListenerConfig(
             x_forwarded=listener.get("x_forwarded", False),
-            resources=[
-                HttpResourceConfig(**res) for res in listener.get("resources", [])
-            ],
+            resources=resources,
             additional_resources=listener.get("additional_resources", {}),
             tag=listener.get("tag"),
         )
@@ -1376,30 +1363,6 @@ def parse_listener_def(listener: Any) -> ListenerConfig:
     return ListenerConfig(port, bind_addresses, listener_type, tls, http_config)
 
 
-NO_MORE_NONE_HTTP_WEB_CLIENT_LOCATION_WARNING = """
-Synapse no longer supports serving a web client. To remove this warning,
-configure 'web_client_location' with an HTTP(S) URL.
-"""
-
-
-NO_MORE_WEB_CLIENT_WARNING = """
-Synapse no longer includes a web client. To redirect the root resource to a web client, configure
-'web_client_location'. To remove this warning, remove 'webclient' from the 'listeners'
-configuration.
-"""
-
-
-def _warn_if_webclient_configured(listeners: Iterable[ListenerConfig]) -> None:
-    for listener in listeners:
-        if not listener.http_options:
-            continue
-        for res in listener.http_options.resources:
-            for name in res.names:
-                if name == "webclient":
-                    logger.warning(NO_MORE_WEB_CLIENT_WARNING)
-                    return
-
-
 _MANHOLE_SETTINGS_SCHEMA = {
     "type": "object",
     "properties": {
diff --git a/synapse/federation/transport/server/_base.py b/synapse/federation/transport/server/_base.py
index 2ca7c05835..dff2b68359 100644
--- a/synapse/federation/transport/server/_base.py
+++ b/synapse/federation/transport/server/_base.py
@@ -15,6 +15,7 @@
 import functools
 import logging
 import re
+import time
 from typing import TYPE_CHECKING, Any, Awaitable, Callable, Optional, Tuple, cast
 
 from synapse.api.errors import Codes, FederationDeniedError, SynapseError
@@ -24,8 +25,10 @@ from synapse.http.servlet import parse_json_object_from_request
 from synapse.http.site import SynapseRequest
 from synapse.logging.context import run_in_background
 from synapse.logging.opentracing import (
+    active_span,
     set_tag,
     span_context_from_request,
+    start_active_span,
     start_active_span_follows_from,
     whitelisted_homeserver,
 )
@@ -265,9 +268,10 @@ class BaseFederationServlet:
                 content = parse_json_object_from_request(request)
 
             try:
-                origin: Optional[str] = await authenticator.authenticate_request(
-                    request, content
-                )
+                with start_active_span("authenticate_request"):
+                    origin: Optional[str] = await authenticator.authenticate_request(
+                        request, content
+                    )
             except NoAuthenticationError:
                 origin = None
                 if self.REQUIRE_AUTH:
@@ -282,32 +286,57 @@ class BaseFederationServlet:
             # update the active opentracing span with the authenticated entity
             set_tag("authenticated_entity", origin)
 
-            # if the origin is authenticated and whitelisted, link to its span context
+            # if the origin is authenticated and whitelisted, use its span context
+            # as the parent.
             context = None
             if origin and whitelisted_homeserver(origin):
                 context = span_context_from_request(request)
 
-            scope = start_active_span_follows_from(
-                "incoming-federation-request", contexts=(context,) if context else ()
-            )
+            if context:
+                servlet_span = active_span()
+                # a scope which uses the origin's context as a parent
+                processing_start_time = time.time()
+                scope = start_active_span_follows_from(
+                    "incoming-federation-request",
+                    child_of=context,
+                    contexts=(servlet_span,),
+                    start_time=processing_start_time,
+                )
 
-            with scope:
-                if origin and self.RATELIMIT:
-                    with ratelimiter.ratelimit(origin) as d:
-                        await d
-                        if request._disconnected:
-                            logger.warning(
-                                "client disconnected before we started processing "
-                                "request"
+            else:
+                # just use our context as a parent
+                scope = start_active_span(
+                    "incoming-federation-request",
+                )
+
+            try:
+                with scope:
+                    if origin and self.RATELIMIT:
+                        with ratelimiter.ratelimit(origin) as d:
+                            await d
+                            if request._disconnected:
+                                logger.warning(
+                                    "client disconnected before we started processing "
+                                    "request"
+                                )
+                                return None
+                            response = await func(
+                                origin, content, request.args, *args, **kwargs
                             )
-                            return None
+                    else:
                         response = await func(
                             origin, content, request.args, *args, **kwargs
                         )
-                else:
-                    response = await func(
-                        origin, content, request.args, *args, **kwargs
+            finally:
+                # if we used the origin's context as the parent, add a new span using
+                # the servlet span as a parent, so that we have a link
+                if context:
+                    scope2 = start_active_span_follows_from(
+                        "process-federation_request",
+                        contexts=(scope.span,),
+                        start_time=processing_start_time,
                     )
+                    scope2.close()
 
             return response
 
diff --git a/synapse/federation/transport/server/federation.py b/synapse/federation/transport/server/federation.py
index 9c1ad5851f..d86dfede4e 100644
--- a/synapse/federation/transport/server/federation.py
+++ b/synapse/federation/transport/server/federation.py
@@ -109,11 +109,11 @@ class FederationSendServlet(BaseFederationServerServlet):
             )
 
             if issue_8631_logger.isEnabledFor(logging.DEBUG):
-                DEVICE_UPDATE_EDUS = {"m.device_list_update", "m.signing_key_update"}
+                DEVICE_UPDATE_EDUS = ["m.device_list_update", "m.signing_key_update"]
                 device_list_updates = [
                     edu.content
                     for edu in transaction_data.get("edus", [])
-                    if edu.edu_type in DEVICE_UPDATE_EDUS
+                    if edu.get("edu_type") in DEVICE_UPDATE_EDUS
                 ]
                 if device_list_updates:
                     issue_8631_logger.debug(
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 7833e77e2b..a42c3558e4 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -55,6 +55,9 @@ class ApplicationServicesHandler:
         self.clock = hs.get_clock()
         self.notify_appservices = hs.config.appservice.notify_appservices
         self.event_sources = hs.get_event_sources()
+        self._msc2409_to_device_messages_enabled = (
+            hs.config.experimental.msc2409_to_device_messages_enabled
+        )
 
         self.current_max = 0
         self.is_processing = False
@@ -132,7 +135,9 @@ class ApplicationServicesHandler:
 
                         # Fork off pushes to these services
                         for service in services:
-                            self.scheduler.submit_event_for_as(service, event)
+                            self.scheduler.enqueue_for_appservice(
+                                service, events=[event]
+                            )
 
                         now = self.clock.time_msec()
                         ts = await self.store.get_received_ts(event.event_id)
@@ -199,8 +204,9 @@ class ApplicationServicesHandler:
         Args:
             stream_key: The stream the event came from.
 
-                `stream_key` can be "typing_key", "receipt_key" or "presence_key". Any other
-                value for `stream_key` will cause this function to return early.
+                `stream_key` can be "typing_key", "receipt_key", "presence_key" or
+                "to_device_key". Any other value for `stream_key` will cause this function
+                to return early.
 
                 Ephemeral events will only be pushed to appservices that have opted into
                 receiving them by setting `push_ephemeral` to true in their registration
@@ -216,8 +222,15 @@ class ApplicationServicesHandler:
         if not self.notify_appservices:
             return
 
-        # Ignore any unsupported streams
-        if stream_key not in ("typing_key", "receipt_key", "presence_key"):
+        # Notify appservices of updates in ephemeral event streams.
+        # Only the following streams are currently supported.
+        # FIXME: We should use constants for these values.
+        if stream_key not in (
+            "typing_key",
+            "receipt_key",
+            "presence_key",
+            "to_device_key",
+        ):
             return
 
         # Assert that new_token is an integer (and not a RoomStreamToken).
@@ -233,6 +246,13 @@ class ApplicationServicesHandler:
         # Additional context: https://github.com/matrix-org/synapse/pull/11137
         assert isinstance(new_token, int)
 
+        # Ignore to-device messages if the feature flag is not enabled
+        if (
+            stream_key == "to_device_key"
+            and not self._msc2409_to_device_messages_enabled
+        ):
+            return
+
         # Check whether there are any appservices which have registered to receive
         # ephemeral events.
         #
@@ -266,7 +286,7 @@ class ApplicationServicesHandler:
         with Measure(self.clock, "notify_interested_services_ephemeral"):
             for service in services:
                 if stream_key == "typing_key":
-                    # Note that we don't persist the token (via set_type_stream_id_for_appservice)
+                    # Note that we don't persist the token (via set_appservice_stream_type_pos)
                     # for typing_key due to performance reasons and due to their highly
                     # ephemeral nature.
                     #
@@ -274,7 +294,7 @@ class ApplicationServicesHandler:
                     # and, if they apply to this application service, send it off.
                     events = await self._handle_typing(service, new_token)
                     if events:
-                        self.scheduler.submit_ephemeral_events_for_as(service, events)
+                        self.scheduler.enqueue_for_appservice(service, ephemeral=events)
                     continue
 
                 # Since we read/update the stream position for this AS/stream
@@ -285,28 +305,37 @@ class ApplicationServicesHandler:
                 ):
                     if stream_key == "receipt_key":
                         events = await self._handle_receipts(service, new_token)
-                        if events:
-                            self.scheduler.submit_ephemeral_events_for_as(
-                                service, events
-                            )
+                        self.scheduler.enqueue_for_appservice(service, ephemeral=events)
 
                         # Persist the latest handled stream token for this appservice
-                        await self.store.set_type_stream_id_for_appservice(
+                        await self.store.set_appservice_stream_type_pos(
                             service, "read_receipt", new_token
                         )
 
                     elif stream_key == "presence_key":
                         events = await self._handle_presence(service, users, new_token)
-                        if events:
-                            self.scheduler.submit_ephemeral_events_for_as(
-                                service, events
-                            )
+                        self.scheduler.enqueue_for_appservice(service, ephemeral=events)
 
                         # Persist the latest handled stream token for this appservice
-                        await self.store.set_type_stream_id_for_appservice(
+                        await self.store.set_appservice_stream_type_pos(
                             service, "presence", new_token
                         )
 
+                    elif stream_key == "to_device_key":
+                        # Retrieve a list of to-device message events, as well as the
+                        # maximum stream token of the messages we were able to retrieve.
+                        to_device_messages = await self._get_to_device_messages(
+                            service, new_token, users
+                        )
+                        self.scheduler.enqueue_for_appservice(
+                            service, to_device_messages=to_device_messages
+                        )
+
+                        # Persist the latest handled stream token for this appservice
+                        await self.store.set_appservice_stream_type_pos(
+                            service, "to_device", new_token
+                        )
+
     async def _handle_typing(
         self, service: ApplicationService, new_token: int
     ) -> List[JsonDict]:
@@ -440,6 +469,79 @@ class ApplicationServicesHandler:
 
         return events
 
+    async def _get_to_device_messages(
+        self,
+        service: ApplicationService,
+        new_token: int,
+        users: Collection[Union[str, UserID]],
+    ) -> List[JsonDict]:
+        """
+        Given an application service, determine which events it should receive
+        from those between the last-recorded to-device message stream token for this
+        appservice and the given stream token.
+
+        Args:
+            service: The application service to check for which events it should receive.
+            new_token: The latest to-device event stream token.
+            users: The users to be notified for the new to-device messages
+                (ie, the recipients of the messages).
+
+        Returns:
+            A list of JSON dictionaries containing data derived from the to-device events
+                that should be sent to the given application service.
+        """
+        # Get the stream token that this application service has processed up until
+        from_key = await self.store.get_type_stream_id_for_appservice(
+            service, "to_device"
+        )
+
+        # Filter out users that this appservice is not interested in
+        users_appservice_is_interested_in: List[str] = []
+        for user in users:
+            # FIXME: We should do this farther up the call stack. We currently repeat
+            #  this operation in _handle_presence.
+            if isinstance(user, UserID):
+                user = user.to_string()
+
+            if service.is_interested_in_user(user):
+                users_appservice_is_interested_in.append(user)
+
+        if not users_appservice_is_interested_in:
+            # Return early if the AS was not interested in any of these users
+            return []
+
+        # Retrieve the to-device messages for each user
+        recipient_device_to_messages = await self.store.get_messages_for_user_devices(
+            users_appservice_is_interested_in,
+            from_key,
+            new_token,
+        )
+
+        # According to MSC2409, we'll need to add 'to_user_id' and 'to_device_id' fields
+        # to the event JSON so that the application service will know which user/device
+        # combination this messages was intended for.
+        #
+        # So we mangle this dict into a flat list of to-device messages with the relevant
+        # user ID and device ID embedded inside each message dict.
+        message_payload: List[JsonDict] = []
+        for (
+            user_id,
+            device_id,
+        ), messages in recipient_device_to_messages.items():
+            for message_json in messages:
+                # Remove 'message_id' from the to-device message, as it's an internal ID
+                message_json.pop("message_id", None)
+
+                message_payload.append(
+                    {
+                        "to_user_id": user_id,
+                        "to_device_id": device_id,
+                        **message_json,
+                    }
+                )
+
+        return message_payload
+
     async def query_user_exists(self, user_id: str) -> bool:
         """Check if any application service knows this user_id exists.
 
@@ -547,7 +649,7 @@ class ApplicationServicesHandler:
         """Retrieve a list of application services interested in this event.
 
         Args:
-            event: The event to check. Can be None if alias_list is not.
+            event: The event to check.
         Returns:
             A list of services interested in this event based on the service regex.
         """
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index e32c93e234..6959d1aa7e 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -2064,6 +2064,7 @@ GET_USERNAME_FOR_REGISTRATION_CALLBACK = Callable[
     [JsonDict, JsonDict],
     Awaitable[Optional[str]],
 ]
+IS_3PID_ALLOWED_CALLBACK = Callable[[str, str, bool], Awaitable[bool]]
 
 
 class PasswordAuthProvider:
@@ -2079,6 +2080,7 @@ class PasswordAuthProvider:
         self.get_username_for_registration_callbacks: List[
             GET_USERNAME_FOR_REGISTRATION_CALLBACK
         ] = []
+        self.is_3pid_allowed_callbacks: List[IS_3PID_ALLOWED_CALLBACK] = []
 
         # Mapping from login type to login parameters
         self._supported_login_types: Dict[str, Iterable[str]] = {}
@@ -2090,6 +2092,7 @@ class PasswordAuthProvider:
         self,
         check_3pid_auth: Optional[CHECK_3PID_AUTH_CALLBACK] = None,
         on_logged_out: Optional[ON_LOGGED_OUT_CALLBACK] = None,
+        is_3pid_allowed: Optional[IS_3PID_ALLOWED_CALLBACK] = None,
         auth_checkers: Optional[
             Dict[Tuple[str, Tuple[str, ...]], CHECK_AUTH_CALLBACK]
         ] = None,
@@ -2145,6 +2148,9 @@ class PasswordAuthProvider:
                 get_username_for_registration,
             )
 
+        if is_3pid_allowed is not None:
+            self.is_3pid_allowed_callbacks.append(is_3pid_allowed)
+
     def get_supported_login_types(self) -> Mapping[str, Iterable[str]]:
         """Get the login types supported by this password provider
 
@@ -2343,3 +2349,41 @@ class PasswordAuthProvider:
                 raise SynapseError(code=500, msg="Internal Server Error")
 
         return None
+
+    async def is_3pid_allowed(
+        self,
+        medium: str,
+        address: str,
+        registration: bool,
+    ) -> bool:
+        """Check if the user can be allowed to bind a 3PID on this homeserver.
+
+        Args:
+            medium: The medium of the 3PID.
+            address: The address of the 3PID.
+            registration: Whether the 3PID is being bound when registering a new user.
+
+        Returns:
+            Whether the 3PID is allowed to be bound on this homeserver
+        """
+        for callback in self.is_3pid_allowed_callbacks:
+            try:
+                res = await callback(medium, address, registration)
+
+                if res is False:
+                    return res
+                elif not isinstance(res, bool):
+                    # mypy complains that this line is unreachable because it assumes the
+                    # data returned by the module fits the expected type. We just want
+                    # to make sure this is the case.
+                    logger.warning(  # type: ignore[unreachable]
+                        "Ignoring non-string value returned by"
+                        " is_3pid_allowed callback %s: %s",
+                        callback,
+                        res,
+                    )
+            except Exception as e:
+                logger.error("Module raised an exception in is_3pid_allowed: %s", e)
+                raise SynapseError(code=500, msg="Internal Server Error")
+
+        return True
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index a37ae0ca09..c0f642005f 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -166,9 +166,14 @@ class FederationHandler:
         oldest_events_with_depth = (
             await self.store.get_oldest_event_ids_with_depth_in_room(room_id)
         )
-        insertion_events_to_be_backfilled = (
-            await self.store.get_insertion_event_backwards_extremities_in_room(room_id)
-        )
+
+        insertion_events_to_be_backfilled: Dict[str, int] = {}
+        if self.hs.config.experimental.msc2716_enabled:
+            insertion_events_to_be_backfilled = (
+                await self.store.get_insertion_event_backward_extremities_in_room(
+                    room_id
+                )
+            )
         logger.debug(
             "_maybe_backfill_inner: extremities oldest_events_with_depth=%s insertion_events_to_be_backfilled=%s",
             oldest_events_with_depth,
@@ -271,11 +276,12 @@ class FederationHandler:
         ]
 
         logger.debug(
-            "room_id: %s, backfill: current_depth: %s, limit: %s, max_depth: %s, extrems: %s filtered_sorted_extremeties_tuple: %s",
+            "room_id: %s, backfill: current_depth: %s, limit: %s, max_depth: %s, extrems (%d): %s filtered_sorted_extremeties_tuple: %s",
             room_id,
             current_depth,
             limit,
             max_depth,
+            len(sorted_extremeties_tuple),
             sorted_extremeties_tuple,
             filtered_sorted_extremeties_tuple,
         )
@@ -1047,6 +1053,19 @@ class FederationHandler:
         limit = min(limit, 100)
 
         events = await self.store.get_backfill_events(room_id, pdu_list, limit)
+        logger.debug(
+            "on_backfill_request: backfill events=%s",
+            [
+                "event_id=%s,depth=%d,body=%s,prevs=%s\n"
+                % (
+                    event.event_id,
+                    event.depth,
+                    event.content.get("body", event.type),
+                    event.prev_event_ids(),
+                )
+                for event in events
+            ],
+        )
 
         events = await filter_events_for_server(self.storage, origin, events)
 
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 3905f60b3a..9edc7369d6 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -508,7 +508,11 @@ class FederationEventHandler:
                     f"room {ev.room_id}, when we were backfilling in {room_id}"
                 )
 
-        await self._process_pulled_events(dest, events, backfilled=True)
+        await self._process_pulled_events(
+            dest,
+            events,
+            backfilled=True,
+        )
 
     async def _get_missing_events_for_pdu(
         self, origin: str, pdu: EventBase, prevs: Set[str], min_depth: int
@@ -626,11 +630,24 @@ class FederationEventHandler:
             backfilled: True if this is part of a historical batch of events (inhibits
                 notification to clients, and validation of device keys.)
         """
+        logger.debug(
+            "processing pulled backfilled=%s events=%s",
+            backfilled,
+            [
+                "event_id=%s,depth=%d,body=%s,prevs=%s\n"
+                % (
+                    event.event_id,
+                    event.depth,
+                    event.content.get("body", event.type),
+                    event.prev_event_ids(),
+                )
+                for event in events
+            ],
+        )
 
         # We want to sort these by depth so we process them and
         # tell clients about them in order.
         sorted_events = sorted(events, key=lambda x: x.depth)
-
         for ev in sorted_events:
             with nested_logging_context(ev.event_id):
                 await self._process_pulled_event(origin, ev, backfilled=backfilled)
@@ -992,6 +1009,8 @@ class FederationEventHandler:
 
         await self._run_push_actions_and_persist_event(event, context, backfilled)
 
+        await self._handle_marker_event(origin, event)
+
         if backfilled or context.rejected:
             return
 
@@ -1071,8 +1090,6 @@ class FederationEventHandler:
                     event.sender,
                 )
 
-        await self._handle_marker_event(origin, event)
-
     async def _resync_device(self, sender: str) -> None:
         """We have detected that the device list for the given user may be out
         of sync, so we try and resync them.
@@ -1323,7 +1340,14 @@ class FederationEventHandler:
             return event, context
 
         events_to_persist = (x for x in (prep(event) for event in fetched_events) if x)
-        await self.persist_events_and_notify(room_id, tuple(events_to_persist))
+        await self.persist_events_and_notify(
+            room_id,
+            tuple(events_to_persist),
+            # Mark these events backfilled as they're historic events that will
+            # eventually be backfilled. For example, missing events we fetch
+            # during backfill should be marked as backfilled as well.
+            backfilled=True,
+        )
 
     async def _check_event_auth(
         self,
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index b37250aa38..9267e586a8 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -490,12 +490,12 @@ class EventCreationHandler:
         requester: Requester,
         event_dict: dict,
         txn_id: Optional[str] = None,
+        allow_no_prev_events: bool = False,
         prev_event_ids: Optional[List[str]] = None,
         auth_event_ids: Optional[List[str]] = None,
         require_consent: bool = True,
         outlier: bool = False,
         historical: bool = False,
-        allow_no_prev_events: bool = False,
         depth: Optional[int] = None,
     ) -> Tuple[EventBase, EventContext]:
         """
@@ -510,6 +510,10 @@ class EventCreationHandler:
             requester
             event_dict: An entire event
             txn_id
+            allow_no_prev_events: Whether to allow this event to be created an empty
+                list of prev_events. Normally this is prohibited just because most
+                events should have a prev_event and we should only use this in special
+                cases like MSC2716.
             prev_event_ids:
                 the forward extremities to use as the prev_events for the
                 new event.
@@ -604,10 +608,10 @@ class EventCreationHandler:
         event, context = await self.create_new_client_event(
             builder=builder,
             requester=requester,
+            allow_no_prev_events=allow_no_prev_events,
             prev_event_ids=prev_event_ids,
             auth_event_ids=auth_event_ids,
             depth=depth,
-            allow_no_prev_events=allow_no_prev_events,
         )
 
         # In an ideal world we wouldn't need the second part of this condition. However,
@@ -764,6 +768,7 @@ class EventCreationHandler:
         self,
         requester: Requester,
         event_dict: dict,
+        allow_no_prev_events: bool = False,
         prev_event_ids: Optional[List[str]] = None,
         auth_event_ids: Optional[List[str]] = None,
         ratelimit: bool = True,
@@ -781,6 +786,10 @@ class EventCreationHandler:
         Args:
             requester: The requester sending the event.
             event_dict: An entire event.
+            allow_no_prev_events: Whether to allow this event to be created an empty
+                list of prev_events. Normally this is prohibited just because most
+                events should have a prev_event and we should only use this in special
+                cases like MSC2716.
             prev_event_ids:
                 The event IDs to use as the prev events.
                 Should normally be left as None to automatically request them
@@ -880,16 +889,20 @@ class EventCreationHandler:
         self,
         builder: EventBuilder,
         requester: Optional[Requester] = None,
+        allow_no_prev_events: bool = False,
         prev_event_ids: Optional[List[str]] = None,
         auth_event_ids: Optional[List[str]] = None,
         depth: Optional[int] = None,
-        allow_no_prev_events: bool = False,
     ) -> Tuple[EventBase, EventContext]:
         """Create a new event for a local client
 
         Args:
             builder:
             requester:
+            allow_no_prev_events: Whether to allow this event to be created an empty
+                list of prev_events. Normally this is prohibited just because most
+                events should have a prev_event and we should only use this in special
+                cases like MSC2716.
             prev_event_ids:
                 the forward extremities to use as the prev_events for the
                 new event.
@@ -908,7 +921,6 @@ class EventCreationHandler:
         Returns:
             Tuple of created event, context
         """
-
         # Strip down the auth_event_ids to only what we need to auth the event.
         # For example, we don't need extra m.room.member that don't match event.sender
         full_state_ids_at_event = None
diff --git a/synapse/handlers/room_batch.py b/synapse/handlers/room_batch.py
index f880aa93d2..f8137ec04c 100644
--- a/synapse/handlers/room_batch.py
+++ b/synapse/handlers/room_batch.py
@@ -13,10 +13,6 @@ if TYPE_CHECKING:
 logger = logging.getLogger(__name__)
 
 
-def generate_fake_event_id() -> str:
-    return "$fake_" + random_string(43)
-
-
 class RoomBatchHandler:
     def __init__(self, hs: "HomeServer"):
         self.hs = hs
@@ -182,11 +178,12 @@ class RoomBatchHandler:
         state_event_ids_at_start = []
         auth_event_ids = initial_auth_event_ids.copy()
 
-        # Make the state events float off on their own so we don't have a
-        # bunch of `@mxid joined the room` noise between each batch
-        prev_event_id_for_state_chain = generate_fake_event_id()
+        # Make the state events float off on their own by specifying no
+        # prev_events for the first one in the chain so we don't have a bunch of
+        # `@mxid joined the room` noise between each batch.
+        prev_event_ids_for_state_chain: List[str] = []
 
-        for state_event in state_events_at_start:
+        for index, state_event in enumerate(state_events_at_start):
             assert_params_in_dict(
                 state_event, ["type", "origin_server_ts", "content", "sender"]
             )
@@ -222,7 +219,10 @@ class RoomBatchHandler:
                     content=event_dict["content"],
                     outlier=True,
                     historical=True,
-                    prev_event_ids=[prev_event_id_for_state_chain],
+                    # Only the first event in the chain should be floating.
+                    # The rest should hang off each other in a chain.
+                    allow_no_prev_events=index == 0,
+                    prev_event_ids=prev_event_ids_for_state_chain,
                     # Make sure to use a copy of this list because we modify it
                     # later in the loop here. Otherwise it will be the same
                     # reference and also update in the event when we append later.
@@ -242,7 +242,10 @@ class RoomBatchHandler:
                     event_dict,
                     outlier=True,
                     historical=True,
-                    prev_event_ids=[prev_event_id_for_state_chain],
+                    # Only the first event in the chain should be floating.
+                    # The rest should hang off each other in a chain.
+                    allow_no_prev_events=index == 0,
+                    prev_event_ids=prev_event_ids_for_state_chain,
                     # Make sure to use a copy of this list because we modify it
                     # later in the loop here. Otherwise it will be the same
                     # reference and also update in the event when we append later.
@@ -253,7 +256,7 @@ class RoomBatchHandler:
             state_event_ids_at_start.append(event_id)
             auth_event_ids.append(event_id)
             # Connect all the state in a floating chain
-            prev_event_id_for_state_chain = event_id
+            prev_event_ids_for_state_chain = [event_id]
 
         return state_event_ids_at_start
 
@@ -261,7 +264,6 @@ class RoomBatchHandler:
         self,
         events_to_create: List[JsonDict],
         room_id: str,
-        initial_prev_event_ids: List[str],
         inherited_depth: int,
         auth_event_ids: List[str],
         app_service_requester: Requester,
@@ -277,9 +279,6 @@ class RoomBatchHandler:
             events_to_create: List of historical events to create in JSON
                 dictionary format.
             room_id: Room where you want the events persisted in.
-            initial_prev_event_ids: These will be the prev_events for the first
-                event created. Each event created afterwards will point to the
-                previous event created.
             inherited_depth: The depth to create the events at (you will
                 probably by calling inherit_depth_from_prev_ids(...)).
             auth_event_ids: Define which events allow you to create the given
@@ -291,11 +290,14 @@ class RoomBatchHandler:
         """
         assert app_service_requester.app_service
 
-        prev_event_ids = initial_prev_event_ids.copy()
+        # Make the historical event chain float off on its own by specifying no
+        # prev_events for the first event in the chain which causes the HS to
+        # ask for the state at the start of the batch later.
+        prev_event_ids: List[str] = []
 
         event_ids = []
         events_to_persist = []
-        for ev in events_to_create:
+        for index, ev in enumerate(events_to_create):
             assert_params_in_dict(ev, ["type", "origin_server_ts", "content", "sender"])
 
             assert self.hs.is_mine_id(ev["sender"]), "User must be our own: %s" % (
@@ -319,6 +321,9 @@ class RoomBatchHandler:
                     ev["sender"], app_service_requester.app_service
                 ),
                 event_dict,
+                # Only the first event in the chain should be floating.
+                # The rest should hang off each other in a chain.
+                allow_no_prev_events=index == 0,
                 prev_event_ids=event_dict.get("prev_events"),
                 auth_event_ids=auth_event_ids,
                 historical=True,
@@ -370,7 +375,6 @@ class RoomBatchHandler:
         events_to_create: List[JsonDict],
         room_id: str,
         batch_id_to_connect_to: str,
-        initial_prev_event_ids: List[str],
         inherited_depth: int,
         auth_event_ids: List[str],
         app_service_requester: Requester,
@@ -385,9 +389,6 @@ class RoomBatchHandler:
             room_id: Room where you want the events created in.
             batch_id_to_connect_to: The batch_id from the insertion event you
                 want this batch to connect to.
-            initial_prev_event_ids: These will be the prev_events for the first
-                event created. Each event created afterwards will point to the
-                previous event created.
             inherited_depth: The depth to create the events at (you will
                 probably by calling inherit_depth_from_prev_ids(...)).
             auth_event_ids: Define which events allow you to create the given
@@ -436,7 +437,6 @@ class RoomBatchHandler:
         event_ids = await self.persist_historical_events(
             events_to_create=events_to_create,
             room_id=room_id,
-            initial_prev_event_ids=initial_prev_event_ids,
             inherited_depth=inherited_depth,
             auth_event_ids=auth_event_ids,
             app_service_requester=app_service_requester,
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 3dd5e1b6e4..bf1a47efb0 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -116,6 +116,13 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
             burst_count=hs.config.ratelimiting.rc_invites_per_user.burst_count,
         )
 
+        self._third_party_invite_limiter = Ratelimiter(
+            store=self.store,
+            clock=self.clock,
+            rate_hz=hs.config.ratelimiting.rc_third_party_invite.per_second,
+            burst_count=hs.config.ratelimiting.rc_third_party_invite.burst_count,
+        )
+
         self.request_ratelimiter = hs.get_request_ratelimiter()
 
     @abc.abstractmethod
@@ -261,7 +268,8 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         target: UserID,
         room_id: str,
         membership: str,
-        prev_event_ids: List[str],
+        allow_no_prev_events: bool = False,
+        prev_event_ids: Optional[List[str]] = None,
         auth_event_ids: Optional[List[str]] = None,
         txn_id: Optional[str] = None,
         ratelimit: bool = True,
@@ -279,8 +287,12 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
             target:
             room_id:
             membership:
-            prev_event_ids: The event IDs to use as the prev events
 
+            allow_no_prev_events: Whether to allow this event to be created an empty
+                list of prev_events. Normally this is prohibited just because most
+                events should have a prev_event and we should only use this in special
+                cases like MSC2716.
+            prev_event_ids: The event IDs to use as the prev events
             auth_event_ids:
                 The event ids to use as the auth_events for the new event.
                 Should normally be left as None, which will cause them to be calculated
@@ -337,6 +349,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
                 "membership": membership,
             },
             txn_id=txn_id,
+            allow_no_prev_events=allow_no_prev_events,
             prev_event_ids=prev_event_ids,
             auth_event_ids=auth_event_ids,
             require_consent=require_consent,
@@ -439,6 +452,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         require_consent: bool = True,
         outlier: bool = False,
         historical: bool = False,
+        allow_no_prev_events: bool = False,
         prev_event_ids: Optional[List[str]] = None,
         auth_event_ids: Optional[List[str]] = None,
     ) -> Tuple[str, int]:
@@ -463,6 +477,10 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
             historical: Indicates whether the message is being inserted
                 back in time around some existing events. This is used to skip
                 a few checks and mark the event as backfilled.
+            allow_no_prev_events: Whether to allow this event to be created an empty
+                list of prev_events. Normally this is prohibited just because most
+                events should have a prev_event and we should only use this in special
+                cases like MSC2716.
             prev_event_ids: The event IDs to use as the prev events
             auth_event_ids:
                 The event ids to use as the auth_events for the new event.
@@ -497,6 +515,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
                 require_consent=require_consent,
                 outlier=outlier,
                 historical=historical,
+                allow_no_prev_events=allow_no_prev_events,
                 prev_event_ids=prev_event_ids,
                 auth_event_ids=auth_event_ids,
             )
@@ -518,6 +537,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         require_consent: bool = True,
         outlier: bool = False,
         historical: bool = False,
+        allow_no_prev_events: bool = False,
         prev_event_ids: Optional[List[str]] = None,
         auth_event_ids: Optional[List[str]] = None,
     ) -> Tuple[str, int]:
@@ -544,6 +564,10 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
             historical: Indicates whether the message is being inserted
                 back in time around some existing events. This is used to skip
                 a few checks and mark the event as backfilled.
+            allow_no_prev_events: Whether to allow this event to be created an empty
+                list of prev_events. Normally this is prohibited just because most
+                events should have a prev_event and we should only use this in special
+                cases like MSC2716.
             prev_event_ids: The event IDs to use as the prev events
             auth_event_ids:
                 The event ids to use as the auth_events for the new event.
@@ -673,6 +697,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
                 membership=effective_membership_state,
                 txn_id=txn_id,
                 ratelimit=ratelimit,
+                allow_no_prev_events=allow_no_prev_events,
                 prev_event_ids=prev_event_ids,
                 auth_event_ids=auth_event_ids,
                 content=content,
@@ -1295,7 +1320,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
 
         # We need to rate limit *before* we send out any 3PID invites, so we
         # can't just rely on the standard ratelimiting of events.
-        await self.request_ratelimiter.ratelimit(requester)
+        await self._third_party_invite_limiter.ratelimit(requester)
 
         can_invite = await self.third_party_event_rules.check_threepid_can_be_invited(
             medium, address, room_id
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index c72ed7c290..aa9a76f8a9 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -1348,8 +1348,8 @@ class SyncHandler:
         if sync_result_builder.since_token is not None:
             since_stream_id = int(sync_result_builder.since_token.to_device_key)
 
-        if since_stream_id != int(now_token.to_device_key):
-            messages, stream_id = await self.store.get_new_messages_for_device(
+        if device_id is not None and since_stream_id != int(now_token.to_device_key):
+            messages, stream_id = await self.store.get_messages_for_device(
                 user_id, device_id, since_stream_id, now_token.to_device_key
             )
 
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index e43c22832d..e4bed1c937 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -446,7 +446,7 @@ class TypingWriterHandler(FollowerTypingHandler):
 
 class TypingNotificationEventSource(EventSource[int, JsonDict]):
     def __init__(self, hs: "HomeServer"):
-        self.hs = hs
+        self._main_store = hs.get_datastore()
         self.clock = hs.get_clock()
         # We can't call get_typing_handler here because there's a cycle:
         #
@@ -487,7 +487,7 @@ class TypingNotificationEventSource(EventSource[int, JsonDict]):
                     continue
 
                 if not await service.matches_user_in_member_list(
-                    room_id, handler.store
+                    room_id, self._main_store
                 ):
                     continue
 
diff --git a/synapse/handlers/ui_auth/__init__.py b/synapse/handlers/ui_auth/__init__.py
index 13b0c61d2e..56eee4057f 100644
--- a/synapse/handlers/ui_auth/__init__.py
+++ b/synapse/handlers/ui_auth/__init__.py
@@ -38,4 +38,4 @@ class UIAuthSessionDataConstants:
     # used during registration to store the registration token used (if required) so that:
     # - we can prevent a token being used twice by one session
     # - we can 'use up' the token after registration has successfully completed
-    REGISTRATION_TOKEN = "org.matrix.msc3231.login.registration_token"
+    REGISTRATION_TOKEN = "m.login.registration_token"
diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py
index b240d2d21d..3ebed5c161 100644
--- a/synapse/logging/opentracing.py
+++ b/synapse/logging/opentracing.py
@@ -443,10 +443,14 @@ def start_active_span(
     start_time=None,
     ignore_active_span=False,
     finish_on_close=True,
+    *,
+    tracer=None,
 ):
-    """Starts an active opentracing span. Note, the scope doesn't become active
-    until it has been entered, however, the span starts from the time this
-    message is called.
+    """Starts an active opentracing span.
+
+    Records the start time for the span, and sets it as the "active span" in the
+    scope manager.
+
     Args:
         See opentracing.tracer
     Returns:
@@ -456,7 +460,11 @@ def start_active_span(
     if opentracing is None:
         return noop_context_manager()  # type: ignore[unreachable]
 
-    return opentracing.tracer.start_active_span(
+    if tracer is None:
+        # use the global tracer by default
+        tracer = opentracing.tracer
+
+    return tracer.start_active_span(
         operation_name,
         child_of=child_of,
         references=references,
@@ -468,21 +476,42 @@ def start_active_span(
 
 
 def start_active_span_follows_from(
-    operation_name: str, contexts: Collection, inherit_force_tracing=False
+    operation_name: str,
+    contexts: Collection,
+    child_of=None,
+    start_time: Optional[float] = None,
+    *,
+    inherit_force_tracing=False,
+    tracer=None,
 ):
     """Starts an active opentracing span, with additional references to previous spans
 
     Args:
         operation_name: name of the operation represented by the new span
         contexts: the previous spans to inherit from
+
+        child_of: optionally override the parent span. If unset, the currently active
+           span will be the parent. (If there is no currently active span, the first
+           span in `contexts` will be the parent.)
+
+        start_time: optional override for the start time of the created span. Seconds
+            since the epoch.
+
         inherit_force_tracing: if set, and any of the previous contexts have had tracing
            forced, the new span will also have tracing forced.
+        tracer: override the opentracing tracer. By default the global tracer is used.
     """
     if opentracing is None:
         return noop_context_manager()  # type: ignore[unreachable]
 
     references = [opentracing.follows_from(context) for context in contexts]
-    scope = start_active_span(operation_name, references=references)
+    scope = start_active_span(
+        operation_name,
+        child_of=child_of,
+        references=references,
+        start_time=start_time,
+        tracer=tracer,
+    )
 
     if inherit_force_tracing and any(
         is_context_forced_tracing(ctx) for ctx in contexts
diff --git a/synapse/logging/scopecontextmanager.py b/synapse/logging/scopecontextmanager.py
index db8ca2c049..d57e7c5324 100644
--- a/synapse/logging/scopecontextmanager.py
+++ b/synapse/logging/scopecontextmanager.py
@@ -28,8 +28,9 @@ class LogContextScopeManager(ScopeManager):
     The LogContextScopeManager tracks the active scope in opentracing
     by using the log contexts which are native to synapse. This is so
     that the basic opentracing api can be used across twisted defereds.
-    (I would love to break logcontexts and this into an OS package. but
-    let's wait for twisted's contexts to be released.)
+
+    It would be nice just to use opentracing's ContextVarsScopeManager,
+    but currently that doesn't work due to https://twistedmatrix.com/trac/ticket/10301.
     """
 
     def __init__(self, config):
@@ -65,29 +66,45 @@ class LogContextScopeManager(ScopeManager):
             Scope.close() on the returned instance.
         """
 
-        enter_logcontext = False
         ctx = current_context()
 
         if not ctx:
-            # We don't want this scope to affect.
             logger.error("Tried to activate scope outside of loggingcontext")
             return Scope(None, span)  # type: ignore[arg-type]
-        elif ctx.scope is not None:
-            # We want the logging scope to look exactly the same so we give it
-            # a blank suffix
+
+        if ctx.scope is not None:
+            # start a new logging context as a child of the existing one.
+            # Doing so -- rather than updating the existing logcontext -- means that
+            # creating several concurrent spans under the same logcontext works
+            # correctly.
             ctx = nested_logging_context("")
             enter_logcontext = True
+        else:
+            # if there is no span currently associated with the current logcontext, we
+            # just store the scope in it.
+            #
+            # This feels a bit dubious, but it does hack around a problem where a
+            # span outlasts its parent logcontext (which would otherwise lead to
+            # "Re-starting finished log context" errors).
+            enter_logcontext = False
 
         scope = _LogContextScope(self, span, ctx, enter_logcontext, finish_on_close)
         ctx.scope = scope
+        if enter_logcontext:
+            ctx.__enter__()
+
         return scope
 
 
 class _LogContextScope(Scope):
     """
-    A custom opentracing scope. The only significant difference is that it will
-    close the log context it's related to if the logcontext was created specifically
-    for this scope.
+    A custom opentracing scope, associated with a LogContext
+
+      * filters out _DefGen_Return exceptions which arise from calling
+        `defer.returnValue` in Twisted code
+
+      * When the scope is closed, the logcontext's active scope is reset to None.
+        and - if enter_logcontext was set - the logcontext is finished too.
     """
 
     def __init__(self, manager, span, logcontext, enter_logcontext, finish_on_close):
@@ -101,8 +118,7 @@ class _LogContextScope(Scope):
             logcontext (LogContext):
                 the logcontext to which this scope is attached.
             enter_logcontext (Boolean):
-                if True the logcontext will be entered and exited when the scope
-                is entered and exited respectively
+                if True the logcontext will be exited when the scope is finished
             finish_on_close (Boolean):
                 if True finish the span when the scope is closed
         """
@@ -111,26 +127,28 @@ class _LogContextScope(Scope):
         self._finish_on_close = finish_on_close
         self._enter_logcontext = enter_logcontext
 
-    def __enter__(self):
-        if self._enter_logcontext:
-            self.logcontext.__enter__()
+    def __exit__(self, exc_type, value, traceback):
+        if exc_type == twisted.internet.defer._DefGen_Return:
+            # filter out defer.returnValue() calls
+            exc_type = value = traceback = None
+        super().__exit__(exc_type, value, traceback)
 
-        return self
-
-    def __exit__(self, type, value, traceback):
-        if type == twisted.internet.defer._DefGen_Return:
-            super().__exit__(None, None, None)
-        else:
-            super().__exit__(type, value, traceback)
-        if self._enter_logcontext:
-            self.logcontext.__exit__(type, value, traceback)
-        else:  # the logcontext existed before the creation of the scope
-            self.logcontext.scope = None
+    def __str__(self):
+        return f"Scope<{self.span}>"
 
     def close(self):
-        if self.manager.active is not self:
-            logger.error("Tried to close a non-active scope!")
-            return
+        active_scope = self.manager.active
+        if active_scope is not self:
+            logger.error(
+                "Closing scope %s which is not the currently-active one %s",
+                self,
+                active_scope,
+            )
 
         if self._finish_on_close:
             self.span.finish()
+
+        self.logcontext.scope = None
+
+        if self._enter_logcontext:
+            self.logcontext.__exit__(None, None, None)
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index 9e6c1b2f3b..cca084c18c 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -30,6 +30,7 @@ from typing import (
     Type,
     TypeVar,
     Union,
+    cast,
 )
 
 import attr
@@ -60,7 +61,7 @@ all_gauges: "Dict[str, Union[LaterGauge, InFlightGauge]]" = {}
 HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
 
 
-class RegistryProxy:
+class _RegistryProxy:
     @staticmethod
     def collect() -> Iterable[Metric]:
         for metric in REGISTRY.collect():
@@ -68,6 +69,13 @@ class RegistryProxy:
                 yield metric
 
 
+# A little bit nasty, but collect() above is static so a Protocol doesn't work.
+# _RegistryProxy matches the signature of a CollectorRegistry instance enough
+# for it to be usable in the contexts in which we use it.
+# TODO Do something nicer about this.
+RegistryProxy = cast(CollectorRegistry, _RegistryProxy)
+
+
 @attr.s(slots=True, hash=True, auto_attribs=True)
 class LaterGauge:
 
diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py
index 788b2e47d5..a91a7fa3ce 100644
--- a/synapse/module_api/__init__.py
+++ b/synapse/module_api/__init__.py
@@ -72,6 +72,7 @@ from synapse.handlers.auth import (
     CHECK_3PID_AUTH_CALLBACK,
     CHECK_AUTH_CALLBACK,
     GET_USERNAME_FOR_REGISTRATION_CALLBACK,
+    IS_3PID_ALLOWED_CALLBACK,
     ON_LOGGED_OUT_CALLBACK,
     AuthHandler,
 )
@@ -312,6 +313,7 @@ class ModuleApi:
         auth_checkers: Optional[
             Dict[Tuple[str, Tuple[str, ...]], CHECK_AUTH_CALLBACK]
         ] = None,
+        is_3pid_allowed: Optional[IS_3PID_ALLOWED_CALLBACK] = None,
         get_username_for_registration: Optional[
             GET_USERNAME_FOR_REGISTRATION_CALLBACK
         ] = None,
@@ -323,6 +325,7 @@ class ModuleApi:
         return self._password_auth_provider.register_password_auth_provider_callbacks(
             check_3pid_auth=check_3pid_auth,
             on_logged_out=on_logged_out,
+            is_3pid_allowed=is_3pid_allowed,
             auth_checkers=auth_checkers,
             get_username_for_registration=get_username_for_registration,
         )
@@ -401,6 +404,32 @@ class ModuleApi:
         """
         return self._hs.config.email.email_app_name
 
+    @property
+    def server_name(self) -> str:
+        """The server name for the local homeserver.
+
+        Added in Synapse v1.53.0.
+        """
+        return self._server_name
+
+    @property
+    def worker_name(self) -> Optional[str]:
+        """The name of the worker this specific instance is running as per the
+        "worker_name" configuration setting, or None if it's the main process.
+
+        Added in Synapse v1.53.0.
+        """
+        return self._hs.config.worker.worker_name
+
+    @property
+    def worker_app(self) -> Optional[str]:
+        """The name of the worker app this specific instance is running as per the
+        "worker_app" configuration setting, or None if it's the main process.
+
+        Added in Synapse v1.53.0.
+        """
+        return self._hs.config.worker.worker_app
+
     async def get_userinfo_by_id(self, user_id: str) -> Optional[UserInfo]:
         """Get user info by user_id
 
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 632b2245ef..5988c67d90 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -461,7 +461,9 @@ class Notifier:
                     users,
                 )
             except Exception:
-                logger.exception("Error notifying application services of event")
+                logger.exception(
+                    "Error notifying application services of ephemeral events"
+                )
 
     def on_new_replication_data(self) -> None:
         """Used to inform replication listeners that something has happened
diff --git a/synapse/push/baserules.py b/synapse/push/baserules.py
index 6211506990..910b05c0da 100644
--- a/synapse/push/baserules.py
+++ b/synapse/push/baserules.py
@@ -20,15 +20,11 @@ from typing import Any, Dict, List
 from synapse.push.rulekinds import PRIORITY_CLASS_INVERSE_MAP, PRIORITY_CLASS_MAP
 
 
-def list_with_base_rules(
-    rawrules: List[Dict[str, Any]], use_new_defaults: bool = False
-) -> List[Dict[str, Any]]:
+def list_with_base_rules(rawrules: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
     """Combine the list of rules set by the user with the default push rules
 
     Args:
         rawrules: The rules the user has modified or set.
-        use_new_defaults: Whether to use the new experimental default rules when
-            appending or prepending default rules.
 
     Returns:
         A new list with the rules set by the user combined with the defaults.
@@ -48,9 +44,7 @@ def list_with_base_rules(
 
     ruleslist.extend(
         make_base_prepend_rules(
-            PRIORITY_CLASS_INVERSE_MAP[current_prio_class],
-            modified_base_rules,
-            use_new_defaults,
+            PRIORITY_CLASS_INVERSE_MAP[current_prio_class], modified_base_rules
         )
     )
 
@@ -61,7 +55,6 @@ def list_with_base_rules(
                     make_base_append_rules(
                         PRIORITY_CLASS_INVERSE_MAP[current_prio_class],
                         modified_base_rules,
-                        use_new_defaults,
                     )
                 )
                 current_prio_class -= 1
@@ -70,7 +63,6 @@ def list_with_base_rules(
                         make_base_prepend_rules(
                             PRIORITY_CLASS_INVERSE_MAP[current_prio_class],
                             modified_base_rules,
-                            use_new_defaults,
                         )
                     )
 
@@ -79,18 +71,14 @@ def list_with_base_rules(
     while current_prio_class > 0:
         ruleslist.extend(
             make_base_append_rules(
-                PRIORITY_CLASS_INVERSE_MAP[current_prio_class],
-                modified_base_rules,
-                use_new_defaults,
+                PRIORITY_CLASS_INVERSE_MAP[current_prio_class], modified_base_rules
             )
         )
         current_prio_class -= 1
         if current_prio_class > 0:
             ruleslist.extend(
                 make_base_prepend_rules(
-                    PRIORITY_CLASS_INVERSE_MAP[current_prio_class],
-                    modified_base_rules,
-                    use_new_defaults,
+                    PRIORITY_CLASS_INVERSE_MAP[current_prio_class], modified_base_rules
                 )
             )
 
@@ -98,24 +86,14 @@ def list_with_base_rules(
 
 
 def make_base_append_rules(
-    kind: str,
-    modified_base_rules: Dict[str, Dict[str, Any]],
-    use_new_defaults: bool = False,
+    kind: str, modified_base_rules: Dict[str, Dict[str, Any]]
 ) -> List[Dict[str, Any]]:
     rules = []
 
     if kind == "override":
-        rules = (
-            NEW_APPEND_OVERRIDE_RULES
-            if use_new_defaults
-            else BASE_APPEND_OVERRIDE_RULES
-        )
+        rules = BASE_APPEND_OVERRIDE_RULES
     elif kind == "underride":
-        rules = (
-            NEW_APPEND_UNDERRIDE_RULES
-            if use_new_defaults
-            else BASE_APPEND_UNDERRIDE_RULES
-        )
+        rules = BASE_APPEND_UNDERRIDE_RULES
     elif kind == "content":
         rules = BASE_APPEND_CONTENT_RULES
 
@@ -134,7 +112,6 @@ def make_base_append_rules(
 def make_base_prepend_rules(
     kind: str,
     modified_base_rules: Dict[str, Dict[str, Any]],
-    use_new_defaults: bool = False,
 ) -> List[Dict[str, Any]]:
     rules = []
 
@@ -301,135 +278,6 @@ BASE_APPEND_OVERRIDE_RULES = [
 ]
 
 
-NEW_APPEND_OVERRIDE_RULES = [
-    {
-        "rule_id": "global/override/.m.rule.encrypted",
-        "conditions": [
-            {
-                "kind": "event_match",
-                "key": "type",
-                "pattern": "m.room.encrypted",
-                "_id": "_encrypted",
-            }
-        ],
-        "actions": ["notify"],
-    },
-    {
-        "rule_id": "global/override/.m.rule.suppress_notices",
-        "conditions": [
-            {
-                "kind": "event_match",
-                "key": "type",
-                "pattern": "m.room.message",
-                "_id": "_suppress_notices_type",
-            },
-            {
-                "kind": "event_match",
-                "key": "content.msgtype",
-                "pattern": "m.notice",
-                "_id": "_suppress_notices",
-            },
-        ],
-        "actions": [],
-    },
-    {
-        "rule_id": "global/underride/.m.rule.suppress_edits",
-        "conditions": [
-            {
-                "kind": "event_match",
-                "key": "m.relates_to.m.rel_type",
-                "pattern": "m.replace",
-                "_id": "_suppress_edits",
-            }
-        ],
-        "actions": [],
-    },
-    {
-        "rule_id": "global/override/.m.rule.invite_for_me",
-        "conditions": [
-            {
-                "kind": "event_match",
-                "key": "type",
-                "pattern": "m.room.member",
-                "_id": "_member",
-            },
-            {
-                "kind": "event_match",
-                "key": "content.membership",
-                "pattern": "invite",
-                "_id": "_invite_member",
-            },
-            {"kind": "event_match", "key": "state_key", "pattern_type": "user_id"},
-        ],
-        "actions": ["notify", {"set_tweak": "sound", "value": "default"}],
-    },
-    {
-        "rule_id": "global/override/.m.rule.contains_display_name",
-        "conditions": [{"kind": "contains_display_name"}],
-        "actions": [
-            "notify",
-            {"set_tweak": "sound", "value": "default"},
-            {"set_tweak": "highlight"},
-        ],
-    },
-    {
-        "rule_id": "global/override/.m.rule.tombstone",
-        "conditions": [
-            {
-                "kind": "event_match",
-                "key": "type",
-                "pattern": "m.room.tombstone",
-                "_id": "_tombstone",
-            },
-            {
-                "kind": "event_match",
-                "key": "state_key",
-                "pattern": "",
-                "_id": "_tombstone_statekey",
-            },
-        ],
-        "actions": [
-            "notify",
-            {"set_tweak": "sound", "value": "default"},
-            {"set_tweak": "highlight"},
-        ],
-    },
-    {
-        "rule_id": "global/override/.m.rule.roomnotif",
-        "conditions": [
-            {
-                "kind": "event_match",
-                "key": "content.body",
-                "pattern": "@room",
-                "_id": "_roomnotif_content",
-            },
-            {
-                "kind": "sender_notification_permission",
-                "key": "room",
-                "_id": "_roomnotif_pl",
-            },
-        ],
-        "actions": [
-            "notify",
-            {"set_tweak": "highlight"},
-            {"set_tweak": "sound", "value": "default"},
-        ],
-    },
-    {
-        "rule_id": "global/override/.m.rule.call",
-        "conditions": [
-            {
-                "kind": "event_match",
-                "key": "type",
-                "pattern": "m.call.invite",
-                "_id": "_call",
-            }
-        ],
-        "actions": ["notify", {"set_tweak": "sound", "value": "ring"}],
-    },
-]
-
-
 BASE_APPEND_UNDERRIDE_RULES = [
     {
         "rule_id": "global/underride/.m.rule.call",
@@ -538,36 +386,6 @@ BASE_APPEND_UNDERRIDE_RULES = [
 ]
 
 
-NEW_APPEND_UNDERRIDE_RULES = [
-    {
-        "rule_id": "global/underride/.m.rule.room_one_to_one",
-        "conditions": [
-            {"kind": "room_member_count", "is": "2", "_id": "member_count"},
-            {
-                "kind": "event_match",
-                "key": "content.body",
-                "pattern": "*",
-                "_id": "body",
-            },
-        ],
-        "actions": ["notify", {"set_tweak": "sound", "value": "default"}],
-    },
-    {
-        "rule_id": "global/underride/.m.rule.message",
-        "conditions": [
-            {
-                "kind": "event_match",
-                "key": "content.body",
-                "pattern": "*",
-                "_id": "body",
-            },
-        ],
-        "actions": ["notify"],
-        "enabled": False,
-    },
-]
-
-
 BASE_RULE_IDS = set()
 
 for r in BASE_APPEND_CONTENT_RULES:
@@ -589,26 +407,3 @@ for r in BASE_APPEND_UNDERRIDE_RULES:
     r["priority_class"] = PRIORITY_CLASS_MAP["underride"]
     r["default"] = True
     BASE_RULE_IDS.add(r["rule_id"])
-
-
-NEW_RULE_IDS = set()
-
-for r in BASE_APPEND_CONTENT_RULES:
-    r["priority_class"] = PRIORITY_CLASS_MAP["content"]
-    r["default"] = True
-    NEW_RULE_IDS.add(r["rule_id"])
-
-for r in BASE_PREPEND_OVERRIDE_RULES:
-    r["priority_class"] = PRIORITY_CLASS_MAP["override"]
-    r["default"] = True
-    NEW_RULE_IDS.add(r["rule_id"])
-
-for r in NEW_APPEND_OVERRIDE_RULES:
-    r["priority_class"] = PRIORITY_CLASS_MAP["override"]
-    r["default"] = True
-    NEW_RULE_IDS.add(r["rule_id"])
-
-for r in NEW_APPEND_UNDERRIDE_RULES:
-    r["priority_class"] = PRIORITY_CLASS_MAP["underride"]
-    r["default"] = True
-    NEW_RULE_IDS.add(r["rule_id"])
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 80786464c2..22b4606ae0 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -76,8 +76,7 @@ REQUIREMENTS = [
     "msgpack>=0.5.2",
     "phonenumbers>=8.2.0",
     # we use GaugeHistogramMetric, which was added in prom-client 0.4.0.
-    # 0.13.0 has an incorrect type annotation, see #11832.
-    "prometheus_client>=0.4.0,<0.13.0",
+    "prometheus_client>=0.4.0",
     # we use `order`, which arrived in attrs 19.2.0.
     # Note: 21.1.0 broke `/sync`, see #9936
     "attrs>=19.2.0,!=21.1.0",
diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py
index 1457d9d59b..aec040ee19 100644
--- a/synapse/replication/http/__init__.py
+++ b/synapse/replication/http/__init__.py
@@ -40,7 +40,7 @@ class ReplicationRestResource(JsonResource):
         super().__init__(hs, canonical_json=False, extract_context=True)
         self.register_servlets(hs)
 
-    def register_servlets(self, hs: "HomeServer"):
+    def register_servlets(self, hs: "HomeServer") -> None:
         send_event.register_servlets(hs, self)
         federation.register_servlets(hs, self)
         presence.register_servlets(hs, self)
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index 585332b244..bc1d28dd19 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -15,16 +15,20 @@
 import abc
 import logging
 import re
-import urllib
+import urllib.parse
 from inspect import signature
 from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, List, Tuple
 
 from prometheus_client import Counter, Gauge
 
+from twisted.web.server import Request
+
 from synapse.api.errors import HttpResponseException, SynapseError
 from synapse.http import RequestTimedOutError
+from synapse.http.server import HttpServer
 from synapse.logging import opentracing
 from synapse.logging.opentracing import trace
+from synapse.types import JsonDict
 from synapse.util.caches.response_cache import ResponseCache
 from synapse.util.stringutils import random_string
 
@@ -113,10 +117,12 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
         if hs.config.worker.worker_replication_secret:
             self._replication_secret = hs.config.worker.worker_replication_secret
 
-    def _check_auth(self, request) -> None:
+    def _check_auth(self, request: Request) -> None:
         # Get the authorization header.
         auth_headers = request.requestHeaders.getRawHeaders(b"Authorization")
 
+        if not auth_headers:
+            raise RuntimeError("Missing Authorization header.")
         if len(auth_headers) > 1:
             raise RuntimeError("Too many Authorization headers.")
         parts = auth_headers[0].split(b" ")
@@ -129,7 +135,7 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
         raise RuntimeError("Invalid Authorization header.")
 
     @abc.abstractmethod
-    async def _serialize_payload(**kwargs):
+    async def _serialize_payload(**kwargs) -> JsonDict:
         """Static method that is called when creating a request.
 
         Concrete implementations should have explicit parameters (rather than
@@ -144,19 +150,20 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
         return {}
 
     @abc.abstractmethod
-    async def _handle_request(self, request, **kwargs):
+    async def _handle_request(
+        self, request: Request, **kwargs: Any
+    ) -> Tuple[int, JsonDict]:
         """Handle incoming request.
 
         This is called with the request object and PATH_ARGS.
 
         Returns:
-            tuple[int, dict]: HTTP status code and a JSON serialisable dict
-            to be used as response body of request.
+            HTTP status code and a JSON serialisable dict to be used as response
+            body of request.
         """
-        pass
 
     @classmethod
-    def make_client(cls, hs: "HomeServer"):
+    def make_client(cls, hs: "HomeServer") -> Callable:
         """Create a client that makes requests.
 
         Returns a callable that accepts the same parameters as
@@ -182,7 +189,7 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
             )
 
         @trace(opname="outgoing_replication_request")
-        async def send_request(*, instance_name="master", **kwargs):
+        async def send_request(*, instance_name: str = "master", **kwargs: Any) -> Any:
             with outgoing_gauge.track_inprogress():
                 if instance_name == local_instance_name:
                     raise Exception("Trying to send HTTP request to self")
@@ -268,7 +275,7 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
 
         return send_request
 
-    def register(self, http_server):
+    def register(self, http_server: HttpServer) -> None:
         """Called by the server to register this as a handler to the
         appropriate path.
         """
@@ -289,7 +296,9 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
             self.__class__.__name__,
         )
 
-    async def _check_auth_and_handle(self, request, **kwargs):
+    async def _check_auth_and_handle(
+        self, request: Request, **kwargs: Any
+    ) -> Tuple[int, JsonDict]:
         """Called on new incoming requests when caching is enabled. Checks
         if there is a cached response for the request and returns that,
         otherwise calls `_handle_request` and caches its response.
diff --git a/synapse/replication/http/account_data.py b/synapse/replication/http/account_data.py
index 5f0f225aa9..310f609153 100644
--- a/synapse/replication/http/account_data.py
+++ b/synapse/replication/http/account_data.py
@@ -13,10 +13,14 @@
 # limitations under the License.
 
 import logging
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Tuple
 
+from twisted.web.server import Request
+
+from synapse.http.server import HttpServer
 from synapse.http.servlet import parse_json_object_from_request
 from synapse.replication.http._base import ReplicationEndpoint
+from synapse.types import JsonDict
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -48,14 +52,18 @@ class ReplicationUserAccountDataRestServlet(ReplicationEndpoint):
         self.clock = hs.get_clock()
 
     @staticmethod
-    async def _serialize_payload(user_id, account_data_type, content):
+    async def _serialize_payload(  # type: ignore[override]
+        user_id: str, account_data_type: str, content: JsonDict
+    ) -> JsonDict:
         payload = {
             "content": content,
         }
 
         return payload
 
-    async def _handle_request(self, request, user_id, account_data_type):
+    async def _handle_request(  # type: ignore[override]
+        self, request: Request, user_id: str, account_data_type: str
+    ) -> Tuple[int, JsonDict]:
         content = parse_json_object_from_request(request)
 
         max_stream_id = await self.handler.add_account_data_for_user(
@@ -89,14 +97,18 @@ class ReplicationRoomAccountDataRestServlet(ReplicationEndpoint):
         self.clock = hs.get_clock()
 
     @staticmethod
-    async def _serialize_payload(user_id, room_id, account_data_type, content):
+    async def _serialize_payload(  # type: ignore[override]
+        user_id: str, room_id: str, account_data_type: str, content: JsonDict
+    ) -> JsonDict:
         payload = {
             "content": content,
         }
 
         return payload
 
-    async def _handle_request(self, request, user_id, room_id, account_data_type):
+    async def _handle_request(  # type: ignore[override]
+        self, request: Request, user_id: str, room_id: str, account_data_type: str
+    ) -> Tuple[int, JsonDict]:
         content = parse_json_object_from_request(request)
 
         max_stream_id = await self.handler.add_account_data_to_room(
@@ -130,14 +142,18 @@ class ReplicationAddTagRestServlet(ReplicationEndpoint):
         self.clock = hs.get_clock()
 
     @staticmethod
-    async def _serialize_payload(user_id, room_id, tag, content):
+    async def _serialize_payload(  # type: ignore[override]
+        user_id: str, room_id: str, tag: str, content: JsonDict
+    ) -> JsonDict:
         payload = {
             "content": content,
         }
 
         return payload
 
-    async def _handle_request(self, request, user_id, room_id, tag):
+    async def _handle_request(  # type: ignore[override]
+        self, request: Request, user_id: str, room_id: str, tag: str
+    ) -> Tuple[int, JsonDict]:
         content = parse_json_object_from_request(request)
 
         max_stream_id = await self.handler.add_tag_to_room(
@@ -173,11 +189,13 @@ class ReplicationRemoveTagRestServlet(ReplicationEndpoint):
         self.clock = hs.get_clock()
 
     @staticmethod
-    async def _serialize_payload(user_id, room_id, tag):
+    async def _serialize_payload(user_id: str, room_id: str, tag: str) -> JsonDict:  # type: ignore[override]
 
         return {}
 
-    async def _handle_request(self, request, user_id, room_id, tag):
+    async def _handle_request(  # type: ignore[override]
+        self, request: Request, user_id: str, room_id: str, tag: str
+    ) -> Tuple[int, JsonDict]:
         max_stream_id = await self.handler.remove_tag_from_room(
             user_id,
             room_id,
@@ -187,7 +205,7 @@ class ReplicationRemoveTagRestServlet(ReplicationEndpoint):
         return 200, {"max_stream_id": max_stream_id}
 
 
-def register_servlets(hs: "HomeServer", http_server):
+def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
     ReplicationUserAccountDataRestServlet(hs).register(http_server)
     ReplicationRoomAccountDataRestServlet(hs).register(http_server)
     ReplicationAddTagRestServlet(hs).register(http_server)
diff --git a/synapse/replication/http/devices.py b/synapse/replication/http/devices.py
index 42dffb39cb..f2f40129fe 100644
--- a/synapse/replication/http/devices.py
+++ b/synapse/replication/http/devices.py
@@ -13,9 +13,13 @@
 # limitations under the License.
 
 import logging
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Tuple
 
+from twisted.web.server import Request
+
+from synapse.http.server import HttpServer
 from synapse.replication.http._base import ReplicationEndpoint
+from synapse.types import JsonDict
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -63,14 +67,16 @@ class ReplicationUserDevicesResyncRestServlet(ReplicationEndpoint):
         self.clock = hs.get_clock()
 
     @staticmethod
-    async def _serialize_payload(user_id):
+    async def _serialize_payload(user_id: str) -> JsonDict:  # type: ignore[override]
         return {}
 
-    async def _handle_request(self, request, user_id):
+    async def _handle_request(  # type: ignore[override]
+        self, request: Request, user_id: str
+    ) -> Tuple[int, JsonDict]:
         user_devices = await self.device_list_updater.user_device_resync(user_id)
 
         return 200, user_devices
 
 
-def register_servlets(hs: "HomeServer", http_server):
+def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
     ReplicationUserDevicesResyncRestServlet(hs).register(http_server)
diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py
index 5ed535c90d..d529c8a19f 100644
--- a/synapse/replication/http/federation.py
+++ b/synapse/replication/http/federation.py
@@ -13,17 +13,22 @@
 # limitations under the License.
 
 import logging
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, List, Tuple
 
-from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
-from synapse.events import make_event_from_dict
+from twisted.web.server import Request
+
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
+from synapse.events import EventBase, make_event_from_dict
 from synapse.events.snapshot import EventContext
+from synapse.http.server import HttpServer
 from synapse.http.servlet import parse_json_object_from_request
 from synapse.replication.http._base import ReplicationEndpoint
+from synapse.types import JsonDict
 from synapse.util.metrics import Measure
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
+    from synapse.storage.databases.main import DataStore
 
 logger = logging.getLogger(__name__)
 
@@ -69,14 +74,18 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
         self.federation_event_handler = hs.get_federation_event_handler()
 
     @staticmethod
-    async def _serialize_payload(store, room_id, event_and_contexts, backfilled):
+    async def _serialize_payload(  # type: ignore[override]
+        store: "DataStore",
+        room_id: str,
+        event_and_contexts: List[Tuple[EventBase, EventContext]],
+        backfilled: bool,
+    ) -> JsonDict:
         """
         Args:
             store
-            room_id (str)
-            event_and_contexts (list[tuple[FrozenEvent, EventContext]])
-            backfilled (bool): Whether or not the events are the result of
-                backfilling
+            room_id
+            event_and_contexts
+            backfilled: Whether or not the events are the result of backfilling
         """
         event_payloads = []
         for event, context in event_and_contexts:
@@ -102,7 +111,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
 
         return payload
 
-    async def _handle_request(self, request):
+    async def _handle_request(self, request: Request) -> Tuple[int, JsonDict]:  # type: ignore[override]
         with Measure(self.clock, "repl_fed_send_events_parse"):
             content = parse_json_object_from_request(request)
 
@@ -163,10 +172,14 @@ class ReplicationFederationSendEduRestServlet(ReplicationEndpoint):
         self.registry = hs.get_federation_registry()
 
     @staticmethod
-    async def _serialize_payload(edu_type, origin, content):
+    async def _serialize_payload(  # type: ignore[override]
+        edu_type: str, origin: str, content: JsonDict
+    ) -> JsonDict:
         return {"origin": origin, "content": content}
 
-    async def _handle_request(self, request, edu_type):
+    async def _handle_request(  # type: ignore[override]
+        self, request: Request, edu_type: str
+    ) -> Tuple[int, JsonDict]:
         with Measure(self.clock, "repl_fed_send_edu_parse"):
             content = parse_json_object_from_request(request)
 
@@ -175,9 +188,9 @@ class ReplicationFederationSendEduRestServlet(ReplicationEndpoint):
 
         logger.info("Got %r edu from %s", edu_type, origin)
 
-        result = await self.registry.on_edu(edu_type, origin, edu_content)
+        await self.registry.on_edu(edu_type, origin, edu_content)
 
-        return 200, result
+        return 200, {}
 
 
 class ReplicationGetQueryRestServlet(ReplicationEndpoint):
@@ -206,15 +219,17 @@ class ReplicationGetQueryRestServlet(ReplicationEndpoint):
         self.registry = hs.get_federation_registry()
 
     @staticmethod
-    async def _serialize_payload(query_type, args):
+    async def _serialize_payload(query_type: str, args: JsonDict) -> JsonDict:  # type: ignore[override]
         """
         Args:
-            query_type (str)
-            args (dict): The arguments received for the given query type
+            query_type
+            args: The arguments received for the given query type
         """
         return {"args": args}
 
-    async def _handle_request(self, request, query_type):
+    async def _handle_request(  # type: ignore[override]
+        self, request: Request, query_type: str
+    ) -> Tuple[int, JsonDict]:
         with Measure(self.clock, "repl_fed_query_parse"):
             content = parse_json_object_from_request(request)
 
@@ -248,14 +263,16 @@ class ReplicationCleanRoomRestServlet(ReplicationEndpoint):
         self.store = hs.get_datastore()
 
     @staticmethod
-    async def _serialize_payload(room_id, args):
+    async def _serialize_payload(room_id: str) -> JsonDict:  # type: ignore[override]
         """
         Args:
-            room_id (str)
+            room_id
         """
         return {}
 
-    async def _handle_request(self, request, room_id):
+    async def _handle_request(  # type: ignore[override]
+        self, request: Request, room_id: str
+    ) -> Tuple[int, JsonDict]:
         await self.store.clean_room_for_join(room_id)
 
         return 200, {}
@@ -283,17 +300,19 @@ class ReplicationStoreRoomOnOutlierMembershipRestServlet(ReplicationEndpoint):
         self.store = hs.get_datastore()
 
     @staticmethod
-    async def _serialize_payload(room_id, room_version):
+    async def _serialize_payload(room_id: str, room_version: RoomVersion) -> JsonDict:  # type: ignore[override]
         return {"room_version": room_version.identifier}
 
-    async def _handle_request(self, request, room_id):
+    async def _handle_request(  # type: ignore[override]
+        self, request: Request, room_id: str
+    ) -> Tuple[int, JsonDict]:
         content = parse_json_object_from_request(request)
         room_version = KNOWN_ROOM_VERSIONS[content["room_version"]]
         await self.store.maybe_store_room_on_outlier_membership(room_id, room_version)
         return 200, {}
 
 
-def register_servlets(hs: "HomeServer", http_server):
+def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
     ReplicationFederationSendEventsRestServlet(hs).register(http_server)
     ReplicationFederationSendEduRestServlet(hs).register(http_server)
     ReplicationGetQueryRestServlet(hs).register(http_server)
diff --git a/synapse/replication/http/login.py b/synapse/replication/http/login.py
index daacc34cea..c68e18da12 100644
--- a/synapse/replication/http/login.py
+++ b/synapse/replication/http/login.py
@@ -13,10 +13,14 @@
 # limitations under the License.
 
 import logging
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Optional, Tuple, cast
 
+from twisted.web.server import Request
+
+from synapse.http.server import HttpServer
 from synapse.http.servlet import parse_json_object_from_request
 from synapse.replication.http._base import ReplicationEndpoint
+from synapse.types import JsonDict
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -39,25 +43,24 @@ class RegisterDeviceReplicationServlet(ReplicationEndpoint):
         self.registration_handler = hs.get_registration_handler()
 
     @staticmethod
-    async def _serialize_payload(
-        user_id,
-        device_id,
-        initial_display_name,
-        is_guest,
-        is_appservice_ghost,
-        should_issue_refresh_token,
-        auth_provider_id,
-        auth_provider_session_id,
-    ):
+    async def _serialize_payload(  # type: ignore[override]
+        user_id: str,
+        device_id: Optional[str],
+        initial_display_name: Optional[str],
+        is_guest: bool,
+        is_appservice_ghost: bool,
+        should_issue_refresh_token: bool,
+        auth_provider_id: Optional[str],
+        auth_provider_session_id: Optional[str],
+    ) -> JsonDict:
         """
         Args:
-            user_id (int)
-            device_id (str|None): Device ID to use, if None a new one is
-                generated.
-            initial_display_name (str|None)
-            is_guest (bool)
-            is_appservice_ghost (bool)
-            should_issue_refresh_token (bool)
+            user_id
+            device_id: Device ID to use, if None a new one is generated.
+            initial_display_name
+            is_guest
+            is_appservice_ghost
+            should_issue_refresh_token
         """
         return {
             "device_id": device_id,
@@ -69,7 +72,9 @@ class RegisterDeviceReplicationServlet(ReplicationEndpoint):
             "auth_provider_session_id": auth_provider_session_id,
         }
 
-    async def _handle_request(self, request, user_id):
+    async def _handle_request(  # type: ignore[override]
+        self, request: Request, user_id: str
+    ) -> Tuple[int, JsonDict]:
         content = parse_json_object_from_request(request)
 
         device_id = content["device_id"]
@@ -91,8 +96,8 @@ class RegisterDeviceReplicationServlet(ReplicationEndpoint):
             auth_provider_session_id=auth_provider_session_id,
         )
 
-        return 200, res
+        return 200, cast(JsonDict, res)
 
 
-def register_servlets(hs: "HomeServer", http_server):
+def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
     RegisterDeviceReplicationServlet(hs).register(http_server)
diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py
index 7371c240b2..0145858e47 100644
--- a/synapse/replication/http/membership.py
+++ b/synapse/replication/http/membership.py
@@ -16,6 +16,7 @@ from typing import TYPE_CHECKING, List, Optional, Tuple
 
 from twisted.web.server import Request
 
+from synapse.http.server import HttpServer
 from synapse.http.servlet import parse_json_object_from_request
 from synapse.http.site import SynapseRequest
 from synapse.replication.http._base import ReplicationEndpoint
@@ -53,7 +54,7 @@ class ReplicationRemoteJoinRestServlet(ReplicationEndpoint):
         self.clock = hs.get_clock()
 
     @staticmethod
-    async def _serialize_payload(  # type: ignore
+    async def _serialize_payload(  # type: ignore[override]
         requester: Requester,
         room_id: str,
         user_id: str,
@@ -77,7 +78,7 @@ class ReplicationRemoteJoinRestServlet(ReplicationEndpoint):
             "content": content,
         }
 
-    async def _handle_request(  # type: ignore
+    async def _handle_request(  # type: ignore[override]
         self, request: SynapseRequest, room_id: str, user_id: str
     ) -> Tuple[int, JsonDict]:
         content = parse_json_object_from_request(request)
@@ -122,13 +123,13 @@ class ReplicationRemoteKnockRestServlet(ReplicationEndpoint):
         self.clock = hs.get_clock()
 
     @staticmethod
-    async def _serialize_payload(  # type: ignore
+    async def _serialize_payload(  # type: ignore[override]
         requester: Requester,
         room_id: str,
         user_id: str,
         remote_room_hosts: List[str],
         content: JsonDict,
-    ):
+    ) -> JsonDict:
         """
         Args:
             requester: The user making the request, according to the access token.
@@ -143,12 +144,12 @@ class ReplicationRemoteKnockRestServlet(ReplicationEndpoint):
             "content": content,
         }
 
-    async def _handle_request(  # type: ignore
+    async def _handle_request(  # type: ignore[override]
         self,
         request: SynapseRequest,
         room_id: str,
         user_id: str,
-    ):
+    ) -> Tuple[int, JsonDict]:
         content = parse_json_object_from_request(request)
 
         remote_room_hosts = content["remote_room_hosts"]
@@ -192,7 +193,7 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
         self.member_handler = hs.get_room_member_handler()
 
     @staticmethod
-    async def _serialize_payload(  # type: ignore
+    async def _serialize_payload(  # type: ignore[override]
         invite_event_id: str,
         txn_id: Optional[str],
         requester: Requester,
@@ -215,7 +216,7 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
             "content": content,
         }
 
-    async def _handle_request(  # type: ignore
+    async def _handle_request(  # type: ignore[override]
         self, request: SynapseRequest, invite_event_id: str
     ) -> Tuple[int, JsonDict]:
         content = parse_json_object_from_request(request)
@@ -262,12 +263,12 @@ class ReplicationRemoteRescindKnockRestServlet(ReplicationEndpoint):
         self.member_handler = hs.get_room_member_handler()
 
     @staticmethod
-    async def _serialize_payload(  # type: ignore
+    async def _serialize_payload(  # type: ignore[override]
         knock_event_id: str,
         txn_id: Optional[str],
         requester: Requester,
         content: JsonDict,
-    ):
+    ) -> JsonDict:
         """
         Args:
             knock_event_id: The ID of the knock to be rescinded.
@@ -281,11 +282,11 @@ class ReplicationRemoteRescindKnockRestServlet(ReplicationEndpoint):
             "content": content,
         }
 
-    async def _handle_request(  # type: ignore
+    async def _handle_request(  # type: ignore[override]
         self,
         request: SynapseRequest,
         knock_event_id: str,
-    ):
+    ) -> Tuple[int, JsonDict]:
         content = parse_json_object_from_request(request)
 
         txn_id = content["txn_id"]
@@ -329,7 +330,7 @@ class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint):
         self.distributor = hs.get_distributor()
 
     @staticmethod
-    async def _serialize_payload(  # type: ignore
+    async def _serialize_payload(  # type: ignore[override]
         room_id: str, user_id: str, change: str
     ) -> JsonDict:
         """
@@ -345,7 +346,7 @@ class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint):
 
         return {}
 
-    async def _handle_request(  # type: ignore
+    async def _handle_request(  # type: ignore[override]
         self, request: Request, room_id: str, user_id: str, change: str
     ) -> Tuple[int, JsonDict]:
         logger.info("user membership change: %s in %s", user_id, room_id)
@@ -360,7 +361,7 @@ class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint):
         return 200, {}
 
 
-def register_servlets(hs: "HomeServer", http_server):
+def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
     ReplicationRemoteJoinRestServlet(hs).register(http_server)
     ReplicationRemoteRejectInviteRestServlet(hs).register(http_server)
     ReplicationUserJoinedLeftRoomRestServlet(hs).register(http_server)
diff --git a/synapse/replication/http/presence.py b/synapse/replication/http/presence.py
index 63143085d5..4a5b08f56f 100644
--- a/synapse/replication/http/presence.py
+++ b/synapse/replication/http/presence.py
@@ -13,11 +13,14 @@
 # limitations under the License.
 
 import logging
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Tuple
 
+from twisted.web.server import Request
+
+from synapse.http.server import HttpServer
 from synapse.http.servlet import parse_json_object_from_request
 from synapse.replication.http._base import ReplicationEndpoint
-from synapse.types import UserID
+from synapse.types import JsonDict, UserID
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -49,18 +52,17 @@ class ReplicationBumpPresenceActiveTime(ReplicationEndpoint):
         self._presence_handler = hs.get_presence_handler()
 
     @staticmethod
-    async def _serialize_payload(user_id):
+    async def _serialize_payload(user_id: str) -> JsonDict:  # type: ignore[override]
         return {}
 
-    async def _handle_request(self, request, user_id):
+    async def _handle_request(  # type: ignore[override]
+        self, request: Request, user_id: str
+    ) -> Tuple[int, JsonDict]:
         await self._presence_handler.bump_presence_active_time(
             UserID.from_string(user_id)
         )
 
-        return (
-            200,
-            {},
-        )
+        return (200, {})
 
 
 class ReplicationPresenceSetState(ReplicationEndpoint):
@@ -92,16 +94,21 @@ class ReplicationPresenceSetState(ReplicationEndpoint):
         self._presence_handler = hs.get_presence_handler()
 
     @staticmethod
-    async def _serialize_payload(
-        user_id, state, ignore_status_msg=False, force_notify=False
-    ):
+    async def _serialize_payload(  # type: ignore[override]
+        user_id: str,
+        state: JsonDict,
+        ignore_status_msg: bool = False,
+        force_notify: bool = False,
+    ) -> JsonDict:
         return {
             "state": state,
             "ignore_status_msg": ignore_status_msg,
             "force_notify": force_notify,
         }
 
-    async def _handle_request(self, request, user_id):
+    async def _handle_request(  # type: ignore[override]
+        self, request: Request, user_id: str
+    ) -> Tuple[int, JsonDict]:
         content = parse_json_object_from_request(request)
 
         await self._presence_handler.set_state(
@@ -111,12 +118,9 @@ class ReplicationPresenceSetState(ReplicationEndpoint):
             content["force_notify"],
         )
 
-        return (
-            200,
-            {},
-        )
+        return (200, {})
 
 
-def register_servlets(hs: "HomeServer", http_server):
+def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
     ReplicationBumpPresenceActiveTime(hs).register(http_server)
     ReplicationPresenceSetState(hs).register(http_server)
diff --git a/synapse/replication/http/push.py b/synapse/replication/http/push.py
index 6c8db3061e..af5c2f66a7 100644
--- a/synapse/replication/http/push.py
+++ b/synapse/replication/http/push.py
@@ -13,10 +13,14 @@
 # limitations under the License.
 
 import logging
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Tuple
 
+from twisted.web.server import Request
+
+from synapse.http.server import HttpServer
 from synapse.http.servlet import parse_json_object_from_request
 from synapse.replication.http._base import ReplicationEndpoint
+from synapse.types import JsonDict
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -48,7 +52,7 @@ class ReplicationRemovePusherRestServlet(ReplicationEndpoint):
         self.pusher_pool = hs.get_pusherpool()
 
     @staticmethod
-    async def _serialize_payload(app_id, pushkey, user_id):
+    async def _serialize_payload(app_id: str, pushkey: str, user_id: str) -> JsonDict:  # type: ignore[override]
         payload = {
             "app_id": app_id,
             "pushkey": pushkey,
@@ -56,7 +60,9 @@ class ReplicationRemovePusherRestServlet(ReplicationEndpoint):
 
         return payload
 
-    async def _handle_request(self, request, user_id):
+    async def _handle_request(  # type: ignore[override]
+        self, request: Request, user_id: str
+    ) -> Tuple[int, JsonDict]:
         content = parse_json_object_from_request(request)
 
         app_id = content["app_id"]
@@ -67,5 +73,5 @@ class ReplicationRemovePusherRestServlet(ReplicationEndpoint):
         return 200, {}
 
 
-def register_servlets(hs: "HomeServer", http_server):
+def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
     ReplicationRemovePusherRestServlet(hs).register(http_server)
diff --git a/synapse/replication/http/register.py b/synapse/replication/http/register.py
index 7adfbb666f..c7f751b70d 100644
--- a/synapse/replication/http/register.py
+++ b/synapse/replication/http/register.py
@@ -13,10 +13,14 @@
 # limitations under the License.
 
 import logging
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Optional, Tuple
 
+from twisted.web.server import Request
+
+from synapse.http.server import HttpServer
 from synapse.http.servlet import parse_json_object_from_request
 from synapse.replication.http._base import ReplicationEndpoint
+from synapse.types import JsonDict
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -36,34 +40,34 @@ class ReplicationRegisterServlet(ReplicationEndpoint):
         self.registration_handler = hs.get_registration_handler()
 
     @staticmethod
-    async def _serialize_payload(
-        user_id,
-        password_hash,
-        was_guest,
-        make_guest,
-        appservice_id,
-        create_profile_with_displayname,
-        admin,
-        user_type,
-        address,
-        shadow_banned,
-    ):
+    async def _serialize_payload(  # type: ignore[override]
+        user_id: str,
+        password_hash: Optional[str],
+        was_guest: bool,
+        make_guest: bool,
+        appservice_id: Optional[str],
+        create_profile_with_displayname: Optional[str],
+        admin: bool,
+        user_type: Optional[str],
+        address: Optional[str],
+        shadow_banned: bool,
+    ) -> JsonDict:
         """
         Args:
-            user_id (str): The desired user ID to register.
-            password_hash (str|None): Optional. The password hash for this user.
-            was_guest (bool): Optional. Whether this is a guest account being
-                upgraded to a non-guest account.
-            make_guest (boolean): True if the the new user should be guest,
-                false to add a regular user account.
-            appservice_id (str|None): The ID of the appservice registering the user.
-            create_profile_with_displayname (unicode|None): Optionally create a
-                profile for the user, setting their displayname to the given value
-            admin (boolean): is an admin user?
-            user_type (str|None): type of user. One of the values from
-                api.constants.UserTypes, or None for a normal user.
-            address (str|None): the IP address used to perform the regitration.
-            shadow_banned (bool): Whether to shadow-ban the user
+            user_id: The desired user ID to register.
+            password_hash: Optional. The password hash for this user.
+            was_guest: Optional. Whether this is a guest account being upgraded
+                to a non-guest account.
+            make_guest: True if the the new user should be guest, false to add a
+                regular user account.
+            appservice_id: The ID of the appservice registering the user.
+            create_profile_with_displayname: Optionally create a profile for the
+                user, setting their displayname to the given value
+            admin: is an admin user?
+            user_type: type of user. One of the values from api.constants.UserTypes,
+                or None for a normal user.
+            address: the IP address used to perform the regitration.
+            shadow_banned: Whether to shadow-ban the user
         """
         return {
             "password_hash": password_hash,
@@ -77,7 +81,9 @@ class ReplicationRegisterServlet(ReplicationEndpoint):
             "shadow_banned": shadow_banned,
         }
 
-    async def _handle_request(self, request, user_id):
+    async def _handle_request(  # type: ignore[override]
+        self, request: Request, user_id: str
+    ) -> Tuple[int, JsonDict]:
         content = parse_json_object_from_request(request)
 
         await self.registration_handler.check_registration_ratelimit(content["address"])
@@ -110,18 +116,21 @@ class ReplicationPostRegisterActionsServlet(ReplicationEndpoint):
         self.registration_handler = hs.get_registration_handler()
 
     @staticmethod
-    async def _serialize_payload(user_id, auth_result, access_token):
+    async def _serialize_payload(  # type: ignore[override]
+        user_id: str, auth_result: JsonDict, access_token: Optional[str]
+    ) -> JsonDict:
         """
         Args:
-            user_id (str): The user ID that consented
-            auth_result (dict): The authenticated credentials of the newly
-                registered user.
-            access_token (str|None): The access token of the newly logged in
+            user_id: The user ID that consented
+            auth_result: The authenticated credentials of the newly registered user.
+            access_token: The access token of the newly logged in
                 device, or None if `inhibit_login` enabled.
         """
         return {"auth_result": auth_result, "access_token": access_token}
 
-    async def _handle_request(self, request, user_id):
+    async def _handle_request(  # type: ignore[override]
+        self, request: Request, user_id: str
+    ) -> Tuple[int, JsonDict]:
         content = parse_json_object_from_request(request)
 
         auth_result = content["auth_result"]
@@ -134,6 +143,6 @@ class ReplicationPostRegisterActionsServlet(ReplicationEndpoint):
         return 200, {}
 
 
-def register_servlets(hs: "HomeServer", http_server):
+def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
     ReplicationRegisterServlet(hs).register(http_server)
     ReplicationPostRegisterActionsServlet(hs).register(http_server)
diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
index 9f6851d059..33e98daf8a 100644
--- a/synapse/replication/http/send_event.py
+++ b/synapse/replication/http/send_event.py
@@ -13,18 +13,22 @@
 # limitations under the License.
 
 import logging
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, List, Tuple
+
+from twisted.web.server import Request
 
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
-from synapse.events import make_event_from_dict
+from synapse.events import EventBase, make_event_from_dict
 from synapse.events.snapshot import EventContext
+from synapse.http.server import HttpServer
 from synapse.http.servlet import parse_json_object_from_request
 from synapse.replication.http._base import ReplicationEndpoint
-from synapse.types import Requester, UserID
+from synapse.types import JsonDict, Requester, UserID
 from synapse.util.metrics import Measure
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
+    from synapse.storage.databases.main import DataStore
 
 logger = logging.getLogger(__name__)
 
@@ -70,18 +74,24 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
         self.clock = hs.get_clock()
 
     @staticmethod
-    async def _serialize_payload(
-        event_id, store, event, context, requester, ratelimit, extra_users
-    ):
+    async def _serialize_payload(  # type: ignore[override]
+        event_id: str,
+        store: "DataStore",
+        event: EventBase,
+        context: EventContext,
+        requester: Requester,
+        ratelimit: bool,
+        extra_users: List[UserID],
+    ) -> JsonDict:
         """
         Args:
-            event_id (str)
-            store (DataStore)
-            requester (Requester)
-            event (FrozenEvent)
-            context (EventContext)
-            ratelimit (bool)
-            extra_users (list(UserID)): Any extra users to notify about event
+            event_id
+            store
+            requester
+            event
+            context
+            ratelimit
+            extra_users: Any extra users to notify about event
         """
         serialized_context = await context.serialize(event, store)
 
@@ -100,7 +110,9 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
 
         return payload
 
-    async def _handle_request(self, request, event_id):
+    async def _handle_request(  # type: ignore[override]
+        self, request: Request, event_id: str
+    ) -> Tuple[int, JsonDict]:
         with Measure(self.clock, "repl_send_event_parse"):
             content = parse_json_object_from_request(request)
 
@@ -120,8 +132,6 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
             ratelimit = content["ratelimit"]
             extra_users = [UserID.from_string(u) for u in content["extra_users"]]
 
-        request.requester = requester
-
         logger.info(
             "Got event to send with ID: %s into room: %s", event.event_id, event.room_id
         )
@@ -139,5 +149,5 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
         )
 
 
-def register_servlets(hs: "HomeServer", http_server):
+def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
     ReplicationSendEventRestServlet(hs).register(http_server)
diff --git a/synapse/replication/http/streams.py b/synapse/replication/http/streams.py
index 3223bc2432..c065225362 100644
--- a/synapse/replication/http/streams.py
+++ b/synapse/replication/http/streams.py
@@ -13,11 +13,15 @@
 # limitations under the License.
 
 import logging
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Tuple
+
+from twisted.web.server import Request
 
 from synapse.api.errors import SynapseError
+from synapse.http.server import HttpServer
 from synapse.http.servlet import parse_integer
 from synapse.replication.http._base import ReplicationEndpoint
+from synapse.types import JsonDict
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -57,10 +61,14 @@ class ReplicationGetStreamUpdates(ReplicationEndpoint):
         self.streams = hs.get_replication_streams()
 
     @staticmethod
-    async def _serialize_payload(stream_name, from_token, upto_token):
+    async def _serialize_payload(  # type: ignore[override]
+        stream_name: str, from_token: int, upto_token: int
+    ) -> JsonDict:
         return {"from_token": from_token, "upto_token": upto_token}
 
-    async def _handle_request(self, request, stream_name):
+    async def _handle_request(  # type: ignore[override]
+        self, request: Request, stream_name: str
+    ) -> Tuple[int, JsonDict]:
         stream = self.streams.get(stream_name)
         if stream is None:
             raise SynapseError(400, "Unknown stream")
@@ -78,5 +86,5 @@ class ReplicationGetStreamUpdates(ReplicationEndpoint):
         )
 
 
-def register_servlets(hs: "HomeServer", http_server):
+def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
     ReplicationGetStreamUpdates(hs).register(http_server)
diff --git a/synapse/rest/client/account.py b/synapse/rest/client/account.py
index 6b272658fc..cfa2aee76d 100644
--- a/synapse/rest/client/account.py
+++ b/synapse/rest/client/account.py
@@ -385,7 +385,7 @@ class EmailThreepidRequestTokenRestServlet(RestServlet):
         send_attempt = body["send_attempt"]
         next_link = body.get("next_link")  # Optional param
 
-        if not check_3pid_allowed(self.hs, "email", email):
+        if not await check_3pid_allowed(self.hs, "email", email):
             raise SynapseError(
                 403,
                 "Your email domain is not authorized on this server",
@@ -468,7 +468,7 @@ class MsisdnThreepidRequestTokenRestServlet(RestServlet):
 
         msisdn = phone_number_to_msisdn(country, phone_number)
 
-        if not check_3pid_allowed(self.hs, "msisdn", msisdn):
+        if not await check_3pid_allowed(self.hs, "msisdn", msisdn):
             raise SynapseError(
                 403,
                 "Account phone numbers are not authorized on this server",
diff --git a/synapse/rest/client/push_rule.py b/synapse/rest/client/push_rule.py
index 6f796d5e50..8fe75bd750 100644
--- a/synapse/rest/client/push_rule.py
+++ b/synapse/rest/client/push_rule.py
@@ -29,7 +29,7 @@ from synapse.http.servlet import (
     parse_string,
 )
 from synapse.http.site import SynapseRequest
-from synapse.push.baserules import BASE_RULE_IDS, NEW_RULE_IDS
+from synapse.push.baserules import BASE_RULE_IDS
 from synapse.push.clientformat import format_push_rules_for_user
 from synapse.push.rulekinds import PRIORITY_CLASS_MAP
 from synapse.rest.client._base import client_patterns
@@ -61,10 +61,6 @@ class PushRuleRestServlet(RestServlet):
         self.notifier = hs.get_notifier()
         self._is_worker = hs.config.worker.worker_app is not None
 
-        self._users_new_default_push_rules = (
-            hs.config.server.users_new_default_push_rules
-        )
-
     async def on_PUT(self, request: SynapseRequest, path: str) -> Tuple[int, JsonDict]:
         if self._is_worker:
             raise Exception("Cannot handle PUT /push_rules on worker")
@@ -217,12 +213,7 @@ class PushRuleRestServlet(RestServlet):
             rule_id = spec.rule_id
             is_default_rule = rule_id.startswith(".")
             if is_default_rule:
-                if user_id in self._users_new_default_push_rules:
-                    rule_ids = NEW_RULE_IDS
-                else:
-                    rule_ids = BASE_RULE_IDS
-
-                if namespaced_rule_id not in rule_ids:
+                if namespaced_rule_id not in BASE_RULE_IDS:
                     raise SynapseError(404, "Unknown rule %r" % (namespaced_rule_id,))
             await self.store.set_push_rule_actions(
                 user_id, namespaced_rule_id, actions, is_default_rule
diff --git a/synapse/rest/client/register.py b/synapse/rest/client/register.py
index e3492f9f93..c965e2bda2 100644
--- a/synapse/rest/client/register.py
+++ b/synapse/rest/client/register.py
@@ -112,7 +112,7 @@ class EmailRegisterRequestTokenRestServlet(RestServlet):
         send_attempt = body["send_attempt"]
         next_link = body.get("next_link")  # Optional param
 
-        if not check_3pid_allowed(self.hs, "email", email):
+        if not await check_3pid_allowed(self.hs, "email", email, registration=True):
             raise SynapseError(
                 403,
                 "Your email domain is not authorized to register on this server",
@@ -192,7 +192,7 @@ class MsisdnRegisterRequestTokenRestServlet(RestServlet):
 
         msisdn = phone_number_to_msisdn(country, phone_number)
 
-        if not check_3pid_allowed(self.hs, "msisdn", msisdn):
+        if not await check_3pid_allowed(self.hs, "msisdn", msisdn, registration=True):
             raise SynapseError(
                 403,
                 "Phone numbers are not authorized to register on this server",
@@ -368,7 +368,7 @@ class RegistrationTokenValidityRestServlet(RestServlet):
 
     Example:
 
-        GET /_matrix/client/unstable/org.matrix.msc3231/register/org.matrix.msc3231.login.registration_token/validity?token=abcd
+        GET /_matrix/client/v1/register/m.login.registration_token/validity?token=abcd
 
         200 OK
 
@@ -378,9 +378,8 @@ class RegistrationTokenValidityRestServlet(RestServlet):
     """
 
     PATTERNS = client_patterns(
-        f"/org.matrix.msc3231/register/{LoginType.REGISTRATION_TOKEN}/validity",
-        releases=(),
-        unstable=True,
+        f"/register/{LoginType.REGISTRATION_TOKEN}/validity",
+        releases=("v1",),
     )
 
     def __init__(self, hs: "HomeServer"):
@@ -617,7 +616,9 @@ class RegisterRestServlet(RestServlet):
                     medium = auth_result[login_type]["medium"]
                     address = auth_result[login_type]["address"]
 
-                    if not check_3pid_allowed(self.hs, medium, address):
+                    if not await check_3pid_allowed(
+                        self.hs, medium, address, registration=True
+                    ):
                         raise SynapseError(
                             403,
                             "Third party identifiers (email/phone numbers)"
diff --git a/synapse/rest/client/room_batch.py b/synapse/rest/client/room_batch.py
index e4c9451ae0..4b6be38327 100644
--- a/synapse/rest/client/room_batch.py
+++ b/synapse/rest/client/room_batch.py
@@ -131,6 +131,14 @@ class RoomBatchSendEventRestServlet(RestServlet):
             prev_event_ids_from_query
         )
 
+        if not auth_event_ids:
+            raise SynapseError(
+                HTTPStatus.BAD_REQUEST,
+                "No auth events found for given prev_event query parameter. The prev_event=%s probably does not exist."
+                % prev_event_ids_from_query,
+                errcode=Codes.INVALID_PARAM,
+            )
+
         state_event_ids_at_start = []
         # Create and persist all of the state events that float off on their own
         # before the batch. These will most likely be all of the invite/member
@@ -197,21 +205,12 @@ class RoomBatchSendEventRestServlet(RestServlet):
                 EventContentFields.MSC2716_NEXT_BATCH_ID
             ]
 
-        # Also connect the historical event chain to the end of the floating
-        # state chain, which causes the HS to ask for the state at the start of
-        # the batch later. If there is no state chain to connect to, just make
-        # the insertion event float itself.
-        prev_event_ids = []
-        if len(state_event_ids_at_start):
-            prev_event_ids = [state_event_ids_at_start[-1]]
-
         # Create and persist all of the historical events as well as insertion
         # and batch meta events to make the batch navigable in the DAG.
         event_ids, next_batch_id = await self.room_batch_handler.handle_batch_of_events(
             events_to_create=events_to_create,
             room_id=room_id,
             batch_id_to_connect_to=batch_id_to_connect_to,
-            initial_prev_event_ids=prev_event_ids,
             inherited_depth=inherited_depth,
             auth_event_ids=auth_event_ids,
             app_service_requester=requester,
diff --git a/synapse/rest/media/v1/upload_resource.py b/synapse/rest/media/v1/upload_resource.py
index 8162094cf6..fde28d08cb 100644
--- a/synapse/rest/media/v1/upload_resource.py
+++ b/synapse/rest/media/v1/upload_resource.py
@@ -49,10 +49,14 @@ class UploadResource(DirectServeJsonResource):
 
     async def _async_render_POST(self, request: SynapseRequest) -> None:
         requester = await self.auth.get_user_by_req(request)
-        content_length = request.getHeader("Content-Length")
-        if content_length is None:
+        raw_content_length = request.getHeader("Content-Length")
+        if raw_content_length is None:
             raise SynapseError(msg="Request must specify a Content-Length", code=400)
-        if int(content_length) > self.max_upload_size:
+        try:
+            content_length = int(raw_content_length)
+        except ValueError:
+            raise SynapseError(msg="Content-Length value is invalid", code=400)
+        if content_length > self.max_upload_size:
             raise SynapseError(
                 msg="Upload request body is too large",
                 code=413,
@@ -66,7 +70,8 @@ class UploadResource(DirectServeJsonResource):
                 upload_name: Optional[str] = upload_name_bytes.decode("utf8")
             except UnicodeDecodeError:
                 raise SynapseError(
-                    msg="Invalid UTF-8 filename parameter: %r" % (upload_name), code=400
+                    msg="Invalid UTF-8 filename parameter: %r" % (upload_name_bytes,),
+                    code=400,
                 )
 
         # If the name is falsey (e.g. an empty byte string) ensure it is None.
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 7967011afd..8df80664a2 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -57,7 +57,7 @@ class SQLBaseStore(metaclass=ABCMeta):
         pass
 
     def _invalidate_state_caches(
-        self, room_id: str, members_changed: Iterable[str]
+        self, room_id: str, members_changed: Collection[str]
     ) -> None:
         """Invalidates caches that are based on the current state, but does
         not stream invalidations down replication.
@@ -66,11 +66,16 @@ class SQLBaseStore(metaclass=ABCMeta):
             room_id: Room where state changed
             members_changed: The user_ids of members that have changed
         """
+        # If there were any membership changes, purge the appropriate caches.
         for host in {get_domain_from_id(u) for u in members_changed}:
             self._attempt_to_invalidate_cache("is_host_joined", (room_id, host))
+        if members_changed:
+            self._attempt_to_invalidate_cache("get_users_in_room", (room_id,))
+            self._attempt_to_invalidate_cache(
+                "get_users_in_room_with_profiles", (room_id,)
+            )
 
-        self._attempt_to_invalidate_cache("get_users_in_room", (room_id,))
-        self._attempt_to_invalidate_cache("get_users_in_room_with_profiles", (room_id,))
+        # Purge other caches based on room state.
         self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
         self._attempt_to_invalidate_cache("get_current_state_ids", (room_id,))
 
diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py
index 5bfa408f74..52146aacc8 100644
--- a/synapse/storage/databases/main/account_data.py
+++ b/synapse/storage/databases/main/account_data.py
@@ -106,6 +106,11 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
             "AccountDataAndTagsChangeCache", account_max
         )
 
+        self.db_pool.updates.register_background_update_handler(
+            "delete_account_data_for_deactivated_users",
+            self._delete_account_data_for_deactivated_users,
+        )
+
     def get_max_account_data_stream_id(self) -> int:
         """Get the current max stream ID for account data stream
 
@@ -549,72 +554,121 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
 
     async def purge_account_data_for_user(self, user_id: str) -> None:
         """
-        Removes the account data for a user.
+        Removes ALL the account data for a user.
+        Intended to be used upon user deactivation.
 
-        This is intended to be used upon user deactivation and also removes any
-        derived information from account data (e.g. push rules and ignored users).
+        Also purges the user from the ignored_users cache table
+        and the push_rules cache tables.
+        """
 
-        Args:
-            user_id: The user ID to remove data for.
+        await self.db_pool.runInteraction(
+            "purge_account_data_for_user_txn",
+            self._purge_account_data_for_user_txn,
+            user_id,
+        )
+
+    def _purge_account_data_for_user_txn(
+        self, txn: LoggingTransaction, user_id: str
+    ) -> None:
         """
+        See `purge_account_data_for_user`.
+        """
+        # Purge from the primary account_data tables.
+        self.db_pool.simple_delete_txn(
+            txn, table="account_data", keyvalues={"user_id": user_id}
+        )
 
-        def purge_account_data_for_user_txn(txn: LoggingTransaction) -> None:
-            # Purge from the primary account_data tables.
-            self.db_pool.simple_delete_txn(
-                txn, table="account_data", keyvalues={"user_id": user_id}
-            )
+        self.db_pool.simple_delete_txn(
+            txn, table="room_account_data", keyvalues={"user_id": user_id}
+        )
 
-            self.db_pool.simple_delete_txn(
-                txn, table="room_account_data", keyvalues={"user_id": user_id}
-            )
+        # Purge from ignored_users where this user is the ignorer.
+        # N.B. We don't purge where this user is the ignoree, because that
+        #      interferes with other users' account data.
+        #      It's also not this user's data to delete!
+        self.db_pool.simple_delete_txn(
+            txn, table="ignored_users", keyvalues={"ignorer_user_id": user_id}
+        )
 
-            # Purge from ignored_users where this user is the ignorer.
-            # N.B. We don't purge where this user is the ignoree, because that
-            #      interferes with other users' account data.
-            #      It's also not this user's data to delete!
-            self.db_pool.simple_delete_txn(
-                txn, table="ignored_users", keyvalues={"ignorer_user_id": user_id}
-            )
+        # Remove the push rules
+        self.db_pool.simple_delete_txn(
+            txn, table="push_rules", keyvalues={"user_name": user_id}
+        )
+        self.db_pool.simple_delete_txn(
+            txn, table="push_rules_enable", keyvalues={"user_name": user_id}
+        )
+        self.db_pool.simple_delete_txn(
+            txn, table="push_rules_stream", keyvalues={"user_id": user_id}
+        )
 
-            # Remove the push rules
-            self.db_pool.simple_delete_txn(
-                txn, table="push_rules", keyvalues={"user_name": user_id}
-            )
-            self.db_pool.simple_delete_txn(
-                txn, table="push_rules_enable", keyvalues={"user_name": user_id}
-            )
-            self.db_pool.simple_delete_txn(
-                txn, table="push_rules_stream", keyvalues={"user_id": user_id}
-            )
+        # Invalidate caches as appropriate
+        self._invalidate_cache_and_stream(
+            txn, self.get_account_data_for_room_and_type, (user_id,)
+        )
+        self._invalidate_cache_and_stream(
+            txn, self.get_account_data_for_user, (user_id,)
+        )
+        self._invalidate_cache_and_stream(
+            txn, self.get_global_account_data_by_type_for_user, (user_id,)
+        )
+        self._invalidate_cache_and_stream(
+            txn, self.get_account_data_for_room, (user_id,)
+        )
+        self._invalidate_cache_and_stream(txn, self.get_push_rules_for_user, (user_id,))
+        self._invalidate_cache_and_stream(
+            txn, self.get_push_rules_enabled_for_user, (user_id,)
+        )
+        # This user might be contained in the ignored_by cache for other users,
+        # so we have to invalidate it all.
+        self._invalidate_all_cache_and_stream(txn, self.ignored_by)
 
-            # Invalidate caches as appropriate
-            self._invalidate_cache_and_stream(
-                txn, self.get_account_data_for_room_and_type, (user_id,)
-            )
-            self._invalidate_cache_and_stream(
-                txn, self.get_account_data_for_user, (user_id,)
-            )
-            self._invalidate_cache_and_stream(
-                txn, self.get_global_account_data_by_type_for_user, (user_id,)
-            )
-            self._invalidate_cache_and_stream(
-                txn, self.get_account_data_for_room, (user_id,)
-            )
-            self._invalidate_cache_and_stream(
-                txn, self.get_push_rules_for_user, (user_id,)
-            )
-            self._invalidate_cache_and_stream(
-                txn, self.get_push_rules_enabled_for_user, (user_id,)
-            )
-            # This user might be contained in the ignored_by cache for other users,
-            # so we have to invalidate it all.
-            self._invalidate_all_cache_and_stream(txn, self.ignored_by)
+    async def _delete_account_data_for_deactivated_users(
+        self, progress: dict, batch_size: int
+    ) -> int:
+        """
+        Retroactively purges account data for users that have already been deactivated.
+        Gets run as a background update caused by a schema delta.
+        """
 
-        await self.db_pool.runInteraction(
-            "purge_account_data_for_user_txn",
-            purge_account_data_for_user_txn,
+        last_user: str = progress.get("last_user", "")
+
+        def _delete_account_data_for_deactivated_users_txn(
+            txn: LoggingTransaction,
+        ) -> int:
+            sql = """
+                SELECT name FROM users
+                WHERE deactivated = ? and name > ?
+                ORDER BY name ASC
+                LIMIT ?
+            """
+
+            txn.execute(sql, (1, last_user, batch_size))
+            users = [row[0] for row in txn]
+
+            for user in users:
+                self._purge_account_data_for_user_txn(txn, user_id=user)
+
+            if users:
+                self.db_pool.updates._background_update_progress_txn(
+                    txn,
+                    "delete_account_data_for_deactivated_users",
+                    {"last_user": users[-1]},
+                )
+
+            return len(users)
+
+        number_deleted = await self.db_pool.runInteraction(
+            "_delete_account_data_for_deactivated_users",
+            _delete_account_data_for_deactivated_users_txn,
         )
 
+        if number_deleted < batch_size:
+            await self.db_pool.updates._end_background_update(
+                "delete_account_data_for_deactivated_users"
+            )
+
+        return number_deleted
+
 
 class AccountDataStore(AccountDataWorkerStore):
     pass
diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py
index 2bb5288431..304814af5d 100644
--- a/synapse/storage/databases/main/appservice.py
+++ b/synapse/storage/databases/main/appservice.py
@@ -198,6 +198,7 @@ class ApplicationServiceTransactionWorkerStore(
         service: ApplicationService,
         events: List[EventBase],
         ephemeral: List[JsonDict],
+        to_device_messages: List[JsonDict],
     ) -> AppServiceTransaction:
         """Atomically creates a new transaction for this application service
         with the given list of events. Ephemeral events are NOT persisted to the
@@ -207,6 +208,7 @@ class ApplicationServiceTransactionWorkerStore(
             service: The service who the transaction is for.
             events: A list of persistent events to put in the transaction.
             ephemeral: A list of ephemeral events to put in the transaction.
+            to_device_messages: A list of to-device messages to put in the transaction.
 
         Returns:
             A new transaction.
@@ -237,7 +239,11 @@ class ApplicationServiceTransactionWorkerStore(
                 (service.id, new_txn_id, event_ids),
             )
             return AppServiceTransaction(
-                service=service, id=new_txn_id, events=events, ephemeral=ephemeral
+                service=service,
+                id=new_txn_id,
+                events=events,
+                ephemeral=ephemeral,
+                to_device_messages=to_device_messages,
             )
 
         return await self.db_pool.runInteraction(
@@ -330,7 +336,11 @@ class ApplicationServiceTransactionWorkerStore(
         events = await self.get_events_as_list(event_ids)
 
         return AppServiceTransaction(
-            service=service, id=entry["txn_id"], events=events, ephemeral=[]
+            service=service,
+            id=entry["txn_id"],
+            events=events,
+            ephemeral=[],
+            to_device_messages=[],
         )
 
     def _get_last_txn(self, txn, service_id: Optional[str]) -> int:
@@ -391,7 +401,7 @@ class ApplicationServiceTransactionWorkerStore(
     async def get_type_stream_id_for_appservice(
         self, service: ApplicationService, type: str
     ) -> int:
-        if type not in ("read_receipt", "presence"):
+        if type not in ("read_receipt", "presence", "to_device"):
             raise ValueError(
                 "Expected type to be a valid application stream id type, got %s"
                 % (type,)
@@ -415,16 +425,16 @@ class ApplicationServiceTransactionWorkerStore(
             "get_type_stream_id_for_appservice", get_type_stream_id_for_appservice_txn
         )
 
-    async def set_type_stream_id_for_appservice(
+    async def set_appservice_stream_type_pos(
         self, service: ApplicationService, stream_type: str, pos: Optional[int]
     ) -> None:
-        if stream_type not in ("read_receipt", "presence"):
+        if stream_type not in ("read_receipt", "presence", "to_device"):
             raise ValueError(
                 "Expected type to be a valid application stream id type, got %s"
                 % (stream_type,)
             )
 
-        def set_type_stream_id_for_appservice_txn(txn):
+        def set_appservice_stream_type_pos_txn(txn):
             stream_id_type = "%s_stream_id" % stream_type
             txn.execute(
                 "UPDATE application_services_state SET %s = ? WHERE as_id=?"
@@ -433,7 +443,7 @@ class ApplicationServiceTransactionWorkerStore(
             )
 
         await self.db_pool.runInteraction(
-            "set_type_stream_id_for_appservice", set_type_stream_id_for_appservice_txn
+            "set_appservice_stream_type_pos", set_appservice_stream_type_pos_txn
         )
 
 
diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py
index 0024348067..c428dd5596 100644
--- a/synapse/storage/databases/main/cache.py
+++ b/synapse/storage/databases/main/cache.py
@@ -15,7 +15,7 @@
 
 import itertools
 import logging
-from typing import TYPE_CHECKING, Any, Iterable, List, Optional, Tuple
+from typing import TYPE_CHECKING, Any, Collection, Iterable, List, Optional, Tuple
 
 from synapse.api.constants import EventTypes
 from synapse.replication.tcp.streams import BackfillStream, CachesStream
@@ -25,7 +25,11 @@ from synapse.replication.tcp.streams.events import (
     EventsStreamEventRow,
 )
 from synapse.storage._base import SQLBaseStore
-from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
+from synapse.storage.database import (
+    DatabasePool,
+    LoggingDatabaseConnection,
+    LoggingTransaction,
+)
 from synapse.storage.engines import PostgresEngine
 from synapse.util.iterutils import batch_iter
 
@@ -236,7 +240,9 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
         txn.call_after(cache_func.invalidate_all)
         self._send_invalidation_to_replication(txn, cache_func.__name__, None)
 
-    def _invalidate_state_caches_and_stream(self, txn, room_id, members_changed):
+    def _invalidate_state_caches_and_stream(
+        self, txn: LoggingTransaction, room_id: str, members_changed: Collection[str]
+    ) -> None:
         """Special case invalidation of caches based on current state.
 
         We special case this so that we can batch the cache invalidations into a
@@ -244,8 +250,8 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
 
         Args:
             txn
-            room_id (str): Room where state changed
-            members_changed (iterable[str]): The user_ids of members that have changed
+            room_id: Room where state changed
+            members_changed: The user_ids of members that have changed
         """
         txn.call_after(self._invalidate_state_caches, room_id, members_changed)
 
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index 4eca97189b..8801b7b2dd 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -14,7 +14,7 @@
 # limitations under the License.
 
 import logging
-from typing import TYPE_CHECKING, List, Optional, Tuple, cast
+from typing import TYPE_CHECKING, Collection, Dict, List, Optional, Set, Tuple, cast
 
 from synapse.logging import issue9533_logger
 from synapse.logging.opentracing import log_kv, set_tag, trace
@@ -24,6 +24,7 @@ from synapse.storage.database import (
     DatabasePool,
     LoggingDatabaseConnection,
     LoggingTransaction,
+    make_in_list_sql_clause,
 )
 from synapse.storage.engines import PostgresEngine
 from synapse.storage.util.id_generators import (
@@ -136,63 +137,260 @@ class DeviceInboxWorkerStore(SQLBaseStore):
     def get_to_device_stream_token(self):
         return self._device_inbox_id_gen.get_current_token()
 
-    async def get_new_messages_for_device(
+    async def get_messages_for_user_devices(
+        self,
+        user_ids: Collection[str],
+        from_stream_id: int,
+        to_stream_id: int,
+    ) -> Dict[Tuple[str, str], List[JsonDict]]:
+        """
+        Retrieve to-device messages for a given set of users.
+
+        Only to-device messages with stream ids between the given boundaries
+        (from < X <= to) are returned.
+
+        Args:
+            user_ids: The users to retrieve to-device messages for.
+            from_stream_id: The lower boundary of stream id to filter with (exclusive).
+            to_stream_id: The upper boundary of stream id to filter with (inclusive).
+
+        Returns:
+            A dictionary of (user id, device id) -> list of to-device messages.
+        """
+        # We expect the stream ID returned by _get_device_messages to always
+        # be to_stream_id. So, no need to return it from this function.
+        (
+            user_id_device_id_to_messages,
+            last_processed_stream_id,
+        ) = await self._get_device_messages(
+            user_ids=user_ids,
+            from_stream_id=from_stream_id,
+            to_stream_id=to_stream_id,
+        )
+
+        assert (
+            last_processed_stream_id == to_stream_id
+        ), "Expected _get_device_messages to process all to-device messages up to `to_stream_id`"
+
+        return user_id_device_id_to_messages
+
+    async def get_messages_for_device(
         self,
         user_id: str,
-        device_id: Optional[str],
-        last_stream_id: int,
-        current_stream_id: int,
+        device_id: str,
+        from_stream_id: int,
+        to_stream_id: int,
         limit: int = 100,
-    ) -> Tuple[List[dict], int]:
+    ) -> Tuple[List[JsonDict], int]:
         """
+        Retrieve to-device messages for a single user device.
+
+        Only to-device messages with stream ids between the given boundaries
+        (from < X <= to) are returned.
+
         Args:
-            user_id: The recipient user_id.
-            device_id: The recipient device_id.
-            last_stream_id: The last stream ID checked.
-            current_stream_id: The current position of the to device
-                message stream.
-            limit: The maximum number of messages to retrieve.
+            user_id: The ID of the user to retrieve messages for.
+            device_id: The ID of the device to retrieve to-device messages for.
+            from_stream_id: The lower boundary of stream id to filter with (exclusive).
+            to_stream_id: The upper boundary of stream id to filter with (inclusive).
+            limit: A limit on the number of to-device messages returned.
 
         Returns:
             A tuple containing:
-                * A list of messages for the device.
-                * The max stream token of these messages. There may be more to retrieve
-                  if the given limit was reached.
+                * A list of to-device messages within the given stream id range intended for
+                  the given user / device combo.
+                * The last-processed stream ID. Subsequent calls of this function with the
+                  same device should pass this value as 'from_stream_id'.
         """
-        has_changed = self._device_inbox_stream_cache.has_entity_changed(
-            user_id, last_stream_id
+        (
+            user_id_device_id_to_messages,
+            last_processed_stream_id,
+        ) = await self._get_device_messages(
+            user_ids=[user_id],
+            device_id=device_id,
+            from_stream_id=from_stream_id,
+            to_stream_id=to_stream_id,
+            limit=limit,
         )
-        if not has_changed:
-            return [], current_stream_id
 
-        def get_new_messages_for_device_txn(txn):
-            sql = (
-                "SELECT stream_id, message_json FROM device_inbox"
-                " WHERE user_id = ? AND device_id = ?"
-                " AND ? < stream_id AND stream_id <= ?"
-                " ORDER BY stream_id ASC"
-                " LIMIT ?"
+        if not user_id_device_id_to_messages:
+            # There were no messages!
+            return [], to_stream_id
+
+        # Extract the messages, no need to return the user and device ID again
+        to_device_messages = user_id_device_id_to_messages.get((user_id, device_id), [])
+
+        return to_device_messages, last_processed_stream_id
+
+    async def _get_device_messages(
+        self,
+        user_ids: Collection[str],
+        from_stream_id: int,
+        to_stream_id: int,
+        device_id: Optional[str] = None,
+        limit: Optional[int] = None,
+    ) -> Tuple[Dict[Tuple[str, str], List[JsonDict]], int]:
+        """
+        Retrieve pending to-device messages for a collection of user devices.
+
+        Only to-device messages with stream ids between the given boundaries
+        (from < X <= to) are returned.
+
+        Note that a stream ID can be shared by multiple copies of the same message with
+        different recipient devices. Stream IDs are only unique in the context of a single
+        user ID / device ID pair. Thus, applying a limit (of messages to return) when working
+        with a sliding window of stream IDs is only possible when querying messages of a
+        single user device.
+
+        Finally, note that device IDs are not unique across users.
+
+        Args:
+            user_ids: The user IDs to filter device messages by.
+            from_stream_id: The lower boundary of stream id to filter with (exclusive).
+            to_stream_id: The upper boundary of stream id to filter with (inclusive).
+            device_id: A device ID to query to-device messages for. If not provided, to-device
+                messages from all device IDs for the given user IDs will be queried. May not be
+                provided if `user_ids` contains more than one entry.
+            limit: The maximum number of to-device messages to return. Can only be used when
+                passing a single user ID / device ID tuple.
+
+        Returns:
+            A tuple containing:
+                * A dict of (user_id, device_id) -> list of to-device messages
+                * The last-processed stream ID. If this is less than `to_stream_id`, then
+                    there may be more messages to retrieve. If `limit` is not set, then this
+                    is always equal to 'to_stream_id'.
+        """
+        if not user_ids:
+            logger.warning("No users provided upon querying for device IDs")
+            return {}, to_stream_id
+
+        # Prevent a query for one user's device also retrieving another user's device with
+        # the same device ID (device IDs are not unique across users).
+        if len(user_ids) > 1 and device_id is not None:
+            raise AssertionError(
+                "Programming error: 'device_id' cannot be supplied to "
+                "_get_device_messages when >1 user_id has been provided"
             )
-            txn.execute(
-                sql, (user_id, device_id, last_stream_id, current_stream_id, limit)
+
+        # A limit can only be applied when querying for a single user ID / device ID tuple.
+        # See the docstring of this function for more details.
+        if limit is not None and device_id is None:
+            raise AssertionError(
+                "Programming error: _get_device_messages was passed 'limit' "
+                "without a specific user_id/device_id"
             )
 
-            messages = []
-            stream_pos = current_stream_id
+        user_ids_to_query: Set[str] = set()
+        device_ids_to_query: Set[str] = set()
+
+        # Note that a device ID could be an empty str
+        if device_id is not None:
+            # If a device ID was passed, use it to filter results.
+            # Otherwise, device IDs will be derived from the given collection of user IDs.
+            device_ids_to_query.add(device_id)
+
+        # Determine which users have devices with pending messages
+        for user_id in user_ids:
+            if self._device_inbox_stream_cache.has_entity_changed(
+                user_id, from_stream_id
+            ):
+                # This user has new messages sent to them. Query messages for them
+                user_ids_to_query.add(user_id)
+
+        def get_device_messages_txn(txn: LoggingTransaction):
+            # Build a query to select messages from any of the given devices that
+            # are between the given stream id bounds.
+
+            # If a list of device IDs was not provided, retrieve all devices IDs
+            # for the given users. We explicitly do not query hidden devices, as
+            # hidden devices should not receive to-device messages.
+            # Note that this is more efficient than just dropping `device_id` from the query,
+            # since device_inbox has an index on `(user_id, device_id, stream_id)`
+            if not device_ids_to_query:
+                user_device_dicts = self.db_pool.simple_select_many_txn(
+                    txn,
+                    table="devices",
+                    column="user_id",
+                    iterable=user_ids_to_query,
+                    keyvalues={"user_id": user_id, "hidden": False},
+                    retcols=("device_id",),
+                )
 
-            for row in txn:
-                stream_pos = row[0]
-                messages.append(db_to_json(row[1]))
+                device_ids_to_query.update(
+                    {row["device_id"] for row in user_device_dicts}
+                )
 
-            # If the limit was not reached we know that there's no more data for this
-            # user/device pair up to current_stream_id.
-            if len(messages) < limit:
-                stream_pos = current_stream_id
+            if not device_ids_to_query:
+                # We've ended up with no devices to query.
+                return {}, to_stream_id
 
-            return messages, stream_pos
+            # We include both user IDs and device IDs in this query, as we have an index
+            # (device_inbox_user_stream_id) for them.
+            user_id_many_clause_sql, user_id_many_clause_args = make_in_list_sql_clause(
+                self.database_engine, "user_id", user_ids_to_query
+            )
+            (
+                device_id_many_clause_sql,
+                device_id_many_clause_args,
+            ) = make_in_list_sql_clause(
+                self.database_engine, "device_id", device_ids_to_query
+            )
+
+            sql = f"""
+                SELECT stream_id, user_id, device_id, message_json FROM device_inbox
+                WHERE {user_id_many_clause_sql}
+                AND {device_id_many_clause_sql}
+                AND ? < stream_id AND stream_id <= ?
+                ORDER BY stream_id ASC
+            """
+            sql_args = (
+                *user_id_many_clause_args,
+                *device_id_many_clause_args,
+                from_stream_id,
+                to_stream_id,
+            )
+
+            # If a limit was provided, limit the data retrieved from the database
+            if limit is not None:
+                sql += "LIMIT ?"
+                sql_args += (limit,)
+
+            txn.execute(sql, sql_args)
+
+            # Create and fill a dictionary of (user ID, device ID) -> list of messages
+            # intended for each device.
+            last_processed_stream_pos = to_stream_id
+            recipient_device_to_messages: Dict[Tuple[str, str], List[JsonDict]] = {}
+            for row in txn:
+                last_processed_stream_pos = row[0]
+                recipient_user_id = row[1]
+                recipient_device_id = row[2]
+                message_dict = db_to_json(row[3])
+
+                # Store the device details
+                recipient_device_to_messages.setdefault(
+                    (recipient_user_id, recipient_device_id), []
+                ).append(message_dict)
+
+            if limit is not None and txn.rowcount == limit:
+                # We ended up bumping up against the message limit. There may be more messages
+                # to retrieve. Return what we have, as well as the last stream position that
+                # was processed.
+                #
+                # The caller is expected to set this as the lower (exclusive) bound
+                # for the next query of this device.
+                return recipient_device_to_messages, last_processed_stream_pos
+
+            # The limit was not reached, thus we know that recipient_device_to_messages
+            # contains all to-device messages for the given device and stream id range.
+            #
+            # We return to_stream_id, which the caller should then provide as the lower
+            # (exclusive) bound on the next query of this device.
+            return recipient_device_to_messages, to_stream_id
 
         return await self.db_pool.runInteraction(
-            "get_new_messages_for_device", get_new_messages_for_device_txn
+            "get_device_messages", get_device_messages_txn
         )
 
     @trace
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index b2a5cd9a65..8d845fe951 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -1496,13 +1496,23 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
         )
 
     async def add_device_change_to_streams(
-        self, user_id: str, device_ids: Collection[str], hosts: List[str]
-    ) -> int:
+        self, user_id: str, device_ids: Collection[str], hosts: Collection[str]
+    ) -> Optional[int]:
         """Persist that a user's devices have been updated, and which hosts
         (if any) should be poked.
+
+        Args:
+            user_id: The ID of the user whose device changed.
+            device_ids: The IDs of any changed devices. If empty, this function will
+                return None.
+            hosts: The remote destinations that should be notified of the change.
+
+        Returns:
+            The maximum stream ID of device list updates that were added to the database, or
+            None if no updates were added.
         """
         if not device_ids:
-            return
+            return None
 
         async with self._device_list_id_gen.get_next_mult(
             len(device_ids)
@@ -1573,11 +1583,11 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
         self,
         txn: LoggingTransaction,
         user_id: str,
-        device_ids: Collection[str],
-        hosts: List[str],
+        device_ids: Iterable[str],
+        hosts: Collection[str],
         stream_ids: List[str],
         context: Dict[str, str],
-    ):
+    ) -> None:
         for host in hosts:
             txn.call_after(
                 self._device_list_federation_stream_cache.entity_has_changed,
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index ca71f073fc..22f6474127 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -16,9 +16,10 @@ import logging
 from queue import Empty, PriorityQueue
 from typing import TYPE_CHECKING, Collection, Dict, Iterable, List, Optional, Set, Tuple
 
+import attr
 from prometheus_client import Counter, Gauge
 
-from synapse.api.constants import MAX_DEPTH
+from synapse.api.constants import MAX_DEPTH, EventTypes
 from synapse.api.errors import StoreError
 from synapse.api.room_versions import EventFormatVersions, RoomVersion
 from synapse.events import EventBase, make_event_from_dict
@@ -60,6 +61,15 @@ pdus_pruned_from_federation_queue = Counter(
 logger = logging.getLogger(__name__)
 
 
+# All the info we need while iterating the DAG while backfilling
+@attr.s(frozen=True, slots=True, auto_attribs=True)
+class BackfillQueueNavigationItem:
+    depth: int
+    stream_ordering: int
+    event_id: str
+    type: str
+
+
 class _NoChainCoverIndex(Exception):
     def __init__(self, room_id: str):
         super().__init__("Unexpectedly no chain cover for events in %s" % (room_id,))
@@ -74,6 +84,8 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
     ):
         super().__init__(database, db_conn, hs)
 
+        self.hs = hs
+
         if hs.config.worker.run_background_tasks:
             hs.get_clock().looping_call(
                 self._delete_old_forward_extrem_cache, 60 * 60 * 1000
@@ -737,7 +749,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
             room_id,
         )
 
-    async def get_insertion_event_backwards_extremities_in_room(
+    async def get_insertion_event_backward_extremities_in_room(
         self, room_id
     ) -> Dict[str, int]:
         """Get the insertion events we know about that we haven't backfilled yet.
@@ -754,7 +766,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
             Map from event_id to depth
         """
 
-        def get_insertion_event_backwards_extremities_in_room_txn(txn, room_id):
+        def get_insertion_event_backward_extremities_in_room_txn(txn, room_id):
             sql = """
                 SELECT b.event_id, MAX(e.depth) FROM insertion_events as i
                 /* We only want insertion events that are also marked as backwards extremities */
@@ -770,8 +782,8 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
             return dict(txn)
 
         return await self.db_pool.runInteraction(
-            "get_insertion_event_backwards_extremities_in_room",
-            get_insertion_event_backwards_extremities_in_room_txn,
+            "get_insertion_event_backward_extremities_in_room",
+            get_insertion_event_backward_extremities_in_room_txn,
             room_id,
         )
 
@@ -997,143 +1009,242 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
             "get_forward_extremeties_for_room", get_forward_extremeties_for_room_txn
         )
 
-    async def get_backfill_events(self, room_id: str, event_list: list, limit: int):
-        """Get a list of Events for a given topic that occurred before (and
-        including) the events in event_list. Return a list of max size `limit`
+    def _get_connected_batch_event_backfill_results_txn(
+        self, txn: LoggingTransaction, insertion_event_id: str, limit: int
+    ) -> List[BackfillQueueNavigationItem]:
+        """
+        Find any batch connections of a given insertion event.
+        A batch event points at a insertion event via:
+        batch_event.content[MSC2716_BATCH_ID] -> insertion_event.content[MSC2716_NEXT_BATCH_ID]
 
         Args:
-            room_id
-            event_list
-            limit
+            txn: The database transaction to use
+            insertion_event_id: The event ID to navigate from. We will find
+                batch events that point back at this insertion event.
+            limit: Max number of event ID's to query for and return
+
+        Returns:
+            List of batch events that the backfill queue can process
+        """
+        batch_connection_query = """
+            SELECT e.depth, e.stream_ordering, c.event_id, e.type FROM insertion_events AS i
+            /* Find the batch that connects to the given insertion event */
+            INNER JOIN batch_events AS c
+            ON i.next_batch_id = c.batch_id
+            /* Get the depth of the batch start event from the events table */
+            INNER JOIN events AS e USING (event_id)
+            /* Find an insertion event which matches the given event_id */
+            WHERE i.event_id = ?
+            LIMIT ?
         """
-        event_ids = await self.db_pool.runInteraction(
-            "get_backfill_events",
-            self._get_backfill_events,
-            room_id,
-            event_list,
-            limit,
-        )
-        events = await self.get_events_as_list(event_ids)
-        return sorted(events, key=lambda e: -e.depth)
 
-    def _get_backfill_events(self, txn, room_id, event_list, limit):
-        logger.debug("_get_backfill_events: %s, %r, %s", room_id, event_list, limit)
+        # Find any batch connections for the given insertion event
+        txn.execute(
+            batch_connection_query,
+            (insertion_event_id, limit),
+        )
+        return [
+            BackfillQueueNavigationItem(
+                depth=row[0],
+                stream_ordering=row[1],
+                event_id=row[2],
+                type=row[3],
+            )
+            for row in txn
+        ]
 
-        event_results = set()
+    def _get_connected_prev_event_backfill_results_txn(
+        self, txn: LoggingTransaction, event_id: str, limit: int
+    ) -> List[BackfillQueueNavigationItem]:
+        """
+        Find any events connected by prev_event the specified event_id.
 
-        # We want to make sure that we do a breadth-first, "depth" ordered
-        # search.
+        Args:
+            txn: The database transaction to use
+            event_id: The event ID to navigate from
+            limit: Max number of event ID's to query for and return
 
+        Returns:
+            List of prev events that the backfill queue can process
+        """
         # Look for the prev_event_id connected to the given event_id
-        query = """
-            SELECT depth, prev_event_id FROM event_edges
-            /* Get the depth of the prev_event_id from the events table */
+        connected_prev_event_query = """
+            SELECT depth, stream_ordering, prev_event_id, events.type FROM event_edges
+            /* Get the depth and stream_ordering of the prev_event_id from the events table */
             INNER JOIN events
             ON prev_event_id = events.event_id
-            /* Find an event which matches the given event_id */
+            /* Look for an edge which matches the given event_id */
             WHERE event_edges.event_id = ?
             AND event_edges.is_state = ?
+            /* Because we can have many events at the same depth,
+            * we want to also tie-break and sort on stream_ordering */
+            ORDER BY depth DESC, stream_ordering DESC
             LIMIT ?
         """
 
-        # Look for the "insertion" events connected to the given event_id
-        connected_insertion_event_query = """
-            SELECT e.depth, i.event_id FROM insertion_event_edges AS i
-            /* Get the depth of the insertion event from the events table */
-            INNER JOIN events AS e USING (event_id)
-            /* Find an insertion event which points via prev_events to the given event_id */
-            WHERE i.insertion_prev_event_id = ?
-            LIMIT ?
+        txn.execute(
+            connected_prev_event_query,
+            (event_id, False, limit),
+        )
+        return [
+            BackfillQueueNavigationItem(
+                depth=row[0],
+                stream_ordering=row[1],
+                event_id=row[2],
+                type=row[3],
+            )
+            for row in txn
+        ]
+
+    async def get_backfill_events(
+        self, room_id: str, seed_event_id_list: list, limit: int
+    ):
+        """Get a list of Events for a given topic that occurred before (and
+        including) the events in seed_event_id_list. Return a list of max size `limit`
+
+        Args:
+            room_id
+            seed_event_id_list
+            limit
         """
+        event_ids = await self.db_pool.runInteraction(
+            "get_backfill_events",
+            self._get_backfill_events,
+            room_id,
+            seed_event_id_list,
+            limit,
+        )
+        events = await self.get_events_as_list(event_ids)
+        return sorted(
+            events, key=lambda e: (-e.depth, -e.internal_metadata.stream_ordering)
+        )
 
-        # Find any batch connections of a given insertion event
-        batch_connection_query = """
-            SELECT e.depth, c.event_id FROM insertion_events AS i
-            /* Find the batch that connects to the given insertion event */
-            INNER JOIN batch_events AS c
-            ON i.next_batch_id = c.batch_id
-            /* Get the depth of the batch start event from the events table */
-            INNER JOIN events AS e USING (event_id)
-            /* Find an insertion event which matches the given event_id */
-            WHERE i.event_id = ?
-            LIMIT ?
+    def _get_backfill_events(self, txn, room_id, seed_event_id_list, limit):
+        """
+        We want to make sure that we do a breadth-first, "depth" ordered search.
+        We also handle navigating historical branches of history connected by
+        insertion and batch events.
         """
+        logger.debug(
+            "_get_backfill_events(room_id=%s): seeding backfill with seed_event_id_list=%s limit=%s",
+            room_id,
+            seed_event_id_list,
+            limit,
+        )
+
+        event_id_results = set()
 
         # In a PriorityQueue, the lowest valued entries are retrieved first.
-        # We're using depth as the priority in the queue.
-        # Depth is lowest at the oldest-in-time message and highest and
-        # newest-in-time message. We add events to the queue with a negative depth so that
-        # we process the newest-in-time messages first going backwards in time.
+        # We're using depth as the priority in the queue and tie-break based on
+        # stream_ordering. Depth is lowest at the oldest-in-time message and
+        # highest and newest-in-time message. We add events to the queue with a
+        # negative depth so that we process the newest-in-time messages first
+        # going backwards in time. stream_ordering follows the same pattern.
         queue = PriorityQueue()
 
-        for event_id in event_list:
-            depth = self.db_pool.simple_select_one_onecol_txn(
+        for seed_event_id in seed_event_id_list:
+            event_lookup_result = self.db_pool.simple_select_one_txn(
                 txn,
                 table="events",
-                keyvalues={"event_id": event_id, "room_id": room_id},
-                retcol="depth",
+                keyvalues={"event_id": seed_event_id, "room_id": room_id},
+                retcols=(
+                    "type",
+                    "depth",
+                    "stream_ordering",
+                ),
                 allow_none=True,
             )
 
-            if depth:
-                queue.put((-depth, event_id))
+            if event_lookup_result is not None:
+                logger.debug(
+                    "_get_backfill_events(room_id=%s): seed_event_id=%s depth=%s stream_ordering=%s type=%s",
+                    room_id,
+                    seed_event_id,
+                    event_lookup_result["depth"],
+                    event_lookup_result["stream_ordering"],
+                    event_lookup_result["type"],
+                )
 
-        while not queue.empty() and len(event_results) < limit:
+                if event_lookup_result["depth"]:
+                    queue.put(
+                        (
+                            -event_lookup_result["depth"],
+                            -event_lookup_result["stream_ordering"],
+                            seed_event_id,
+                            event_lookup_result["type"],
+                        )
+                    )
+
+        while not queue.empty() and len(event_id_results) < limit:
             try:
-                _, event_id = queue.get_nowait()
+                _, _, event_id, event_type = queue.get_nowait()
             except Empty:
                 break
 
-            if event_id in event_results:
+            if event_id in event_id_results:
                 continue
 
-            event_results.add(event_id)
+            event_id_results.add(event_id)
 
             # Try and find any potential historical batches of message history.
-            #
-            # First we look for an insertion event connected to the current
-            # event (by prev_event). If we find any, we need to go and try to
-            # find any batch events connected to the insertion event (by
-            # batch_id). If we find any, we'll add them to the queue and
-            # navigate up the DAG like normal in the next iteration of the loop.
-            txn.execute(
-                connected_insertion_event_query, (event_id, limit - len(event_results))
-            )
-            connected_insertion_event_id_results = txn.fetchall()
-            logger.debug(
-                "_get_backfill_events: connected_insertion_event_query %s",
-                connected_insertion_event_id_results,
-            )
-            for row in connected_insertion_event_id_results:
-                connected_insertion_event_depth = row[0]
-                connected_insertion_event = row[1]
-                queue.put((-connected_insertion_event_depth, connected_insertion_event))
+            if self.hs.config.experimental.msc2716_enabled:
+                # We need to go and try to find any batch events connected
+                # to a given insertion event (by batch_id). If we find any, we'll
+                # add them to the queue and navigate up the DAG like normal in the
+                # next iteration of the loop.
+                if event_type == EventTypes.MSC2716_INSERTION:
+                    # Find any batch connections for the given insertion event
+                    connected_batch_event_backfill_results = (
+                        self._get_connected_batch_event_backfill_results_txn(
+                            txn, event_id, limit - len(event_id_results)
+                        )
+                    )
+                    logger.debug(
+                        "_get_backfill_events(room_id=%s): connected_batch_event_backfill_results=%s",
+                        room_id,
+                        connected_batch_event_backfill_results,
+                    )
+                    for (
+                        connected_batch_event_backfill_item
+                    ) in connected_batch_event_backfill_results:
+                        if (
+                            connected_batch_event_backfill_item.event_id
+                            not in event_id_results
+                        ):
+                            queue.put(
+                                (
+                                    -connected_batch_event_backfill_item.depth,
+                                    -connected_batch_event_backfill_item.stream_ordering,
+                                    connected_batch_event_backfill_item.event_id,
+                                    connected_batch_event_backfill_item.type,
+                                )
+                            )
 
-                # Find any batch connections for the given insertion event
-                txn.execute(
-                    batch_connection_query,
-                    (connected_insertion_event, limit - len(event_results)),
-                )
-                batch_start_event_id_results = txn.fetchall()
-                logger.debug(
-                    "_get_backfill_events: batch_start_event_id_results %s",
-                    batch_start_event_id_results,
+            # Now we just look up the DAG by prev_events as normal
+            connected_prev_event_backfill_results = (
+                self._get_connected_prev_event_backfill_results_txn(
+                    txn, event_id, limit - len(event_id_results)
                 )
-                for row in batch_start_event_id_results:
-                    if row[1] not in event_results:
-                        queue.put((-row[0], row[1]))
-
-            txn.execute(query, (event_id, False, limit - len(event_results)))
-            prev_event_id_results = txn.fetchall()
+            )
             logger.debug(
-                "_get_backfill_events: prev_event_ids %s", prev_event_id_results
+                "_get_backfill_events(room_id=%s): connected_prev_event_backfill_results=%s",
+                room_id,
+                connected_prev_event_backfill_results,
             )
+            for (
+                connected_prev_event_backfill_item
+            ) in connected_prev_event_backfill_results:
+                if connected_prev_event_backfill_item.event_id not in event_id_results:
+                    queue.put(
+                        (
+                            -connected_prev_event_backfill_item.depth,
+                            -connected_prev_event_backfill_item.stream_ordering,
+                            connected_prev_event_backfill_item.event_id,
+                            connected_prev_event_backfill_item.type,
+                        )
+                    )
 
-            for row in prev_event_id_results:
-                if row[1] not in event_results:
-                    queue.put((-row[0], row[1]))
-
-        return event_results
+        return event_id_results
 
     async def get_missing_events(self, room_id, earliest_events, latest_events, limit):
         ids = await self.db_pool.runInteraction(
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index b7554154ac..2e44c77715 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1801,9 +1801,7 @@ class PersistEventsStore:
         )
 
         if rel_type == RelationTypes.REPLACE:
-            txn.call_after(
-                self.store.get_applicable_edit.invalidate, (parent_id, event.room_id)
-            )
+            txn.call_after(self.store.get_applicable_edit.invalidate, (parent_id,))
 
         if rel_type == RelationTypes.THREAD:
             txn.call_after(
@@ -2215,9 +2213,14 @@ class PersistEventsStore:
             "   SELECT 1 FROM event_backward_extremities"
             "   WHERE event_id = ? AND room_id = ?"
             " )"
+            # 1. Don't add an event as a extremity again if we already persisted it
+            # as a non-outlier.
+            # 2. Don't add an outlier as an extremity if it has no prev_events
             " AND NOT EXISTS ("
-            "   SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
-            "   AND outlier = ?"
+            "   SELECT 1 FROM events"
+            "   LEFT JOIN event_edges edge"
+            "   ON edge.event_id = events.event_id"
+            "   WHERE events.event_id = ? AND events.room_id = ? AND (events.outlier = ? OR edge.event_id IS NULL)"
             " )"
         )
 
@@ -2243,6 +2246,10 @@ class PersistEventsStore:
                 (ev.event_id, ev.room_id)
                 for ev in events
                 if not ev.internal_metadata.is_outlier()
+                # If we encountered an event with no prev_events, then we might
+                # as well remove it now because it won't ever have anything else
+                # to backfill from.
+                or len(ev.prev_event_ids()) == 0
             ],
         )
 
diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py
index e01c94930a..92539f5d41 100644
--- a/synapse/storage/databases/main/push_rule.py
+++ b/synapse/storage/databases/main/push_rule.py
@@ -42,7 +42,7 @@ if TYPE_CHECKING:
 logger = logging.getLogger(__name__)
 
 
-def _load_rules(rawrules, enabled_map, use_new_defaults=False):
+def _load_rules(rawrules, enabled_map):
     ruleslist = []
     for rawrule in rawrules:
         rule = dict(rawrule)
@@ -52,7 +52,7 @@ def _load_rules(rawrules, enabled_map, use_new_defaults=False):
         ruleslist.append(rule)
 
     # We're going to be mutating this a lot, so do a deep copy
-    rules = list(list_with_base_rules(ruleslist, use_new_defaults))
+    rules = list(list_with_base_rules(ruleslist))
 
     for i, rule in enumerate(rules):
         rule_id = rule["rule_id"]
@@ -112,10 +112,6 @@ class PushRulesWorkerStore(
             prefilled_cache=push_rules_prefill,
         )
 
-        self._users_new_default_push_rules = (
-            hs.config.server.users_new_default_push_rules
-        )
-
     @abc.abstractmethod
     def get_max_push_rules_stream_id(self):
         """Get the position of the push rules stream.
@@ -145,9 +141,7 @@ class PushRulesWorkerStore(
 
         enabled_map = await self.get_push_rules_enabled_for_user(user_id)
 
-        use_new_defaults = user_id in self._users_new_default_push_rules
-
-        return _load_rules(rows, enabled_map, use_new_defaults)
+        return _load_rules(rows, enabled_map)
 
     @cached(max_entries=5000)
     async def get_push_rules_enabled_for_user(self, user_id) -> Dict[str, bool]:
@@ -206,13 +200,7 @@ class PushRulesWorkerStore(
         enabled_map_by_user = await self.bulk_get_push_rules_enabled(user_ids)
 
         for user_id, rules in results.items():
-            use_new_defaults = user_id in self._users_new_default_push_rules
-
-            results[user_id] = _load_rules(
-                rules,
-                enabled_map_by_user.get(user_id, {}),
-                use_new_defaults,
-            )
+            results[user_id] = _load_rules(rules, enabled_map_by_user.get(user_id, {}))
 
         return results
 
diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py
index 37468a5183..6180b17296 100644
--- a/synapse/storage/databases/main/relations.py
+++ b/synapse/storage/databases/main/relations.py
@@ -13,12 +13,22 @@
 # limitations under the License.
 
 import logging
-from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple, Union, cast
+from typing import (
+    TYPE_CHECKING,
+    Collection,
+    Dict,
+    Iterable,
+    List,
+    Optional,
+    Tuple,
+    Union,
+    cast,
+)
 
 import attr
 from frozendict import frozendict
 
-from synapse.api.constants import EventTypes, RelationTypes
+from synapse.api.constants import RelationTypes
 from synapse.events import EventBase
 from synapse.storage._base import SQLBaseStore
 from synapse.storage.database import (
@@ -28,13 +38,14 @@ from synapse.storage.database import (
     make_in_list_sql_clause,
 )
 from synapse.storage.databases.main.stream import generate_pagination_where_clause
+from synapse.storage.engines import PostgresEngine
 from synapse.storage.relations import (
     AggregationPaginationToken,
     PaginationChunk,
     RelationPaginationToken,
 )
 from synapse.types import JsonDict
-from synapse.util.caches.descriptors import cached
+from synapse.util.caches.descriptors import cached, cachedList
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -340,20 +351,24 @@ class RelationsWorkerStore(SQLBaseStore):
         )
 
     @cached()
-    async def get_applicable_edit(
-        self, event_id: str, room_id: str
-    ) -> Optional[EventBase]:
+    def get_applicable_edit(self, event_id: str) -> Optional[EventBase]:
+        raise NotImplementedError()
+
+    @cachedList(cached_method_name="get_applicable_edit", list_name="event_ids")
+    async def _get_applicable_edits(
+        self, event_ids: Collection[str]
+    ) -> Dict[str, Optional[EventBase]]:
         """Get the most recent edit (if any) that has happened for the given
-        event.
+        events.
 
         Correctly handles checking whether edits were allowed to happen.
 
         Args:
-            event_id: The original event ID
-            room_id: The original event's room ID
+            event_ids: The original event IDs
 
         Returns:
-            The most recent edit, if any.
+            A map of the most recent edit for each event. If there are no edits,
+            the event will map to None.
         """
 
         # We only allow edits for `m.room.message` events that have the same sender
@@ -362,37 +377,67 @@ class RelationsWorkerStore(SQLBaseStore):
 
         # Fetches latest edit that has the same type and sender as the
         # original, and is an `m.room.message`.
-        sql = """
-            SELECT edit.event_id FROM events AS edit
-            INNER JOIN event_relations USING (event_id)
-            INNER JOIN events AS original ON
-                original.event_id = relates_to_id
-                AND edit.type = original.type
-                AND edit.sender = original.sender
-            WHERE
-                relates_to_id = ?
-                AND relation_type = ?
-                AND edit.room_id = ?
-                AND edit.type = 'm.room.message'
-            ORDER by edit.origin_server_ts DESC, edit.event_id DESC
-            LIMIT 1
-        """
+        if isinstance(self.database_engine, PostgresEngine):
+            # The `DISTINCT ON` clause will pick the *first* row it encounters,
+            # so ordering by origin server ts + event ID desc will ensure we get
+            # the latest edit.
+            sql = """
+                SELECT DISTINCT ON (original.event_id) original.event_id, edit.event_id FROM events AS edit
+                INNER JOIN event_relations USING (event_id)
+                INNER JOIN events AS original ON
+                    original.event_id = relates_to_id
+                    AND edit.type = original.type
+                    AND edit.sender = original.sender
+                    AND edit.room_id = original.room_id
+                WHERE
+                    %s
+                    AND relation_type = ?
+                    AND edit.type = 'm.room.message'
+                ORDER by original.event_id DESC, edit.origin_server_ts DESC, edit.event_id DESC
+            """
+        else:
+            # SQLite uses a simplified query which returns all edits for an
+            # original event. The results are then de-duplicated when turned into
+            # a dict. Due to the chosen ordering, the latest edit stomps on
+            # earlier edits.
+            sql = """
+                SELECT original.event_id, edit.event_id FROM events AS edit
+                INNER JOIN event_relations USING (event_id)
+                INNER JOIN events AS original ON
+                    original.event_id = relates_to_id
+                    AND edit.type = original.type
+                    AND edit.sender = original.sender
+                    AND edit.room_id = original.room_id
+                WHERE
+                    %s
+                    AND relation_type = ?
+                    AND edit.type = 'm.room.message'
+                ORDER by edit.origin_server_ts, edit.event_id
+            """
 
-        def _get_applicable_edit_txn(txn: LoggingTransaction) -> Optional[str]:
-            txn.execute(sql, (event_id, RelationTypes.REPLACE, room_id))
-            row = txn.fetchone()
-            if row:
-                return row[0]
-            return None
+        def _get_applicable_edits_txn(txn: LoggingTransaction) -> Dict[str, str]:
+            clause, args = make_in_list_sql_clause(
+                txn.database_engine, "relates_to_id", event_ids
+            )
+            args.append(RelationTypes.REPLACE)
 
-        edit_id = await self.db_pool.runInteraction(
-            "get_applicable_edit", _get_applicable_edit_txn
+            txn.execute(sql % (clause,), args)
+            return dict(cast(Iterable[Tuple[str, str]], txn.fetchall()))
+
+        edit_ids = await self.db_pool.runInteraction(
+            "get_applicable_edits", _get_applicable_edits_txn
         )
 
-        if not edit_id:
-            return None
+        edits = await self.get_events(edit_ids.values())  # type: ignore[attr-defined]
 
-        return await self.get_event(edit_id, allow_none=True)  # type: ignore[attr-defined]
+        # Map to the original event IDs to the edit events.
+        #
+        # There might not be an edit event due to there being no edits or
+        # due to the event not being known, either case is treated the same.
+        return {
+            original_event_id: edits.get(edit_ids.get(original_event_id))
+            for original_event_id in event_ids
+        }
 
     @cached()
     async def get_thread_summary(
@@ -612,9 +657,6 @@ class RelationsWorkerStore(SQLBaseStore):
             The bundled aggregations for an event, if bundled aggregations are
             enabled and the event can have bundled aggregations.
         """
-        # State events and redacted events do not get bundled aggregations.
-        if event.is_state() or event.internal_metadata.is_redacted():
-            return None
 
         # Do not bundle aggregations for an event which represents an edit or an
         # annotation. It does not make sense for them to have related events.
@@ -642,13 +684,6 @@ class RelationsWorkerStore(SQLBaseStore):
         if references.chunk:
             aggregations.references = references.to_dict()
 
-        edit = None
-        if event.type == EventTypes.Message:
-            edit = await self.get_applicable_edit(event_id, room_id)
-
-        if edit:
-            aggregations.replace = edit
-
         # If this event is the start of a thread, include a summary of the replies.
         if self._msc3440_enabled:
             thread_count, latest_thread_event = await self.get_thread_summary(
@@ -668,9 +703,7 @@ class RelationsWorkerStore(SQLBaseStore):
         return aggregations
 
     async def get_bundled_aggregations(
-        self,
-        events: Iterable[EventBase],
-        user_id: str,
+        self, events: Iterable[EventBase], user_id: str
     ) -> Dict[str, BundledAggregations]:
         """Generate bundled aggregations for events.
 
@@ -683,13 +716,28 @@ class RelationsWorkerStore(SQLBaseStore):
             events may have bundled aggregations in the results.
         """
 
-        # TODO Parallelize.
-        results = {}
+        # State events and redacted events do not get bundled aggregations.
+        events = [
+            event
+            for event in events
+            if not event.is_state() and not event.internal_metadata.is_redacted()
+        ]
+
+        # event ID -> bundled aggregation in non-serialized form.
+        results: Dict[str, BundledAggregations] = {}
+
+        # Fetch other relations per event.
         for event in events:
             event_result = await self._get_bundled_aggregation_for_event(event, user_id)
             if event_result:
                 results[event.event_id] = event_result
 
+        # Fetch any edits.
+        event_ids = [event.event_id for event in events]
+        edits = await self._get_applicable_edits(event_ids)
+        for event_id, edit in edits.items():
+            results.setdefault(event_id, BundledAggregations()).replace = edit
+
         return results
 
 
diff --git a/synapse/storage/schema/main/delta/68/02_msc2409_add_device_id_appservice_stream_type.sql b/synapse/storage/schema/main/delta/68/02_msc2409_add_device_id_appservice_stream_type.sql
new file mode 100644
index 0000000000..bbf0af5311
--- /dev/null
+++ b/synapse/storage/schema/main/delta/68/02_msc2409_add_device_id_appservice_stream_type.sql
@@ -0,0 +1,21 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Add a column to track what to_device stream id that this application
+-- service has been caught up to.
+
+-- NULL indicates that this appservice has never received any to_device messages. This
+-- can be used, for example, to avoid sending a huge dump of messages at startup.
+ALTER TABLE application_services_state ADD COLUMN to_device_stream_id BIGINT;
\ No newline at end of file
diff --git a/synapse/storage/schema/main/delta/68/03_delete_account_data_for_deactivated_accounts.sql b/synapse/storage/schema/main/delta/68/03_delete_account_data_for_deactivated_accounts.sql
new file mode 100644
index 0000000000..e124933843
--- /dev/null
+++ b/synapse/storage/schema/main/delta/68/03_delete_account_data_for_deactivated_accounts.sql
@@ -0,0 +1,20 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+-- We want to retroactively delete account data for users that were already
+-- deactivated.
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+  (6803, 'delete_account_data_for_deactivated_users', '{}');
diff --git a/synapse/util/caches/deferred_cache.py b/synapse/util/caches/deferred_cache.py
index 377c9a282a..1d6ec22191 100644
--- a/synapse/util/caches/deferred_cache.py
+++ b/synapse/util/caches/deferred_cache.py
@@ -81,13 +81,14 @@ class DeferredCache(Generic[KT, VT]):
         Args:
             name: The name of the cache
             max_entries: Maximum amount of entries that the cache will hold
-            keylen: The length of the tuple used as the cache key. Ignored unless
-               `tree` is True.
             tree: Use a TreeCache instead of a dict as the underlying cache type
             iterable: If True, count each item in the cached object as an entry,
                 rather than each cached object
             apply_cache_factor_from_config: Whether cache factors specified in the
                 config file affect `max_entries`
+            prune_unread_entries: If True, cache entries that haven't been read recently
+                will be evicted from the cache in the background. Set to False to
+                opt-out of this behaviour.
         """
         cache_type = TreeCache if tree else dict
 
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index 375cd443f1..df4fb156c2 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -254,9 +254,17 @@ class DeferredCacheDescriptor(_CacheDescriptorBase):
             return r1 + r2
 
     Args:
+        orig:
+        max_entries:
         num_args: number of positional arguments (excluding ``self`` and
             ``cache_context``) to use as cache keys. Defaults to all named
             args of the function.
+        tree:
+        cache_context:
+        iterable:
+        prune_unread_entries: If True, cache entries that haven't been read recently
+            will be evicted from the cache in the background. Set to False to opt-out
+            of this behaviour.
     """
 
     def __init__(
diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py
index 3f11a2f9dd..7548b38548 100644
--- a/synapse/util/caches/lrucache.py
+++ b/synapse/util/caches/lrucache.py
@@ -340,6 +340,12 @@ class LruCache(Generic[KT, VT]):
 
             apply_cache_factor_from_config (bool): If true, `max_size` will be
                 multiplied by a cache factor derived from the homeserver config
+
+            clock:
+
+            prune_unread_entries: If True, cache entries that haven't been read recently
+                will be evicted from the cache in the background. Set to False to
+                opt-out of this behaviour.
         """
         # Default `clock` to something sensible. Note that we rename it to
         # `real_clock` so that mypy doesn't think its still `Optional`.
diff --git a/synapse/util/threepids.py b/synapse/util/threepids.py
index 389adf00f6..1e9c2faa64 100644
--- a/synapse/util/threepids.py
+++ b/synapse/util/threepids.py
@@ -32,7 +32,12 @@ logger = logging.getLogger(__name__)
 MAX_EMAIL_ADDRESS_LENGTH = 500
 
 
-def check_3pid_allowed(hs: "HomeServer", medium: str, address: str) -> bool:
+async def check_3pid_allowed(
+    hs: "HomeServer",
+    medium: str,
+    address: str,
+    registration: bool = False,
+) -> bool:
     """Checks whether a given format of 3PID is allowed to be used on this HS
 
     Args:
@@ -40,9 +45,15 @@ def check_3pid_allowed(hs: "HomeServer", medium: str, address: str) -> bool:
         medium: 3pid medium - e.g. email, msisdn
         address: address within that medium (e.g. "wotan@matrix.org")
             msisdns need to first have been canonicalised
+        registration: whether we want to bind the 3PID as part of registering a new user.
+
     Returns:
         bool: whether the 3PID medium/address is allowed to be added to this HS
     """
+    if not await hs.get_password_auth_provider().is_3pid_allowed(
+        medium, address, registration
+    ):
+        return False
 
     if hs.config.registration.allowed_local_3pids:
         for constraint in hs.config.registration.allowed_local_3pids: