summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rwxr-xr-xsynapse/_scripts/update_synapse_database.py14
-rw-r--r--synapse/config/server.py13
-rw-r--r--synapse/events/builder.py1
-rw-r--r--synapse/federation/federation_client.py3
-rw-r--r--synapse/handlers/auth.py34
-rw-r--r--synapse/handlers/e2e_keys.py21
-rw-r--r--synapse/handlers/federation_event.py10
-rw-r--r--synapse/handlers/push_rules.py5
-rw-r--r--synapse/http/site.py14
-rw-r--r--synapse/module_api/__init__.py25
-rw-r--r--synapse/push/baserules.py583
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py7
-rw-r--r--synapse/push/clientformat.py5
-rw-r--r--synapse/rest/admin/__init__.py2
-rw-r--r--synapse/rest/admin/users.py27
-rw-r--r--synapse/rest/client/account.py65
-rw-r--r--synapse/rest/client/models.py28
-rw-r--r--synapse/rest/media/v1/media_repository.py6
-rw-r--r--synapse/rest/media/v1/upload_resource.py6
-rw-r--r--synapse/state/v2.py15
-rw-r--r--synapse/storage/background_updates.py6
-rw-r--r--synapse/storage/database.py30
-rw-r--r--synapse/storage/databases/main/purge_events.py5
-rw-r--r--synapse/storage/databases/main/push_rule.py23
-rw-r--r--synapse/storage/schema/__init__.py2
-rw-r--r--synapse/storage/schema/main/delta/73/02room_id_indexes_for_purging.sql22
26 files changed, 241 insertions, 731 deletions
diff --git a/synapse/_scripts/update_synapse_database.py b/synapse/_scripts/update_synapse_database.py
index b4aeae6dd5..fb1fb83f50 100755
--- a/synapse/_scripts/update_synapse_database.py
+++ b/synapse/_scripts/update_synapse_database.py
@@ -48,10 +48,13 @@ class MockHomeserver(HomeServer):
 
 
 def run_background_updates(hs: HomeServer) -> None:
-    store = hs.get_datastores().main
+    main = hs.get_datastores().main
+    state = hs.get_datastores().state
 
     async def run_background_updates() -> None:
-        await store.db_pool.updates.run_background_updates(sleep=False)
+        await main.db_pool.updates.run_background_updates(sleep=False)
+        if state:
+            await state.db_pool.updates.run_background_updates(sleep=False)
         # Stop the reactor to exit the script once every background update is run.
         reactor.stop()
 
@@ -97,8 +100,11 @@ def main() -> None:
     # Load, process and sanity-check the config.
     hs_config = yaml.safe_load(args.database_config)
 
-    if "database" not in hs_config:
-        sys.stderr.write("The configuration file must have a 'database' section.\n")
+    if "database" not in hs_config and "databases" not in hs_config:
+        sys.stderr.write(
+            "The configuration file must have a 'database' or 'databases' section. "
+            "See https://matrix-org.github.io/synapse/latest/usage/configuration/config_documentation.html#database"
+        )
         sys.exit(4)
 
     config = HomeServerConfig()
diff --git a/synapse/config/server.py b/synapse/config/server.py
index c91df636d9..f2353ce5fb 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -206,6 +206,7 @@ class HttpListenerConfig:
     resources: List[HttpResourceConfig] = attr.Factory(list)
     additional_resources: Dict[str, dict] = attr.Factory(dict)
     tag: Optional[str] = None
+    request_id_header: Optional[str] = None
 
 
 @attr.s(slots=True, frozen=True, auto_attribs=True)
@@ -520,9 +521,11 @@ class ServerConfig(Config):
         ):
             raise ConfigError("allowed_avatar_mimetypes must be a list")
 
-        self.listeners = [
-            parse_listener_def(i, x) for i, x in enumerate(config.get("listeners", []))
-        ]
+        listeners = config.get("listeners", [])
+        if not isinstance(listeners, list):
+            raise ConfigError("Expected a list", ("listeners",))
+
+        self.listeners = [parse_listener_def(i, x) for i, x in enumerate(listeners)]
 
         # no_tls is not really supported any more, but let's grandfather it in
         # here.
@@ -889,6 +892,9 @@ def read_gc_thresholds(
 
 def parse_listener_def(num: int, listener: Any) -> ListenerConfig:
     """parse a listener config from the config file"""
+    if not isinstance(listener, dict):
+        raise ConfigError("Expected a dictionary", ("listeners", str(num)))
+
     listener_type = listener["type"]
     # Raise a helpful error if direct TCP replication is still configured.
     if listener_type == "replication":
@@ -928,6 +934,7 @@ def parse_listener_def(num: int, listener: Any) -> ListenerConfig:
             resources=resources,
             additional_resources=listener.get("additional_resources", {}),
             tag=listener.get("tag"),
+            request_id_header=listener.get("request_id_header"),
         )
 
     return ListenerConfig(port, bind_addresses, listener_type, tls, http_config)
diff --git a/synapse/events/builder.py b/synapse/events/builder.py
index 746bd3978d..e2ee10dd3d 100644
--- a/synapse/events/builder.py
+++ b/synapse/events/builder.py
@@ -167,7 +167,6 @@ class EventBuilder:
             "content": self.content,
             "unsigned": self.unsigned,
             "depth": depth,
-            "prev_state": [],
         }
 
         if self.is_state():
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 62fe820ee9..9e7527291b 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -906,9 +906,6 @@ class FederationClient(FederationBase):
             # The protoevent received over the JSON wire may not have all
             # the required fields. Lets just gloss over that because
             # there's some we never care about
