summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorEric Eastwood <erice@element.io>2022-08-18 16:33:22 -0500
committerEric Eastwood <erice@element.io>2022-08-18 16:33:22 -0500
commit8def7e4b4b6d0ecfb7776bf987f621e38946b50c (patch)
tree92fb7ce5f7008e7b053a36342409e1958fbdbb46 /synapse
parentMerge branch 'develop' into madlittlemods/11850-migrate-to-opentelemetry (diff)
parentAdd metrics to track `/messages` response time by room size (#13545) (diff)
downloadsynapse-8def7e4b4b6d0ecfb7776bf987f621e38946b50c.tar.xz
Merge branch 'develop' into madlittlemods/11850-migrate-to-opentelemetry
Conflicts:
	poetry.lock
	synapse/federation/federation_client.py
	synapse/federation/federation_server.py
	synapse/handlers/federation.py
	synapse/handlers/federation_event.py
	synapse/logging/opentracing.py
	synapse/rest/client/room.py
	synapse/storage/controllers/persist_events.py
	synapse/storage/controllers/state.py
Diffstat (limited to 'synapse')
-rw-r--r--synapse/app/generic_worker.py7
-rw-r--r--synapse/app/homeserver.py5
-rw-r--r--synapse/config/account_validity.py2
-rw-r--r--synapse/config/emailconfig.py2
-rw-r--r--synapse/config/sso.py2
-rw-r--r--synapse/federation/federation_client.py27
-rw-r--r--synapse/federation/federation_server.py6
-rw-r--r--synapse/handlers/federation.py68
-rw-r--r--synapse/handlers/federation_event.py161
-rw-r--r--synapse/handlers/message.py6
-rw-r--r--synapse/handlers/room_summary.py1
-rw-r--r--synapse/handlers/sync.py361
-rw-r--r--synapse/http/servlet.py25
-rw-r--r--synapse/logging/tracing.py21
-rw-r--r--synapse/push/baserules.py529
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py37
-rw-r--r--synapse/push/clientformat.py68
-rw-r--r--synapse/push/push_rule_evaluator.py27
-rw-r--r--synapse/res/templates/account_previously_renewed.html13
-rw-r--r--synapse/res/templates/account_renewed.html13
-rw-r--r--synapse/res/templates/add_threepid.html11
-rw-r--r--synapse/res/templates/add_threepid_failure.html15
-rw-r--r--synapse/res/templates/add_threepid_success.html14
-rw-r--r--synapse/res/templates/auth_success.html4
-rw-r--r--synapse/res/templates/invalid_token.html13
-rw-r--r--synapse/res/templates/notice_expiry.html2
-rw-r--r--synapse/res/templates/notif_mail.html2
-rw-r--r--synapse/res/templates/password_reset.html7
-rw-r--r--synapse/res/templates/password_reset_confirmation.html8
-rw-r--r--synapse/res/templates/password_reset_failure.html8
-rw-r--r--synapse/res/templates/password_reset_success.html7
-rw-r--r--synapse/res/templates/recaptcha.html4
-rw-r--r--synapse/res/templates/registration.html7
-rw-r--r--synapse/res/templates/registration_failure.html7
-rw-r--r--synapse/res/templates/registration_success.html8
-rw-r--r--synapse/res/templates/registration_token.html6
-rw-r--r--synapse/res/templates/sso_account_deactivated.html4
-rw-r--r--synapse/res/templates/sso_auth_account_details.html3
-rw-r--r--synapse/res/templates/sso_auth_bad_user.html3
-rw-r--r--synapse/res/templates/sso_auth_confirm.html3
-rw-r--r--synapse/res/templates/sso_auth_success.html3
-rw-r--r--synapse/res/templates/sso_error.html3
-rw-r--r--synapse/res/templates/sso_login_idp_picker.html2
-rw-r--r--synapse/res/templates/sso_new_user_consent.html3
-rw-r--r--synapse/res/templates/sso_redirect_confirm.html3
-rw-r--r--synapse/res/templates/terms.html4
-rw-r--r--synapse/rest/admin/rooms.py1
-rw-r--r--synapse/rest/client/account.py148
-rw-r--r--synapse/rest/client/models.py69
-rw-r--r--synapse/rest/client/room.py83
-rw-r--r--synapse/rest/models.py23
-rw-r--r--synapse/static/client/login/index.html3
-rw-r--r--synapse/static/client/register/index.html3
-rw-r--r--synapse/storage/controllers/persist_events.py21
-rw-r--r--synapse/storage/controllers/state.py42
-rw-r--r--synapse/storage/databases/main/event_federation.py9
-rw-r--r--synapse/storage/databases/main/event_push_actions.py75
-rw-r--r--synapse/storage/databases/main/events.py2
-rw-r--r--synapse/storage/databases/main/events_worker.py98
-rw-r--r--synapse/storage/databases/main/push_rule.py121
-rw-r--r--synapse/storage/databases/main/receipts.py2
-rw-r--r--synapse/storage/databases/main/room.py6
-rw-r--r--synapse/storage/databases/main/roommember.py27
-rw-r--r--synapse/storage/databases/main/state.py5
-rw-r--r--synapse/storage/state.py9
-rw-r--r--synapse/storage/util/partial_state_events_tracker.py3
-rw-r--r--synapse/util/ratelimitutils.py111
-rw-r--r--synapse/visibility.py4
68 files changed, 1787 insertions, 613 deletions
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 42d1f6d219..30e21d9707 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -441,6 +441,13 @@ def start(config_options: List[str]) -> None:
         "synapse.app.user_dir",
     )
 
+    if config.experimental.faster_joins_enabled:
+        raise ConfigError(
+            "You have enabled the experimental `faster_joins` config option, but it is "
+            "not compatible with worker deployments yet. Please disable `faster_joins` "
+            "or run Synapse as a single process deployment instead."
+        )
+
     synapse.events.USE_FROZEN_DICTS = config.server.use_frozen_dicts
     synapse.util.caches.TRACK_MEMORY_USAGE = config.caches.track_memory_usage
 
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 745e704141..d98012adeb 100644
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -220,7 +220,10 @@ class SynapseHomeServer(HomeServer):
             resources.update({"/_matrix/consent": consent_resource})
 
         if name == "federation":
-            resources.update({FEDERATION_PREFIX: TransportLayerServer(self)})
+            federation_resource: Resource = TransportLayerServer(self)
+            if compress:
+                federation_resource = gz_wrap(federation_resource)
+            resources.update({FEDERATION_PREFIX: federation_resource})
 
         if name == "openid":
             resources.update(
diff --git a/synapse/config/account_validity.py b/synapse/config/account_validity.py
index d1335e77cd..b3972ede96 100644
--- a/synapse/config/account_validity.py
+++ b/synapse/config/account_validity.py
@@ -23,7 +23,7 @@ LEGACY_TEMPLATE_DIR_WARNING = """
 This server's configuration file is using the deprecated 'template_dir' setting in the
 'account_validity' section. Support for this setting has been deprecated and will be
 removed in a future version of Synapse. Server admins should instead use the new
-'custom_templates_directory' setting documented here:
+'custom_template_directory' setting documented here:
 https://matrix-org.github.io/synapse/latest/templates.html
 ---------------------------------------------------------------------------------------"""
 
diff --git a/synapse/config/emailconfig.py b/synapse/config/emailconfig.py
index 7765c5b454..66a6dbf1fe 100644
--- a/synapse/config/emailconfig.py
+++ b/synapse/config/emailconfig.py
@@ -53,7 +53,7 @@ LEGACY_TEMPLATE_DIR_WARNING = """
 This server's configuration file is using the deprecated 'template_dir' setting in the
 'email' section. Support for this setting has been deprecated and will be removed in a
 future version of Synapse. Server admins should instead use the new
-'custom_templates_directory' setting documented here:
+'custom_template_directory' setting documented here:
 https://matrix-org.github.io/synapse/latest/templates.html
 ---------------------------------------------------------------------------------------"""
 
diff --git a/synapse/config/sso.py b/synapse/config/sso.py
index 2178cbf983..a452cc3a49 100644
--- a/synapse/config/sso.py
+++ b/synapse/config/sso.py
@@ -26,7 +26,7 @@ LEGACY_TEMPLATE_DIR_WARNING = """
 This server's configuration file is using the deprecated 'template_dir' setting in the
 'sso' section. Support for this setting has been deprecated and will be removed in a
 future version of Synapse. Server admins should instead use the new
-'custom_templates_directory' setting documented here:
+'custom_template_directory' setting documented here:
 https://matrix-org.github.io/synapse/latest/templates.html
 ---------------------------------------------------------------------------------------"""
 
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index dfd0bc414e..dee5461270 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -61,7 +61,7 @@ from synapse.federation.federation_base import (
 )
 from synapse.federation.transport.client import SendJoinResponse
 from synapse.http.types import QueryParams
-from synapse.logging.tracing import trace
+from synapse.logging.tracing import SynapseTags, set_attribute, tag_args, trace
 from synapse.types import JsonDict, UserID, get_domain_from_id
 from synapse.util.async_helpers import concurrently_execute
 from synapse.util.caches.expiringcache import ExpiringCache
@@ -235,6 +235,7 @@ class FederationClient(FederationBase):
         )
 
     @trace
+    @tag_args
     async def backfill(
         self, dest: str, room_id: str, limit: int, extremities: Collection[str]
     ) -> Optional[List[EventBase]]:
@@ -337,6 +338,8 @@ class FederationClient(FederationBase):
 
         return None
 