-            if "prev_state" not in pdu_dict:
-                pdu_dict["prev_state"] = []
-
             ev = builder.create_local_event_from_event_dict(
                 self._clock,
                 self.hostname,
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 0327fc57a4..eacd631ee0 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -63,7 +63,6 @@ from synapse.http.server import finish_request, respond_with_html
 from synapse.http.site import SynapseRequest
 from synapse.logging.context import defer_to_thread
 from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.storage.roommember import ProfileInfo
 from synapse.types import JsonDict, Requester, UserID
 from synapse.util import stringutils as stringutils
 from synapse.util.async_helpers import delay_cancellation, maybe_awaitable
@@ -1687,41 +1686,10 @@ class AuthHandler:
             respond_with_html(request, 403, self._sso_account_deactivated_template)
             return
 
-        profile = await self.store.get_profileinfo(
+        user_profile_data = await self.store.get_profileinfo(
             UserID.from_string(registered_user_id).localpart
         )
 
-        self._complete_sso_login(
-            registered_user_id,
-            auth_provider_id,
-            request,
-            client_redirect_url,
-            extra_attributes,
-            new_user=new_user,
-            user_profile_data=profile,
-            auth_provider_session_id=auth_provider_session_id,
-        )
-
-    def _complete_sso_login(
-        self,
-        registered_user_id: str,
-        auth_provider_id: str,
-        request: Request,
-        client_redirect_url: str,
-        extra_attributes: Optional[JsonDict] = None,
-        new_user: bool = False,
-        user_profile_data: Optional[ProfileInfo] = None,
-        auth_provider_session_id: Optional[str] = None,
-    ) -> None:
-        """
-        The synchronous portion of complete_sso_login.
-
-        This exists purely for backwards compatibility of synapse.module_api.ModuleApi.
-        """
-
-        if user_profile_data is None:
-            user_profile_data = ProfileInfo(None, None)
-
         # Store any extra attributes which will be passed in the login response.
         # Note that this is per-user so it may overwrite a previous value, this
         # is considered OK since the newest SSO attributes should be most valid.
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index a87ab99404..cf788a4a86 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -188,18 +188,21 @@ class E2eKeysHandler:
                 )
                 invalid_cached_users = cached_users - valid_cached_users
                 if invalid_cached_users:
-                    # Fix up results. If we get here, there is either a bug in device
-                    # list tracking, or we hit the race mentioned above.
+                    # Fix up results. If we get here, it means there was either a bug in
+                    # device list tracking, or we hit the race mentioned above.
+                    # TODO: In practice, this path is hit fairly often in existing
+                    #       deployments when clients query the keys of departed remote
+                    #       users. A background update to mark the appropriate device
+                    #       lists as unsubscribed is needed.
+                    #       https://github.com/matrix-org/synapse/issues/13651
+                    # Note that this currently introduces a failure mode when clients
+                    # are trying to decrypt old messages from a remote user whose
+                    # homeserver is no longer available. We may want to consider falling
+                    # back to the cached data when we fail to retrieve a device list
+                    # over federation for such remote users.
                     user_ids_not_in_cache.update(invalid_cached_users)
                     for invalid_user_id in invalid_cached_users:
                         remote_results.pop(invalid_user_id)
-                    # This log message may be removed if it turns out it's almost
-                    # entirely triggered by races.
-                    logger.error(
-                        "Devices for %s were cached, but the server no longer shares "
-                        "any rooms with them. The cached device lists are stale.",
-                        invalid_cached_users,
-                    )
 
                 for user_id, devices in remote_results.items():
                     user_devices = results.setdefault(user_id, {})
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 47dabaee3f..c608b80c4a 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -866,6 +866,11 @@ class FederationEventHandler:
                 event.room_id, event_id, str(err)
             )
             return
+        except Exception as exc:
+            await self._store.record_event_failed_pull_attempt(
+                event.room_id, event_id, str(exc)
+            )
+            raise exc
 
         try:
             try:
@@ -908,6 +913,11 @@ class FederationEventHandler:
                 logger.warning("Pulled event %s failed history check.", event_id)
             else:
                 raise
+        except Exception as exc:
+            await self._store.record_event_failed_pull_attempt(
+                event.room_id, event_id, str(exc)
+            )
+            raise exc
 
     @trace
     async def _compute_event_context_with_maybe_missing_prevs(
diff --git a/synapse/handlers/push_rules.py b/synapse/handlers/push_rules.py
index 2599160bcc..1219672a59 100644
--- a/synapse/handlers/push_rules.py
+++ b/synapse/handlers/push_rules.py
@@ -16,14 +16,17 @@ from typing import TYPE_CHECKING, List, Optional, Union
 import attr
 
 from synapse.api.errors import SynapseError, UnrecognizedRequestError
-from synapse.push.baserules import BASE_RULE_IDS
 from synapse.storage.push_rule import RuleNotFoundException
+from synapse.synapse_rust.push import get_base_rule_ids
 from synapse.types import JsonDict
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
 
 
+BASE_RULE_IDS = get_base_rule_ids()
+
+
 @attr.s(slots=True, frozen=True, auto_attribs=True)
 class RuleSpec:
     scope: str
diff --git a/synapse/http/site.py b/synapse/http/site.py
index 4a6cadc597..d9cd0aab83 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -72,10 +72,12 @@ class SynapseRequest(Request):
         site: "SynapseSite",
         *args: Any,
         max_request_body_size: int = 1024,
+        request_id_header: Optional[str] = None,
         **kw: Any,
     ):
         super().__init__(channel, *args, **kw)
         self._max_request_body_size = max_request_body_size
+        self.request_id_header = request_id_header
         self.synapse_site = site
         self.reactor = site.reactor
         self._channel = channel  # this is used by the tests
@@ -172,7 +174,14 @@ class SynapseRequest(Request):
         self._tracing_span = span
 
     def get_request_id(self) -> str:
-        return "%s-%i" % (self.get_method(), self.request_seq)
+        request_id_value = None
+        if self.request_id_header:
+            request_id_value = self.getHeader(self.request_id_header)
+
+        if request_id_value is None:
+            request_id_value = str(self.request_seq)
+
+        return "%s-%s" % (self.get_method(), request_id_value)
 
     def get_redacted_uri(self) -> str:
         """Gets the redacted URI associated with the request (or placeholder if the URI
@@ -619,12 +628,15 @@ class SynapseSite(Site):
         proxied = config.http_options.x_forwarded
         request_class = XForwardedForRequest if proxied else SynapseRequest
 
+        request_id_header = config.http_options.request_id_header
+
         def request_factory(channel: HTTPChannel, queued: bool) -> Request:
             return request_class(
                 channel,
                 self,
                 max_request_body_size=max_request_body_size,
                 queued=queued,
+                request_id_header=request_id_header,
             )
 
         self.requestFactory = request_factory  # type: ignore
diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py
index 87ba154cb7..9287c0fb8d 100644
--- a/synapse/module_api/__init__.py
+++ b/synapse/module_api/__init__.py
@@ -836,31 +836,6 @@ class ModuleApi:
             self._store.db_pool.runInteraction(desc, func, *args, **kwargs)  # type: ignore[arg-type]
         )
 
-    def complete_sso_login(
-        self, registered_user_id: str, request: SynapseRequest, client_redirect_url: str
-    ) -> None:
-        """Complete a SSO login by redirecting the user to a page to confirm whether they
-        want their access token sent to `client_redirect_url`, or redirect them to that
-        URL with a token directly if the URL matches with one of the whitelisted clients.
-
-        This is deprecated in favor of complete_sso_login_async.
-
-        Added in Synapse v1.11.1.
-
-        Args:
-            registered_user_id: The MXID that has been registered as a previous step of
-                of this SSO login.
-            request: The request to respond to.
-            client_redirect_url: The URL to which to offer to redirect the user (or to
-                redirect them directly if whitelisted).
-        """
-        self._auth_handler._complete_sso_login(
-            registered_user_id,
-            "<unknown>",
-            request,
-            client_redirect_url,
-        )
-
     async def complete_sso_login_async(
         self,
         registered_user_id: str,
diff --git a/synapse/push/baserules.py b/synapse/push/baserules.py
deleted file mode 100644
index 440205e80c..0000000000
--- a/synapse/push/baserules.py
+++ /dev/null
@@ -1,583 +0,0 @@
-# Copyright 2015, 2016 OpenMarket Ltd
-# Copyright 2017 New Vector Ltd
-# Copyright 2019 The Matrix.org Foundation C.I.C.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Push rules is the system used to determine which events trigger a push (and a
-bump in notification counts).
-
-This consists of a list of "push rules" for each user, where a push rule is a
-pair of "conditions" and "actions". When a user receives an event Synapse
-iterates over the list of push rules until it finds one where all the conditions
-match the event, at which point "actions" describe the outcome (e.g. notify,
-highlight, etc).
-
-Push rules are split up into 5 different "kinds" (aka "priority classes"), which
-are run in order:
-    1. Override — highest priority rules, e.g. always ignore notices
-    2. Content — content specific rules, e.g. @ notifications
-    3. Room — per room rules, e.g. enable/disable notifications for all messages
-       in a room
-    4. Sender — per sender rules, e.g. never notify for messages from a given
-       user
-    5. Underride — the lowest priority "default" rules, e.g. notify for every
-       message.
-
-The set of "base rules" are the list of rules that every user has by default. A
-user can modify their copy of the push rules in one of three ways:
-
-    1. Adding a new push rule of a certain kind
-    2. Changing the actions of a base rule
-    3. Enabling/disabling a base rule.
-
-The base rules are split into whether they come before or after a particular
-kind, so the order of push rule evaluation would be: base rules for before
-"override" kind, user defined "override" rules, base rules after "override"
-kind, etc, etc.
-"""
-
-import itertools
-import logging
-from typing import Dict, Iterator, List, Mapping, Sequence, Tuple, Union
-
-import attr
-
-from synapse.config.experimental import ExperimentalConfig
-from synapse.push.rulekinds import PRIORITY_CLASS_MAP
-
-logger = logging.getLogger(__name__)
-
-
-@attr.s(auto_attribs=True, slots=True, frozen=True)
-class PushRule:
-    """A push rule
-
-    Attributes:
-        rule_id: a unique ID for this rule
-        priority_class: what "kind" of push rule this is (see
-            `PRIORITY_CLASS_MAP` for mapping between int and kind)
-        conditions: the sequence of conditions that all need to match
-        actions: the actions to apply if all conditions are met
-        default: is this a base rule?
-        default_enabled: is this enabled by default?
-    """
-
-    rule_id: str
-    priority_class: int
-    conditions: Sequence[Mapping[str, str]]
-    actions: Sequence[Union[str, Mapping]]
-    default: bool = False
-    default_enabled: bool = True
-
-
-@attr.s(auto_attribs=True, slots=True, frozen=True, weakref_slot=False)
-class PushRules:
-    """A collection of push rules for an account.
-
-    Can be iterated over, producing push rules in priority order.
-    """
-
-    # A mapping from rule ID to push rule that overrides a base rule. These will
-    # be returned instead of the base rule.
-    overriden_base_rules: Dict[str, PushRule] = attr.Factory(dict)
-
-    # The following stores the custom push rules at each priority class.
-    #
-    # We keep these separate (rather than combining into one big list) to avoid
-    # copying the base rules around all the time.
-    override: List[PushRule] = attr.Factory(list)
-    content: List[PushRule] = attr.Factory(list)
-    room: List[PushRule] = attr.Factory(list)
-    sender: List[PushRule] = attr.Factory(list)
-    underride: List[PushRule] = attr.Factory(list)
-
-    def __iter__(self) -> Iterator[PushRule]:
-        # When iterating over the push rules we need to return the base rules
-        # interspersed at the correct spots.
-        for rule in itertools.chain(
-            BASE_PREPEND_OVERRIDE_RULES,
-            self.override,
-            BASE_APPEND_OVERRIDE_RULES,
-            self.content,
-            BASE_APPEND_CONTENT_RULES,
-            self.room,
-            self.sender,
-            self.underride,
-            BASE_APPEND_UNDERRIDE_RULES,
-        ):
-            # Check if a base rule has been overriden by a custom rule. If so
-            # return that instead.
-            override_rule = self.overriden_base_rules.get(rule.rule_id)
-            if override_rule:
-                yield override_rule
-            else:
-                yield rule
-
-    def __len__(self) -> int:
-        # The length is mostly used by caches to get a sense of "size" / amount
-        # of memory this object is using, so we only count the number of custom
-        # rules.
-        return (
-            len(self.overriden_base_rules)
-            + len(self.override)
-            + len(self.content)
-            + len(self.room)
-            + len(self.sender)
-            + len(self.underride)
-        )
-
-
-@attr.s(auto_attribs=True, slots=True, frozen=True, weakref_slot=False)
-class FilteredPushRules:
-    """A wrapper around `PushRules` that filters out disabled experimental push
-    rules, and includes the "enabled" state for each rule when iterated over.
-    """
-
-    push_rules: PushRules
-    enabled_map: Dict[str, bool]
-    experimental_config: ExperimentalConfig
-
-    def __iter__(self) -> Iterator[Tuple[PushRule, bool]]:
-        for rule in self.push_rules:
-            if not _is_experimental_rule_enabled(
-                rule.rule_id, self.experimental_config
-            ):
-                continue
-
-            enabled = self.enabled_map.get(rule.rule_id, rule.default_enabled)
-
-            yield rule, enabled
-
-    def __len__(self) -> int:
-        return len(self.push_rules)
-
-
-DEFAULT_EMPTY_PUSH_RULES = PushRules()
-
-
-def compile_push_rules(rawrules: List[PushRule]) -> PushRules:
-    """Given a set of custom push rules return a `PushRules` instance (which
-    includes the base rules).
-    """
-
-    if not rawrules:
-        # Fast path to avoid allocating empty lists when there are no custom
-        # rules for the user.
-        return DEFAULT_EMPTY_PUSH_RULES
-
-    rules = PushRules()
-
-    for rule in rawrules:
-        # We need to decide which bucket each custom push rule goes into.
-
-        # If it has the same ID as a base rule then it overrides that...
-        overriden_base_rule = BASE_RULES_BY_ID.get(rule.rule_id)
-        if overriden_base_rule:
-            rules.overriden_base_rules[rule.rule_id] = attr.evolve(
-                overriden_base_rule, actions=rule.actions
-            )
-            continue
-
-        # ... otherwise it gets added to the appropriate priority class bucket
-        collection: List[PushRule]
-        if rule.priority_class == 5:
-            collection = rules.override
-        elif rule.priority_class == 4:
-            collection = rules.content
-        elif rule.priority_class == 3:
-            collection = rules.room
-        elif rule.priority_class == 2:
-            collection = rules.sender
-        elif rule.priority_class == 1:
-            collection = rules.underride
-        elif rule.priority_class <= 0:
-            logger.info(
-                "Got rule with priority class less than zero, but doesn't override a base rule: %s",
-                rule,
-            )
-            continue
-        else:
-            # We log and continue here so as not to break event sending
-            logger.error("Unknown priority class: %", rule.priority_class)
-            continue
-
-        collection.append(rule)
-
-    return rules
-
-
-def _is_experimental_rule_enabled(
-    rule_id: str, experimental_config: ExperimentalConfig
-) -> bool:
-    """Used by `FilteredPushRules` to filter out experimental rules when they
-    have not been enabled.
-    """
-    if (
-        rule_id == "global/override/.org.matrix.msc3786.rule.room.server_acl"
-        and not experimental_config.msc3786_enabled
-    ):
-        return False
-    if (
-        rule_id == "global/underride/.org.matrix.msc3772.thread_reply"
-        and not experimental_config.msc3772_enabled
-    ):
-        return False
-    return True
-
-
-BASE_APPEND_CONTENT_RULES = [
-    PushRule(
-        default=True,
-        priority_class=PRIORITY_CLASS_MAP["content"],
-        rule_id="global/content/.m.rule.contains_user_name",
-        conditions=[
-            {
-                "kind": "event_match",
-                "key": "content.body",
-                # Match the localpart of the requester's MXID.
-                "pattern_type": "user_localpart",
-            }
-        ],
-        actions=[
-            "notify",
-            {"set_tweak": "sound", "value": "default"},
-            {"set_tweak": "highlight"},
-        ],
-    )
-]
-
-
-BASE_PREPEND_OVERRIDE_RULES = [
-    PushRule(
-        default=True,
-        priority_class=PRIORITY_CLASS_MAP["override"],
-        rule_id="global/override/.m.rule.master",
-        default_enabled=False,
-        conditions=[],
-        actions=["dont_notify"],
-    )
-]
-
-
-BASE_APPEND_OVERRIDE_RULES = [
-    PushRule(
-        default=True,
-        priority_class=PRIORITY_CLASS_MAP["override"],
-        rule_id="global/override/.m.rule.suppress_notices",
-        conditions=[
-            {
-                "kind": "event_match",
-                "key": "content.msgtype",
-                "pattern": "m.notice",
-                "_cache_key": "_suppress_notices",
-            }
-        ],
-        actions=["dont_notify"],
-    ),
-    # NB. .m.rule.invite_for_me must be higher prio than .m.rule.member_event
-    # otherwise invites will be matched by .m.rule.member_event
-    PushRule(
-        default=True,
-        priority_class=PRIORITY_CLASS_MAP["override"],
-        rule_id="global/override/.m.rule.invite_for_me",
-        conditions=[
-            {
-                "kind": "event_match",
-                "key": "type",
-                "pattern": "m.room.member",
-                "_cache_key": "_member",
-            },
-            {
-                "kind": "event_match",
-                "key": "content.membership",
-                "pattern": "invite",
-                "_cache_key": "_invite_member",
-            },
-            # Match the requester's MXID.
-            {"kind": "event_match", "key": "state_key", "pattern_type": "user_id"},
-        ],
-        actions=[
-            "notify",
-            {"set_tweak": "sound", "value": "default"},
-            {"set_tweak": "highlight", "value": False},
-        ],
-    ),
-    # Will we sometimes want to know about people joining and leaving?
-    # Perhaps: if so, this could be expanded upon. Seems the most usual case
-    # is that we don't though. We add this override rule so that even if
-    # the room rule is set to notify, we don't get notifications about
-    # join/leave/avatar/displayname events.
-    # See also: https://matrix.org/jira/browse/SYN-607
-    PushRule(
-        default=True,
-        priority_class=PRIORITY_CLASS_MAP["override"],
-        rule_id="global/override/.m.rule.member_event",
-        conditions=[
-            {
-                "kind": "event_match",
-                "key": "type",
-                "pattern": "m.room.member",
-                "_cache_key": "_member",
-            }
-        ],
-        actions=["dont_notify"],
-    ),
-    # This was changed from underride to override so it's closer in priority
-    # to the content rules where the user name highlight rule lives. This
-    # way a room rule is lower priority than both but a custom override rule
-    # is higher priority than both.
-    PushRule(
-        default=True,
-        priority_class=PRIORITY_CLASS_MAP["override"],
-        rule_id="global/override/.m.rule.contains_display_name",
-        conditions=[{"kind": "contains_display_name"}],
-        actions=[
-            "notify",
-            {"set_tweak": "sound", "value": "default"},
-            {"set_tweak": "highlight"},
-        ],
-    ),
-    PushRule(
-        default=True,
-        priority_class=PRIORITY_CLASS_MAP["override"],
-        rule_id="global/override/.m.rule.roomnotif",
-        conditions=[
-            {
-                "kind": "event_match",
-                "key": "content.body",
-                "pattern": "@room",
-                "_cache_key": "_roomnotif_content",
-            },
-            {
-                "kind": "sender_notification_permission",
-                "key": "room",
-                "_cache_key": "_roomnotif_pl",
-            },
-        ],
-        actions=["notify", {"set_tweak": "highlight", "value": True}],
-    ),
-    PushRule(
-        default=True,
-        priority_class=PRIORITY_CLASS_MAP["override"],
-        rule_id="global/override/.m.rule.tombstone",
-        conditions=[
-            {
-                "kind": "event_match",
-                "key": "type",
-                "pattern": "m.room.tombstone",
-                "_cache_key": "_tombstone",
-            },
-            {
-                "kind": "event_match",
-                "key": "state_key",
-                "pattern": "",
-                "_cache_key": "_tombstone_statekey",
-            },
-        ],
-        actions=["notify", {"set_tweak": "highlight", "value": True}],
-    ),
-    PushRule(
-        default=True,
-        priority_class=PRIORITY_CLASS_MAP["override"],
-        rule_id="global/override/.m.rule.reaction",
-        conditions=[
-            {
-                "kind": "event_match",
-                "key": "type",
-                "pattern": "m.reaction",
-                "_cache_key": "_reaction",
-            }
-        ],
-        actions=["dont_notify"],
-    ),
-    # XXX: This is an experimental rule that is only enabled if msc3786_enabled
-    # is enabled, if it is not the rule gets filtered out in _load_rules() in
-    # PushRulesWorkerStore
-    PushRule(
-        default=True,
-        priority_class=PRIORITY_CLASS_MAP["override"],
-        rule_id="global/override/.org.matrix.msc3786.rule.room.server_acl",
-        conditions=[
-            {
-                "kind": "event_match",
-                "key": "type",
-                "pattern": "m.room.server_acl",
-                "_cache_key": "_room_server_acl",
-            },
-            {
-                "kind": "event_match",
-                "key": "state_key",
-                "pattern": "",
-                "_cache_key": "_room_server_acl_state_key",
-            },
-        ],
-        actions=[],
-    ),
-]
-
-
-BASE_APPEND_UNDERRIDE_RULES = [
-    PushRule(
-        default=True,
-        priority_class=PRIORITY_CLASS_MAP["underride"],
-        rule_id="global/underride/.m.rule.call",
-        conditions=[
-            {
-                "kind": "event_match",
-                "key": "type",
-                "pattern": "m.call.invite",
-                "_cache_key": "_call",
-            }
-        ],
-        actions=[
-            "notify",
-            {"set_tweak": "sound", "value": "ring"},
-            {"set_tweak": "highlight", "value": False},
-        ],
-    ),
-    # XXX: once m.direct is standardised everywhere, we should use it to detect
-    # a DM from the user's perspective rather than this heuristic.
-    PushRule(
-        default=True,
-        priority_class=PRIORITY_CLASS_MAP["underride"],
-        rule_id="global/underride/.m.rule.room_one_to_one",
-        conditions=[
-            {"kind": "room_member_count", "is": "2", "_cache_key": "member_count"},
-            {
-                "kind": "event_match",
-                "key": "type",
-                "pattern": "m.room.message",
-                "_cache_key": "_message",
-            },
-        ],
-        actions=[
-            "notify",
-            {"set_tweak": "sound", "value": "default"},
-            {"set_tweak": "highlight", "value": False},
-        ],
-    ),
-    # XXX: this is going to fire for events which aren't m.room.messages
-    # but are encrypted (e.g. m.call.*)...
-    PushRule(
-        default=True,
-        priority_class=PRIORITY_CLASS_MAP["underride"],
-        rule_id="global/underride/.m.rule.encrypted_room_one_to_one",
-        conditions=[
-            {"kind": "room_member_count", "is": "2", "_cache_key": "member_count"},
-            {
-                "kind": "event_match",
-                "key": "type",
-                "pattern": "m.room.encrypted",
-                "_cache_key": "_encrypted",
-            },
-        ],
-        actions=[
-            "notify",
-            {"set_tweak": "sound", "value": "default"},
-            {"set_tweak": "highlight", "value": False},
-        ],
-    ),
-    PushRule(
-        default=True,
-        priority_class=PRIORITY_CLASS_MAP["underride"],
-        rule_id="global/underride/.org.matrix.msc3772.thread_reply",
-        conditions=[
-            {
-                "kind": "org.matrix.msc3772.relation_match",
-                "rel_type": "m.thread",
-                # Match the requester's MXID.
-                "sender_type": "user_id",
-            }
-        ],
-        actions=["notify", {"set_tweak": "highlight", "value": False}],
-    ),
-    PushRule(
-        default=True,
-        priority_class=PRIORITY_CLASS_MAP["underride"],
-        rule_id="global/underride/.m.rule.message",
-        conditions=[
-            {
-                "kind": "event_match",
-                "key": "type",
-                "pattern": "m.room.message",
-                "_cache_key": "_message",
-            }
-        ],
-        actions=["notify", {"set_tweak": "highlight", "value": False}],
-    ),
-    # XXX: this is going to fire for events which aren't m.room.messages
-    # but are encrypted (e.g. m.call.*)...
-    PushRule(
-        default=True,
-        priority_class=PRIORITY_CLASS_MAP["underride"],
-        rule_id="global/underride/.m.rule.encrypted",
-        conditions=[
-            {
-                "kind": "event_match",
-                "key": "type",
-                "pattern": "m.room.encrypted",
-                "_cache_key": "_encrypted",
-            }
-        ],
-        actions=["notify", {"set_tweak": "highlight", "value": False}],
-    ),
-    PushRule(
-        default=True,
-        priority_class=PRIORITY_CLASS_MAP["underride"],
-        rule_id="global/underride/.im.vector.jitsi",
-        conditions=[
-            {
-                "kind": "event_match",
-                "key": "type",
-                "pattern": "im.vector.modular.widgets",
-                "_cache_key": "_type_modular_widgets",
-            },
-            {
-                "kind": "event_match",
-                "key": "content.type",
-                "pattern": "jitsi",
-                "_cache_key": "_content_type_jitsi",
-            },
-            {
-                "kind": "event_match",
-                "key": "state_key",
-                "pattern": "*",
-                "_cache_key": "_is_state_event",
-            },
-        ],
-        actions=["notify", {"set_tweak": "highlight", "value": False}],
-    ),
-]
-
-
-BASE_RULE_IDS = set()
-
-BASE_RULES_BY_ID: Dict[str, PushRule] = {}
-
-for r in BASE_APPEND_CONTENT_RULES:
-    BASE_RULE_IDS.add(r.rule_id)
-    BASE_RULES_BY_ID[r.rule_id] = r
-
-for r in BASE_PREPEND_OVERRIDE_RULES:
-    BASE_RULE_IDS.add(r.rule_id)
-    BASE_RULES_BY_ID[r.rule_id] = r
-
-for r in BASE_APPEND_OVERRIDE_RULES:
-    BASE_RULE_IDS.add(r.rule_id)
-    BASE_RULES_BY_ID[r.rule_id] = r
-
-for r in BASE_APPEND_UNDERRIDE_RULES:
-    BASE_RULE_IDS.add(r.rule_id)
-    BASE_RULES_BY_ID[r.rule_id] = r
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 3846fbc5f0..404379ef67 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -37,11 +37,11 @@ from synapse.events.snapshot import EventContext
 from synapse.state import POWER_KEY
 from synapse.storage.databases.main.roommember import EventIdMembership
 from synapse.storage.state import StateFilter