+    @trace
+    @tag_args
     async def get_pdu(
         self,
         destinations: Iterable[str],
@@ -448,6 +451,8 @@ class FederationClient(FederationBase):
 
         return event_copy
 
+    @trace
+    @tag_args
     async def get_room_state_ids(
         self, destination: str, room_id: str, event_id: str
     ) -> Tuple[List[str], List[str]]:
@@ -467,6 +472,23 @@ class FederationClient(FederationBase):
         state_event_ids = result["pdu_ids"]
         auth_event_ids = result.get("auth_chain_ids", [])
 
+        set_attribute(
+            SynapseTags.RESULT_PREFIX + "state_event_ids",
+            str(state_event_ids),
+        )
+        set_attribute(
+            SynapseTags.RESULT_PREFIX + "state_event_ids.length",
+            str(len(state_event_ids)),
+        )
+        set_attribute(
+            SynapseTags.RESULT_PREFIX + "auth_event_ids",
+            str(auth_event_ids),
+        )
+        set_attribute(
+            SynapseTags.RESULT_PREFIX + "auth_event_ids.length",
+            str(len(auth_event_ids)),
+        )
+
         if not isinstance(state_event_ids, list) or not isinstance(
             auth_event_ids, list
         ):
@@ -474,6 +496,8 @@ class FederationClient(FederationBase):
 
         return state_event_ids, auth_event_ids
 
+    @trace
+    @tag_args
     async def get_room_state(
         self,
         destination: str,
@@ -533,6 +557,7 @@ class FederationClient(FederationBase):
 
         return valid_state_events, valid_auth_events
 
+    @trace
     async def _check_sigs_and_hash_and_fetch(
         self,
         origin: str,
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index c5b3b3cedf..dd9aeba9c8 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -61,7 +61,7 @@ from synapse.logging.context import (
     nested_logging_context,
     run_in_background,
 )
-from synapse.logging.tracing import log_kv, start_active_span_from_edu, trace
+from synapse.logging.tracing import log_kv, start_active_span_from_edu, tag_args, trace
 from synapse.metrics.background_process_metrics import wrap_as_background_process
 from synapse.replication.http.federation import (
     ReplicationFederationSendEduRestServlet,
@@ -547,6 +547,8 @@ class FederationServer(FederationBase):
 
         return 200, resp
 
+    @trace
+    @tag_args
     async def on_state_ids_request(
         self, origin: str, room_id: str, event_id: str
     ) -> Tuple[int, JsonDict]:
@@ -569,6 +571,8 @@ class FederationServer(FederationBase):
 
         return 200, resp
 
+    @trace
+    @tag_args
     async def _on_state_ids_request_compute(
         self, room_id: str, event_id: str
     ) -> JsonDict:
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 5e2b1c0456..8dfc424807 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -32,6 +32,7 @@ from typing import (
 )
 
 import attr
+from prometheus_client import Histogram
 from signedjson.key import decode_verify_key_bytes
 from signedjson.sign import verify_signed_json
 from unpaddedbase64 import decode_base64
@@ -59,7 +60,7 @@ from synapse.events.validator import EventValidator
 from synapse.federation.federation_client import InvalidResponseError
 from synapse.http.servlet import assert_params_in_dict
 from synapse.logging.context import nested_logging_context
-from synapse.logging.tracing import trace
+from synapse.logging.tracing import SynapseTags, set_attribute, tag_args, trace
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.module_api import NOT_SPAM
 from synapse.replication.http.federation import (
@@ -79,6 +80,24 @@ if TYPE_CHECKING:
 
 logger = logging.getLogger(__name__)
 
+# Added to debug performance and track progress on optimizations
+backfill_processing_before_timer = Histogram(
+    "synapse_federation_backfill_processing_before_time_seconds",
+    "sec",
+    [],
+    buckets=(
+        1.0,
+        5.0,
+        10.0,
+        20.0,
+        30.0,
+        40.0,
+        60.0,
+        80.0,
+        "+Inf",
+    ),
+)
+
 
 def get_domains_from_state(state: StateMap[EventBase]) -> List[Tuple[str, int]]:
     """Get joined domains from state
@@ -138,6 +157,7 @@ class FederationHandler:
     def __init__(self, hs: "HomeServer"):
         self.hs = hs
 
+        self.clock = hs.get_clock()
         self.store = hs.get_datastores().main
         self._storage_controllers = hs.get_storage_controllers()
         self._state_storage_controller = self._storage_controllers.state
@@ -197,12 +217,39 @@ class FederationHandler:
                 return. This is used as part of the heuristic to decide if we
                 should back paginate.
         """
+        # Starting the processing time here so we can include the room backfill
+        # linearizer lock queue in the timing
+        processing_start_time = self.clock.time_msec()
+
         async with self._room_backfill.queue(room_id):
-            return await self._maybe_backfill_inner(room_id, current_depth, limit)
+            return await self._maybe_backfill_inner(
+                room_id,
+                current_depth,
+                limit,
+                processing_start_time=processing_start_time,
+            )
 
     async def _maybe_backfill_inner(
-        self, room_id: str, current_depth: int, limit: int
+        self,
+        room_id: str,
+        current_depth: int,
+        limit: int,
+        *,
+        processing_start_time: int,
     ) -> bool:
+        """
+        Checks whether the `current_depth` is at or approaching any backfill
+        points in the room and if so, will backfill. We only care about
+        checking backfill points that happened before the `current_depth`
+        (meaning less than or equal to the `current_depth`).
+
+        Args:
+            room_id: The room to backfill in.
+            current_depth: The depth to check at for any upcoming backfill points.
+            limit: The max number of events to request from the remote federated server.
+            processing_start_time: The time when `maybe_backfill` started
+                processing. Only used for timing.
+        """
         backwards_extremities = [
             _BackfillPoint(event_id, depth, _BackfillPointType.BACKWARDS_EXTREMITY)
             for event_id, depth in await self.store.get_oldest_event_ids_with_depth_in_room(
@@ -370,6 +417,14 @@ class FederationHandler:
         logger.debug(
             "_maybe_backfill_inner: extremities_to_request %s", extremities_to_request
         )
+        set_attribute(
+            SynapseTags.RESULT_PREFIX + "extremities_to_request",
+            str(extremities_to_request),
+        )
+        set_attribute(
+            SynapseTags.RESULT_PREFIX + "extremities_to_request.length",
+            str(len(extremities_to_request)),
+        )
 
         # Now we need to decide which hosts to hit first.
 
@@ -425,6 +480,11 @@ class FederationHandler:
 
             return False
 
+        processing_end_time = self.clock.time_msec()
+        backfill_processing_before_timer.observe(
+            (processing_start_time - processing_end_time) / 1000
+        )
+
         success = await try_backfill(likely_domains)
         if success:
             return True
@@ -1081,6 +1141,8 @@ class FederationHandler:
 
         return event
 
+    @trace
+    @tag_args
     async def get_state_ids_for_pdu(self, room_id: str, event_id: str) -> List[str]:
         """Returns the state at the event. i.e. not including said event."""
         event = await self.store.get_event(event_id, check_room_id=room_id)
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 740a04aad6..a3bd04b712 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -29,7 +29,7 @@ from typing import (
     Tuple,
 )
 
-from prometheus_client import Counter
+from prometheus_client import Counter, Histogram
 
 from synapse import event_auth
 from synapse.api.constants import (
@@ -59,7 +59,13 @@ from synapse.events import EventBase
 from synapse.events.snapshot import EventContext
 from synapse.federation.federation_client import InvalidResponseError
 from synapse.logging.context import nested_logging_context
-from synapse.logging.tracing import trace
+from synapse.logging.tracing import (
+    SynapseTags,
+    set_attribute,
+    start_active_span,
+    tag_args,
+    trace,
+)
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
 from synapse.replication.http.federation import (
@@ -92,6 +98,26 @@ soft_failed_event_counter = Counter(
     "Events received over federation that we marked as soft_failed",
 )
 
+# Added to debug performance and track progress on optimizations
+backfill_processing_after_timer = Histogram(
+    "synapse_federation_backfill_processing_after_time_seconds",
+    "sec",
+    [],
+    buckets=(
+        1.0,
+        5.0,
+        10.0,
+        20.0,
+        30.0,
+        40.0,
+        60.0,
+        80.0,
+        120.0,
+        180.0,
+        "+Inf",
+    ),
+)
+
 
 class FederationEventHandler:
     """Handles events that originated from federation.
@@ -410,6 +436,7 @@ class FederationEventHandler:
             prev_member_event,
         )
 
+    @trace
     async def process_remote_join(
         self,
         origin: str,
@@ -597,20 +624,21 @@ class FederationEventHandler:
         if not events:
             return
 
-        # if there are any events in the wrong room, the remote server is buggy and
-        # should not be trusted.
-        for ev in events:
-            if ev.room_id != room_id:
-                raise InvalidResponseError(
-                    f"Remote server {dest} returned event {ev.event_id} which is in "
-                    f"room {ev.room_id}, when we were backfilling in {room_id}"
-                )
+        with backfill_processing_after_timer.time():
+            # if there are any events in the wrong room, the remote server is buggy and
+            # should not be trusted.
+            for ev in events:
+                if ev.room_id != room_id:
+                    raise InvalidResponseError(
+                        f"Remote server {dest} returned event {ev.event_id} which is in "
+                        f"room {ev.room_id}, when we were backfilling in {room_id}"
+                    )
 
-        await self._process_pulled_events(
-            dest,
-            events,
-            backfilled=True,
-        )
+            await self._process_pulled_events(
+                dest,
+                events,
+                backfilled=True,
+            )
 
     @trace
     async def _get_missing_events_for_pdu(
@@ -715,7 +743,7 @@ class FederationEventHandler:
 
     @trace
     async def _process_pulled_events(
-        self, origin: str, events: Iterable[EventBase], backfilled: bool
+        self, origin: str, events: Collection[EventBase], backfilled: bool
     ) -> None:
         """Process a batch of events we have pulled from a remote server
 
@@ -730,6 +758,15 @@ class FederationEventHandler:
             backfilled: True if this is part of a historical batch of events (inhibits
                 notification to clients, and validation of device keys.)
         """
+        set_attribute(
+            SynapseTags.FUNC_ARG_PREFIX + "event_ids",
+            str([event.event_id for event in events]),
+        )
+        set_attribute(
+            SynapseTags.FUNC_ARG_PREFIX + "event_ids.length",
+            str(len(events)),
+        )
+        set_attribute(SynapseTags.FUNC_ARG_PREFIX + "backfilled", str(backfilled))
         logger.debug(
             "processing pulled backfilled=%s events=%s",
             backfilled,
@@ -753,6 +790,7 @@ class FederationEventHandler:
                 await self._process_pulled_event(origin, ev, backfilled=backfilled)
 
     @trace
+    @tag_args
     async def _process_pulled_event(
         self, origin: str, event: EventBase, backfilled: bool
     ) -> None:
@@ -854,6 +892,7 @@ class FederationEventHandler:
             else:
                 raise
 
+    @trace
     async def _compute_event_context_with_maybe_missing_prevs(
         self, dest: str, event: EventBase
     ) -> EventContext:
@@ -970,6 +1009,8 @@ class FederationEventHandler:
             event, state_ids_before_event=state_map, partial_state=partial_state
         )
 
+    @trace
+    @tag_args
     async def _get_state_ids_after_missing_prev_event(
         self,
         destination: str,
@@ -1009,10 +1050,10 @@ class FederationEventHandler:
         logger.debug("Fetching %i events from cache/store", len(desired_events))
         have_events = await self._store.have_seen_events(room_id, desired_events)
 
-        missing_desired_events = desired_events - have_events
+        missing_desired_event_ids = desired_events - have_events
         logger.debug(
             "We are missing %i events (got %i)",
-            len(missing_desired_events),
+            len(missing_desired_event_ids),
             len(have_events),
         )
 
@@ -1024,13 +1065,30 @@ class FederationEventHandler:
         #   already have a bunch of the state events. It would be nice if the
         #   federation api gave us a way of finding out which we actually need.
 
-        missing_auth_events = set(auth_event_ids) - have_events
-        missing_auth_events.difference_update(
-            await self._store.have_seen_events(room_id, missing_auth_events)
+        missing_auth_event_ids = set(auth_event_ids) - have_events
+        missing_auth_event_ids.difference_update(
+            await self._store.have_seen_events(room_id, missing_auth_event_ids)
         )
-        logger.debug("We are also missing %i auth events", len(missing_auth_events))
+        logger.debug("We are also missing %i auth events", len(missing_auth_event_ids))
 
-        missing_events = missing_desired_events | missing_auth_events
+        missing_event_ids = missing_desired_event_ids | missing_auth_event_ids
+
+        set_attribute(
+            SynapseTags.RESULT_PREFIX + "missing_auth_event_ids",
+            str(missing_auth_event_ids),
+        )
+        set_attribute(
+            SynapseTags.RESULT_PREFIX + "missing_auth_event_ids.length",
+            str(len(missing_auth_event_ids)),
+        )
+        set_attribute(
+            SynapseTags.RESULT_PREFIX + "missing_desired_event_ids",
+            str(missing_desired_event_ids),
+        )
+        set_attribute(
+            SynapseTags.RESULT_PREFIX + "missing_desired_event_ids.length",
+            str(len(missing_desired_event_ids)),
+        )
 
         # Making an individual request for each of 1000s of events has a lot of
         # overhead. On the other hand, we don't really want to fetch all of the events
@@ -1041,13 +1099,13 @@ class FederationEventHandler:
         #
         # TODO: might it be better to have an API which lets us do an aggregate event
         #   request
-        if (len(missing_events) * 10) >= len(auth_event_ids) + len(state_event_ids):
+        if (len(missing_event_ids) * 10) >= len(auth_event_ids) + len(state_event_ids):
             logger.debug("Requesting complete state from remote")
             await self._get_state_and_persist(destination, room_id, event_id)
         else:
-            logger.debug("Fetching %i events from remote", len(missing_events))
+            logger.debug("Fetching %i events from remote", len(missing_event_ids))
             await self._get_events_and_persist(
-                destination=destination, room_id=room_id, event_ids=missing_events
+                destination=destination, room_id=room_id, event_ids=missing_event_ids
             )
 
         # We now need to fill out the state map, which involves fetching the
@@ -1104,6 +1162,14 @@ class FederationEventHandler:
                 event_id,
                 failed_to_fetch,
             )
+            set_attribute(
+                SynapseTags.RESULT_PREFIX + "failed_to_fetch",
+                str(failed_to_fetch),
+            )
+            set_attribute(
+                SynapseTags.RESULT_PREFIX + "failed_to_fetch.length",
+                str(len(failed_to_fetch)),
+            )
 
         if remote_event.is_state() and remote_event.rejected_reason is None:
             state_map[
@@ -1112,6 +1178,8 @@ class FederationEventHandler:
 
         return state_map
 
+    @trace
+    @tag_args
     async def _get_state_and_persist(
         self, destination: str, room_id: str, event_id: str
     ) -> None:
@@ -1133,6 +1201,7 @@ class FederationEventHandler:
                 destination=destination, room_id=room_id, event_ids=(event_id,)
             )
 
+    @trace
     async def _process_received_pdu(
         self,
         origin: str,
@@ -1283,6 +1352,7 @@ class FederationEventHandler:
         except Exception:
             logger.exception("Failed to resync device for %s", sender)
 
+    @trace
     async def _handle_marker_event(self, origin: str, marker_event: EventBase) -> None:
         """Handles backfilling the insertion event when we receive a marker
         event that points to one.
@@ -1414,6 +1484,8 @@ class FederationEventHandler:
 
         return event_from_response
 
+    @trace
+    @tag_args
     async def _get_events_and_persist(
         self, destination: str, room_id: str, event_ids: Collection[str]
     ) -> None:
@@ -1459,6 +1531,7 @@ class FederationEventHandler:
         logger.info("Fetched %i events of %i requested", len(events), len(event_ids))
         await self._auth_and_persist_outliers(room_id, events)
 
+    @trace
     async def _auth_and_persist_outliers(
         self, room_id: str, events: Iterable[EventBase]
     ) -> None:
@@ -1477,6 +1550,16 @@ class FederationEventHandler:
         """
         event_map = {event.event_id: event for event in events}
 
+        event_ids = event_map.keys()
+        set_attribute(
+            SynapseTags.FUNC_ARG_PREFIX + "event_ids",
+            str(event_ids),
+        )
+        set_attribute(
+            SynapseTags.FUNC_ARG_PREFIX + "event_ids.length",
+            str(len(event_ids)),
+        )
+
         # filter out any events we have already seen. This might happen because
         # the events were eagerly pushed to us (eg, during a room join), or because
         # another thread has raced against us since we decided to request the event.
@@ -1593,6 +1676,7 @@ class FederationEventHandler:
             backfilled=True,
         )
 
+    @trace
     async def _check_event_auth(
         self, origin: Optional[str], event: EventBase, context: EventContext
     ) -> None:
@@ -1631,6 +1715,14 @@ class FederationEventHandler:
         claimed_auth_events = await self._load_or_fetch_auth_events_for_event(
             origin, event
         )
+        set_attribute(
+            SynapseTags.RESULT_PREFIX + "claimed_auth_events",
+            str([ev.event_id for ev in claimed_auth_events]),
+        )
+        set_attribute(
+            SynapseTags.RESULT_PREFIX + "claimed_auth_events.length",
+            str(len(claimed_auth_events)),
+        )
 
         # ... and check that the event passes auth at those auth events.
         # https://spec.matrix.org/v1.3/server-server-api/#checks-performed-on-receipt-of-a-pdu:
@@ -1728,6 +1820,7 @@ class FederationEventHandler:
             )
             context.rejected = RejectedReason.AUTH_ERROR
 
+    @trace
     async def _maybe_kick_guest_users(self, event: EventBase) -> None:
         if event.type != EventTypes.GuestAccess:
             return
@@ -1935,6 +2028,8 @@ class FederationEventHandler:
         # instead we raise an AuthError, which will make the caller ignore it.
         raise AuthError(code=HTTPStatus.FORBIDDEN, msg="Auth events could not be found")
 
+    @trace
+    @tag_args
     async def _get_remote_auth_chain_for_event(
         self, destination: str, room_id: str, event_id: str
     ) -> None:
@@ -1963,6 +2058,7 @@ class FederationEventHandler:
 
         await self._auth_and_persist_outliers(room_id, remote_auth_events)
 
+    @trace
     async def _run_push_actions_and_persist_event(
         self, event: EventBase, context: EventContext, backfilled: bool = False
     ) -> None:
@@ -2071,8 +2167,17 @@ class FederationEventHandler:
                     self._message_handler.maybe_schedule_expiry(event)
 
             if not backfilled:  # Never notify for backfilled events
-                for event in events:
-                    await self._notify_persisted_event(event, max_stream_token)
+                with start_active_span("notify_persisted_events"):
+                    set_attribute(
+                        SynapseTags.RESULT_PREFIX + "event_ids",
+                        str([ev.event_id for ev in events]),
+                    )
+                    set_attribute(
+                        SynapseTags.RESULT_PREFIX + "event_ids.length",
+                        str(len(events)),
+                    )
+                    for event in events:
+                        await self._notify_persisted_event(event, max_stream_token)
 
             return max_stream_token.stream
 
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 8d9754f306..a7af04f6f3 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -331,7 +331,11 @@ class MessageHandler:
                     msg="Getting joined members while not being a current member of the room is forbidden.",
                 )
 
-        users_with_profile = await self.store.get_users_in_room_with_profiles(room_id)
+        users_with_profile = (
+            await self._state_storage_controller.get_users_in_room_with_profiles(
+                room_id
+            )
+        )
 
         # If this is an AS, double check that they are allowed to see the members.
         # This can either be because the AS user is in the room or because there
diff --git a/synapse/handlers/room_summary.py b/synapse/handlers/room_summary.py
index ebd445adca..732b0310bc 100644
--- a/synapse/handlers/room_summary.py
+++ b/synapse/handlers/room_summary.py
@@ -453,6 +453,7 @@ class RoomSummaryHandler:
                 "type": e.type,
                 "state_key": e.state_key,
                 "content": e.content,
+                "room_id": e.room_id,
                 "sender": e.sender,
                 "origin_server_ts": e.origin_server_ts,
             }
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index cddfb4cec7..2991967226 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -13,7 +13,19 @@
 # limitations under the License.
 import itertools
 import logging
-from typing import TYPE_CHECKING, Any, Dict, FrozenSet, List, Optional, Set, Tuple
+from typing import (
+    TYPE_CHECKING,
+    Any,
+    Collection,
+    Dict,
+    FrozenSet,
+    List,
+    Mapping,
+    Optional,
+    Sequence,
+    Set,
+    Tuple,
+)
 
 import attr
 from prometheus_client import Counter
@@ -94,7 +106,7 @@ class SyncConfig:
 @attr.s(slots=True, frozen=True, auto_attribs=True)
 class TimelineBatch:
     prev_batch: StreamToken
-    events: List[EventBase]
+    events: Sequence[EventBase]
     limited: bool
     # A mapping of event ID to the bundled aggregations for the above events.
     # This is only calculated if limited is true.
@@ -512,10 +524,17 @@ class SyncHandler:
                 # ensure that we always include current state in the timeline
                 current_state_ids: FrozenSet[str] = frozenset()
                 if any(e.is_state() for e in recents):
+                    # FIXME(faster_joins): We use the partial state here as
+                    # we don't want to block `/sync` on finishing a lazy join.
+                    # Which should be fine once
+                    # https://github.com/matrix-org/synapse/issues/12989 is resolved,
+                    # since we shouldn't reach here anymore?
+                    # Note that we use the current state as a whitelist for filtering
+                    # `recents`, so partial state is only a problem when a membership
+                    # event turns up in `recents` but has not made it into the current
+                    # state.
                     current_state_ids_map = (
-                        await self._state_storage_controller.get_current_state_ids(
-                            room_id
-                        )
+                        await self.store.get_partial_current_state_ids(room_id)
                     )
                     current_state_ids = frozenset(current_state_ids_map.values())
 
@@ -584,7 +603,13 @@ class SyncHandler:
                 if any(e.is_state() for e in loaded_recents):
                     # FIXME(faster_joins): We use the partial state here as
                     # we don't want to block `/sync` on finishing a lazy join.
-                    # Is this the correct way of doing it?
+                    # Which should be fine once
+                    # https://github.com/matrix-org/synapse/issues/12989 is resolved,
+                    # since we shouldn't reach here anymore?
+                    # Note that we use the current state as a whitelist for filtering
+                    # `loaded_recents`, so partial state is only a problem when a
+                    # membership event turns up in `loaded_recents` but has not made it
+                    # into the current state.
                     current_state_ids_map = (
                         await self.store.get_partial_current_state_ids(room_id)
                     )
@@ -632,7 +657,10 @@ class SyncHandler:
         )
 
     async def get_state_after_event(
-        self, event_id: str, state_filter: Optional[StateFilter] = None
+        self,
+        event_id: str,
+        state_filter: Optional[StateFilter] = None,
+        await_full_state: bool = True,
     ) -> StateMap[str]:
         """
         Get the room state after the given event
@@ -640,9 +668,14 @@ class SyncHandler:
         Args:
             event_id: event of interest
             state_filter: The state filter used to fetch state from the database.
+            await_full_state: if `True`, will block if we do not yet have complete state
+                at the event and `state_filter` is not satisfied by partial state.
+                Defaults to `True`.
         """
         state_ids = await self._state_storage_controller.get_state_ids_for_event(
-            event_id, state_filter=state_filter or StateFilter.all()
+            event_id,
+            state_filter=state_filter or StateFilter.all(),
+            await_full_state=await_full_state,
         )
 
         # using get_metadata_for_events here (instead of get_event) sidesteps an issue
@@ -665,6 +698,7 @@ class SyncHandler:
         room_id: str,
         stream_position: StreamToken,
         state_filter: Optional[StateFilter] = None,
+        await_full_state: bool = True,
     ) -> StateMap[str]:
         """Get the room state at a particular stream position
 
@@ -672,6 +706,9 @@ class SyncHandler:
             room_id: room for which to get state
             stream_position: point at which to get state
             state_filter: The state filter used to fetch state from the database.
+            await_full_state: if `True`, will block if we do not yet have complete state
+                at the last event in the room before `stream_position` and
+                `state_filter` is not satisfied by partial state. Defaults to `True`.
         """
         # FIXME: This gets the state at the latest event before the stream ordering,
         # which might not be the same as the "current state" of the room at the time
@@ -683,7 +720,9 @@ class SyncHandler:
 
         if last_event_id:
             state = await self.get_state_after_event(
-                last_event_id, state_filter=state_filter or StateFilter.all()
+                last_event_id,
+                state_filter=state_filter or StateFilter.all(),
+                await_full_state=await_full_state,
             )
 
         else:
@@ -857,16 +896,26 @@ class SyncHandler:
         now_token: StreamToken,
         full_state: bool,
     ) -> MutableStateMap[EventBase]:
-        """Works out the difference in state between the start of the timeline
-        and the previous sync.
+        """Works out the difference in state between the end of the previous sync and
+        the start of the timeline.
 
         Args:
             room_id:
             batch: The timeline batch for the room that will be sent to the user.
             sync_config:
-            since_token: Token of the end of the previous batch. May be None.
+            since_token: Token of the end of the previous batch. May be `None`.
             now_token: Token of the end of the current batch.
             full_state: Whether to force returning the full state.
+                `lazy_load_members` still applies when `full_state` is `True`.
+
+        Returns:
+            The state to return in the sync response for the room.
+
+            Clients will overlay this onto the state at the end of the previous sync to
+            arrive at the state at the start of the timeline.
+
+            Clients will then overlay state events in the timeline to arrive at the
+            state at the end of the timeline, in preparation for the next sync.
         """
         # TODO(mjark) Check if the state events were received by the server
         # after the previous sync, since we need to include those state
@@ -874,8 +923,17 @@ class SyncHandler:
         # TODO(mjark) Check for new redactions in the state events.
 
         with Measure(self.clock, "compute_state_delta"):
+            # The memberships needed for events in the timeline.
+            # Only calculated when `lazy_load_members` is on.
+            members_to_fetch: Optional[Set[str]] = None
+
+            # A dictionary mapping user IDs to the first event in the timeline sent by
+            # them. Only calculated when `lazy_load_members` is on.
+            first_event_by_sender_map: Optional[Dict[str, EventBase]] = None
 
-            members_to_fetch = None
+            # The contribution to the room state from state events in the timeline.
+            # Only contains the last event for any given state key.
+            timeline_state: StateMap[str]
 
             lazy_load_members = sync_config.filter_collection.lazy_load_members()
             include_redundant_members = (
@@ -886,10 +944,23 @@ class SyncHandler:
                 # We only request state for the members needed to display the
                 # timeline:
 
-                members_to_fetch = {
-                    event.sender  # FIXME: we also care about invite targets etc.
-                    for event in batch.events
-                }
+                timeline_state = {}
+
+                members_to_fetch = set()
+                first_event_by_sender_map = {}
+                for event in batch.events:
+                    # Build the map from user IDs to the first timeline event they sent.
+                    if event.sender not in first_event_by_sender_map:
+                        first_event_by_sender_map[event.sender] = event
+
+                    # We need the event's sender, unless their membership was in a
+                    # previous timeline event.
+                    if (EventTypes.Member, event.sender) not in timeline_state:
+                        members_to_fetch.add(event.sender)
+                    # FIXME: we also care about invite targets etc.
+
+                    if event.is_state():
+                        timeline_state[(event.type, event.state_key)] = event.event_id
 
                 if full_state:
                     # always make sure we LL ourselves so we know we're in the room
@@ -899,55 +970,80 @@ class SyncHandler:
                     members_to_fetch.add(sync_config.user.to_string())
 
                 state_filter = StateFilter.from_lazy_load_member_list(members_to_fetch)
+
+                # We are happy to use partial state to compute the `/sync` response.
+                # Since partial state may not include the lazy-loaded memberships we
+                # require, we fix up the state response afterwards with memberships from
+                # auth events.
+                await_full_state = False
             else:
+                timeline_state = {
+                    (event.type, event.state_key): event.event_id
+                    for event in batch.events
+                    if event.is_state()
+                }
+
                 state_filter = StateFilter.all()
+                await_full_state = True
 
-            timeline_state = {
-                (event.type, event.state_key): event.event_id
-                for event in batch.events
-                if event.is_state()
-            }
+            # Now calculate the state to return in the sync response for the room.
+            # This is more or less the change in state between the end of the previous
+            # sync's timeline and the start of the current sync's timeline.
+            # See the docstring above for details.
+            state_ids: StateMap[str]
 
             if full_state:
                 if batch:
-                    current_state_ids = (
+                    state_at_timeline_end = (
                         await self._state_storage_controller.get_state_ids_for_event(
-                            batch.events[-1].event_id, state_filter=state_filter
+                            batch.events[-1].event_id,
+                            state_filter=state_filter,
+                            await_full_state=await_full_state,
                         )
                     )
 
-                    state_ids = (
+                    state_at_timeline_start = (
                         await self._state_storage_controller.get_state_ids_for_event(
-                            batch.events[0].event_id, state_filter=state_filter
+                            batch.events[0].event_id,
+                            state_filter=state_filter,
+                            await_full_state=await_full_state,
                         )
                     )
 
                 else:
-                    current_state_ids = await self.get_state_at(
-                        room_id, stream_position=now_token, state_filter=state_filter
+                    state_at_timeline_end = await self.get_state_at(
+                        room_id,
+                        stream_position=now_token,
+                        state_filter=state_filter,
+                        await_full_state=await_full_state,
                     )
 
-                    state_ids = current_state_ids
+                    state_at_timeline_start = state_at_timeline_end
 
                 state_ids = _calculate_state(
                     timeline_contains=timeline_state,
-                    timeline_start=state_ids,
-                    previous={},
-                    current=current_state_ids,
+                    timeline_start=state_at_timeline_start,
+                    timeline_end=state_at_timeline_end,
+                    previous_timeline_end={},
                     lazy_load_members=lazy_load_members,
                 )
             elif batch.limited:
                 if batch:
                     state_at_timeline_start = (
                         await self._state_storage_controller.get_state_ids_for_event(
-                            batch.events[0].event_id, state_filter=state_filter
+                            batch.events[0].event_id,
+                            state_filter=state_filter,
+                            await_full_state=await_full_state,
                         )
                     )
                 else:
                     # We can get here if the user has ignored the senders of all
                     # the recent events.
                     state_at_timeline_start = await self.get_state_at(
-                        room_id, stream_position=now_token, state_filter=state_filter
+                        room_id,
+                        stream_position=now_token,
+                        state_filter=state_filter,
+                        await_full_state=await_full_state,
                     )
 
                 # for now, we disable LL for gappy syncs - see
@@ -969,28 +1065,35 @@ class SyncHandler:
                 # is indeed the case.
                 assert since_token is not None
                 state_at_previous_sync = await self.get_state_at(
-                    room_id, stream_position=since_token, state_filter=state_filter
+                    room_id,
+                    stream_position=since_token,
+                    state_filter=state_filter,
+                    await_full_state=await_full_state,
                 )
 
                 if batch:
-                    current_state_ids = (
+                    state_at_timeline_end = (
                         await self._state_storage_controller.get_state_ids_for_event(
-                            batch.events[-1].event_id, state_filter=state_filter
+                            batch.events[-1].event_id,
+                            state_filter=state_filter,
+                            await_full_state=await_full_state,
                         )
                     )
                 else:
-                    # Its not clear how we get here, but empirically we do
-                    # (#5407). Logging has been added elsewhere to try and
-                    # figure out where this state comes from.
-                    current_state_ids = await self.get_state_at(
-                        room_id, stream_position=now_token, state_filter=state_filter
+                    # We can get here if the user has ignored the senders of all
+                    # the recent events.
+                    state_at_timeline_end = await self.get_state_at(
+                        room_id,
+                        stream_position=now_token,
+                        state_filter=state_filter,
+                        await_full_state=await_full_state,
                     )
 
                 state_ids = _calculate_state(
                     timeline_contains=timeline_state,
                     timeline_start=state_at_timeline_start,
-                    previous=state_at_previous_sync,
-                    current=current_state_ids,
+                    timeline_end=state_at_timeline_end,
+                    previous_timeline_end=state_at_previous_sync,
                     # we have to include LL members in case LL initial sync missed them
                     lazy_load_members=lazy_load_members,
                 )
@@ -1013,8 +1116,30 @@ class SyncHandler:
                                 (EventTypes.Member, member)
                                 for member in members_to_fetch
                             ),
+                            await_full_state=False,
                         )
 
+            # If we only have partial state for the room, `state_ids` may be missing the
+            # memberships we wanted. We attempt to find some by digging through the auth
+            # events of timeline events.
+            if lazy_load_members and await self.store.is_partial_state_room(room_id):
+                assert members_to_fetch is not None
+                assert first_event_by_sender_map is not None
+
+                additional_state_ids = (
+                    await self._find_missing_partial_state_memberships(
+                        room_id, members_to_fetch, first_event_by_sender_map, state_ids
+                    )
+                )
+                state_ids = {**state_ids, **additional_state_ids}
+
+            # At this point, if `lazy_load_members` is enabled, `state_ids` includes
+            # the memberships of all event senders in the timeline. This is because we
+            # may not have sent the memberships in a previous sync.
+
+            # When `include_redundant_members` is on, we send all the lazy-loaded
+            # memberships of event senders. Otherwise we make an effort to limit the set
+            # of memberships we send to those that we have not already sent to this client.
             if lazy_load_members and not include_redundant_members:
                 cache_key = (sync_config.user.to_string(), sync_config.device_id)
                 cache = self.get_lazy_loaded_members_cache(cache_key)
@@ -1056,6 +1181,99 @@ class SyncHandler:
             if e.type != EventTypes.Aliases  # until MSC2261 or alternative solution
         }
 
+    async def _find_missing_partial_state_memberships(
+        self,
+        room_id: str,
+        members_to_fetch: Collection[str],
+        events_with_membership_auth: Mapping[str, EventBase],
+        found_state_ids: StateMap[str],
+    ) -> StateMap[str]:
+        """Finds missing memberships from a set of auth events and returns them as a
+        state map.
+
+        Args:
+            room_id: The partial state room to find the remaining memberships for.
+            members_to_fetch: The memberships to find.
+            events_with_membership_auth: A mapping from user IDs to events whose auth
+                events are known to contain their membership.
+            found_state_ids: A dict from (type, state_key) -> state_event_id, containing
+                memberships that have been previously found. Entries in
+                `members_to_fetch` that have a membership in `found_state_ids` are
+                ignored.
+
+        Returns:
+            A dict from ("m.room.member", state_key) -> state_event_id, containing the
+            memberships missing from `found_state_ids`.
+
+        Raises:
+            KeyError: if `events_with_membership_auth` does not have an entry for a
+                missing membership. Memberships in `found_state_ids` do not need an
+                entry in `events_with_membership_auth`.
+        """
+        additional_state_ids: MutableStateMap[str] = {}
+
+        # Tracks the missing members for logging purposes.
+        missing_members = set()
+
+        # Identify memberships missing from `found_state_ids` and pick out the auth
+        # events in which to look for them.
+        auth_event_ids: Set[str] = set()
+        for member in members_to_fetch:
+            if (EventTypes.Member, member) in found_state_ids:
+                continue
+
+            missing_members.add(member)
+            event_with_membership_auth = events_with_membership_auth[member]
+            auth_event_ids.update(event_with_membership_auth.auth_event_ids())
+
+        auth_events = await self.store.get_events(auth_event_ids)
+
+        # Run through the missing memberships once more, picking out the memberships
+        # from the pile of auth events we have just fetched.
+        for member in members_to_fetch:
+            if (EventTypes.Member, member) in found_state_ids:
+                continue
+
+            event_with_membership_auth = events_with_membership_auth[member]
+
+            # Dig through the auth events to find the desired membership.
+            for auth_event_id in event_with_membership_auth.auth_event_ids():
+                # We only store events once we have all their auth events,
+                # so the auth event must be in the pile we have just
+                # fetched.
+                auth_event = auth_events[auth_event_id]
+
+                if (
+                    auth_event.type == EventTypes.Member
+                    and auth_event.state_key == member
+                ):
+                    missing_members.remove(member)
+                    additional_state_ids[
+                        (EventTypes.Member, member)
+                    ] = auth_event.event_id
+                    break
+
+        if missing_members:
+            # There really shouldn't be any missing memberships now. Either:
+            #  * we couldn't find an auth event, which shouldn't happen because we do
+            #    not persist events with persisting their auth events first, or
+            #  * the set of auth events did not contain a membership we wanted, which
+            #    means our caller didn't compute the events in `members_to_fetch`
+            #    correctly, or we somehow accepted an event whose auth events were
+            #    dodgy.
+            logger.error(
+                "Failed to find memberships for %s in partial state room "
+                "%s in the auth events of %s.",
+                missing_members,
+                room_id,
+                [
+                    events_with_membership_auth[member].event_id
+                    for member in missing_members
+                ],
+            )
+
+        return additional_state_ids
+
     async def unread_notifs_for_room_id(
         self, room_id: str, sync_config: SyncConfig
     ) -> NotifCounts:
@@ -1700,7 +1918,11 @@ class SyncHandler:
                 continue
 
             if room_id in sync_result_builder.joined_room_ids or has_join:
-                old_state_ids = await self.get_state_at(room_id, since_token)
+                old_state_ids = await self.get_state_at(
+                    room_id,
+                    since_token,
+                    state_filter=StateFilter.from_types([(EventTypes.Member, user_id)]),
+                )
                 old_mem_ev_id = old_state_ids.get((EventTypes.Member, user_id), None)
                 old_mem_ev = None
                 if old_mem_ev_id:
@@ -1726,7 +1948,13 @@ class SyncHandler:
                     newly_left_rooms.append(room_id)
                 else:
                     if not old_state_ids:
-                        old_state_ids = await self.get_state_at(room_id, since_token)
+                        old_state_ids = await self.get_state_at(
+                            room_id,
+                            since_token,
+                            state_filter=StateFilter.from_types(
+                                [(EventTypes.Member, user_id)]
+                            ),
+                        )
                         old_mem_ev_id = old_state_ids.get(
                             (EventTypes.Member, user_id), None
                         )
@@ -2221,8 +2449,8 @@ def _action_has_highlight(actions: List[JsonDict]) -> bool:
 def _calculate_state(
     timeline_contains: StateMap[str],
     timeline_start: StateMap[str],
-    previous: StateMap[str],
-    current: StateMap[str],
+    timeline_end: StateMap[str],
+    previous_timeline_end: StateMap[str],
     lazy_load_members: bool,
 ) -> StateMap[str]:
     """Works out what state to include in a sync response.
@@ -2230,45 +2458,50 @@ def _calculate_state(
     Args:
         timeline_contains: state in the timeline
         timeline_start: state at the start of the timeline
-        previous: state at the end of the previous sync (or empty dict
+        timeline_end: state at the end of the timeline
+        previous_timeline_end: state at the end of the previous sync (or empty dict
             if this is an initial sync)
-        current: state at the end of the timeline
         lazy_load_members: whether to return members from timeline_start
             or not.  assumes that timeline_start has already been filtered to
             include only the members the client needs to know about.
     """
-    event_id_to_key = {
-        e: key
-        for key, e in itertools.chain(
+    event_id_to_state_key = {
+        event_id: state_key
+        for state_key, event_id in itertools.chain(
             timeline_contains.items(),
-            previous.items(),
             timeline_start.items(),
-            current.items(),
+            timeline_end.items(),
+            previous_timeline_end.items(),
         )
     }
 
-    c_ids = set(current.values())
-    ts_ids = set(timeline_start.values())
-    p_ids = set(previous.values())
-    tc_ids = set(timeline_contains.values())
+    timeline_end_ids = set(timeline_end.values())
+    timeline_start_ids = set(timeline_start.values())
+    previous_timeline_end_ids = set(previous_timeline_end.values())
+    timeline_contains_ids = set(timeline_contains.values())
 
     # If we are lazyloading room members, we explicitly add the membership events
     # for the senders in the timeline into the state block returned by /sync,
     # as we may not have sent them to the client before.  We find these membership
     # events by filtering them out of timeline_start, which has already been filtered
     # to only include membership events for the senders in the timeline.
-    # In practice, we can do this by removing them from the p_ids list,
-    # which is the list of relevant state we know we have already sent to the client.
+    # In practice, we can do this by removing them from the previous_timeline_end_ids
+    # list, which is the list of relevant state we know we have already sent to the
+    # client.
     # see https://github.com/matrix-org/synapse/pull/2970/files/efcdacad7d1b7f52f879179701c7e0d9b763511f#r204732809
 
     if lazy_load_members:
-        p_ids.difference_update(
+        previous_timeline_end_ids.difference_update(
             e for t, e in timeline_start.items() if t[0] == EventTypes.Member
         )
 
-    state_ids = ((c_ids | ts_ids) - p_ids) - tc_ids
+    state_ids = (
+        (timeline_end_ids | timeline_start_ids)
+        - previous_timeline_end_ids
+        - timeline_contains_ids
+    )
 
-    return {event_id_to_key[e]: e for e in state_ids}
+    return {event_id_to_state_key[e]: e for e in state_ids}
 
 
 @attr.s(slots=True, auto_attribs=True)
diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py
index 4ff840ca0e..26aaabfb34 100644
--- a/synapse/http/servlet.py
+++ b/synapse/http/servlet.py
@@ -23,9 +23,12 @@ from typing import (
     Optional,
     Sequence,
     Tuple,
+    Type,
+    TypeVar,
     overload,
 )
 
+from pydantic import BaseModel, ValidationError
 from typing_extensions import Literal
 
 from twisted.web.server import Request
@@ -694,6 +697,28 @@ def parse_json_object_from_request(
     return content
 
 
+Model = TypeVar("Model", bound=BaseModel)
+
+
+def parse_and_validate_json_object_from_request(
+    request: Request, model_type: Type[Model]
+) -> Model:
+    """Parse a JSON object from the body of a twisted HTTP request, then deserialise and
+    validate using the given pydantic model.
+
+    Raises:
+        SynapseError if the request body couldn't be decoded as JSON or
+            if it wasn't a JSON object.
+    """
+    content = parse_json_object_from_request(request, allow_empty_body=False)
+    try:
+        instance = model_type.parse_obj(content)
+    except ValidationError as e:
+        raise SynapseError(HTTPStatus.BAD_REQUEST, str(e), errcode=Codes.BAD_JSON)
+
+    return instance
+
+
 def assert_params_in_dict(body: JsonDict, required: Iterable[str]) -> None:
     absent = []
     for k in required:
diff --git a/synapse/logging/tracing.py b/synapse/logging/tracing.py
index 1b509ffdcd..a250bbb204 100644
--- a/synapse/logging/tracing.py
+++ b/synapse/logging/tracing.py
@@ -280,6 +280,19 @@ class SynapseTags:
     # The name of the external cache
     CACHE_NAME = "cache.name"
 
+    # Used to tag function arguments
+    #
+    # Tag a named arg. The name of the argument should be appended to this prefix.
+    FUNC_ARG_PREFIX = "ARG."
+    # Tag extra variadic number of positional arguments (`def foo(first, second, *extras)`)
+    FUNC_ARGS = "args"
+    # Tag keyword args
+    FUNC_KWARGS = "kwargs"
+
+    # Some intermediate result that's interesting to the function. The label for
+    # the result should be appended to this prefix.
+    RESULT_PREFIX = "RESULT."
+
 
 class SynapseBaggage:
     FORCE_TRACING = "synapse-force-tracing"
@@ -796,7 +809,6 @@ def _custom_sync_async_decorator(
     """
     Decorates a function that is sync or async (coroutines), or that returns a Twisted
     `Deferred`. The custom business logic of the decorator goes in `wrapping_logic`.
-
     Example usage:
     ```py
     # Decorator to time the function and log it out
@@ -812,7 +824,6 @@ def _custom_sync_async_decorator(
                 logger.info("%s took %s seconds", func.__name__, duration)
         return _custom_sync_async_decorator(func, _wrapping_logic)
     ```
-
     Args:
         func: The function to be decorated
         wrapping_logic: The business logic of your custom decorator.
@@ -928,9 +939,9 @@ def tag_args(func: Callable[P, R]) -> Callable[P, R]:
         #   first argument only if it's named `self` or `cls`. This isn't fool-proof
         #   but handles the idiomatic cases.
         for i, arg in enumerate(args[1:], start=1):  # type: ignore[index]
-            set_attribute("ARG_" + argspec.args[i], str(arg))
-        set_attribute("args", str(args[len(argspec.args) :]))  # type: ignore[index]
-        set_attribute("kwargs", str(kwargs))
+            set_attribute(SynapseTags.FUNC_ARG_PREFIX + argspec.args[i], str(arg))
+        set_attribute(SynapseTags.FUNC_ARGS, str(args[len(argspec.args) :]))  # type: ignore[index]
+        set_attribute(SynapseTags.FUNC_KWARGS, str(kwargs))
         yield
 
     return _custom_sync_async_decorator(func, _wrapping_logic)
diff --git a/synapse/push/baserules.py b/synapse/push/baserules.py
index 6c0cc5a6ce..440205e80c 100644
--- a/synapse/push/baserules.py
+++ b/synapse/push/baserules.py
@@ -14,128 +14,235 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import copy
-from typing import Any, Dict, List
-
-from synapse.push.rulekinds import PRIORITY_CLASS_INVERSE_MAP, PRIORITY_CLASS_MAP
+"""
+Push rules is the system used to determine which events trigger a push (and a
+bump in notification counts).
+
+This consists of a list of "push rules" for each user, where a push rule is a
+pair of "conditions" and "actions". When a user receives an event Synapse
+iterates over the list of push rules until it finds one where all the conditions
+match the event, at which point "actions" describe the outcome (e.g. notify,
+highlight, etc).
+
+Push rules are split up into 5 different "kinds" (aka "priority classes"), which
+are run in order:
+    1. Override — highest priority rules, e.g. always ignore notices
+    2. Content — content specific rules, e.g. @ notifications
+    3. Room — per room rules, e.g. enable/disable notifications for all messages
+       in a room
+    4. Sender — per sender rules, e.g. never notify for messages from a given
+       user
+    5. Underride — the lowest priority "default" rules, e.g. notify for every
+       message.
+
+The set of "base rules" are the list of rules that every user has by default. A
+user can modify their copy of the push rules in one of three ways:
+
+    1. Adding a new push rule of a certain kind
+    2. Changing the actions of a base rule
+    3. Enabling/disabling a base rule.
+
+The base rules are split into whether they come before or after a particular
+kind, so the order of push rule evaluation would be: base rules for before
+"override" kind, user defined "override" rules, base rules after "override"
+kind, etc, etc.
+"""
+
+import itertools
+import logging
+from typing import Dict, Iterator, List, Mapping, Sequence, Tuple, Union
+
+import attr
+
+from synapse.config.experimental import ExperimentalConfig
+from synapse.push.rulekinds import PRIORITY_CLASS_MAP
+
+logger = logging.getLogger(__name__)
+
+
+@attr.s(auto_attribs=True, slots=True, frozen=True)
+class PushRule:
+    """A push rule
+
+    Attributes:
+        rule_id: a unique ID for this rule
+        priority_class: what "kind" of push rule this is (see
+            `PRIORITY_CLASS_MAP` for mapping between int and kind)
+        conditions: the sequence of conditions that all need to match
+        actions: the actions to apply if all conditions are met
+        default: is this a base rule?
+        default_enabled: is this enabled by default?
+    """
 
+    rule_id: str
+    priority_class: int
+    conditions: Sequence[Mapping[str, str]]
+    actions: Sequence[Union[str, Mapping]]
+    default: bool = False
+    default_enabled: bool = True
 
-def list_with_base_rules(rawrules: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
-    """Combine the list of rules set by the user with the default push rules
 
-    Args:
-        rawrules: The rules the user has modified or set.
+@attr.s(auto_attribs=True, slots=True, frozen=True, weakref_slot=False)
+class PushRules:
+    """A collection of push rules for an account.
 
-    Returns:
-        A new list with the rules set by the user combined with the defaults.
+    Can be iterated over, producing push rules in priority order.
     """
-    ruleslist = []
 
-    # Grab the base rules that the user has modified.
-    # The modified base rules have a priority_class of -1.
-    modified_base_rules = {r["rule_id"]: r for r in rawrules if r["priority_class"] < 0}
+    # A mapping from rule ID to push rule that overrides a base rule. These will
+    # be returned instead of the base rule.
+    overriden_base_rules: Dict[str, PushRule] = attr.Factory(dict)
+
+    # The following stores the custom push rules at each priority class.
+    #
+    # We keep these separate (rather than combining into one big list) to avoid
+    # copying the base rules around all the time.
+    override: List[PushRule] = attr.Factory(list)
+    content: List[PushRule] = attr.Factory(list)
+    room: List[PushRule] = attr.Factory(list)
+    sender: List[PushRule] = attr.Factory(list)
+    underride: List[PushRule] = attr.Factory(list)
+
+    def __iter__(self) -> Iterator[PushRule]:
+        # When iterating over the push rules we need to return the base rules
+        # interspersed at the correct spots.
+        for rule in itertools.chain(
+            BASE_PREPEND_OVERRIDE_RULES,
+            self.override,
+            BASE_APPEND_OVERRIDE_RULES,
+            self.content,
+            BASE_APPEND_CONTENT_RULES,
+            self.room,
+            self.sender,
+            self.underride,
+            BASE_APPEND_UNDERRIDE_RULES,
+        ):
+            # Check if a base rule has been overriden by a custom rule. If so
+            # return that instead.
+            override_rule = self.overriden_base_rules.get(rule.rule_id)
+            if override_rule:
+                yield override_rule
+            else:
+                yield rule
+
+    def __len__(self) -> int:
+        # The length is mostly used by caches to get a sense of "size" / amount
+        # of memory this object is using, so we only count the number of custom
+        # rules.
+        return (
+            len(self.overriden_base_rules)
+            + len(self.override)
+            + len(self.content)
+            + len(self.room)
+            + len(self.sender)
+            + len(self.underride)
+        )
 
-    # Remove the modified base rules from the list, They'll be added back
-    # in the default positions in the list.
-    rawrules = [r for r in rawrules if r["priority_class"] >= 0]
 
-    # shove the server default rules for each kind onto the end of each
-    current_prio_class = list(PRIORITY_CLASS_INVERSE_MAP)[-1]
+@attr.s(auto_attribs=True, slots=True, frozen=True, weakref_slot=False)
+class FilteredPushRules:
+    """A wrapper around `PushRules` that filters out disabled experimental push
+    rules, and includes the "enabled" state for each rule when iterated over.
+    """
 
-    ruleslist.extend(
-        make_base_prepend_rules(
-            PRIORITY_CLASS_INVERSE_MAP[current_prio_class], modified_base_rules
-        )
-    )
+    push_rules: PushRules
+    enabled_map: Dict[str, bool]
+    experimental_config: ExperimentalConfig
 
-    for r in rawrules:
-        if r["priority_class"] < current_prio_class:
-            while r["priority_class"] < current_prio_class:
-                ruleslist.extend(
-                    make_base_append_rules(
-                        PRIORITY_CLASS_INVERSE_MAP[current_prio_class],
-                        modified_base_rules,
-                    )
-                )
-                current_prio_class -= 1
-                if current_prio_class > 0:
-                    ruleslist.extend(
-                        make_base_prepend_rules(
-                            PRIORITY_CLASS_INVERSE_MAP[current_prio_class],
-                            modified_base_rules,
-                        )
-                    )
-
-        ruleslist.append(r)
-
-    while current_prio_class > 0:
-        ruleslist.extend(
-            make_base_append_rules(
-                PRIORITY_CLASS_INVERSE_MAP[current_prio_class], modified_base_rules
-            )
-        )
-        current_prio_class -= 1
-        if current_prio_class > 0:
-            ruleslist.extend(
-                make_base_prepend_rules(
-                    PRIORITY_CLASS_INVERSE_MAP[current_prio_class], modified_base_rules
-                )
-            )
+    def __iter__(self) -> Iterator[Tuple[PushRule, bool]]:
+        for rule in self.push_rules:
+            if not _is_experimental_rule_enabled(
+                rule.rule_id, self.experimental_config
+            ):
+                continue
 
-    return ruleslist
+            enabled = self.enabled_map.get(rule.rule_id, rule.default_enabled)
 
+            yield rule, enabled
 
-def make_base_append_rules(
-    kind: str, modified_base_rules: Dict[str, Dict[str, Any]]
-) -> List[Dict[str, Any]]:
-    rules = []
+    def __len__(self) -> int:
+        return len(self.push_rules)
 
-    if kind == "override":
-        rules = BASE_APPEND_OVERRIDE_RULES
-    elif kind == "underride":
-        rules = BASE_APPEND_UNDERRIDE_RULES
-    elif kind == "content":
-        rules = BASE_APPEND_CONTENT_RULES
 
-    # Copy the rules before modifying them
-    rules = copy.deepcopy(rules)
-    for r in rules:
-        # Only modify the actions, keep the conditions the same.
-        assert isinstance(r["rule_id"], str)
-        modified = modified_base_rules.get(r["rule_id"])
-        if modified:
-            r["actions"] = modified["actions"]
+DEFAULT_EMPTY_PUSH_RULES = PushRules()
 
-    return rules
 
+def compile_push_rules(rawrules: List[PushRule]) -> PushRules:
+    """Given a set of custom push rules return a `PushRules` instance (which
+    includes the base rules).
+    """
+
+    if not rawrules:
+        # Fast path to avoid allocating empty lists when there are no custom
+        # rules for the user.
+        return DEFAULT_EMPTY_PUSH_RULES
+
+    rules = PushRules()
 
-def make_base_prepend_rules(
-    kind: str,
-    modified_base_rules: Dict[str, Dict[str, Any]],
-) -> List[Dict[str, Any]]:
-    rules = []
+    for rule in rawrules:
+        # We need to decide which bucket each custom push rule goes into.
 
-    if kind == "override":
-        rules = BASE_PREPEND_OVERRIDE_RULES
+        # If it has the same ID as a base rule then it overrides that...
+        overriden_base_rule = BASE_RULES_BY_ID.get(rule.rule_id)
+        if overriden_base_rule:
+            rules.overriden_base_rules[rule.rule_id] = attr.evolve(
+                overriden_base_rule, actions=rule.actions
+            )
+            continue
+
+        # ... otherwise it gets added to the appropriate priority class bucket
+        collection: List[PushRule]
+        if rule.priority_class == 5:
+            collection = rules.override
+        elif rule.priority_class == 4:
+            collection = rules.content
+        elif rule.priority_class == 3:
+            collection = rules.room
+        elif rule.priority_class == 2:
+            collection = rules.sender
+        elif rule.priority_class == 1:
+            collection = rules.underride
+        elif rule.priority_class <= 0:
+            logger.info(
+                "Got rule with priority class less than zero, but doesn't override a base rule: %s",
+                rule,
+            )
+            continue
+        else:
+            # We log and continue here so as not to break event sending
+            logger.error("Unknown priority class: %", rule.priority_class)
+            continue
 
-    # Copy the rules before modifying them
-    rules = copy.deepcopy(rules)
-    for r in rules:
-        # Only modify the actions, keep the conditions the same.
-        assert isinstance(r["rule_id"], str)
-        modified = modified_base_rules.get(r["rule_id"])
-        if modified:
-            r["actions"] = modified["actions"]
+        collection.append(rule)
 
     return rules
 
 
-# We have to annotate these types, otherwise mypy infers them as
-# `List[Dict[str, Sequence[Collection[str]]]]`.
-BASE_APPEND_CONTENT_RULES: List[Dict[str, Any]] = [
-    {
-        "rule_id": "global/content/.m.rule.contains_user_name",
-        "conditions": [
+def _is_experimental_rule_enabled(
+    rule_id: str, experimental_config: ExperimentalConfig
+) -> bool:
+    """Used by `FilteredPushRules` to filter out experimental rules when they
+    have not been enabled.
+    """
+    if (
+        rule_id == "global/override/.org.matrix.msc3786.rule.room.server_acl"
+        and not experimental_config.msc3786_enabled
+    ):
+        return False
+    if (
+        rule_id == "global/underride/.org.matrix.msc3772.thread_reply"
+        and not experimental_config.msc3772_enabled
+    ):
+        return False
+    return True
+
+
+BASE_APPEND_CONTENT_RULES = [
+    PushRule(
+        default=True,
+        priority_class=PRIORITY_CLASS_MAP["content"],
+        rule_id="global/content/.m.rule.contains_user_name",
+        conditions=[
             {
                 "kind": "event_match",
                 "key": "content.body",
@@ -143,29 +250,33 @@ BASE_APPEND_CONTENT_RULES: List[Dict[str, Any]] = [
                 "pattern_type": "user_localpart",
             }
         ],
-        "actions": [
+        actions=[
             "notify",
             {"set_tweak": "sound", "value": "default"},
             {"set_tweak": "highlight"},
         ],
-    }
+    )
 ]
 
 
-BASE_PREPEND_OVERRIDE_RULES: List[Dict[str, Any]] = [
-    {
-        "rule_id": "global/override/.m.rule.master",
-        "enabled": False,
-        "conditions": [],
-        "actions": ["dont_notify"],
-    }
+BASE_PREPEND_OVERRIDE_RULES = [
+    PushRule(
+        default=True,
+        priority_class=PRIORITY_CLASS_MAP["override"],
+        rule_id="global/override/.m.rule.master",
+        default_enabled=False,
+        conditions=[],
+        actions=["dont_notify"],
+    )
 ]
 
 
-BASE_APPEND_OVERRIDE_RULES: List[Dict[str, Any]] = [
-    {
-        "rule_id": "global/override/.m.rule.suppress_notices",
-        "conditions": [
+BASE_APPEND_OVERRIDE_RULES = [
+    PushRule(
+        default=True,
+        priority_class=PRIORITY_CLASS_MAP["override"],
+        rule_id="global/override/.m.rule.suppress_notices",
+        conditions=[
             {
                 "kind": "event_match",
                 "key": "content.msgtype",
@@ -173,13 +284,15 @@ BASE_APPEND_OVERRIDE_RULES: List[Dict[str, Any]] = [
                 "_cache_key": "_suppress_notices",
             }
         ],
-        "actions": ["dont_notify"],
-    },
+        actions=["dont_notify"],
+    ),
     # NB. .m.rule.invite_for_me must be higher prio than .m.rule.member_event
     # otherwise invites will be matched by .m.rule.member_event
-    {
-        "rule_id": "global/override/.m.rule.invite_for_me",
-        "conditions": [
+    PushRule(
+        default=True,
+        priority_class=PRIORITY_CLASS_MAP["override"],
+        rule_id="global/override/.m.rule.invite_for_me",
+        conditions=[
             {
                 "kind": "event_match",
                 "key": "type",
@@ -195,21 +308,23 @@ BASE_APPEND_OVERRIDE_RULES: List[Dict[str, Any]] = [
             # Match the requester's MXID.
             {"kind": "event_match", "key": "state_key", "pattern_type": "user_id"},
         ],
-        "actions": [
+        actions=[
             "notify",
             {"set_tweak": "sound", "value": "default"},
             {"set_tweak": "highlight", "value": False},
         ],
-    },
+    ),
     # Will we sometimes want to know about people joining and leaving?
     # Perhaps: if so, this could be expanded upon. Seems the most usual case
     # is that we don't though. We add this override rule so that even if
     # the room rule is set to notify, we don't get notifications about
     # join/leave/avatar/displayname events.
     # See also: https://matrix.org/jira/browse/SYN-607
-    {
-        "rule_id": "global/override/.m.rule.member_event",
-        "conditions": [
+    PushRule(
+        default=True,
+        priority_class=PRIORITY_CLASS_MAP["override"],
+        rule_id="global/override/.m.rule.member_event",
+        conditions=[
             {
                 "kind": "event_match",
                 "key": "type",
@@ -217,24 +332,28 @@ BASE_APPEND_OVERRIDE_RULES: List[Dict[str, Any]] = [
                 "_cache_key": "_member",
             }
         ],
-        "actions": ["dont_notify"],
-    },
+        actions=["dont_notify"],
+    ),
     # This was changed from underride to override so it's closer in priority
     # to the content rules where the user name highlight rule lives. This
     # way a room rule is lower priority than both but a custom override rule
     # is higher priority than both.
-    {
-        "rule_id": "global/override/.m.rule.contains_display_name",
-        "conditions": [{"kind": "contains_display_name"}],
-        "actions": [
+    PushRule(
+        default=True,
+        priority_class=PRIORITY_CLASS_MAP["override"],
+        rule_id="global/override/.m.rule.contains_display_name",
+        conditions=[{"kind": "contains_display_name"}],
+        actions=[
             "notify",
             {"set_tweak": "sound", "value": "default"},
             {"set_tweak": "highlight"},
         ],
-    },
-    {
-        "rule_id": "global/override/.m.rule.roomnotif",
-        "conditions": [
+    ),
+    PushRule(
+        default=True,
+        priority_class=PRIORITY_CLASS_MAP["override"],
+        rule_id="global/override/.m.rule.roomnotif",
+        conditions=[
             {
                 "kind": "event_match",
                 "key": "content.body",
@@ -247,11 +366,13 @@ BASE_APPEND_OVERRIDE_RULES: List[Dict[str, Any]] = [
                 "_cache_key": "_roomnotif_pl",
             },
         ],
-        "actions": ["notify", {"set_tweak": "highlight", "value": True}],
-    },
-    {
-        "rule_id": "global/override/.m.rule.tombstone",
-        "conditions": [
+        actions=["notify", {"set_tweak": "highlight", "value": True}],
+    ),
+    PushRule(
+        default=True,
+        priority_class=PRIORITY_CLASS_MAP["override"],
+        rule_id="global/override/.m.rule.tombstone",
+        conditions=[
             {
                 "kind": "event_match",
                 "key": "type",
@@ -265,11 +386,13 @@ BASE_APPEND_OVERRIDE_RULES: List[Dict[str, Any]] = [
                 "_cache_key": "_tombstone_statekey",
             },
         ],
-        "actions": ["notify", {"set_tweak": "highlight", "value": True}],
-    },
-    {
-        "rule_id": "global/override/.m.rule.reaction",
-        "conditions": [
+        actions=["notify", {"set_tweak": "highlight", "value": True}],
+    ),
+    PushRule(
+        default=True,
+        priority_class=PRIORITY_CLASS_MAP["override"],
+        rule_id="global/override/.m.rule.reaction",
+        conditions=[
             {
                 "kind": "event_match",
                 "key": "type",
@@ -277,14 +400,16 @@ BASE_APPEND_OVERRIDE_RULES: List[Dict[str, Any]] = [
                 "_cache_key": "_reaction",
             }
         ],
-        "actions": ["dont_notify"],
-    },
+        actions=["dont_notify"],
+    ),
     # XXX: This is an experimental rule that is only enabled if msc3786_enabled
     # is enabled, if it is not the rule gets filtered out in _load_rules() in
     # PushRulesWorkerStore
-    {
-        "rule_id": "global/override/.org.matrix.msc3786.rule.room.server_acl",
-        "conditions": [
+    PushRule(
+        default=True,
+        priority_class=PRIORITY_CLASS_MAP["override"],
+        rule_id="global/override/.org.matrix.msc3786.rule.room.server_acl",
+        conditions=[
             {
                 "kind": "event_match",
                 "key": "type",
@@ -298,15 +423,17 @@ BASE_APPEND_OVERRIDE_RULES: List[Dict[str, Any]] = [
                 "_cache_key": "_room_server_acl_state_key",
             },
         ],
-        "actions": [],
-    },
+        actions=[],
+    ),
 ]
 
 
-BASE_APPEND_UNDERRIDE_RULES: List[Dict[str, Any]] = [
-    {
-        "rule_id": "global/underride/.m.rule.call",
-        "conditions": [
+BASE_APPEND_UNDERRIDE_RULES = [
+    PushRule(
+        default=True,
+        priority_class=PRIORITY_CLASS_MAP["underride"],
+        rule_id="global/underride/.m.rule.call",
+        conditions=[
             {
                 "kind": "event_match",
                 "key": "type",
@@ -314,17 +441,19 @@ BASE_APPEND_UNDERRIDE_RULES: List[Dict[str, Any]] = [
                 "_cache_key": "_call",
             }
         ],
-        "actions": [
+        actions=[
             "notify",
             {"set_tweak": "sound", "value": "ring"},
             {"set_tweak": "highlight", "value": False},
         ],
-    },
+    ),
     # XXX: once m.direct is standardised everywhere, we should use it to detect
     # a DM from the user's perspective rather than this heuristic.
-    {
-        "rule_id": "global/underride/.m.rule.room_one_to_one",
-        "conditions": [
+    PushRule(
+        default=True,
+        priority_class=PRIORITY_CLASS_MAP["underride"],
+        rule_id="global/underride/.m.rule.room_one_to_one",
+        conditions=[
             {"kind": "room_member_count", "is": "2", "_cache_key": "member_count"},
             {
                 "kind": "event_match",
@@ -333,17 +462,19 @@ BASE_APPEND_UNDERRIDE_RULES: List[Dict[str, Any]] = [
                 "_cache_key": "_message",
             },
         ],
-        "actions": [
+        actions=[
             "notify",
             {"set_tweak": "sound", "value": "default"},
             {"set_tweak": "highlight", "value": False},
         ],
-    },
+    ),
     # XXX: this is going to fire for events which aren't m.room.messages
     # but are encrypted (e.g. m.call.*)...
-    {
-        "rule_id": "global/underride/.m.rule.encrypted_room_one_to_one",
-        "conditions": [
+    PushRule(
+        default=True,
+        priority_class=PRIORITY_CLASS_MAP["underride"],
+        rule_id="global/underride/.m.rule.encrypted_room_one_to_one",
+        conditions=[
             {"kind": "room_member_count", "is": "2", "_cache_key": "member_count"},
             {
                 "kind": "event_match",
@@ -352,15 +483,17 @@ BASE_APPEND_UNDERRIDE_RULES: List[Dict[str, Any]] = [
                 "_cache_key": "_encrypted",
             },
         ],
-        "actions": [
+        actions=[
             "notify",
             {"set_tweak": "sound", "value": "default"},
             {"set_tweak": "highlight", "value": False},
         ],
-    },
-    {
-        "rule_id": "global/underride/.org.matrix.msc3772.thread_reply",
-        "conditions": [
+    ),
+    PushRule(
+        default=True,
+        priority_class=PRIORITY_CLASS_MAP["underride"],
+        rule_id="global/underride/.org.matrix.msc3772.thread_reply",
+        conditions=[
             {
                 "kind": "org.matrix.msc3772.relation_match",
                 "rel_type": "m.thread",
@@ -368,11 +501,13 @@ BASE_APPEND_UNDERRIDE_RULES: List[Dict[str, Any]] = [
                 "sender_type": "user_id",
             }
         ],
-        "actions": ["notify", {"set_tweak": "highlight", "value": False}],
-    },
-    {
-        "rule_id": "global/underride/.m.rule.message",
-        "conditions": [
+        actions=["notify", {"set_tweak": "highlight", "value": False}],
+    ),
+    PushRule(
+        default=True,
+        priority_class=PRIORITY_CLASS_MAP["underride"],
+        rule_id="global/underride/.m.rule.message",
+        conditions=[
             {
                 "kind": "event_match",
                 "key": "type",
@@ -380,13 +515,15 @@ BASE_APPEND_UNDERRIDE_RULES: List[Dict[str, Any]] = [
                 "_cache_key": "_message",
             }
         ],
-        "actions": ["notify", {"set_tweak": "highlight", "value": False}],
-    },
+        actions=["notify", {"set_tweak": "highlight", "value": False}],
+    ),
     # XXX: this is going to fire for events which aren't m.room.messages
     # but are encrypted (e.g. m.call.*)...
-    {
-        "rule_id": "global/underride/.m.rule.encrypted",
-        "conditions": [
+    PushRule(
+        default=True,
+        priority_class=PRIORITY_CLASS_MAP["underride"],
+        rule_id="global/underride/.m.rule.encrypted",
+        conditions=[
             {
                 "kind": "event_match",
                 "key": "type",
@@ -394,11 +531,13 @@ BASE_APPEND_UNDERRIDE_RULES: List[Dict[str, Any]] = [
                 "_cache_key": "_encrypted",
             }
         ],
-        "actions": ["notify", {"set_tweak": "highlight", "value": False}],
-    },
-    {
-        "rule_id": "global/underride/.im.vector.jitsi",
-        "conditions": [
+        actions=["notify", {"set_tweak": "highlight", "value": False}],
+    ),
+    PushRule(
+        default=True,
+        priority_class=PRIORITY_CLASS_MAP["underride"],
+        rule_id="global/underride/.im.vector.jitsi",
+        conditions=[
             {
                 "kind": "event_match",
                 "key": "type",
@@ -418,29 +557,27 @@ BASE_APPEND_UNDERRIDE_RULES: List[Dict[str, Any]] = [
                 "_cache_key": "_is_state_event",
             },
         ],
-        "actions": ["notify", {"set_tweak": "highlight", "value": False}],
-    },
+        actions=["notify", {"set_tweak": "highlight", "value": False}],
+    ),
 ]
 
 
 BASE_RULE_IDS = set()
 
+BASE_RULES_BY_ID: Dict[str, PushRule] = {}
+
 for r in BASE_APPEND_CONTENT_RULES:
-    r["priority_class"] = PRIORITY_CLASS_MAP["content"]
-    r["default"] = True
-    BASE_RULE_IDS.add(r["rule_id"])
+    BASE_RULE_IDS.add(r.rule_id)
+    BASE_RULES_BY_ID[r.rule_id] = r
 
 for r in BASE_PREPEND_OVERRIDE_RULES:
-    r["priority_class"] = PRIORITY_CLASS_MAP["override"]
-    r["default"] = True
-    BASE_RULE_IDS.add(r["rule_id"])
+    BASE_RULE_IDS.add(r.rule_id)
+    BASE_RULES_BY_ID[r.rule_id] = r
 
 for r in BASE_APPEND_OVERRIDE_RULES:
-    r["priority_class"] = PRIORITY_CLASS_MAP["override"]
-    r["default"] = True
-    BASE_RULE_IDS.add(r["rule_id"])
+    BASE_RULE_IDS.add(r.rule_id)
+    BASE_RULES_BY_ID[r.rule_id] = r
 
 for r in BASE_APPEND_UNDERRIDE_RULES:
-    r["priority_class"] = PRIORITY_CLASS_MAP["underride"]
-    r["default"] = True
-    BASE_RULE_IDS.add(r["rule_id"])
+    BASE_RULE_IDS.add(r.rule_id)
+    BASE_RULES_BY_ID[r.rule_id] = r
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 713dcf6950..ccd512be54 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -15,7 +15,18 @@
 
 import itertools
 import logging
-from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Tuple, Union
+from typing import (
+    TYPE_CHECKING,
+    Collection,
+    Dict,
+    Iterable,
+    List,
+    Mapping,
+    Optional,
+    Set,
+    Tuple,
+    Union,
+)
 
 from prometheus_client import Counter
 
@@ -30,6 +41,7 @@ from synapse.util.caches import register_cache
 from synapse.util.metrics import measure_func
 from synapse.visibility import filter_event_for_clients_with_state
 
+from .baserules import FilteredPushRules, PushRule
 from .push_rule_evaluator import PushRuleEvaluatorForEvent
 
 if TYPE_CHECKING:
@@ -112,7 +124,7 @@ class BulkPushRuleEvaluator:
     async def _get_rules_for_event(
         self,
         event: EventBase,
-    ) -> Dict[str, List[Dict[str, Any]]]:
+    ) -> Dict[str, FilteredPushRules]:
         """Get the push rules for all users who may need to be notified about
         the event.
 
@@ -186,7 +198,7 @@ class BulkPushRuleEvaluator:
         return pl_event.content if pl_event else {}, sender_level
 
     async def _get_mutual_relations(
-        self, event: EventBase, rules: Iterable[Dict[str, Any]]
+        self, event: EventBase, rules: Iterable[Tuple[PushRule, bool]]
     ) -> Dict[str, Set[Tuple[str, str]]]:
         """
         Fetch event metadata for events which related to the same event as the given event.
@@ -216,12 +228,11 @@ class BulkPushRuleEvaluator:
 
         # Pre-filter to figure out which relation types are interesting.
         rel_types = set()
-        for rule in rules:
-            # Skip disabled rules.
-            if "enabled" in rule and not rule["enabled"]:
+        for rule, enabled in rules:
+            if not enabled:
                 continue
 
-            for condition in rule["conditions"]:
+            for condition in rule.conditions:
                 if condition["kind"] != "org.matrix.msc3772.relation_match":
                     continue
 
@@ -254,7 +265,7 @@ class BulkPushRuleEvaluator:
         count_as_unread = _should_count_as_unread(event, context)
 
         rules_by_user = await self._get_rules_for_event(event)
-        actions_by_user: Dict[str, List[Union[dict, str]]] = {}
+        actions_by_user: Dict[str, Collection[Union[Mapping, str]]] = {}
 
         room_member_count = await self.store.get_number_joined_users_in_room(
             event.room_id
@@ -317,15 +328,13 @@ class BulkPushRuleEvaluator:
                 # current user, it'll be added to the dict later.
                 actions_by_user[uid] = []
 
-            for rule in rules:
-                if "enabled" in rule and not rule["enabled"]:
+            for rule, enabled in rules:
+                if not enabled:
                     continue
 
-                matches = evaluator.check_conditions(
-                    rule["conditions"], uid, display_name
-                )
+                matches = evaluator.check_conditions(rule.conditions, uid, display_name)
                 if matches:
-                    actions = [x for x in rule["actions"] if x != "dont_notify"]
+                    actions = [x for x in rule.actions if x != "dont_notify"]
                     if actions and "notify" in actions:
                         # Push rules say we should notify the user of this event
                         actions_by_user[uid] = actions
diff --git a/synapse/push/clientformat.py b/synapse/push/clientformat.py
index 5117ef6854..73618d9234 100644
--- a/synapse/push/clientformat.py
+++ b/synapse/push/clientformat.py
@@ -18,16 +18,15 @@ from typing import Any, Dict, List, Optional
 from synapse.push.rulekinds import PRIORITY_CLASS_INVERSE_MAP, PRIORITY_CLASS_MAP
 from synapse.types import UserID
 
+from .baserules import FilteredPushRules, PushRule
+
 
 def format_push_rules_for_user(
-    user: UserID, ruleslist: List
+    user: UserID, ruleslist: FilteredPushRules
 ) -> Dict[str, Dict[str, list]]:
     """Converts a list of rawrules and a enabled map into nested dictionaries
     to match the Matrix client-server format for push rules"""
 
-    # We're going to be mutating this a lot, so do a deep copy
-    ruleslist = copy.deepcopy(ruleslist)
-
     rules: Dict[str, Dict[str, List[Dict[str, Any]]]] = {
         "global": {},
         "device": {},
@@ -35,11 +34,30 @@ def format_push_rules_for_user(
 
     rules["global"] = _add_empty_priority_class_arrays(rules["global"])
 
-    for r in ruleslist:
-        template_name = _priority_class_to_template_name(r["priority_class"])
+    for r, enabled in ruleslist:
+        template_name = _priority_class_to_template_name(r.priority_class)
+
+        rulearray = rules["global"][template_name]
+
+        template_rule = _rule_to_template(r)
+        if not template_rule:
+            continue
+
+        rulearray.append(template_rule)
+
+        template_rule["enabled"] = enabled
+
+        if "conditions" not in template_rule:
+            # Not all formatted rules have explicit conditions, e.g. "room"
+            # rules omit them as they can be derived from the kind and rule ID.
+            #
+            # If the formatted rule has no conditions then we can skip the
+            # formatting of conditions.
+            continue
 
         # Remove internal stuff.
-        for c in r["conditions"]:
+        template_rule["conditions"] = copy.deepcopy(template_rule["conditions"])
+        for c in template_rule["conditions"]:
             c.pop("_cache_key", None)
 
             pattern_type = c.pop("pattern_type", None)
@@ -52,16 +70,6 @@ def format_push_rules_for_user(
             if sender_type == "user_id":
                 c["sender"] = user.to_string()
 
-        rulearray = rules["global"][template_name]
-
-        template_rule = _rule_to_template(r)
-        if template_rule:
-            if "enabled" in r:
-                template_rule["enabled"] = r["enabled"]
-            else:
-                template_rule["enabled"] = True
-            rulearray.append(template_rule)
-
     return rules
 
 
@@ -71,24 +79,24 @@ def _add_empty_priority_class_arrays(d: Dict[str, list]) -> Dict[str, list]:
     return d
 
 
-def _rule_to_template(rule: Dict[str, Any]) -> Optional[Dict[str, Any]]:
-    unscoped_rule_id = None
-    if "rule_id" in rule:
-        unscoped_rule_id = _rule_id_from_namespaced(rule["rule_id"])
+def _rule_to_template(rule: PushRule) -> Optional[Dict[str, Any]]:
+    templaterule: Dict[str, Any]
+
+    unscoped_rule_id = _rule_id_from_namespaced(rule.rule_id)
 
-    template_name = _priority_class_to_template_name(rule["priority_class"])
+    template_name = _priority_class_to_template_name(rule.priority_class)
     if template_name in ["override", "underride"]:
-        templaterule = {k: rule[k] for k in ["conditions", "actions"]}
+        templaterule = {"conditions": rule.conditions, "actions": rule.actions}
     elif template_name in ["sender", "room"]:
-        templaterule = {"actions": rule["actions"]}
-        unscoped_rule_id = rule["conditions"][0]["pattern"]
+        templaterule = {"actions": rule.actions}
+        unscoped_rule_id = rule.conditions[0]["pattern"]
     elif template_name == "content":
-        if len(rule["conditions"]) != 1:
+        if len(rule.conditions) != 1:
             return None
-        thecond = rule["conditions"][0]
+        thecond = rule.conditions[0]
         if "pattern" not in thecond:
             return None
-        templaterule = {"actions": rule["actions"]}
+        templaterule = {"actions": rule.actions}
         templaterule["pattern"] = thecond["pattern"]
     else:
         # This should not be reached unless this function is not kept in sync
@@ -97,8 +105,8 @@ def _rule_to_template(rule: Dict[str, Any]) -> Optional[Dict[str, Any]]:
 
     if unscoped_rule_id:
         templaterule["rule_id"] = unscoped_rule_id
-    if "default" in rule:
-        templaterule["default"] = rule["default"]
+    if rule.default:
+        templaterule["default"] = True
     return templaterule
 
 
diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py
index 2e8a017add..3c5632cd91 100644
--- a/synapse/push/push_rule_evaluator.py
+++ b/synapse/push/push_rule_evaluator.py
@@ -15,7 +15,18 @@
 
 import logging
 import re
-from typing import Any, Dict, List, Mapping, Optional, Pattern, Set, Tuple, Union
+from typing import (
+    Any,
+    Dict,
+    List,
+    Mapping,
+    Optional,
+    Pattern,
+    Sequence,
+    Set,
+    Tuple,
+    Union,
+)
 
 from matrix_common.regex import glob_to_regex, to_word_pattern
 
@@ -32,14 +43,14 @@ INEQUALITY_EXPR = re.compile("^([=<>]*)([0-9]*)$")
 
 
 def _room_member_count(
-    ev: EventBase, condition: Dict[str, Any], room_member_count: int
+    ev: EventBase, condition: Mapping[str, Any], room_member_count: int
 ) -> bool:
     return _test_ineq_condition(condition, room_member_count)
 
 
 def _sender_notification_permission(
     ev: EventBase,
-    condition: Dict[str, Any],
+    condition: Mapping[str, Any],
     sender_power_level: int,
     power_levels: Dict[str, Union[int, Dict[str, int]]],
 ) -> bool:
@@ -54,7 +65,7 @@ def _sender_notification_permission(
     return sender_power_level >= room_notif_level
 
 
-def _test_ineq_condition(condition: Dict[str, Any], number: int) -> bool:
+def _test_ineq_condition(condition: Mapping[str, Any], number: int) -> bool:
     if "is" not in condition:
         return False
     m = INEQUALITY_EXPR.match(condition["is"])
@@ -137,7 +148,7 @@ class PushRuleEvaluatorForEvent:
         self._condition_cache: Dict[str, bool] = {}
 
     def check_conditions(
-        self, conditions: List[dict], uid: str, display_name: Optional[str]
+        self, conditions: Sequence[Mapping], uid: str, display_name: Optional[str]
     ) -> bool:
         """
         Returns true if a user's conditions/user ID/display name match the event.
@@ -169,7 +180,7 @@ class PushRuleEvaluatorForEvent:
         return True
 
     def matches(
-        self, condition: Dict[str, Any], user_id: str, display_name: Optional[str]
+        self, condition: Mapping[str, Any], user_id: str, display_name: Optional[str]
     ) -> bool:
         """
         Returns true if a user's condition/user ID/display name match the event.
@@ -204,7 +215,7 @@ class PushRuleEvaluatorForEvent:
             #     endpoint with an unknown kind, see _rule_tuple_from_request_object.
             return True
 
-    def _event_match(self, condition: dict, user_id: str) -> bool:
+    def _event_match(self, condition: Mapping, user_id: str) -> bool:
         """
         Check an "event_match" push rule condition.
 
@@ -269,7 +280,7 @@ class PushRuleEvaluatorForEvent:
 
         return bool(r.search(body))
 
-    def _relation_match(self, condition: dict, user_id: str) -> bool:
+    def _relation_match(self, condition: Mapping, user_id: str) -> bool:
         """
         Check an "relation_match" push rule condition.
 
diff --git a/synapse/res/templates/account_previously_renewed.html b/synapse/res/templates/account_previously_renewed.html
index b751359bdf..bd4f7cea97 100644
--- a/synapse/res/templates/account_previously_renewed.html
+++ b/synapse/res/templates/account_previously_renewed.html
@@ -1 +1,12 @@
-<html><body>Your account is valid until {{ expiration_ts|format_ts("%d-%m-%Y") }}.</body><html>
+<!DOCTYPE html>
+<html lang="en">
+<head>
+    <meta charset="UTF-8">
+    <meta http-equiv="X-UA-Compatible" content="IE=edge">
+    <meta name="viewport" content="width=device-width, initial-scale=1.0">
+    <title>Your account is valid until {{ expiration_ts|format_ts("%d-%m-%Y") }}.</title>
+</head>
+<body>
+    Your account is valid until {{ expiration_ts|format_ts("%d-%m-%Y") }}.
+</body>
+</html>
\ No newline at end of file
diff --git a/synapse/res/templates/account_renewed.html b/synapse/res/templates/account_renewed.html
index e8c0f52f05..57b319f375 100644
--- a/synapse/res/templates/account_renewed.html
+++ b/synapse/res/templates/account_renewed.html
@@ -1 +1,12 @@
-<html><body>Your account has been successfully renewed and is valid until {{ expiration_ts|format_ts("%d-%m-%Y") }}.</body><html>
+<!DOCTYPE html>
+<html lang="en">
+<head>
+    <meta charset="UTF-8">
+    <meta http-equiv="X-UA-Compatible" content="IE=edge">
+    <meta name="viewport" content="width=device-width, initial-scale=1.0">
+    <title>Your account has been successfully renewed and is valid until {{ expiration_ts|format_ts("%d-%m-%Y") }}.</title>
+</head>
+<body>
+    Your account has been successfully renewed and is valid until {{ expiration_ts|format_ts("%d-%m-%Y") }}.
+</body>
+</html>
\ No newline at end of file
diff --git a/synapse/res/templates/add_threepid.html b/synapse/res/templates/add_threepid.html
index cc4ab07e09..71f2215b7a 100644
--- a/synapse/res/templates/add_threepid.html
+++ b/synapse/res/templates/add_threepid.html
@@ -1,9 +1,14 @@
-<html>
+<!DOCTYPE html>
+<html lang="en">
+<head>
+    <meta charset="UTF-8">
+    <meta http-equiv="X-UA-Compatible" content="IE=edge">
+    <meta name="viewport" content="width=device-width, initial-scale=1.0">
+    <title>Request to add an email address to your Matrix account</title>
+</head>
 <body>
     <p>A request to add an email address to your Matrix account has been received. If this was you, please click the link below to confirm adding this email:</p>
-
     <a href="{{ link }}">{{ link }}</a>
-
     <p>If this was not you, you can safely ignore this email. Thank you.</p>
 </body>
 </html>
diff --git a/synapse/res/templates/add_threepid_failure.html b/synapse/res/templates/add_threepid_failure.html
index 441d11c846..bd627ee9ce 100644
--- a/synapse/res/templates/add_threepid_failure.html
+++ b/synapse/res/templates/add_threepid_failure.html
@@ -1,8 +1,13 @@
-<html>
-<head></head>
+<!DOCTYPE html>
+<html lang="en">
+<head>
+    <meta charset="UTF-8">
+    <meta http-equiv="X-UA-Compatible" content="IE=edge">
+    <meta name="viewport" content="width=device-width, initial-scale=1.0">
+    <title>Request failed</title>
+</head>
 <body>
-<p>The request failed for the following reason: {{ failure_reason }}.</p>
-
-<p>No changes have been made to your account.</p>
+    <p>The request failed for the following reason: {{ failure_reason }}.</p>
+    <p>No changes have been made to your account.</p>
 </body>
 </html>
diff --git a/synapse/res/templates/add_threepid_success.html b/synapse/res/templates/add_threepid_success.html
index fbd6e4018f..49170c138e 100644
--- a/synapse/res/templates/add_threepid_success.html
+++ b/synapse/res/templates/add_threepid_success.html
@@ -1,6 +1,12 @@
-<html>
-<head></head>
+<!DOCTYPE html>
+<html lang="en">
+<head>
+    <meta charset="UTF-8">
+    <meta http-equiv="X-UA-Compatible" content="IE=edge">
+    <meta name="viewport" content="width=device-width, initial-scale=1.0">
+    <title>Your email has now been validated</title>
+</head>
 <body>
-<p>Your email has now been validated, please return to your client. You may now close this window.</p>
+    <p>Your email has now been validated, please return to your client. You may now close this window.</p>
 </body>
-</html>
+</html>
\ No newline at end of file
diff --git a/synapse/res/templates/auth_success.html b/synapse/res/templates/auth_success.html
index baf4633142..2d6ac44a0e 100644
--- a/synapse/res/templates/auth_success.html
+++ b/synapse/res/templates/auth_success.html
@@ -1,8 +1,8 @@
 <html>
 <head>
 <title>Success!</title>
-<meta name='viewport' content='width=device-width, initial-scale=1,
-    user-scalable=no, minimum-scale=1.0, maximum-scale=1.0'>
+    <meta http-equiv="X-UA-Compatible" content="IE=edge">
+    <meta name="viewport" content="width=device-width, initial-scale=1.0">
 <link rel="stylesheet" href="/_matrix/static/client/register/style.css">
 <script>
 if (window.onAuthDone) {
diff --git a/synapse/res/templates/invalid_token.html b/synapse/res/templates/invalid_token.html
index 6bd2b98364..2c7c384fe3 100644
--- a/synapse/res/templates/invalid_token.html
+++ b/synapse/res/templates/invalid_token.html
@@ -1 +1,12 @@
-<html><body>Invalid renewal token.</body><html>
+<!DOCTYPE html>
+<html lang="en">
+<head>
+    <meta charset="UTF-8">
+    <meta http-equiv="X-UA-Compatible" content="IE=edge">
+    <meta name="viewport" content="width=device-width, initial-scale=1.0">
+    <title>Invalid renewal token.</title>
+</head>
+<body>
+    Invalid renewal token.
+</body>
+</html>
diff --git a/synapse/res/templates/notice_expiry.html b/synapse/res/templates/notice_expiry.html
index d87311f659..865f9f7ada 100644
--- a/synapse/res/templates/notice_expiry.html
+++ b/synapse/res/templates/notice_expiry.html
@@ -1,6 +1,8 @@
 <!doctype html>
 <html lang="en">
     <head>
+        <meta http-equiv="X-UA-Compatible" content="IE=edge">
+        <meta name="viewport" content="width=device-width, initial-scale=1.0">
         <style type="text/css">
             {% include 'mail.css' without context %}
             {% include "mail-%s.css" % app_name ignore missing without context %}
diff --git a/synapse/res/templates/notif_mail.html b/synapse/res/templates/notif_mail.html
index 27d4182790..9dba0c0253 100644
--- a/synapse/res/templates/notif_mail.html
+++ b/synapse/res/templates/notif_mail.html
@@ -1,6 +1,8 @@
 <!doctype html>
 <html lang="en">
     <head>
+        <meta http-equiv="X-UA-Compatible" content="IE=edge">
+        <meta name="viewport" content="width=device-width, initial-scale=1.0">
         <style type="text/css">
             {%- include 'mail.css' without context %}
             {%- include "mail-%s.css" % app_name ignore missing without context %}
diff --git a/synapse/res/templates/password_reset.html b/synapse/res/templates/password_reset.html
index a197bf872c..a8bdce357b 100644
--- a/synapse/res/templates/password_reset.html
+++ b/synapse/res/templates/password_reset.html
@@ -1,4 +1,9 @@
-<html>
+<html lang="en">
+    <head>
+        <title>Password reset</title>
+        <meta http-equiv="X-UA-Compatible" content="IE=edge">
+        <meta name="viewport" content="width=device-width, initial-scale=1.0">
+    </head>
 <body>
     <p>A password reset request has been received for your Matrix account. If this was you, please click the link below to confirm resetting your password:</p>
 
diff --git a/synapse/res/templates/password_reset_confirmation.html b/synapse/res/templates/password_reset_confirmation.html
index def4b5162b..2e3fd2ec1e 100644
--- a/synapse/res/templates/password_reset_confirmation.html
+++ b/synapse/res/templates/password_reset_confirmation.html
@@ -1,5 +1,9 @@
-<html>
-<head></head>
+<html lang="en">
+<head>
+    <title>Password reset confirmation</title>
+    <meta http-equiv="X-UA-Compatible" content="IE=edge">
+    <meta name="viewport" content="width=device-width, initial-scale=1.0">
+</head>
 <body>
 <!--Use a hidden form to resubmit the information necessary to reset the password-->
 <form method="post">
diff --git a/synapse/res/templates/password_reset_failure.html b/synapse/res/templates/password_reset_failure.html
index 9e3c4446e3..2d59c463f0 100644
--- a/synapse/res/templates/password_reset_failure.html
+++ b/synapse/res/templates/password_reset_failure.html
@@ -1,5 +1,9 @@
-<html>
-<head></head>
+<html lang="en">
+<head>
+    <title>Password reset failure</title>
+    <meta http-equiv="X-UA-Compatible" content="IE=edge">
+    <meta name="viewport" content="width=device-width, initial-scale=1.0">
+</head>
 <body>
 <p>The request failed for the following reason: {{ failure_reason }}.</p>
 
diff --git a/synapse/res/templates/password_reset_success.html b/synapse/res/templates/password_reset_success.html
index 7324d66d1e..5165bd1fa2 100644
--- a/synapse/res/templates/password_reset_success.html
+++ b/synapse/res/templates/password_reset_success.html
@@ -1,5 +1,8 @@
-<html>
-<head></head>
+<html lang="en">
+<head>
+    <meta http-equiv="X-UA-Compatible" content="IE=edge">
+    <meta name="viewport" content="width=device-width, initial-scale=1.0">
+</head>
 <body>
 <p>Your email has now been validated, please return to your client to reset your password. You may now close this window.</p>
 </body>
diff --git a/synapse/res/templates/recaptcha.html b/synapse/res/templates/recaptcha.html
index b3db06ef97..615d3239c6 100644
--- a/synapse/res/templates/recaptcha.html
+++ b/synapse/res/templates/recaptcha.html
@@ -1,8 +1,8 @@
 <html>
 <head>
 <title>Authentication</title>
-<meta name='viewport' content='width=device-width, initial-scale=1,
-    user-scalable=no, minimum-scale=1.0, maximum-scale=1.0'>
+    <meta http-equiv="X-UA-Compatible" content="IE=edge">
+    <meta name="viewport" content="width=device-width, initial-scale=1.0">
 <script src="https://www.recaptcha.net/recaptcha/api.js"
     async defer></script>
 <script src="//code.jquery.com/jquery-1.11.2.min.js"></script>
diff --git a/synapse/res/templates/registration.html b/synapse/res/templates/registration.html
index 16730a527f..20e831ff4a 100644
--- a/synapse/res/templates/registration.html
+++ b/synapse/res/templates/registration.html
@@ -1,4 +1,9 @@
-<html>
+<html lang="en">
+<head>
+    <title>Registration</title>
+    <meta http-equiv="X-UA-Compatible" content="IE=edge">
+    <meta name="viewport" content="width=device-width, initial-scale=1.0">
+</head>
 <body>
     <p>You have asked us to register this email with a new Matrix account. If this was you, please click the link below to confirm your email address:</p>
 
diff --git a/synapse/res/templates/registration_failure.html b/synapse/res/templates/registration_failure.html
index 2833d79c37..a6ed22bc90 100644
--- a/synapse/res/templates/registration_failure.html
+++ b/synapse/res/templates/registration_failure.html
@@ -1,5 +1,8 @@
-<html>
-<head></head>
+<html lang="en">
+<head>
+    <meta http-equiv="X-UA-Compatible" content="IE=edge">
+    <meta name="viewport" content="width=device-width, initial-scale=1.0">
+</head>
 <body>
 <p>Validation failed for the following reason: {{ failure_reason }}.</p>
 </body>
diff --git a/synapse/res/templates/registration_success.html b/synapse/res/templates/registration_success.html
index fbd6e4018f..d51d5549d8 100644
--- a/synapse/res/templates/registration_success.html
+++ b/synapse/res/templates/registration_success.html
@@ -1,5 +1,9 @@
-<html>
-<head></head>
+<html lang="en">
+<head>
+    <title>Your email has now been validated</title>
+    <meta http-equiv="X-UA-Compatible" content="IE=edge">
+    <meta name="viewport" content="width=device-width, initial-scale=1.0">
+</head>
 <body>
 <p>Your email has now been validated, please return to your client. You may now close this window.</p>
 </body>
diff --git a/synapse/res/templates/registration_token.html b/synapse/res/templates/registration_token.html
index 4577ce1702..59a98f564c 100644
--- a/synapse/res/templates/registration_token.html
+++ b/synapse/res/templates/registration_token.html
@@ -1,8 +1,8 @@
-<html>
+<html lang="en">
 <head>
 <title>Authentication</title>
-<meta name='viewport' content='width=device-width, initial-scale=1,
-    user-scalable=no, minimum-scale=1.0, maximum-scale=1.0'>
+    <meta http-equiv="X-UA-Compatible" content="IE=edge">
+    <meta name="viewport" content="width=device-width, initial-scale=1.0">
 <link rel="stylesheet" href="/_matrix/static/client/register/style.css">
 </head>
 <body>
diff --git a/synapse/res/templates/sso_account_deactivated.html b/synapse/res/templates/sso_account_deactivated.html
index c3e4deed93..075f801cec 100644
--- a/synapse/res/templates/sso_account_deactivated.html
+++ b/synapse/res/templates/sso_account_deactivated.html
@@ -3,8 +3,8 @@
     <head>
         <meta charset="UTF-8">
         <title>SSO account deactivated</title>
-        <meta name="viewport" content="width=device-width, user-scalable=no">
-        <style type="text/css">
+        <meta http-equiv="X-UA-Compatible" content="IE=edge">
+        <meta name="viewport" content="width=device-width, initial-scale=1.0">        <style type="text/css">
             {% include "sso.css" without context %}
         </style>
     </head>
diff --git a/synapse/res/templates/sso_auth_account_details.html b/synapse/res/templates/sso_auth_account_details.html
index cf72df0a2a..2d1db386e1 100644
--- a/synapse/res/templates/sso_auth_account_details.html
+++ b/synapse/res/templates/sso_auth_account_details.html
@@ -3,7 +3,8 @@
   <head>
     <title>Create your account</title>
     <meta charset="utf-8">
-    <meta name="viewport" content="width=device-width, user-scalable=no">
+    <meta http-equiv="X-UA-Compatible" content="IE=edge">
+    <meta name="viewport" content="width=device-width, initial-scale=1.0">
     <script type="text/javascript">
       let wasKeyboard = false;
       document.addEventListener("mousedown", function() { wasKeyboard = false; });
diff --git a/synapse/res/templates/sso_auth_bad_user.html b/synapse/res/templates/sso_auth_bad_user.html
index da579ffe69..94403fc3ce 100644
--- a/synapse/res/templates/sso_auth_bad_user.html
+++ b/synapse/res/templates/sso_auth_bad_user.html
@@ -3,7 +3,8 @@
     <head>
         <meta charset="UTF-8">
         <title>Authentication failed</title>
-        <meta name="viewport" content="width=device-width, user-scalable=no">
+        <meta http-equiv="X-UA-Compatible" content="IE=edge">
+        <meta name="viewport" content="width=device-width, initial-scale=1.0">
         <style type="text/css">
             {% include "sso.css" without context %}
         </style>
diff --git a/synapse/res/templates/sso_auth_confirm.html b/synapse/res/templates/sso_auth_confirm.html
index f9d0456f0a..aa1c974a6b 100644
--- a/synapse/res/templates/sso_auth_confirm.html
+++ b/synapse/res/templates/sso_auth_confirm.html
@@ -3,7 +3,8 @@
     <head>
         <meta charset="UTF-8">
         <title>Confirm it's you</title>
-        <meta name="viewport" content="width=device-width, user-scalable=no">
+        <meta http-equiv="X-UA-Compatible" content="IE=edge">
+        <meta name="viewport" content="width=device-width, initial-scale=1.0">
         <style type="text/css">
             {% include "sso.css" without context %}
         </style>
diff --git a/synapse/res/templates/sso_auth_success.html b/synapse/res/templates/sso_auth_success.html
index 1ed3967e87..4898af6011 100644
--- a/synapse/res/templates/sso_auth_success.html
+++ b/synapse/res/templates/sso_auth_success.html
@@ -3,7 +3,8 @@
     <head>
         <meta charset="UTF-8">
         <title>Authentication successful</title>
-        <meta name="viewport" content="width=device-width, user-scalable=no">
+        <meta http-equiv="X-UA-Compatible" content="IE=edge">
+        <meta name="viewport" content="width=device-width, initial-scale=1.0">
         <style type="text/css">
             {% include "sso.css" without context %}
         </style>
diff --git a/synapse/res/templates/sso_error.html b/synapse/res/templates/sso_error.html
index 472309c350..19992ff2ad 100644
--- a/synapse/res/templates/sso_error.html
+++ b/synapse/res/templates/sso_error.html
@@ -3,7 +3,8 @@
     <head>
         <meta charset="UTF-8">
         <title>Authentication failed</title>
-        <meta name="viewport" content="width=device-width, user-scalable=no">
+        <meta http-equiv="X-UA-Compatible" content="IE=edge">
+        <meta name="viewport" content="width=device-width, initial-scale=1.0">
         <style type="text/css">
             {% include "sso.css" without context %}
 
diff --git a/synapse/res/templates/sso_login_idp_picker.html b/synapse/res/templates/sso_login_idp_picker.html
index 53b82db84e..56fabfa3d2 100644
--- a/synapse/res/templates/sso_login_idp_picker.html
+++ b/synapse/res/templates/sso_login_idp_picker.html
@@ -1,6 +1,8 @@
 <!DOCTYPE html>
 <html lang="en">
     <head>
+        <meta http-equiv="X-UA-Compatible" content="IE=edge">
+        <meta name="viewport" content="width=device-width, initial-scale=1.0">
         <meta charset="UTF-8">
         <title>Choose identity provider</title>
         <style type="text/css">
diff --git a/synapse/res/templates/sso_new_user_consent.html b/synapse/res/templates/sso_new_user_consent.html
index 68c8b9f33a..523f64c4fc 100644
--- a/synapse/res/templates/sso_new_user_consent.html
+++ b/synapse/res/templates/sso_new_user_consent.html
@@ -3,7 +3,8 @@
 <head>
     <meta charset="UTF-8">
     <title>Agree to terms and conditions</title>
-    <meta name="viewport" content="width=device-width, user-scalable=no">
+    <meta http-equiv="X-UA-Compatible" content="IE=edge">
+    <meta name="viewport" content="width=device-width, initial-scale=1.0">
     <style type="text/css">
       {% include "sso.css" without context %}
 
diff --git a/synapse/res/templates/sso_redirect_confirm.html b/synapse/res/templates/sso_redirect_confirm.html
index 1b01471ac8..1049a9bd92 100644
--- a/synapse/res/templates/sso_redirect_confirm.html
+++ b/synapse/res/templates/sso_redirect_confirm.html
@@ -3,7 +3,8 @@
 <head>
     <meta charset="UTF-8">
     <title>Continue to your account</title>
-    <meta name="viewport" content="width=device-width, user-scalable=no">
+    <meta http-equiv="X-UA-Compatible" content="IE=edge">
+    <meta name="viewport" content="width=device-width, initial-scale=1.0">
     <style type="text/css">
       {% include "sso.css" without context %}
 
diff --git a/synapse/res/templates/terms.html b/synapse/res/templates/terms.html
index 369ff446d2..2081d990ab 100644
--- a/synapse/res/templates/terms.html
+++ b/synapse/res/templates/terms.html
@@ -1,8 +1,8 @@
 <html>
 <head>
 <title>Authentication</title>
-<meta name='viewport' content='width=device-width, initial-scale=1,
-    user-scalable=no, minimum-scale=1.0, maximum-scale=1.0'>
+<meta http-equiv="X-UA-Compatible" content="IE=edge">
+<meta name="viewport" content="width=device-width, initial-scale=1.0">
 <link rel="stylesheet" href="/_matrix/static/client/register/style.css">
 </head>
 <body>
diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py
index 9d953d58de..68054ffc28 100644
--- a/synapse/rest/admin/rooms.py
+++ b/synapse/rest/admin/rooms.py
@@ -303,6 +303,7 @@ class RoomRestServlet(RestServlet):
 
         members = await self.store.get_users_in_room(room_id)
         ret["joined_local_devices"] = await self.store.count_devices_by_users(members)
+        ret["forgotten"] = await self.store.is_locally_forgotten_room(room_id)
 
         return HTTPStatus.OK, ret
 
diff --git a/synapse/rest/client/account.py b/synapse/rest/client/account.py
index 50edc6b7d3..e5ee63133b 100644
--- a/synapse/rest/client/account.py
+++ b/synapse/rest/client/account.py
@@ -15,10 +15,11 @@
 # limitations under the License.
 import logging
 import random
-from http import HTTPStatus
 from typing import TYPE_CHECKING, Optional, Tuple
 from urllib.parse import urlparse
 
+from pydantic import StrictBool, StrictStr, constr
+
 from twisted.web.server import Request
 
 from synapse.api.constants import LoginType
@@ -34,12 +35,15 @@ from synapse.http.server import HttpServer, finish_request, respond_with_html
 from synapse.http.servlet import (
     RestServlet,
     assert_params_in_dict,
+    parse_and_validate_json_object_from_request,
     parse_json_object_from_request,
     parse_string,
 )
 from synapse.http.site import SynapseRequest
 from synapse.metrics import threepid_send_requests
 from synapse.push.mailer import Mailer
+from synapse.rest.client.models import AuthenticationData, EmailRequestTokenBody
+from synapse.rest.models import RequestBodyModel
 from synapse.types import JsonDict
 from synapse.util.msisdn import phone_number_to_msisdn
 from synapse.util.stringutils import assert_valid_client_secret, random_string
@@ -82,32 +86,16 @@ class EmailPasswordRequestTokenRestServlet(RestServlet):
                 400, "Email-based password resets have been disabled on this server"
             )
 
-        body = parse_json_object_from_request(request)
-
-        assert_params_in_dict(body, ["client_secret", "email", "send_attempt"])
-
-        # Extract params from body
-        client_secret = body["client_secret"]
-        assert_valid_client_secret(client_secret)
-
-        # Canonicalise the email address. The addresses are all stored canonicalised
-        # in the database. This allows the user to reset his password without having to
-        # know the exact spelling (eg. upper and lower case) of address in the database.
-        # Stored in the database "foo@bar.com"
-        # User requests with "FOO@bar.com" would raise a Not Found error
-        try:
-            email = validate_email(body["email"])
-        except ValueError as e:
-            raise SynapseError(400, str(e))
-        send_attempt = body["send_attempt"]
-        next_link = body.get("next_link")  # Optional param
+        body = parse_and_validate_json_object_from_request(
+            request, EmailRequestTokenBody
+        )
 
-        if next_link:
+        if body.next_link:
             # Raise if the provided next_link value isn't valid
-            assert_valid_next_link(self.hs, next_link)
+            assert_valid_next_link(self.hs, body.next_link)
 
         await self.identity_handler.ratelimit_request_token_requests(
-            request, "email", email
+            request, "email", body.email
         )
 
         # The email will be sent to the stored address.
@@ -115,7 +103,7 @@ class EmailPasswordRequestTokenRestServlet(RestServlet):
         # an email address which is controlled by the attacker but which, after
         # canonicalisation, matches the one in our database.
         existing_user_id = await self.hs.get_datastores().main.get_user_id_by_threepid(
-            "email", email
+            "email", body.email
         )
 
         if existing_user_id is None:
@@ -135,26 +123,26 @@ class EmailPasswordRequestTokenRestServlet(RestServlet):
             # Have the configured identity server handle the request
             ret = await self.identity_handler.request_email_token(
                 self.hs.config.registration.account_threepid_delegate_email,
-                email,
-                client_secret,
-                send_attempt,
-                next_link,
+                body.email,
+                body.client_secret,
+                body.send_attempt,
+                body.next_link,
             )
         else:
             # Send password reset emails from Synapse
             sid = await self.identity_handler.send_threepid_validation(
-                email,
-                client_secret,
-                send_attempt,
+                body.email,
+                body.client_secret,
+                body.send_attempt,
                 self.mailer.send_password_reset_mail,
-                next_link,
+                body.next_link,
             )
 
             # Wrap the session id in a JSON object
             ret = {"sid": sid}
 
         threepid_send_requests.labels(type="email", reason="password_reset").observe(
-            send_attempt
+            body.send_attempt
         )
 
         return 200, ret
@@ -172,16 +160,23 @@ class PasswordRestServlet(RestServlet):
         self.password_policy_handler = hs.get_password_policy_handler()
         self._set_password_handler = hs.get_set_password_handler()
 
+    class PostBody(RequestBodyModel):
+        auth: Optional[AuthenticationData] = None
+        logout_devices: StrictBool = True
+        if TYPE_CHECKING:
+            # workaround for https://github.com/samuelcolvin/pydantic/issues/156
+            new_password: Optional[StrictStr] = None
+        else:
+            new_password: Optional[constr(max_length=512, strict=True)] = None
+
     @interactive_auth_handler
     async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
-        body = parse_json_object_from_request(request)
+        body = parse_and_validate_json_object_from_request(request, self.PostBody)
 
         # we do basic sanity checks here because the auth layer will store these
         # in sessions. Pull out the new password provided to us.
-        new_password = body.pop("new_password", None)
+        new_password = body.new_password
         if new_password is not None:
-            if not isinstance(new_password, str) or len(new_password) > 512:
-                raise SynapseError(400, "Invalid password")
             self.password_policy_handler.validate_password(new_password)
 
         # there are two possibilities here. Either the user does not have an
@@ -201,7 +196,7 @@ class PasswordRestServlet(RestServlet):
                 params, session_id = await self.auth_handler.validate_user_via_ui_auth(
                     requester,
                     request,
-                    body,
+                    body.dict(),
                     "modify your account password",
                 )
             except InteractiveAuthIncompleteError as e:
@@ -224,7 +219,7 @@ class PasswordRestServlet(RestServlet):
                 result, params, session_id = await self.auth_handler.check_ui_auth(
                     [[LoginType.EMAIL_IDENTITY]],
                     request,
-                    body,
+                    body.dict(),
                     "modify your account password",
                 )
             except InteractiveAuthIncompleteError as e:
@@ -299,37 +294,33 @@ class DeactivateAccountRestServlet(RestServlet):
         self.auth_handler = hs.get_auth_handler()
         self._deactivate_account_handler = hs.get_deactivate_account_handler()
 
+    class PostBody(RequestBodyModel):
+        auth: Optional[AuthenticationData] = None
+        id_server: Optional[StrictStr] = None
+        # Not specced, see https://github.com/matrix-org/matrix-spec/issues/297
+        erase: StrictBool = False
+
     @interactive_auth_handler
     async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
-        body = parse_json_object_from_request(request)
-        erase = body.get("erase", False)
-        if not isinstance(erase, bool):
-            raise SynapseError(
-                HTTPStatus.BAD_REQUEST,
-                "Param 'erase' must be a boolean, if given",
-                Codes.BAD_JSON,
-            )
+        body = parse_and_validate_json_object_from_request(request, self.PostBody)
 
         requester = await self.auth.get_user_by_req(request)
 
         # allow ASes to deactivate their own users
         if requester.app_service:
             await self._deactivate_account_handler.deactivate_account(
-                requester.user.to_string(), erase, requester
+                requester.user.to_string(), body.erase, requester
             )
             return 200, {}
 
         await self.auth_handler.validate_user_via_ui_auth(
             requester,
             request,
-            body,
+            body.dict(),
             "deactivate your account",
         )
         result = await self._deactivate_account_handler.deactivate_account(
-            requester.user.to_string(),
-            erase,
-            requester,
-            id_server=body.get("id_server"),
+            requester.user.to_string(), body.erase, requester, id_server=body.id_server
         )
         if result:
             id_server_unbind_result = "success"
@@ -364,28 +355,15 @@ class EmailThreepidRequestTokenRestServlet(RestServlet):
                     "Adding emails have been disabled due to lack of an email config"
                 )
             raise SynapseError(
-                400, "Adding an email to your account is disabled on this server"
+                400,
+                "Adding an email to your account is disabled on this server",
             )
 
-        body = parse_json_object_from_request(request)
-        assert_params_in_dict(body, ["client_secret", "email", "send_attempt"])
-        client_secret = body["client_secret"]
-        assert_valid_client_secret(client_secret)
-
-        # Canonicalise the email address. The addresses are all stored canonicalised
-        # in the database.
-        # This ensures that the validation email is sent to the canonicalised address
-        # as it will later be entered into the database.
-        # Otherwise the email will be sent to "FOO@bar.com" and stored as
-        # "foo@bar.com" in database.
-        try:
-            email = validate_email(body["email"])
-        except ValueError as e:
-            raise SynapseError(400, str(e))
-        send_attempt = body["send_attempt"]
-        next_link = body.get("next_link")  # Optional param
+        body = parse_and_validate_json_object_from_request(
+            request, EmailRequestTokenBody
+        )
 
-        if not await check_3pid_allowed(self.hs, "email", email):
+        if not await check_3pid_allowed(self.hs, "email", body.email):
             raise SynapseError(
                 403,
                 "Your email domain is not authorized on this server",
@@ -393,14 +371,14 @@ class EmailThreepidRequestTokenRestServlet(RestServlet):
             )
 
         await self.identity_handler.ratelimit_request_token_requests(
-            request, "email", email
+            request, "email", body.email
         )
 
-        if next_link:
+        if body.next_link:
             # Raise if the provided next_link value isn't valid
-            assert_valid_next_link(self.hs, next_link)
+            assert_valid_next_link(self.hs, body.next_link)
 
-        existing_user_id = await self.store.get_user_id_by_threepid("email", email)
+        existing_user_id = await self.store.get_user_id_by_threepid("email", body.email)
 
         if existing_user_id is not None:
             if self.config.server.request_token_inhibit_3pid_errors:
@@ -419,26 +397,26 @@ class EmailThreepidRequestTokenRestServlet(RestServlet):
             # Have the configured identity server handle the request
             ret = await self.identity_handler.request_email_token(
                 self.hs.config.registration.account_threepid_delegate_email,
-                email,
-                client_secret,
-                send_attempt,
-                next_link,
+                body.email,
+                body.client_secret,
+                body.send_attempt,
+                body.next_link,
             )
         else:
             # Send threepid validation emails from Synapse
             sid = await self.identity_handler.send_threepid_validation(
-                email,
-                client_secret,
-                send_attempt,
+                body.email,
+                body.client_secret,
+                body.send_attempt,
                 self.mailer.send_add_threepid_mail,
-                next_link,
+                body.next_link,
             )
 
             # Wrap the session id in a JSON object
             ret = {"sid": sid}
 
         threepid_send_requests.labels(type="email", reason="add_threepid").observe(
-            send_attempt
+            body.send_attempt
         )
 
         return 200, ret
diff --git a/synapse/rest/client/models.py b/synapse/rest/client/models.py
new file mode 100644
index 0000000000..3150602997
--- /dev/null
+++ b/synapse/rest/client/models.py
@@ -0,0 +1,69 @@
+# Copyright 2022 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from typing import TYPE_CHECKING, Dict, Optional
+
+from pydantic import Extra, StrictInt, StrictStr, constr, validator
+
+from synapse.rest.models import RequestBodyModel
+from synapse.util.threepids import validate_email
+
+
+class AuthenticationData(RequestBodyModel):
+    """
+    Data used during user-interactive authentication.
+
+    (The name "Authentication Data" is taken directly from the spec.)
+
+    Additional keys will be present, depending on the `type` field. Use `.dict()` to
+    access them.
+    """
+
+    class Config:
+        extra = Extra.allow
+
+    session: Optional[StrictStr] = None
+    type: Optional[StrictStr] = None
+
+
+class EmailRequestTokenBody(RequestBodyModel):
+    if TYPE_CHECKING:
+        client_secret: StrictStr
+    else:
+        # See also assert_valid_client_secret()
+        client_secret: constr(
+            regex="[0-9a-zA-Z.=_-]",  # noqa: F722
+            min_length=0,
+            max_length=255,
+            strict=True,
+        )
+    email: StrictStr
+    id_server: Optional[StrictStr]
+    id_access_token: Optional[StrictStr]
+    next_link: Optional[StrictStr]
+    send_attempt: StrictInt
+
+    @validator("id_access_token", always=True)
+    def token_required_for_identity_server(
+        cls, token: Optional[str], values: Dict[str, object]
+    ) -> Optional[str]:
+        if values.get("id_server") is not None and token is None:
+            raise ValueError("id_access_token is required if an id_server is supplied.")
+        return token
+
+    # Canonicalise the email address. The addresses are all stored canonicalised
+    # in the database. This allows the user to reset his password without having to
+    # know the exact spelling (eg. upper and lower case) of address in the database.
+    # Without this, an email stored in the database as "foo@bar.com" would cause
+    # user requests for "FOO@bar.com" to raise a Not Found error.
+    _email_validator = validator("email", allow_reuse=True)(validate_email)
diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py
index 3880846e9a..a8adf00176 100644
--- a/synapse/rest/client/room.py
+++ b/synapse/rest/client/room.py
@@ -16,9 +16,12 @@
 """ This module contains REST servlets to do with rooms: /rooms/<paths> """
 import logging
 import re
+from enum import Enum
 from typing import TYPE_CHECKING, Awaitable, Dict, List, Optional, Tuple
 from urllib import parse as urlparse
 
+from prometheus_client.core import Histogram
+
 from twisted.web.server import Request
 
 from synapse import event_auth
@@ -46,6 +49,7 @@ from synapse.http.servlet import (
     parse_strings_from_args,
 )
 from synapse.http.site import SynapseRequest
+from synapse.logging.context import make_deferred_yieldable, run_in_background
 from synapse.logging.tracing import set_attribute
 from synapse.rest.client._base import client_patterns
 from synapse.rest.client.transactions import HttpTransactionCache
@@ -61,6 +65,66 @@ if TYPE_CHECKING:
 logger = logging.getLogger(__name__)
 
 
+class _RoomSize(Enum):
+    """
+    Enum to differentiate sizes of rooms. This is a pretty good approximation
+    about how hard it will be to get events in the room. We could also look at
+    room "complexity".
+    """
+
+    # This doesn't necessarily mean the room is a DM, just that there is a DM
+    # amount of people there.
+    DM_SIZE = "direct_message_size"
+    SMALL = "small"
+    SUBSTANTIAL = "substantial"
+    LARGE = "large"
+
+    @staticmethod
+    def from_member_count(member_count: int) -> "_RoomSize":
+        if member_count <= 2:
+            return _RoomSize.DM_SIZE
+        elif member_count < 100:
+            return _RoomSize.SMALL
+        elif member_count < 1000:
+            return _RoomSize.SUBSTANTIAL
+        else:
+            return _RoomSize.LARGE
+
+
+# This is an extra metric on top of `synapse_http_server_response_time_seconds`
+# which times the same sort of thing but this one allows us to see values
+# greater than 10s. We use a separate dedicated histogram with its own buckets
+# so that we don't increase the cardinality of the general one because it's
+# multiplied across hundreds of servlets.
+messsages_response_timer = Histogram(
+    "synapse_room_message_list_rest_servlet_response_time_seconds",
+    "sec",
+    # We have a label for room size so we can try to see a more realistic
+    # picture of /messages response time for bigger rooms. We don't want the
+    # tiny rooms that can always respond fast skewing our results when we're trying
+    # to optimize the bigger cases.
+    ["room_size"],
+    buckets=(
+        0.005,
+        0.01,
+        0.025,
+        0.05,
+        0.1,
+        0.25,
+        0.5,
+        1.0,
+        2.5,
+        5.0,
+        10.0,
+        30.0,
+        60.0,
+        120.0,
+        180.0,
+        "+Inf",
+    ),
+)
+
+
 class TransactionRestServlet(RestServlet):
     def __init__(self, hs: "HomeServer"):
         super().__init__()
@@ -556,6 +620,7 @@ class RoomMessageListRestServlet(RestServlet):
     def __init__(self, hs: "HomeServer"):
         super().__init__()
         self._hs = hs
+        self.clock = hs.get_clock()
         self.pagination_handler = hs.get_pagination_handler()
         self.auth = hs.get_auth()
         self.store = hs.get_datastores().main
@@ -563,6 +628,18 @@ class RoomMessageListRestServlet(RestServlet):
     async def on_GET(
         self, request: SynapseRequest, room_id: str
     ) -> Tuple[int, JsonDict]:
+        processing_start_time = self.clock.time_msec()
+        # Fire off and hope that we get a result by the end.
+        #
+        # We're using the mypy type ignore comment because the `@cached`
+        # decorator on `get_number_joined_users_in_room` doesn't play well with
+        # the type system. Maybe in the future, it can use some ParamSpec
+        # wizardry to fix it up.
+        room_member_count_deferred = run_in_background(  # type: ignore[call-arg]
+            self.store.get_number_joined_users_in_room,
+            room_id,  # type: ignore[arg-type]
+        )
+
         requester = await self.auth.get_user_by_req(request, allow_guest=True)
         pagination_config = await PaginationConfig.from_request(
             self.store, request, default_limit=10
@@ -593,6 +670,12 @@ class RoomMessageListRestServlet(RestServlet):
             event_filter=event_filter,
         )
 
+        processing_end_time = self.clock.time_msec()
+        room_member_count = await make_deferred_yieldable(room_member_count_deferred)
+        messsages_response_timer.labels(
+            room_size=_RoomSize.from_member_count(room_member_count)
+        ).observe((processing_start_time - processing_end_time) / 1000)
+
         return 200, msgs
 
 
diff --git a/synapse/rest/models.py b/synapse/rest/models.py
new file mode 100644
index 0000000000..ac39cda8e5
--- /dev/null
+++ b/synapse/rest/models.py
@@ -0,0 +1,23 @@
+from pydantic import BaseModel, Extra
+
+
+class RequestBodyModel(BaseModel):
+    """A custom version of Pydantic's BaseModel which
+
+     - ignores unknown fields and
+     - does not allow fields to be overwritten after construction,
+
+    but otherwise uses Pydantic's default behaviour.
+
+    Ignoring unknown fields is a useful default. It means that clients can provide
+    unstable field not known to the server without the request being refused outright.
+
+    Subclassing in this way is recommended by
+    https://pydantic-docs.helpmanual.io/usage/model_config/#change-behaviour-globally
+    """
+
+    class Config:
+        # By default, ignore fields that we don't recognise.
+        extra = Extra.ignore
+        # By default, don't allow fields to be reassigned after parsing.
+        allow_mutation = False
diff --git a/synapse/static/client/login/index.html b/synapse/static/client/login/index.html
index 9e6daf38ac..40510889ac 100644
--- a/synapse/static/client/login/index.html
+++ b/synapse/static/client/login/index.html
@@ -3,7 +3,8 @@
 <head>
     <meta http-equiv="Content-Type" content="text/html; charset=utf-8">
     <title> Login </title>
-    <meta name='viewport' content='width=device-width, initial-scale=1, user-scalable=no, minimum-scale=1.0, maximum-scale=1.0'>
+    <meta http-equiv="X-UA-Compatible" content="IE=edge">
+    <meta name="viewport" content="width=device-width, initial-scale=1.0">
     <link rel="stylesheet" href="style.css">
     <script src="js/jquery-3.4.1.min.js"></script>
     <script src="js/login.js"></script>
diff --git a/synapse/static/client/register/index.html b/synapse/static/client/register/index.html
index 140653574d..27bbd76f51 100644
--- a/synapse/static/client/register/index.html
+++ b/synapse/static/client/register/index.html
@@ -2,7 +2,8 @@
 <html>
 <head>
 <title> Registration </title>
-<meta name='viewport' content='width=device-width, initial-scale=1, user-scalable=no, minimum-scale=1.0, maximum-scale=1.0'> 
+<meta http-equiv="X-UA-Compatible" content="IE=edge">
+<meta name="viewport" content="width=device-width, initial-scale=1.0">
 <link rel="stylesheet" href="style.css">
 <script src="js/jquery-3.4.1.min.js"></script>
 <script src="https://www.recaptcha.net/recaptcha/api/js/recaptcha_ajax.js"></script>
diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py
index f34c067515..bb14729a9d 100644
--- a/synapse/storage/controllers/persist_events.py
+++ b/synapse/storage/controllers/persist_events.py
@@ -46,7 +46,14 @@ from synapse.api.constants import EventTypes, Membership
 from synapse.events import EventBase
 from synapse.events.snapshot import EventContext
 from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
-from synapse.logging.tracing import Link, get_active_span, start_active_span, trace
+from synapse.logging.tracing import (
+    Link,
+    SynapseTags,
+    get_active_span,
+    set_attribute,
+    start_active_span,
+    trace,
+)
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage.controllers.state import StateStorageController
 from synapse.storage.databases import Databases
@@ -383,9 +390,21 @@ class EventsPersistenceStorageController:
             PartialStateConflictError: if attempting to persist a partial state event in
                 a room that has been un-partial stated.
         """
+        event_ids: List[str] = []
         partitioned: Dict[str, List[Tuple[EventBase, EventContext]]] = {}
         for event, ctx in events_and_contexts:
             partitioned.setdefault(event.room_id, []).append((event, ctx))
+            event_ids.append(event.event_id)
+
+        set_attribute(
+            SynapseTags.FUNC_ARG_PREFIX + "event_ids",
+            str(event_ids),
+        )
+        set_attribute(
+            SynapseTags.FUNC_ARG_PREFIX + "event_ids.length",
+            str(len(event_ids)),
+        )
+        set_attribute(SynapseTags.FUNC_ARG_PREFIX + "backfilled", str(backfilled))
 
         async def enqueue(
             item: Tuple[str, List[Tuple[EventBase, EventContext]]]
diff --git a/synapse/storage/controllers/state.py b/synapse/storage/controllers/state.py
index 7b2084664c..5964835ea3 100644
--- a/synapse/storage/controllers/state.py
+++ b/synapse/storage/controllers/state.py
@@ -29,7 +29,8 @@ from typing import (
 
 from synapse.api.constants import EventTypes
 from synapse.events import EventBase
-from synapse.logging.tracing import trace
+from synapse.logging.tracing import tag_args, trace
+from synapse.storage.roommember import ProfileInfo
 from synapse.storage.state import StateFilter
 from synapse.storage.util.partial_state_events_tracker import (
     PartialCurrentStateTracker,
@@ -228,10 +229,12 @@ class StateStorageController:
         return {event: event_to_state[event] for event in event_ids}
 
     @trace
+    @tag_args
     async def get_state_ids_for_events(
         self,
         event_ids: Collection[str],
         state_filter: Optional[StateFilter] = None,
+        await_full_state: bool = True,
     ) -> Dict[str, StateMap[str]]:
         """
         Get the state dicts corresponding to a list of events, containing the event_ids
@@ -240,6 +243,9 @@ class StateStorageController:
         Args:
             event_ids: events whose state should be returned
             state_filter: The state filter used to fetch state from the database.
+            await_full_state: if `True`, will block if we do not yet have complete state
+                at these events and `state_filter` is not satisfied by partial state.
+                Defaults to `True`.
 
         Returns:
             A dict from event_id -> (type, state_key) -> event_id
@@ -248,8 +254,12 @@ class StateStorageController:
             RuntimeError if we don't have a state group for one or more of the events
                 (ie they are outliers or unknown)
         """
-        await_full_state = True
-        if state_filter and not state_filter.must_await_full_state(self._is_mine_id):
+        if (
+            await_full_state
+            and state_filter
+            and not state_filter.must_await_full_state(self._is_mine_id)
+        ):
+            # Full state is not required if the state filter is restrictive enough.
             await_full_state = False
 
         event_to_groups = await self.get_state_group_for_events(
@@ -292,7 +302,10 @@ class StateStorageController:
 
     @trace
     async def get_state_ids_for_event(
-        self, event_id: str, state_filter: Optional[StateFilter] = None
+        self,
+        event_id: str,
+        state_filter: Optional[StateFilter] = None,
+        await_full_state: bool = True,
     ) -> StateMap[str]:
         """
         Get the state dict corresponding to a particular event
@@ -300,6 +313,9 @@ class StateStorageController:
         Args:
             event_id: event whose state should be returned
             state_filter: The state filter used to fetch state from the database.
+            await_full_state: if `True`, will block if we do not yet have complete state
+                at the event and `state_filter` is not satisfied by partial state.
+                Defaults to `True`.
 
         Returns:
             A dict from (type, state_key) -> state_event_id
@@ -309,7 +325,9 @@ class StateStorageController:
                 outlier or is unknown)
         """
         state_map = await self.get_state_ids_for_events(
-            [event_id], state_filter or StateFilter.all()
+            [event_id],
+            state_filter or StateFilter.all(),
+            await_full_state=await_full_state,
         )
         return state_map[event_id]
 
@@ -332,6 +350,7 @@ class StateStorageController:
         )
 
     @trace
+    @tag_args
     async def get_state_group_for_events(
         self,
         event_ids: Collection[str],
@@ -473,6 +492,7 @@ class StateStorageController:
             prev_stream_id, max_stream_id
         )
 
+    @trace
     async def get_current_state(
         self, room_id: str, state_filter: Optional[StateFilter] = None
     ) -> StateMap[EventBase]:
@@ -506,3 +526,15 @@ class StateStorageController:
         await self._partial_state_room_tracker.await_full_state(room_id)
 
         return await self.stores.main.get_current_hosts_in_room(room_id)
+
+    async def get_users_in_room_with_profiles(
+        self, room_id: str
+    ) -> Dict[str, ProfileInfo]:
+        """
+        Get the current users in the room with their profiles.
+        If the room is currently partial-stated, this will block until the room has
+        full state.
+        """
+        await self._partial_state_room_tracker.await_full_state(room_id)
+
+        return await self.stores.main.get_users_in_room_with_profiles(room_id)
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index eec55b6478..41b015dba1 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -33,6 +33,7 @@ from synapse.api.constants import MAX_DEPTH, EventTypes
 from synapse.api.errors import StoreError
 from synapse.api.room_versions import EventFormatVersions, RoomVersion
 from synapse.events import EventBase, make_event_from_dict
+from synapse.logging.tracing import tag_args, trace
 from synapse.metrics.background_process_metrics import wrap_as_background_process
 from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
 from synapse.storage.database import (
@@ -126,6 +127,8 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
         )
         return await self.get_events_as_list(event_ids)
 
+    @trace
+    @tag_args
     async def get_auth_chain_ids(
         self,
         room_id: str,
@@ -709,6 +712,8 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
         # Return all events where not all sets can reach them.
         return {eid for eid, n in event_to_missing_sets.items() if n}
 
+    @trace
+    @tag_args
     async def get_oldest_event_ids_with_depth_in_room(
         self, room_id: str
     ) -> List[Tuple[str, int]]:
@@ -767,6 +772,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
             room_id,
         )
 
+    @trace
     async def get_insertion_event_backward_extremities_in_room(
         self, room_id: str
     ) -> List[Tuple[str, int]]:
@@ -1339,6 +1345,8 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
         event_results.reverse()
         return event_results
 
+    @trace
+    @tag_args
     async def get_successor_events(self, event_id: str) -> List[str]:
         """Fetch all events that have the given event as a prev event
 
@@ -1375,6 +1383,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
             _delete_old_forward_extrem_cache_txn,
         )
 
+    @trace
     async def insert_insertion_extremity(self, event_id: str, room_id: str) -> None:
         await self.db_pool.simple_upsert(
             table="insertion_event_extremities",
diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index 161aad0f89..eabf9c9739 100644
--- a/synapse/storage/databases/main/event_push_actions.py
+++ b/synapse/storage/databases/main/event_push_actions.py
@@ -74,7 +74,17 @@ receipt.
 """
 
 import logging
-from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Union, cast
+from typing import (
+    TYPE_CHECKING,
+    Collection,
+    Dict,
+    List,
+    Mapping,
+    Optional,
+    Tuple,
+    Union,
+    cast,
+)
 
 import attr
 
@@ -154,7 +164,9 @@ class NotifCounts:
     highlight_count: int = 0
 
 
-def _serialize_action(actions: List[Union[dict, str]], is_highlight: bool) -> str:
+def _serialize_action(
+    actions: Collection[Union[Mapping, str]], is_highlight: bool
+) -> str:
     """Custom serializer for actions. This allows us to "compress" common actions.
 
     We use the fact that most users have the same actions for notifs (and for
@@ -227,7 +239,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
         user_id: str,
     ) -> NotifCounts:
         """Get the notification count, the highlight count and the unread message count
-        for a given user in a given room after the given read receipt.
+        for a given user in a given room after their latest read receipt.
 
         Note that this function assumes the user to be a current member of the room,
         since it's either called by the sync handler to handle joined room entries, or by
@@ -238,9 +250,8 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
             user_id: The user to retrieve the counts for.
 
         Returns
-            A dict containing the counts mentioned earlier in this docstring,
-            respectively under the keys "notify_count", "highlight_count" and
-            "unread_count".
+            A NotifCounts object containing the notification count, the highlight count
+            and the unread message count.
         """
         return await self.db_pool.runInteraction(
             "get_unread_event_push_actions_by_room",
@@ -255,6 +266,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
         room_id: str,
         user_id: str,
     ) -> NotifCounts:
+        # Get the stream ordering of the user's latest receipt in the room.
         result = self.get_last_receipt_for_user_txn(
             txn,
             user_id,
@@ -266,13 +278,11 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
             ),
         )
 
-        stream_ordering = None
         if result:
             _, stream_ordering = result
 
-        if stream_ordering is None:
-            # Either last_read_event_id is None, or it's an event we don't have (e.g.
-            # because it's been purged), in which case retrieve the stream ordering for
+        else:
+            # If the user has no receipts in the room, retrieve the stream ordering for
             # the latest membership event from this user in this room (which we assume is
             # a join).
             event_id = self.db_pool.simple_select_one_onecol_txn(
@@ -289,10 +299,26 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
         )
 
     def _get_unread_counts_by_pos_txn(
-        self, txn: LoggingTransaction, room_id: str, user_id: str, stream_ordering: int
+        self,
+        txn: LoggingTransaction,
+        room_id: str,
+        user_id: str,
+        receipt_stream_ordering: int,
     ) -> NotifCounts:
         """Get the number of unread messages for a user/room that have happened
         since the given stream ordering.
+
+        Args:
+            txn: The database transaction.
+            room_id: The room ID to get unread counts for.
+            user_id: The user ID to get unread counts for.
+            receipt_stream_ordering: The stream ordering of the user's latest
+                receipt in the room. If there are no receipts, the stream ordering
+                of the user's join event.
+
+        Returns
+            A NotifCounts object containing the notification count, the highlight count
+            and the unread message count.
         """
 
         counts = NotifCounts()
@@ -320,7 +346,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
                     OR last_receipt_stream_ordering = ?
                 )
             """,
-            (room_id, user_id, stream_ordering, stream_ordering),
+            (room_id, user_id, receipt_stream_ordering, receipt_stream_ordering),
         )
         row = txn.fetchone()
 
@@ -338,17 +364,20 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
                 AND stream_ordering > ?
                 AND highlight = 1
         """
-        txn.execute(sql, (user_id, room_id, stream_ordering))
+        txn.execute(sql, (user_id, room_id, receipt_stream_ordering))
         row = txn.fetchone()
         if row:
             counts.highlight_count += row[0]
 
         # Finally we need to count push actions that aren't included in the
-        # summary returned above, e.g. recent events that haven't been
-        # summarised yet, or the summary is empty due to a recent read receipt.
-        stream_ordering = max(stream_ordering, summary_stream_ordering)
+        # summary returned above. This might be due to recent events that haven't
+        # been summarised yet or the summary is out of date due to a recent read
+        # receipt.
+        start_unread_stream_ordering = max(
+            receipt_stream_ordering, summary_stream_ordering
+        )
         notify_count, unread_count = self._get_notif_unread_count_for_user_room(
-            txn, room_id, user_id, stream_ordering
+            txn, room_id, user_id, start_unread_stream_ordering
         )
 
         counts.notify_count += notify_count
@@ -733,7 +762,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
     async def add_push_actions_to_staging(
         self,
         event_id: str,
-        user_id_actions: Dict[str, List[Union[dict, str]]],
+        user_id_actions: Dict[str, Collection[Union[Mapping, str]]],
         count_as_unread: bool,
     ) -> None:
         """Add the push actions for the event to the push action staging area.
@@ -750,7 +779,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
         # This is a helper function for generating the necessary tuple that
         # can be used to insert into the `event_push_actions_staging` table.
         def _gen_entry(
-            user_id: str, actions: List[Union[dict, str]]
+            user_id: str, actions: Collection[Union[Mapping, str]]
         ) -> Tuple[str, str, str, int, int, int]:
             is_highlight = 1 if _action_has_highlight(actions) else 0
             notif = 1 if "notify" in actions else 0
@@ -1151,8 +1180,6 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
             txn: The database transaction.
             old_rotate_stream_ordering: The previous maximum event stream ordering.
             rotate_to_stream_ordering: The new maximum event stream ordering to summarise.
-
-        Returns whether the archiving process has caught up or not.
         """
 
         # Calculate the new counts that should be upserted into event_push_summary
@@ -1238,9 +1265,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
             (rotate_to_stream_ordering,),
         )
 
-    async def _remove_old_push_actions_that_have_rotated(
-        self,
-    ) -> None:
+    async def _remove_old_push_actions_that_have_rotated(self) -> None:
         """Clear out old push actions that have been summarised."""
 
         # We want to clear out anything that is older than a day that *has* already
@@ -1397,7 +1422,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
         ]
 
 
-def _action_has_highlight(actions: List[Union[dict, str]]) -> bool:
+def _action_has_highlight(actions: Collection[Union[Mapping, str]]) -> bool:
     for action in actions:
         if not isinstance(action, dict):
             continue
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 5560b38a48..1c3b804da0 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -40,6 +40,7 @@ from synapse.api.errors import Codes, SynapseError
 from synapse.api.room_versions import RoomVersions
 from synapse.events import EventBase, relation_from_event
 from synapse.events.snapshot import EventContext
+from synapse.logging.tracing import trace
 from synapse.storage._base import db_to_json, make_in_list_sql_clause
 from synapse.storage.database import (
     DatabasePool,
@@ -145,6 +146,7 @@ class PersistEventsStore:
         self._backfill_id_gen: AbstractStreamIdGenerator = self.store._backfill_id_gen
         self._stream_id_gen: AbstractStreamIdGenerator = self.store._stream_id_gen
 
+    @trace
     async def _persist_events_and_state_updates(
         self,
         events_and_contexts: List[Tuple[EventBase, EventContext]],
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index e9ff6cfb34..90e6d82058 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -54,6 +54,7 @@ from synapse.logging.context import (
     current_context,
     make_deferred_yieldable,
 )
+from synapse.logging.tracing import start_active_span, tag_args, trace
 from synapse.metrics.background_process_metrics import (
     run_as_background_process,
     wrap_as_background_process,
@@ -430,6 +431,8 @@ class EventsWorkerStore(SQLBaseStore):
 
         return {e.event_id: e for e in events}
 
+    @trace
+    @tag_args
     async def get_events_as_list(
         self,
         event_ids: Collection[str],
@@ -1090,23 +1093,42 @@ class EventsWorkerStore(SQLBaseStore):
         """
         fetched_event_ids: Set[str] = set()
         fetched_events: Dict[str, _EventRow] = {}
-        events_to_fetch = event_ids
 
-        while events_to_fetch:
-            row_map = await self._enqueue_events(events_to_fetch)
+        async def _fetch_event_ids_and_get_outstanding_redactions(
+            event_ids_to_fetch: Collection[str],
+        ) -> Collection[str]:
+            """
+            Fetch all of the given event_ids and return any associated redaction event_ids
+            that we still need to fetch in the next iteration.
+            """
+            row_map = await self._enqueue_events(event_ids_to_fetch)
 
             # we need to recursively fetch any redactions of those events
             redaction_ids: Set[str] = set()
-            for event_id in events_to_fetch:
+            for event_id in event_ids_to_fetch:
                 row = row_map.get(event_id)
                 fetched_event_ids.add(event_id)
                 if row:
                     fetched_events[event_id] = row
                     redaction_ids.update(row.redactions)
 
-            events_to_fetch = redaction_ids.difference(fetched_event_ids)
-            if events_to_fetch:
-                logger.debug("Also fetching redaction events %s", events_to_fetch)
+            event_ids_to_fetch = redaction_ids.difference(fetched_event_ids)
+            return event_ids_to_fetch
+
+        # Grab the initial list of events requested
+        event_ids_to_fetch = await _fetch_event_ids_and_get_outstanding_redactions(
+            event_ids
+        )
+        # Then go and recursively find all of the associated redactions
+        with start_active_span("recursively fetching redactions"):
+            while event_ids_to_fetch:
+                logger.debug("Also fetching redaction events %s", event_ids_to_fetch)
+
+                event_ids_to_fetch = (
+                    await _fetch_event_ids_and_get_outstanding_redactions(
+                        event_ids_to_fetch
+                    )
+                )
 
         # build a map from event_id to EventBase
         event_map: Dict[str, EventBase] = {}
@@ -1424,6 +1446,8 @@ class EventsWorkerStore(SQLBaseStore):
 
         return {r["event_id"] for r in rows}
 
+    @trace
+    @tag_args
     async def have_seen_events(
         self, room_id: str, event_ids: Iterable[str]
     ) -> Set[str]:
@@ -2200,3 +2224,63 @@ class EventsWorkerStore(SQLBaseStore):
             (room_id,),
         )
         return [row[0] for row in txn]
+
+    def mark_event_rejected_txn(
+        self,
+        txn: LoggingTransaction,
+        event_id: str,
+        rejection_reason: Optional[str],
+    ) -> None:
+        """Mark an event that was previously accepted as rejected, or vice versa
+
+        This can happen, for example, when resyncing state during a faster join.
+
+        Args:
+            txn:
+            event_id: ID of event to update
+            rejection_reason: reason it has been rejected, or None if it is now accepted
+        """
+        if rejection_reason is None:
+            logger.info(
+                "Marking previously-processed event %s as accepted",
+                event_id,
+            )
+            self.db_pool.simple_delete_txn(
+                txn,
+                "rejections",
+                keyvalues={"event_id": event_id},
+            )
+        else:
+            logger.info(
+                "Marking previously-processed event %s as rejected(%s)",
+                event_id,
+                rejection_reason,
+            )
+            self.db_pool.simple_upsert_txn(
+                txn,
+                table="rejections",
+                keyvalues={"event_id": event_id},
+                values={
+                    "reason": rejection_reason,
+                    "last_check": self._clock.time_msec(),
+                },
+            )
+        self.db_pool.simple_update_txn(
+            txn,
+            table="events",
+            keyvalues={"event_id": event_id},
+            updatevalues={"rejection_reason": rejection_reason},
+        )
+
+        self.invalidate_get_event_cache_after_txn(txn, event_id)
+
+        # TODO(faster_joins): invalidate the cache on workers. Ideally we'd just
+        #   call '_send_invalidation_to_replication', but we actually need the other
+        #   end to call _invalidate_local_get_event_cache() rather than (just)
+        #   _get_event_cache.invalidate().
+        #
+        #   One solution might be to (somehow) get the workers to call
+        #   _invalidate_caches_for_event() (though that will invalidate more than
+        #   strictly necessary).
+        #
+        #   https://github.com/matrix-org/synapse/issues/12994
diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py
index 768f95d16c..255620f996 100644
--- a/synapse/storage/databases/main/push_rule.py
+++ b/synapse/storage/databases/main/push_rule.py
@@ -14,11 +14,23 @@
 # limitations under the License.
 import abc
 import logging
-from typing import TYPE_CHECKING, Collection, Dict, List, Optional, Tuple, Union, cast
+from typing import (
+    TYPE_CHECKING,
+    Any,
+    Collection,
+    Dict,
+    List,
+    Mapping,
+    Optional,
+    Sequence,
+    Tuple,
+    Union,
+    cast,
+)
 
 from synapse.api.errors import StoreError
 from synapse.config.homeserver import ExperimentalConfig
-from synapse.push.baserules import list_with_base_rules
+from synapse.push.baserules import FilteredPushRules, PushRule, compile_push_rules
 from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
 from synapse.storage._base import SQLBaseStore, db_to_json
 from synapse.storage.database import (
@@ -50,60 +62,30 @@ if TYPE_CHECKING:
 logger = logging.getLogger(__name__)
 
 
-def _is_experimental_rule_enabled(
-    rule_id: str, experimental_config: ExperimentalConfig
-) -> bool:
-    """Used by `_load_rules` to filter out experimental rules when they
-    have not been enabled.
-    """
-    if (
-        rule_id == "global/override/.org.matrix.msc3786.rule.room.server_acl"
-        and not experimental_config.msc3786_enabled
-    ):
-        return False
-    if (
-        rule_id == "global/underride/.org.matrix.msc3772.thread_reply"
-        and not experimental_config.msc3772_enabled
-    ):
-        return False
-    return True
-
-
 def _load_rules(
     rawrules: List[JsonDict],
     enabled_map: Dict[str, bool],
     experimental_config: ExperimentalConfig,
-) -> List[JsonDict]:
-    ruleslist = []
-    for rawrule in rawrules:
-        rule = dict(rawrule)
-        rule["conditions"] = db_to_json(rawrule["conditions"])
-        rule["actions"] = db_to_json(rawrule["actions"])
-        rule["default"] = False
-        ruleslist.append(rule)
-
-    # We're going to be mutating this a lot, so copy it. We also filter out
-    # any experimental default push rules that aren't enabled.
-    rules = [
-        rule
-        for rule in list_with_base_rules(ruleslist)
-        if _is_experimental_rule_enabled(rule["rule_id"], experimental_config)
-    ]
+) -> FilteredPushRules:
+    """Take the DB rows returned from the DB and convert them into a full
+    `FilteredPushRules` object.
+    """
 
-    for i, rule in enumerate(rules):
-        rule_id = rule["rule_id"]
+    ruleslist = [
+        PushRule(
+            rule_id=rawrule["rule_id"],
+            priority_class=rawrule["priority_class"],
+            conditions=db_to_json(rawrule["conditions"]),
+            actions=db_to_json(rawrule["actions"]),
+        )
+        for rawrule in rawrules
+    ]
 
-        if rule_id not in enabled_map:
-            continue
-        if rule.get("enabled", True) == bool(enabled_map[rule_id]):
-            continue
+    push_rules = compile_push_rules(ruleslist)
 
-        # Rules are cached across users.
-        rule = dict(rule)
-        rule["enabled"] = bool(enabled_map[rule_id])
-        rules[i] = rule
+    filtered_rules = FilteredPushRules(push_rules, enabled_map, experimental_config)
 
-    return rules
+    return filtered_rules
 
 
 # The ABCMeta metaclass ensures that it cannot be instantiated without
@@ -162,7 +144,7 @@ class PushRulesWorkerStore(
         raise NotImplementedError()
 
     @cached(max_entries=5000)
-    async def get_push_rules_for_user(self, user_id: str) -> List[JsonDict]:
+    async def get_push_rules_for_user(self, user_id: str) -> FilteredPushRules:
         rows = await self.db_pool.simple_select_list(
             table="push_rules",
             keyvalues={"user_name": user_id},
@@ -216,11 +198,11 @@ class PushRulesWorkerStore(
     @cachedList(cached_method_name="get_push_rules_for_user", list_name="user_ids")
     async def bulk_get_push_rules(
         self, user_ids: Collection[str]
-    ) -> Dict[str, List[JsonDict]]:
+    ) -> Dict[str, FilteredPushRules]:
         if not user_ids:
             return {}
 
-        results: Dict[str, List[JsonDict]] = {user_id: [] for user_id in user_ids}
+        raw_rules: Dict[str, List[JsonDict]] = {user_id: [] for user_id in user_ids}
 
         rows = await self.db_pool.simple_select_many_batch(
             table="push_rules",
@@ -234,11 +216,13 @@ class PushRulesWorkerStore(
         rows.sort(key=lambda row: (-int(row["priority_class"]), -int(row["priority"])))
 
         for row in rows:
-            results.setdefault(row["user_name"], []).append(row)
+            raw_rules.setdefault(row["user_name"], []).append(row)
 
         enabled_map_by_user = await self.bulk_get_push_rules_enabled(user_ids)
 
-        for user_id, rules in results.items():
+        results: Dict[str, FilteredPushRules] = {}
+
+        for user_id, rules in raw_rules.items():
             results[user_id] = _load_rules(
                 rules, enabled_map_by_user.get(user_id, {}), self.hs.config.experimental
             )
@@ -345,8 +329,8 @@ class PushRuleStore(PushRulesWorkerStore):
         user_id: str,
         rule_id: str,
         priority_class: int,
-        conditions: List[Dict[str, str]],
-        actions: List[Union[JsonDict, str]],
+        conditions: Sequence[Mapping[str, str]],
+        actions: Sequence[Union[Mapping[str, Any], str]],
         before: Optional[str] = None,
         after: Optional[str] = None,
     ) -> None:
@@ -817,7 +801,7 @@ class PushRuleStore(PushRulesWorkerStore):
         return self._push_rules_stream_id_gen.get_current_token()
 
     async def copy_push_rule_from_room_to_room(
-        self, new_room_id: str, user_id: str, rule: dict
+        self, new_room_id: str, user_id: str, rule: PushRule
     ) -> None:
         """Copy a single push rule from one room to another for a specific user.
 
@@ -827,21 +811,27 @@ class PushRuleStore(PushRulesWorkerStore):
             rule: A push rule.
         """
         # Create new rule id
-        rule_id_scope = "/".join(rule["rule_id"].split("/")[:-1])
+        rule_id_scope = "/".join(rule.rule_id.split("/")[:-1])
         new_rule_id = rule_id_scope + "/" + new_room_id
 
+        new_conditions = []
+
         # Change room id in each condition
-        for condition in rule.get("conditions", []):
+        for condition in rule.conditions:
+            new_condition = condition
             if condition.get("key") == "room_id":
-                condition["pattern"] = new_room_id
+                new_condition = dict(condition)
+                new_condition["pattern"] = new_room_id
+
+            new_conditions.append(new_condition)
 
         # Add the rule for the new room
         await self.add_push_rule(
             user_id=user_id,
             rule_id=new_rule_id,
-            priority_class=rule["priority_class"],
-            conditions=rule["conditions"],
-            actions=rule["actions"],
+            priority_class=rule.priority_class,
+            conditions=new_conditions,
+            actions=rule.actions,
         )
 
     async def copy_push_rules_from_room_to_room_for_user(
@@ -859,8 +849,11 @@ class PushRuleStore(PushRulesWorkerStore):
         user_push_rules = await self.get_push_rules_for_user(user_id)
 
         # Get rules relating to the old room and copy them to the new room
-        for rule in user_push_rules:
-            conditions = rule.get("conditions", [])
+        for rule, enabled in user_push_rules:
+            if not enabled:
+                continue
+
+            conditions = rule.conditions
             if any(
                 (c.get("key") == "room_id" and c.get("pattern") == old_room_id)
                 for c in conditions
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 0090c9f225..124c70ad37 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -161,7 +161,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
             receipt_type: The receipt types to fetch.
 
         Returns:
-            The latest receipt, if one exists.
+            The event ID and stream ordering of the latest receipt, if one exists.
         """
 
         clause, args = make_in_list_sql_clause(
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 0f1f0d11ea..b7d4baa6bb 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -2001,9 +2001,15 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
 
             where_clause = "WHERE " + " AND ".join(filters) if len(filters) > 0 else ""
 
+            # We join on room_stats_state despite not using any columns from it
+            # because the join can influence the number of rows returned;
+            # e.g. a room that doesn't have state, maybe because it was deleted.
+            # The query returning the total count should be consistent with
+            # the query returning the results.
             sql = """
                 SELECT COUNT(*) as total_event_reports
                 FROM event_reports AS er
+                JOIN room_stats_state ON room_stats_state.room_id = er.room_id
                 {}
                 """.format(
                 where_clause
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index 93ff4816c8..827c1f1efd 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -283,6 +283,9 @@ class RoomMemberWorkerStore(EventsWorkerStore):
 
         Returns:
             A mapping from user ID to ProfileInfo.
+
+        Preconditions:
+          - There is full state available for the room (it is not partial-stated).
         """
 
         def _get_users_in_room_with_profiles(
@@ -1212,6 +1215,30 @@ class RoomMemberWorkerStore(EventsWorkerStore):
             "get_forgotten_rooms_for_user", _get_forgotten_rooms_for_user_txn
         )
 
+    async def is_locally_forgotten_room(self, room_id: str) -> bool:
+        """Returns whether all local users have forgotten this room_id.
+
+        Args:
+            room_id: The room ID to query.
+
+        Returns:
+            Whether the room is forgotten.
+        """
+
+        sql = """
+            SELECT count(*) > 0 FROM local_current_membership
+            INNER JOIN room_memberships USING (room_id, event_id)
+            WHERE
+                room_id = ?
+                AND forgotten = 0;
+        """
+
+        rows = await self.db_pool.execute("is_forgotten_room", None, sql, room_id)
+
+        # `count(*)` returns always an integer
+        # If any rows still exist it means someone has not forgotten this room yet
+        return not rows[0][0]
+
     async def get_rooms_user_has_been_in(self, user_id: str) -> Set[str]:
         """Get all rooms that the user has ever been in.
 
diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py
index f70705a0af..0b10af0e58 100644
--- a/synapse/storage/databases/main/state.py
+++ b/synapse/storage/databases/main/state.py
@@ -430,6 +430,11 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
             updatevalues={"state_group": state_group},
         )
 
+        # the event may now be rejected where it was not before, or vice versa,
+        # in which case we need to update the rejected flags.
+        if bool(context.rejected) != (event.rejected_reason is not None):
+            self.mark_event_rejected_txn(txn, event.event_id, context.rejected)
+
         self.db_pool.simple_delete_one_txn(
             txn,
             table="partial_state_events",
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index af3bab2c15..0004d955b4 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -539,15 +539,6 @@ class StateFilter:
             is_mine_id: a callable which confirms if a given state_key matches a mxid
                of a local user
         """
-
-        # TODO(faster_joins): it's not entirely clear that this is safe. In particular,
-        #  there may be circumstances in which we return a piece of state that, once we
-        #  resync the state, we discover is invalid. For example: if it turns out that
-        #  the sender of a piece of state wasn't actually in the room, then clearly that
-        #  state shouldn't have been returned.
-        #  We should at least add some tests around this to see what happens.
-        #  https://github.com/matrix-org/synapse/issues/13006
-
         # if we haven't requested membership events, then it depends on the value of
         # 'include_others'
         if EventTypes.Member not in self.types:
diff --git a/synapse/storage/util/partial_state_events_tracker.py b/synapse/storage/util/partial_state_events_tracker.py
index 466e5137f2..07af89ee31 100644
--- a/synapse/storage/util/partial_state_events_tracker.py
+++ b/synapse/storage/util/partial_state_events_tracker.py
@@ -20,6 +20,7 @@ from twisted.internet import defer
 from twisted.internet.defer import Deferred
 
 from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
+from synapse.logging.tracing import trace_with_opname
 from synapse.storage.databases.main.events_worker import EventsWorkerStore
 from synapse.storage.databases.main.room import RoomWorkerStore
 from synapse.util import unwrapFirstError
@@ -58,6 +59,7 @@ class PartialStateEventsTracker:
             for o in observers:
                 o.callback(None)
 
+    @trace_with_opname("PartialStateEventsTracker.await_full_state")
     async def await_full_state(self, event_ids: Collection[str]) -> None:
         """Wait for all the given events to have full state.
 
@@ -151,6 +153,7 @@ class PartialCurrentStateTracker:
             for o in observers:
                 o.callback(None)
 
+    @trace_with_opname("PartialCurrentStateTracker.await_full_state")
     async def await_full_state(self, room_id: str) -> None:
         # We add the deferred immediately so that the DB call to check for
         # partial state doesn't race when we unpartial the room.
diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py
index 6394cc39ac..603570c10f 100644
--- a/synapse/util/ratelimitutils.py
+++ b/synapse/util/ratelimitutils.py
@@ -18,6 +18,8 @@ import logging
 import typing
 from typing import Any, DefaultDict, Iterator, List, Set
 
+from prometheus_client.core import Counter
+
 from twisted.internet import defer
 
 from synapse.api.errors import LimitExceededError
@@ -27,6 +29,8 @@ from synapse.logging.context import (
     make_deferred_yieldable,
     run_in_background,
 )
+from synapse.logging.tracing import start_active_span
+from synapse.metrics import Histogram, LaterGauge
 from synapse.util import Clock
 
 if typing.TYPE_CHECKING:
@@ -35,6 +39,32 @@ if typing.TYPE_CHECKING:
 logger = logging.getLogger(__name__)
 
 
+# Track how much the ratelimiter is affecting requests
+rate_limit_sleep_counter = Counter("synapse_rate_limit_sleep", "")
+rate_limit_reject_counter = Counter("synapse_rate_limit_reject", "")
+queue_wait_timer = Histogram(
+    "synapse_rate_limit_queue_wait_time_seconds",
+    "sec",
+    [],
+    buckets=(
+        0.005,
+        0.01,
+        0.025,
+        0.05,
+        0.1,
+        0.25,
+        0.5,
+        0.75,
+        1.0,
+        2.5,
+        5.0,
+        10.0,
+        20.0,
+        "+Inf",
+    ),
+)
+
+
 class FederationRateLimiter:
     def __init__(self, clock: Clock, config: FederationRatelimitSettings):
         def new_limiter() -> "_PerHostRatelimiter":
@@ -44,6 +74,27 @@ class FederationRateLimiter:
             str, "_PerHostRatelimiter"
         ] = collections.defaultdict(new_limiter)
 
+        # We track the number of affected hosts per time-period so we can
+        # differentiate one really noisy homeserver from a general
+        # ratelimit tuning problem across the federation.
+        LaterGauge(
+            "synapse_rate_limit_sleep_affected_hosts",
+            "Number of hosts that had requests put to sleep",
+            [],
+            lambda: sum(
+                ratelimiter.should_sleep() for ratelimiter in self.ratelimiters.values()
+            ),
+        )
+        LaterGauge(
+            "synapse_rate_limit_reject_affected_hosts",
+            "Number of hosts that had requests rejected",
+            [],
+            lambda: sum(
+                ratelimiter.should_reject()
+                for ratelimiter in self.ratelimiters.values()
+            ),
+        )
+
     def ratelimit(self, host: str) -> "_GeneratorContextManager[defer.Deferred[None]]":
         """Used to ratelimit an incoming request from a given host
 
@@ -59,7 +110,7 @@ class FederationRateLimiter:
         Returns:
             context manager which returns a deferred.
         """
-        return self.ratelimiters[host].ratelimit()
+        return self.ratelimiters[host].ratelimit(host)
 
 
 class _PerHostRatelimiter:
@@ -94,19 +145,42 @@ class _PerHostRatelimiter:
         self.request_times: List[int] = []
 
     @contextlib.contextmanager
-    def ratelimit(self) -> "Iterator[defer.Deferred[None]]":
+    def ratelimit(self, host: str) -> "Iterator[defer.Deferred[None]]":
         # `contextlib.contextmanager` takes a generator and turns it into a
         # context manager. The generator should only yield once with a value
         # to be returned by manager.
         # Exceptions will be reraised at the yield.
 
+        self.host = host
+
         request_id = object()
-        ret = self._on_enter(request_id)
+        # Ideally we'd use `Deferred.fromCoroutine()` here, to save on redundant
+        # type-checking, but we'd need Twisted >= 21.2.
+        ret = defer.ensureDeferred(self._on_enter_with_tracing(request_id))
         try:
             yield ret
         finally:
             self._on_exit(request_id)
 
+    def should_reject(self) -> bool:
+        """
+        Whether to reject the request if we already have too many queued up
+        (either sleeping or in the ready queue).
+        """
+        queue_size = len(self.ready_request_queue) + len(self.sleeping_requests)
+        return queue_size > self.reject_limit
+
+    def should_sleep(self) -> bool:
+        """
+        Whether to sleep the request if we already have too many requests coming
+        through within the window.
+        """
+        return len(self.request_times) > self.sleep_limit
+
+    async def _on_enter_with_tracing(self, request_id: object) -> None:
+        with start_active_span("ratelimit wait"), queue_wait_timer.time():
+            await self._on_enter(request_id)
+
     def _on_enter(self, request_id: object) -> "defer.Deferred[None]":
         time_now = self.clock.time_msec()
 
@@ -117,8 +191,9 @@ class _PerHostRatelimiter:
 
         # reject the request if we already have too many queued up (either
         # sleeping or in the ready queue).
-        queue_size = len(self.ready_request_queue) + len(self.sleeping_requests)
-        if queue_size > self.reject_limit:
+        if self.should_reject():
+            logger.debug("Ratelimiter(%s): rejecting request", self.host)
+            rate_limit_reject_counter.inc()
             raise LimitExceededError(
                 retry_after_ms=int(self.window_size / self.sleep_limit)
             )
@@ -130,7 +205,8 @@ class _PerHostRatelimiter:
                 queue_defer: defer.Deferred[None] = defer.Deferred()
                 self.ready_request_queue[request_id] = queue_defer
                 logger.info(
-                    "Ratelimiter: queueing request (queue now %i items)",
+                    "Ratelimiter(%s): queueing request (queue now %i items)",
+                    self.host,
                     len(self.ready_request_queue),
                 )
 
@@ -139,19 +215,28 @@ class _PerHostRatelimiter:
                 return defer.succeed(None)
 
         logger.debug(
-            "Ratelimit [%s]: len(self.request_times)=%d",
+            "Ratelimit(%s) [%s]: len(self.request_times)=%d",
+            self.host,
             id(request_id),
             len(self.request_times),
         )
 
-        if len(self.request_times) > self.sleep_limit:
-            logger.debug("Ratelimiter: sleeping request for %f sec", self.sleep_sec)
+        if self.should_sleep():
+            logger.debug(
+                "Ratelimiter(%s) [%s]: sleeping request for %f sec",
+                self.host,
+                id(request_id),
+                self.sleep_sec,
+            )
+            rate_limit_sleep_counter.inc()
             ret_defer = run_in_background(self.clock.sleep, self.sleep_sec)
 
             self.sleeping_requests.add(request_id)
 
             def on_wait_finished(_: Any) -> "defer.Deferred[None]":
-                logger.debug("Ratelimit [%s]: Finished sleeping", id(request_id))
+                logger.debug(
+                    "Ratelimit(%s) [%s]: Finished sleeping", self.host, id(request_id)
+                )
                 self.sleeping_requests.discard(request_id)
                 queue_defer = queue_request()
                 return queue_defer
@@ -161,7 +246,9 @@ class _PerHostRatelimiter:
             ret_defer = queue_request()
 
         def on_start(r: object) -> object:
-            logger.debug("Ratelimit [%s]: Processing req", id(request_id))
+            logger.debug(
+                "Ratelimit(%s) [%s]: Processing req", self.host, id(request_id)
+            )
             self.current_processing.add(request_id)
             return r
 
@@ -183,7 +270,7 @@ class _PerHostRatelimiter:
         return make_deferred_yieldable(ret_defer)
 
     def _on_exit(self, request_id: object) -> None:
-        logger.debug("Ratelimit [%s]: Processed req", id(request_id))
+        logger.debug("Ratelimit(%s) [%s]: Processed req", self.host, id(request_id))
         self.current_processing.discard(request_id)
         try:
             # start processing the next item on the queue.
diff --git a/synapse/visibility.py b/synapse/visibility.py
index 813fe1a155..342d60a921 100644
--- a/synapse/visibility.py
+++ b/synapse/visibility.py
@@ -73,8 +73,8 @@ async def filter_events_for_client(
           * the user is not currently a member of the room, and:
           * the user has not been a member of the room since the given
             events
-        always_include_ids: set of event ids to specifically
-            include (unless sender is ignored)
+        always_include_ids: set of event ids to specifically include, if present
+            in events (unless sender is ignored)
         filter_send_to_client: Whether we're checking an event that's going to be
             sent to a client. This might not always be the case since this function can
             also be called to check whether a user can see the state at a given point.