+from synapse.synapse_rust.push import FilteredPushRules, PushRule
 from synapse.util.caches import register_cache
 from synapse.util.metrics import measure_func
 from synapse.visibility import filter_event_for_clients_with_state
 
-from .baserules import FilteredPushRules, PushRule
 from .push_rule_evaluator import PushRuleEvaluatorForEvent
 
 if TYPE_CHECKING:
@@ -280,7 +280,8 @@ class BulkPushRuleEvaluator:
         thread_id = "main"
         if relation:
             relations = await self._get_mutual_relations(
-                relation.parent_id, itertools.chain(*rules_by_user.values())
+                relation.parent_id,
+                itertools.chain(*(r.rules() for r in rules_by_user.values())),
             )
             if relation.rel_type == RelationTypes.THREAD:
                 thread_id = relation.parent_id
@@ -333,7 +334,7 @@ class BulkPushRuleEvaluator:
                 # current user, it'll be added to the dict later.
                 actions_by_user[uid] = []
 
-            for rule, enabled in rules:
+            for rule, enabled in rules.rules():
                 if not enabled:
                     continue
 
diff --git a/synapse/push/clientformat.py b/synapse/push/clientformat.py
index 73618d9234..ebc13beda1 100644
--- a/synapse/push/clientformat.py
+++ b/synapse/push/clientformat.py
@@ -16,10 +16,9 @@ import copy
 from typing import Any, Dict, List, Optional
 
 from synapse.push.rulekinds import PRIORITY_CLASS_INVERSE_MAP, PRIORITY_CLASS_MAP
+from synapse.synapse_rust.push import FilteredPushRules, PushRule
 from synapse.types import UserID
 
-from .baserules import FilteredPushRules, PushRule
-
 
 def format_push_rules_for_user(
     user: UserID, ruleslist: FilteredPushRules
@@ -34,7 +33,7 @@ def format_push_rules_for_user(
 
     rules["global"] = _add_empty_priority_class_arrays(rules["global"])
 
-    for r, enabled in ruleslist:
+    for r, enabled in ruleslist.rules():
         template_name = _priority_class_to_template_name(r.priority_class)
 
         rulearray = rules["global"][template_name]
diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py
index bac754e1b1..885669f9c7 100644
--- a/synapse/rest/admin/__init__.py
+++ b/synapse/rest/admin/__init__.py
@@ -80,6 +80,7 @@ from synapse.rest.admin.users import (
     SearchUsersRestServlet,
     ShadowBanRestServlet,
     UserAdminServlet,
+    UserByExternalId,
     UserMembershipRestServlet,
     UserRegisterServlet,
     UserRestServletV2,
@@ -275,6 +276,7 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
     ListDestinationsRestServlet(hs).register(http_server)
     RoomMessagesRestServlet(hs).register(http_server)
     RoomTimestampToEventRestServlet(hs).register(http_server)
+    UserByExternalId(hs).register(http_server)
 
     # Some servlets only get registered for the main process.
     if hs.config.worker.worker_app is None:
diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py
index 78ee9b6532..2ca6b2d08a 100644
--- a/synapse/rest/admin/users.py
+++ b/synapse/rest/admin/users.py
@@ -1156,3 +1156,30 @@ class AccountDataRestServlet(RestServlet):
                 "rooms": by_room_data,
             },
         }
+
+
+class UserByExternalId(RestServlet):
+    """Find a user based on an external ID from an auth provider"""
+
+    PATTERNS = admin_patterns(
+        "/auth_providers/(?P<provider>[^/]*)/users/(?P<external_id>[^/]*)"
+    )
+
+    def __init__(self, hs: "HomeServer"):
+        self._auth = hs.get_auth()
+        self._store = hs.get_datastores().main
+
+    async def on_GET(
+        self,
+        request: SynapseRequest,
+        provider: str,
+        external_id: str,
+    ) -> Tuple[int, JsonDict]:
+        await assert_requester_is_admin(self._auth, request)
+
+        user_id = await self._store.get_user_by_external_id(provider, external_id)
+
+        if user_id is None:
+            raise NotFoundError("User not found")
+
+        return HTTPStatus.OK, {"user_id": user_id}
diff --git a/synapse/rest/client/account.py b/synapse/rest/client/account.py
index a09aaf3448..2db2a04f95 100644
--- a/synapse/rest/client/account.py
+++ b/synapse/rest/client/account.py
@@ -19,6 +19,7 @@ from typing import TYPE_CHECKING, List, Optional, Tuple
 from urllib.parse import urlparse
 
 from pydantic import StrictBool, StrictStr, constr
+from typing_extensions import Literal
 
 from twisted.web.server import Request
 
@@ -43,6 +44,7 @@ from synapse.metrics import threepid_send_requests
 from synapse.push.mailer import Mailer
 from synapse.rest.client.models import (
     AuthenticationData,
+    ClientSecretStr,
     EmailRequestTokenBody,
     MsisdnRequestTokenBody,
 )
@@ -627,6 +629,11 @@ class ThreepidAddRestServlet(RestServlet):
         self.auth = hs.get_auth()
         self.auth_handler = hs.get_auth_handler()
 
+    class PostBody(RequestBodyModel):
+        auth: Optional[AuthenticationData] = None
+        client_secret: ClientSecretStr
+        sid: StrictStr
+
     @interactive_auth_handler
     async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
         if not self.hs.config.registration.enable_3pid_changes:
@@ -636,22 +643,17 @@ class ThreepidAddRestServlet(RestServlet):
 
         requester = await self.auth.get_user_by_req(request)
         user_id = requester.user.to_string()
-        body = parse_json_object_from_request(request)
-
-        assert_params_in_dict(body, ["client_secret", "sid"])
-        sid = body["sid"]
-        client_secret = body["client_secret"]
-        assert_valid_client_secret(client_secret)
+        body = parse_and_validate_json_object_from_request(request, self.PostBody)
 
         await self.auth_handler.validate_user_via_ui_auth(
             requester,
             request,
-            body,
+            body.dict(exclude_unset=True),
             "add a third-party identifier to your account",
         )
 
         validation_session = await self.identity_handler.validate_threepid_session(
-            client_secret, sid
+            body.client_secret, body.sid
         )
         if validation_session:
             await self.auth_handler.add_threepid(
@@ -676,23 +678,20 @@ class ThreepidBindRestServlet(RestServlet):
         self.identity_handler = hs.get_identity_handler()
         self.auth = hs.get_auth()
 
-    async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
-        body = parse_json_object_from_request(request)
+    class PostBody(RequestBodyModel):
+        client_secret: ClientSecretStr
+        id_access_token: StrictStr
+        id_server: StrictStr
+        sid: StrictStr
 
-        assert_params_in_dict(
-            body, ["id_server", "sid", "id_access_token", "client_secret"]
-        )
-        id_server = body["id_server"]
-        sid = body["sid"]
-        id_access_token = body["id_access_token"]
-        client_secret = body["client_secret"]
-        assert_valid_client_secret(client_secret)
+    async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
+        body = parse_and_validate_json_object_from_request(request, self.PostBody)
 
         requester = await self.auth.get_user_by_req(request)
         user_id = requester.user.to_string()
 
         await self.identity_handler.bind_threepid(
-            client_secret, sid, user_id, id_server, id_access_token
+            body.client_secret, body.sid, user_id, body.id_server, body.id_access_token
         )
 
         return 200, {}
@@ -708,23 +707,27 @@ class ThreepidUnbindRestServlet(RestServlet):
         self.auth = hs.get_auth()
         self.datastore = self.hs.get_datastores().main
 
+    class PostBody(RequestBodyModel):
+        address: StrictStr
+        id_server: Optional[StrictStr] = None
+        medium: Literal["email", "msisdn"]
+
     async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
         """Unbind the given 3pid from a specific identity server, or identity servers that are
         known to have this 3pid bound
         """
         requester = await self.auth.get_user_by_req(request)
-        body = parse_json_object_from_request(request)
-        assert_params_in_dict(body, ["medium", "address"])
-
-        medium = body.get("medium")
-        address = body.get("address")
-        id_server = body.get("id_server")
+        body = parse_and_validate_json_object_from_request(request, self.PostBody)
 
         # Attempt to unbind the threepid from an identity server. If id_server is None, try to
         # unbind from all identity servers this threepid has been added to in the past
         result = await self.identity_handler.try_unbind_threepid(
             requester.user.to_string(),
-            {"address": address, "medium": medium, "id_server": id_server},
+            {
+                "address": body.address,
+                "medium": body.medium,
+                "id_server": body.id_server,
+            },
         )
         return 200, {"id_server_unbind_result": "success" if result else "no-support"}
 
@@ -738,21 +741,25 @@ class ThreepidDeleteRestServlet(RestServlet):
         self.auth = hs.get_auth()
         self.auth_handler = hs.get_auth_handler()
 
+    class PostBody(RequestBodyModel):
+        address: StrictStr
+        id_server: Optional[StrictStr] = None
+        medium: Literal["email", "msisdn"]
+
     async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
         if not self.hs.config.registration.enable_3pid_changes:
             raise SynapseError(
                 400, "3PID changes are disabled on this server", Codes.FORBIDDEN
             )
 
-        body = parse_json_object_from_request(request)
-        assert_params_in_dict(body, ["medium", "address"])
+        body = parse_and_validate_json_object_from_request(request, self.PostBody)
 
         requester = await self.auth.get_user_by_req(request)
         user_id = requester.user.to_string()
 
         try:
             ret = await self.auth_handler.delete_threepid(
-                user_id, body["medium"], body["address"], body.get("id_server")
+                user_id, body.medium, body.address, body.id_server
             )
         except Exception:
             # NB. This endpoint should succeed if there is nothing to
diff --git a/synapse/rest/client/models.py b/synapse/rest/client/models.py
index 6278450c70..3d7940b0fc 100644
--- a/synapse/rest/client/models.py
+++ b/synapse/rest/client/models.py
@@ -36,18 +36,20 @@ class AuthenticationData(RequestBodyModel):
     type: Optional[StrictStr] = None
 
 
-class ThreePidRequestTokenBody(RequestBodyModel):
-    if TYPE_CHECKING:
-        client_secret: StrictStr
-    else:
-        # See also assert_valid_client_secret()
-        client_secret: constr(
-            regex="[0-9a-zA-Z.=_-]",  # noqa: F722
-            min_length=0,
-            max_length=255,
-            strict=True,
-        )
+if TYPE_CHECKING:
+    ClientSecretStr = StrictStr
+else:
+    # See also assert_valid_client_secret()
+    ClientSecretStr = constr(
+        regex="[0-9a-zA-Z.=_-]",  # noqa: F722
+        min_length=1,
+        max_length=255,
+        strict=True,
+    )
+
 
+class ThreepidRequestTokenBody(RequestBodyModel):
+    client_secret: ClientSecretStr
     id_server: Optional[StrictStr]
     id_access_token: Optional[StrictStr]
     next_link: Optional[StrictStr]
@@ -62,7 +64,7 @@ class ThreePidRequestTokenBody(RequestBodyModel):
         return token
 
 
-class EmailRequestTokenBody(ThreePidRequestTokenBody):
+class EmailRequestTokenBody(ThreepidRequestTokenBody):
     email: StrictStr
 
     # Canonicalise the email address. The addresses are all stored canonicalised
@@ -80,6 +82,6 @@ else:
     ISO3116_1_Alpha_2 = constr(regex="[A-Z]{2}", strict=True)
 
 
-class MsisdnRequestTokenBody(ThreePidRequestTokenBody):
+class MsisdnRequestTokenBody(ThreepidRequestTokenBody):
     country: ISO3116_1_Alpha_2
     phone_number: StrictStr
diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index 9dd3c8d4bb..328c0c5477 100644
--- a/synapse/rest/media/v1/media_repository.py
+++ b/synapse/rest/media/v1/media_repository.py
@@ -19,6 +19,8 @@ import shutil
 from io import BytesIO
 from typing import IO, TYPE_CHECKING, Dict, List, Optional, Set, Tuple
 
+from matrix_common.types.mxc_uri import MXCUri
+
 import twisted.internet.error
 import twisted.web.http
 from twisted.internet.defer import Deferred
@@ -186,7 +188,7 @@ class MediaRepository:
         content: IO,
         content_length: int,
         auth_user: UserID,
-    ) -> str:
+    ) -> MXCUri:
         """Store uploaded content for a local user and return the mxc URL
 
         Args:
@@ -219,7 +221,7 @@ class MediaRepository:
 
         await self._generate_thumbnails(None, media_id, media_id, media_type)
 
-        return "mxc://%s/%s" % (self.server_name, media_id)
+        return MXCUri(self.server_name, media_id)
 
     async def get_local_media(
         self, request: SynapseRequest, media_id: str, name: Optional[str]
diff --git a/synapse/rest/media/v1/upload_resource.py b/synapse/rest/media/v1/upload_resource.py
index e73e431dc9..97548b54e5 100644
--- a/synapse/rest/media/v1/upload_resource.py
+++ b/synapse/rest/media/v1/upload_resource.py
@@ -101,6 +101,8 @@ class UploadResource(DirectServeJsonResource):
             # the default 404, as that would just be confusing.
             raise SynapseError(400, "Bad content")
 
-        logger.info("Uploaded content with URI %r", content_uri)
+        logger.info("Uploaded content with URI '%s'", content_uri)
 
-        respond_with_json(request, 200, {"content_uri": content_uri}, send_cors=True)
+        respond_with_json(
+            request, 200, {"content_uri": str(content_uri)}, send_cors=True
+        )
diff --git a/synapse/state/v2.py b/synapse/state/v2.py
index af03851c71..1b9d7d8457 100644
--- a/synapse/state/v2.py
+++ b/synapse/state/v2.py
@@ -577,6 +577,21 @@ async def _iterative_auth_checks(
                 if ev.rejected_reason is None:
                     auth_events[key] = event_map[ev_id]
 
+        if event.rejected_reason is not None:
+            # Do not admit previously rejected events into state.
+            # TODO: This isn't spec compliant. Events that were previously rejected due
+            #       to failing auth checks at their state, but pass auth checks during
+            #       state resolution should be accepted. Synapse does not handle the
+            #       change of rejection status well, so we preserve the previous
+            #       rejection status for now.
+            #
+            #       Note that events rejected for non-state reasons, such as having the
+            #       wrong auth events, should remain rejected.
+            #
+            #       https://spec.matrix.org/v1.2/rooms/v9/#rejected-events
+            #       https://github.com/matrix-org/synapse/issues/13797
+            continue
+
         try:
             event_auth.check_state_dependent_auth_rules(
                 event,
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index cf1eabc437..2056ecb2c3 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -285,7 +285,10 @@ class BackgroundUpdater:
         back_to_back_failures = 0
 
         try:
-            logger.info("Starting background schema updates")
+            logger.info(
+                "Starting background schema updates for database %s",
+                self._database_name,
+            )
             while self.enabled:
                 try:
                     result = await self.do_next_background_update(sleep)
@@ -533,6 +536,7 @@ class BackgroundUpdater:
             index_name: name of index to add
             table: table to add index to
             columns: columns/expressions to include in index
+            where_clause: A WHERE clause to specify a partial unique index.
             unique: true to make a UNIQUE index
             psql_only: true to only create this index on psql databases (useful
                 for virtual sqlite tables)
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index 15d5da3fa5..4a75fb4f3e 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -1187,6 +1187,7 @@ class DatabasePool:
         keyvalues: Dict[str, Any],
         values: Dict[str, Any],
         insertion_values: Optional[Dict[str, Any]] = None,
+        where_clause: Optional[str] = None,
         lock: bool = True,
     ) -> bool:
         """
@@ -1199,6 +1200,7 @@ class DatabasePool:
             keyvalues: The unique key tables and their new values
             values: The nonunique columns and their new values
             insertion_values: additional key/values to use only when inserting
+            where_clause: An index predicate to apply to the upsert.
             lock: True to lock the table when doing the upsert. Unused when performing
                 a native upsert.
         Returns:
@@ -1209,7 +1211,12 @@ class DatabasePool:
 
         if table not in self._unsafe_to_upsert_tables:
             return self.simple_upsert_txn_native_upsert(
-                txn, table, keyvalues, values, insertion_values=insertion_values
+                txn,
+                table,
+                keyvalues,
+                values,
+                insertion_values=insertion_values,
+                where_clause=where_clause,
             )
         else:
             return self.simple_upsert_txn_emulated(
@@ -1218,6 +1225,7 @@ class DatabasePool:
                 keyvalues,
                 values,
                 insertion_values=insertion_values,
+                where_clause=where_clause,
                 lock=lock,
             )
 
@@ -1228,6 +1236,7 @@ class DatabasePool:
         keyvalues: Dict[str, Any],
         values: Dict[str, Any],
         insertion_values: Optional[Dict[str, Any]] = None,
+        where_clause: Optional[str] = None,
         lock: bool = True,
     ) -> bool:
         """
@@ -1236,6 +1245,7 @@ class DatabasePool:
             keyvalues: The unique key tables and their new values
             values: The nonunique columns and their new values
             insertion_values: additional key/values to use only when inserting
+            where_clause: An index predicate to apply to the upsert.
             lock: True to lock the table when doing the upsert.
         Returns:
             Returns True if a row was inserted or updated (i.e. if `values` is
@@ -1255,14 +1265,17 @@ class DatabasePool:
             else:
                 return "%s = ?" % (key,)
 
+        # Generate a where clause of each keyvalue and optionally the provided
+        # index predicate.
+        where = [_getwhere(k) for k in keyvalues]
+        if where_clause:
+            where.append(where_clause)
+
         if not values:
             # If `values` is empty, then all of the values we care about are in
             # the unique key, so there is nothing to UPDATE. We can just do a
             # SELECT instead to see if it exists.
-            sql = "SELECT 1 FROM %s WHERE %s" % (
-                table,
-                " AND ".join(_getwhere(k) for k in keyvalues),
-            )
+            sql = "SELECT 1 FROM %s WHERE %s" % (table, " AND ".join(where))
             sqlargs = list(keyvalues.values())
             txn.execute(sql, sqlargs)
             if txn.fetchall():
@@ -1273,7 +1286,7 @@ class DatabasePool:
             sql = "UPDATE %s SET %s WHERE %s" % (
                 table,
                 ", ".join("%s = ?" % (k,) for k in values),
-                " AND ".join(_getwhere(k) for k in keyvalues),
+                " AND ".join(where),
             )
             sqlargs = list(values.values()) + list(keyvalues.values())
 
@@ -1303,6 +1316,7 @@ class DatabasePool:
         keyvalues: Dict[str, Any],
         values: Dict[str, Any],
         insertion_values: Optional[Dict[str, Any]] = None,
+        where_clause: Optional[str] = None,
     ) -> bool:
         """
         Use the native UPSERT functionality in PostgreSQL.
@@ -1312,6 +1326,7 @@ class DatabasePool:
             keyvalues: The unique key tables and their new values
             values: The nonunique columns and their new values
             insertion_values: additional key/values to use only when inserting
+            where_clause: An index predicate to apply to the upsert.
 
         Returns:
             Returns True if a row was inserted or updated (i.e. if `values` is
@@ -1327,11 +1342,12 @@ class DatabasePool:
             allvalues.update(values)
             latter = "UPDATE SET " + ", ".join(k + "=EXCLUDED." + k for k in values)
 
-        sql = ("INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) DO %s") % (
+        sql = "INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) %s DO %s" % (
             table,
             ", ".join(k for k in allvalues),
             ", ".join("?" for _ in allvalues),
             ", ".join(k for k in keyvalues),
+            f"WHERE {where_clause}" if where_clause else "",
             latter,
         )
         txn.execute(sql, list(allvalues.values()))
diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index f6822707e4..9213ce0b5a 100644
--- a/synapse/storage/databases/main/purge_events.py
+++ b/synapse/storage/databases/main/purge_events.py
@@ -419,6 +419,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
             "event_forward_extremities",
             "event_push_actions",
             "event_search",
+            "event_failed_pull_attempts",
             "partial_state_events",
             "events",
             "federation_inbound_events_staging",
@@ -441,6 +442,10 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
             "e2e_room_keys",
             "event_push_summary",
             "pusher_throttle",
+            "insertion_events",
+            "insertion_event_extremities",
+            "insertion_event_edges",
+            "batch_events",
             "room_account_data",
             "room_tags",
             # "rooms" happens last, to keep the foreign keys in the other tables
diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py
index 5079edd1e0..ed17b2e70c 100644
--- a/synapse/storage/databases/main/push_rule.py
+++ b/synapse/storage/databases/main/push_rule.py
@@ -30,9 +30,8 @@ from typing import (
 
 from synapse.api.errors import StoreError
 from synapse.config.homeserver import ExperimentalConfig
-from synapse.push.baserules import FilteredPushRules, PushRule, compile_push_rules
 from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
-from synapse.storage._base import SQLBaseStore, db_to_json
+from synapse.storage._base import SQLBaseStore
 from synapse.storage.database import (
     DatabasePool,
     LoggingDatabaseConnection,
@@ -51,6 +50,7 @@ from synapse.storage.util.id_generators import (
     IdGenerator,
     StreamIdGenerator,
 )
+from synapse.synapse_rust.push import FilteredPushRules, PushRule, PushRules
 from synapse.types import JsonDict
 from synapse.util import json_encoder
 from synapse.util.caches.descriptors import cached, cachedList
@@ -72,18 +72,25 @@ def _load_rules(
     """
 
     ruleslist = [
-        PushRule(
+        PushRule.from_db(
             rule_id=rawrule["rule_id"],
             priority_class=rawrule["priority_class"],
-            conditions=db_to_json(rawrule["conditions"]),
-            actions=db_to_json(rawrule["actions"]),
+            conditions=rawrule["conditions"],
+            actions=rawrule["actions"],
         )
         for rawrule in rawrules
     ]
 
-    push_rules = compile_push_rules(ruleslist)
+    push_rules = PushRules(
+        ruleslist,
+    )
 
-    filtered_rules = FilteredPushRules(push_rules, enabled_map, experimental_config)
+    filtered_rules = FilteredPushRules(
+        push_rules,
+        enabled_map,
+        msc3786_enabled=experimental_config.msc3786_enabled,
+        msc3772_enabled=experimental_config.msc3772_enabled,
+    )
 
     return filtered_rules
 
@@ -845,7 +852,7 @@ class PushRuleStore(PushRulesWorkerStore):
         user_push_rules = await self.get_push_rules_for_user(user_id)
 
         # Get rules relating to the old room and copy them to the new room
-        for rule, enabled in user_push_rules:
+        for rule, enabled in user_push_rules.rules():
             if not enabled:
                 continue
 
diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index da8ed6d8cf..74a9b42951 100644
--- a/synapse/storage/schema/__init__.py
+++ b/synapse/storage/schema/__init__.py
@@ -83,6 +83,8 @@ Changes in SCHEMA_VERSION = 73;
       event_push_summary, receipts_linearized, and receipts_graph.
     - Add table `event_failed_pull_attempts` to keep track when we fail to pull
       events over federation.
+    - Add indexes to various tables (`event_failed_pull_attempts`, `insertion_events`,
+      `batch_events`) to make it easy to delete all associated rows when purging a room.
     - Rename column in `device_lists_outbound_pokes` and `device_lists_changes_in_room`
       from `opentracing_context` to generalized `tracing_context`.
 """
diff --git a/synapse/storage/schema/main/delta/73/02room_id_indexes_for_purging.sql b/synapse/storage/schema/main/delta/73/02room_id_indexes_for_purging.sql
new file mode 100644
index 0000000000..6d38bdd430
--- /dev/null
+++ b/synapse/storage/schema/main/delta/73/02room_id_indexes_for_purging.sql
@@ -0,0 +1,22 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Add index so we can easily purge all rows from a given `room_id`
+CREATE INDEX IF NOT EXISTS event_failed_pull_attempts_room_id ON event_failed_pull_attempts(room_id);
+
+-- MSC2716 related tables:
+-- Add indexes so we can easily purge all rows from a given `room_id`
+CREATE INDEX IF NOT EXISTS insertion_events_room_id ON insertion_events(room_id);
+CREATE INDEX IF NOT EXISTS batch_events_room_id ON batch_events(room_id);