summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/api/auth/__init__.py175
-rw-r--r--synapse/api/auth/base.py (renamed from synapse/api/auth.py)471
-rw-r--r--synapse/api/auth/internal.py291
-rw-r--r--synapse/api/auth/msc3861_delegated.py352
-rw-r--r--synapse/api/constants.py2
-rw-r--r--synapse/api/errors.py28
-rw-r--r--synapse/api/filtering.py4
-rw-r--r--synapse/config/auth.py19
-rw-r--r--synapse/config/experimental.py230
-rw-r--r--synapse/events/validator.py9
-rw-r--r--synapse/federation/federation_server.py9
-rw-r--r--synapse/handlers/account_validity.py2
-rw-r--r--synapse/handlers/admin.py2
-rw-r--r--synapse/handlers/auth.py10
-rw-r--r--synapse/handlers/deactivate_account.py2
-rw-r--r--synapse/handlers/federation.py17
-rw-r--r--synapse/handlers/oidc.py2
-rw-r--r--synapse/handlers/pagination.py4
-rw-r--r--synapse/handlers/presence.py1
-rw-r--r--synapse/handlers/profile.py26
-rw-r--r--synapse/handlers/read_marker.py1
-rw-r--r--synapse/handlers/register.py2
-rw-r--r--synapse/handlers/relations.py16
-rw-r--r--synapse/handlers/room.py1
-rw-r--r--synapse/handlers/stats.py1
-rw-r--r--synapse/http/matrixfederationclient.py11
-rw-r--r--synapse/http/server.py20
-rw-r--r--synapse/logging/opentracing.py37
-rw-r--r--synapse/media/oembed.py32
-rw-r--r--synapse/media/preview_html.py79
-rw-r--r--synapse/module_api/__init__.py38
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py8
-rw-r--r--synapse/push/mailer.py2
-rw-r--r--synapse/rest/admin/__init__.py21
-rw-r--r--synapse/rest/admin/users.py8
-rw-r--r--synapse/rest/client/account.py24
-rw-r--r--synapse/rest/client/capabilities.py3
-rw-r--r--synapse/rest/client/devices.py11
-rw-r--r--synapse/rest/client/filter.py2
-rw-r--r--synapse/rest/client/keys.py30
-rw-r--r--synapse/rest/client/login.py34
-rw-r--r--synapse/rest/client/login_token_request.py47
-rw-r--r--synapse/rest/client/logout.py3
-rw-r--r--synapse/rest/client/register.py72
-rw-r--r--synapse/rest/client/sync.py2
-rw-r--r--synapse/rest/client/versions.py6
-rw-r--r--synapse/rest/media/upload_resource.py1
-rw-r--r--synapse/rest/synapse/client/__init__.py6
-rw-r--r--synapse/rest/synapse/client/jwks.py70
-rw-r--r--synapse/rest/well_known.py10
-rw-r--r--synapse/server.py7
-rw-r--r--synapse/storage/controllers/state.py2
-rw-r--r--synapse/storage/databases/main/devices.py4
-rw-r--r--synapse/storage/databases/main/end_to_end_keys.py67
-rw-r--r--synapse/storage/databases/main/filtering.py12
-rw-r--r--synapse/storage/databases/main/profile.py12
-rw-r--r--synapse/storage/databases/main/push_rule.py1
-rw-r--r--synapse/storage/databases/main/relations.py30
-rw-r--r--synapse/storage/databases/main/roommember.py7
-rw-r--r--synapse/storage/databases/main/user_directory.py12
-rw-r--r--synapse/storage/schema/__init__.py5
-rw-r--r--synapse/storage/schema/main/delta/78/01_validate_and_update_profiles.py92
-rw-r--r--synapse/storage/schema/main/delta/78/02_validate_and_update_user_filters.py95
-rw-r--r--synapse/types/__init__.py8
-rw-r--r--synapse/util/__init__.py4
-rw-r--r--synapse/util/async_helpers.py2
-rw-r--r--synapse/util/caches/lrucache.py6
-rw-r--r--synapse/visibility.py14
68 files changed, 2047 insertions, 587 deletions
diff --git a/synapse/api/auth/__init__.py b/synapse/api/auth/__init__.py
new file mode 100644
index 0000000000..90cfe39d76
--- /dev/null
+++ b/synapse/api/auth/__init__.py
@@ -0,0 +1,175 @@
+# Copyright 2023 The Matrix.org Foundation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from typing import Optional, Tuple
+
+from typing_extensions import Protocol
+
+from twisted.web.server import Request
+
+from synapse.appservice import ApplicationService
+from synapse.http.site import SynapseRequest
+from synapse.types import Requester
+
+# guests always get this device id.
+GUEST_DEVICE_ID = "guest_device"
+
+
+class Auth(Protocol):
+    """The interface that an auth provider must implement."""
+
+    async def check_user_in_room(
+        self,
+        room_id: str,
+        requester: Requester,
+        allow_departed_users: bool = False,
+    ) -> Tuple[str, Optional[str]]:
+        """Check if the user is in the room, or was at some point.
+        Args:
+            room_id: The room to check.
+
+            user_id: The user to check.
+
+            current_state: Optional map of the current state of the room.
+                If provided then that map is used to check whether they are a
+                member of the room. Otherwise the current membership is
+                loaded from the database.
+
+            allow_departed_users: if True, accept users that were previously
+                members but have now departed.
+
+        Raises:
+            AuthError if the user is/was not in the room.
+        Returns:
+            The current membership of the user in the room and the
+            membership event ID of the user.
+        """
+
+    async def get_user_by_req(
+        self,
+        request: SynapseRequest,
+        allow_guest: bool = False,
+        allow_expired: bool = False,
+    ) -> Requester:
+        """Get a registered user's ID.
+
+        Args:
+            request: An HTTP request with an access_token query parameter.
+            allow_guest: If False, will raise an AuthError if the user making the
+                request is a guest.
+            allow_expired: If True, allow the request through even if the account
+                is expired, or session token lifetime has ended. Note that
+                /login will deliver access tokens regardless of expiration.
+
+        Returns:
+            Resolves to the requester
+        Raises:
+            InvalidClientCredentialsError if no user by that token exists or the token
+                is invalid.
+            AuthError if access is denied for the user in the access token
+        """
+
+    async def validate_appservice_can_control_user_id(
+        self, app_service: ApplicationService, user_id: str
+    ) -> None:
+        """Validates that the app service is allowed to control
+        the given user.
+
+        Args:
+            app_service: The app service that controls the user
+            user_id: The author MXID that the app service is controlling
+
+        Raises:
+            AuthError: If the application service is not allowed to control the user
+                (user namespace regex does not match, wrong homeserver, etc)
+                or if the user has not been registered yet.
+        """
+
+    async def get_user_by_access_token(
+        self,
+        token: str,
+        allow_expired: bool = False,
+    ) -> Requester:
+        """Validate access token and get user_id from it
+
+        Args:
+            token: The access token to get the user by
+            allow_expired: If False, raises an InvalidClientTokenError
+                if the token is expired
+
+        Raises:
+            InvalidClientTokenError if a user by that token exists, but the token is
+                expired
+            InvalidClientCredentialsError if no user by that token exists or the token
+                is invalid
+        """
+
+    async def is_server_admin(self, requester: Requester) -> bool:
+        """Check if the given user is a local server admin.
+
+        Args:
+            requester: user to check
+
+        Returns:
+            True if the user is an admin
+        """
+
+    async def check_can_change_room_list(
+        self, room_id: str, requester: Requester
+    ) -> bool:
+        """Determine whether the user is allowed to edit the room's entry in the
+        published room list.
+
+        Args:
+            room_id
+            user
+        """
+
+    @staticmethod
+    def has_access_token(request: Request) -> bool:
+        """Checks if the request has an access_token.
+
+        Returns:
+            False if no access_token was given, True otherwise.
+        """
+
+    @staticmethod
+    def get_access_token_from_request(request: Request) -> str:
+        """Extracts the access_token from the request.
+
+        Args:
+            request: The http request.
+        Returns:
+            The access_token
+        Raises:
+            MissingClientTokenError: If there isn't a single access_token in the
+                request
+        """
+
+    async def check_user_in_room_or_world_readable(
+        self, room_id: str, requester: Requester, allow_departed_users: bool = False
+    ) -> Tuple[str, Optional[str]]:
+        """Checks that the user is or was in the room or the room is world
+        readable. If it isn't then an exception is raised.
+
+        Args:
+            room_id: room to check
+            user_id: user to check
+            allow_departed_users: if True, accept users that were previously
+                members but have now departed
+
+        Returns:
+            Resolves to the current membership of the user in the room and the
+            membership event ID of the user. If the user is not in the room and
+            never has been, then `(Membership.JOIN, None)` is returned.
+        """
diff --git a/synapse/api/auth.py b/synapse/api/auth/base.py
index 66e869bc2d..9321d6f186 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth/base.py
@@ -1,4 +1,4 @@
-# Copyright 2014 - 2016 OpenMarket Ltd
+# Copyright 2023 The Matrix.org Foundation.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -14,7 +14,6 @@
 import logging
 from typing import TYPE_CHECKING, Optional, Tuple
 
-import pymacaroons
 from netaddr import IPAddress
 
 from twisted.web.server import Request
@@ -24,19 +23,11 @@ from synapse.api.constants import EventTypes, HistoryVisibility, Membership
 from synapse.api.errors import (
     AuthError,
     Codes,
-    InvalidClientTokenError,
     MissingClientTokenError,
     UnstableSpecAuthError,
 )
 from synapse.appservice import ApplicationService
-from synapse.http import get_request_user_agent
-from synapse.http.site import SynapseRequest
-from synapse.logging.opentracing import (
-    active_span,
-    force_tracing,
-    start_active_span,
-    trace,
-)
+from synapse.logging.opentracing import trace
 from synapse.types import Requester, create_requester
 from synapse.util.cancellation import cancellable
 
@@ -46,26 +37,13 @@ if TYPE_CHECKING:
 logger = logging.getLogger(__name__)
 
 
-# guests always get this device id.
-GUEST_DEVICE_ID = "guest_device"
-
-
-class Auth:
-    """
-    This class contains functions for authenticating users of our client-server API.
-    """
+class BaseAuth:
+    """Common base class for all auth implementations."""
 
     def __init__(self, hs: "HomeServer"):
         self.hs = hs
-        self.clock = hs.get_clock()
         self.store = hs.get_datastores().main
-        self._account_validity_handler = hs.get_account_validity_handler()
         self._storage_controllers = hs.get_storage_controllers()
-        self._macaroon_generator = hs.get_macaroon_generator()
-
-        self._track_appservice_user_ips = hs.config.appservice.track_appservice_user_ips
-        self._track_puppeted_user_ips = hs.config.api.track_puppeted_user_ips
-        self._force_tracing_for_users = hs.config.tracing.force_tracing_for_users
 
     async def check_user_in_room(
         self,
@@ -119,139 +97,49 @@ class Auth:
             errcode=Codes.NOT_JOINED,
         )
 
-    @cancellable
-    async def get_user_by_req(
-        self,
-        request: SynapseRequest,
-        allow_guest: bool = False,
-        allow_expired: bool = False,
-    ) -> Requester:
-        """Get a registered user's ID.
+    @trace
+    async def check_user_in_room_or_world_readable(
+        self, room_id: str, requester: Requester, allow_departed_users: bool = False
+    ) -> Tuple[str, Optional[str]]:
+        """Checks that the user is or was in the room or the room is world
+        readable. If it isn't then an exception is raised.
 
         Args:
-            request: An HTTP request with an access_token query parameter.
-            allow_guest: If False, will raise an AuthError if the user making the
-                request is a guest.
-            allow_expired: If True, allow the request through even if the account
-                is expired, or session token lifetime has ended. Note that
-                /login will deliver access tokens regardless of expiration.
+            room_id: room to check
+            user_id: user to check
+            allow_departed_users: if True, accept users that were previously
+                members but have now departed
 
         Returns:
-            Resolves to the requester
-        Raises:
-            InvalidClientCredentialsError if no user by that token exists or the token
-                is invalid.
-            AuthError if access is denied for the user in the access token
+            Resolves to the current membership of the user in the room and the
+            membership event ID of the user. If the user is not in the room and
+            never has been, then `(Membership.JOIN, None)` is returned.
         """
-        parent_span = active_span()
-        with start_active_span("get_user_by_req"):
-            requester = await self._wrapped_get_user_by_req(
-                request, allow_guest, allow_expired
-            )
-
-            if parent_span:
-                if requester.authenticated_entity in self._force_tracing_for_users:
-                    # request tracing is enabled for this user, so we need to force it
-                    # tracing on for the parent span (which will be the servlet span).
-                    #
-                    # It's too late for the get_user_by_req span to inherit the setting,
-                    # so we also force it on for that.
-                    force_tracing()
-                    force_tracing(parent_span)
-                parent_span.set_tag(
-                    "authenticated_entity", requester.authenticated_entity
-                )
-                parent_span.set_tag("user_id", requester.user.to_string())
-                if requester.device_id is not None:
-                    parent_span.set_tag("device_id", requester.device_id)
-                if requester.app_service is not None:
-                    parent_span.set_tag("appservice_id", requester.app_service.id)
-            return requester
 
-    @cancellable
-    async def _wrapped_get_user_by_req(
-        self,
-        request: SynapseRequest,
-        allow_guest: bool,
-        allow_expired: bool,
-    ) -> Requester:
-        """Helper for get_user_by_req
-
-        Once get_user_by_req has set up the opentracing span, this does the actual work.
-        """
         try:
-            ip_addr = request.getClientAddress().host
-            user_agent = get_request_user_agent(request)
-
-            access_token = self.get_access_token_from_request(request)
-
-            # First check if it could be a request from an appservice
-            requester = await self._get_appservice_user(request)
-            if not requester:
-                # If not, it should be from a regular user
-                requester = await self.get_user_by_access_token(
-                    access_token, allow_expired=allow_expired
-                )
-
-                # Deny the request if the user account has expired.
-                # This check is only done for regular users, not appservice ones.
-                if not allow_expired:
-                    if await self._account_validity_handler.is_user_expired(
-                        requester.user.to_string()
-                    ):
-                        # Raise the error if either an account validity module has determined
-                        # the account has expired, or the legacy account validity
-                        # implementation is enabled and determined the account has expired
-                        raise AuthError(
-                            403,
-                            "User account has expired",
-                            errcode=Codes.EXPIRED_ACCOUNT,
-                        )
-
-            if ip_addr and (
-                not requester.app_service or self._track_appservice_user_ips
+            # check_user_in_room will return the most recent membership
+            # event for the user if:
+            #  * The user is a non-guest user, and was ever in the room
+            #  * The user is a guest user, and has joined the room
+            # else it will throw.
+            return await self.check_user_in_room(
+                room_id, requester, allow_departed_users=allow_departed_users
+            )
+        except AuthError:
+            visibility = await self._storage_controllers.state.get_current_state_event(
+                room_id, EventTypes.RoomHistoryVisibility, ""
+            )
+            if (
+                visibility
+                and visibility.content.get("history_visibility")
+                == HistoryVisibility.WORLD_READABLE
             ):
-                # XXX(quenting): I'm 95% confident that we could skip setting the
-                # device_id to "dummy-device" for appservices, and that the only impact
-                # would be some rows which whould not deduplicate in the 'user_ips'
-                # table during the transition
-                recorded_device_id = (
-                    "dummy-device"
-                    if requester.device_id is None and requester.app_service is not None
-                    else requester.device_id
-                )
-                await self.store.insert_client_ip(
-                    user_id=requester.authenticated_entity,
-                    access_token=access_token,
-                    ip=ip_addr,
-                    user_agent=user_agent,
-                    device_id=recorded_device_id,
-                )
-
-                # Track also the puppeted user client IP if enabled and the user is puppeting
-                if (
-                    requester.user.to_string() != requester.authenticated_entity
-                    and self._track_puppeted_user_ips
-                ):
-                    await self.store.insert_client_ip(
-                        user_id=requester.user.to_string(),
-                        access_token=access_token,
-                        ip=ip_addr,
-                        user_agent=user_agent,
-                        device_id=requester.device_id,
-                    )
-
-            if requester.is_guest and not allow_guest:
-                raise AuthError(
-                    403,
-                    "Guest access not allowed",
-                    errcode=Codes.GUEST_ACCESS_FORBIDDEN,
-                )
-
-            request.requester = requester
-            return requester
-        except KeyError:
-            raise MissingClientTokenError()
+                return Membership.JOIN, None
+            raise AuthError(
+                403,
+                "User %r not in room %s, and room previews are disabled"
+                % (requester.user, room_id),
+            )
 
     async def validate_appservice_can_control_user_id(
         self, app_service: ApplicationService, user_id: str
@@ -284,184 +172,16 @@ class Auth:
                 403, "Application service has not registered this user (%s)" % user_id
             )
 
-    @cancellable
-    async def _get_appservice_user(self, request: Request) -> Optional[Requester]:
-        """
-        Given a request, reads the request parameters to determine:
-        - whether it's an application service that's making this request
-        - what user the application service should be treated as controlling
-          (the user_id URI parameter allows an application service to masquerade
-          any applicable user in its namespace)
-        - what device the application service should be treated as controlling
-          (the device_id[^1] URI parameter allows an application service to masquerade
-          as any device that exists for the relevant user)
-
-        [^1] Unstable and provided by MSC3202.
-             Must use `org.matrix.msc3202.device_id` in place of `device_id` for now.
-
-        Returns:
-            the application service `Requester` of that request
-
-        Postconditions:
-        - The `app_service` field in the returned `Requester` is set
-        - The `user_id` field in the returned `Requester` is either the application
-          service sender or the controlled user set by the `user_id` URI parameter
-        - The returned application service is permitted to control the returned user ID.
-        - The returned device ID, if present, has been checked to be a valid device ID
-          for the returned user ID.
-        """
-        DEVICE_ID_ARG_NAME = b"org.matrix.msc3202.device_id"
-
-        app_service = self.store.get_app_service_by_token(
-            self.get_access_token_from_request(request)
-        )
-        if app_service is None:
-            return None
-
-        if app_service.ip_range_whitelist:
-            ip_address = IPAddress(request.getClientAddress().host)
-            if ip_address not in app_service.ip_range_whitelist:
-                return None
-
-        # This will always be set by the time Twisted calls us.
-        assert request.args is not None
-
-        if b"user_id" in request.args:
-            effective_user_id = request.args[b"user_id"][0].decode("utf8")
-            await self.validate_appservice_can_control_user_id(
-                app_service, effective_user_id
-            )
-        else:
-            effective_user_id = app_service.sender
-
-        effective_device_id: Optional[str] = None
-
-        if (
-            self.hs.config.experimental.msc3202_device_masquerading_enabled
-            and DEVICE_ID_ARG_NAME in request.args
-        ):
-            effective_device_id = request.args[DEVICE_ID_ARG_NAME][0].decode("utf8")
-            # We only just set this so it can't be None!
-            assert effective_device_id is not None
-            device_opt = await self.store.get_device(
-                effective_user_id, effective_device_id
-            )
-            if device_opt is None:
-                # For now, use 400 M_EXCLUSIVE if the device doesn't exist.
-                # This is an open thread of discussion on MSC3202 as of 2021-12-09.
-                raise AuthError(
-                    400,
-                    f"Application service trying to use a device that doesn't exist ('{effective_device_id}' for {effective_user_id})",
-                    Codes.EXCLUSIVE,
-                )
-
-        return create_requester(
-            effective_user_id, app_service=app_service, device_id=effective_device_id
-        )
-
-    async def get_user_by_access_token(
-        self,
-        token: str,
-        allow_expired: bool = False,
-    ) -> Requester:
-        """Validate access token and get user_id from it
-
-        Args:
-            token: The access token to get the user by
-            allow_expired: If False, raises an InvalidClientTokenError
-                if the token is expired
-
-        Raises:
-            InvalidClientTokenError if a user by that token exists, but the token is
-                expired
-            InvalidClientCredentialsError if no user by that token exists or the token
-                is invalid
-        """
-
-        # First look in the database to see if the access token is present
-        # as an opaque token.
-        user_info = await self.store.get_user_by_access_token(token)
-        if user_info:
-            valid_until_ms = user_info.valid_until_ms
-            if (
-                not allow_expired
-                and valid_until_ms is not None
-                and valid_until_ms < self.clock.time_msec()
-            ):
-                # there was a valid access token, but it has expired.
-                # soft-logout the user.
-                raise InvalidClientTokenError(
-                    msg="Access token has expired", soft_logout=True
-                )
-
-            # Mark the token as used. This is used to invalidate old refresh
-            # tokens after some time.
-            await self.store.mark_access_token_as_used(user_info.token_id)
-
-            requester = create_requester(
-                user_id=user_info.user_id,
-                access_token_id=user_info.token_id,
-                is_guest=user_info.is_guest,
-                shadow_banned=user_info.shadow_banned,
-                device_id=user_info.device_id,
-                authenticated_entity=user_info.token_owner,
-            )
-
-            return requester
-
-        # If the token isn't found in the database, then it could still be a
-        # macaroon for a guest, so we check that here.
-        try:
-            user_id = self._macaroon_generator.verify_guest_token(token)
-
-            # Guest access tokens are not stored in the database (there can
-            # only be one access token per guest, anyway).
-            #
-            # In order to prevent guest access tokens being used as regular
-            # user access tokens (and hence getting around the invalidation
-            # process), we look up the user id and check that it is indeed
-            # a guest user.
-            #
-            # It would of course be much easier to store guest access
-            # tokens in the database as well, but that would break existing
-            # guest tokens.
-            stored_user = await self.store.get_user_by_id(user_id)
-            if not stored_user:
-                raise InvalidClientTokenError("Unknown user_id %s" % user_id)
-            if not stored_user["is_guest"]:
-                raise InvalidClientTokenError(
-                    "Guest access token used for regular user"
-                )
-
-            return create_requester(
-                user_id=user_id,
-                is_guest=True,
-                # all guests get the same device id
-                device_id=GUEST_DEVICE_ID,
-                authenticated_entity=user_id,
-            )
-        except (
-            pymacaroons.exceptions.MacaroonException,
-            TypeError,
-            ValueError,
-        ) as e:
-            logger.warning(
-                "Invalid access token in auth: %s %s.",
-                type(e),
-                e,
-            )
-            raise InvalidClientTokenError("Invalid access token passed.")
-
     async def is_server_admin(self, requester: Requester) -> bool:
         """Check if the given user is a local server admin.
 
         Args:
-            requester: The user making the request, according to the access token.
+            requester: user to check
 
         Returns:
             True if the user is an admin
         """
-        return await self.store.is_server_admin(requester.user)
+        raise NotImplementedError()
 
     async def check_can_change_room_list(
         self, room_id: str, requester: Requester
@@ -470,8 +190,8 @@ class Auth:
         published room list.
 
         Args:
-            room_id: The room to check.
-            requester: The user making the request, according to the access token.
+            room_id
+            user
         """
 
         is_admin = await self.is_server_admin(requester)
@@ -518,7 +238,6 @@ class Auth:
         return bool(query_params) or bool(auth_headers)
 
     @staticmethod
-    @cancellable
     def get_access_token_from_request(request: Request) -> str:
         """Extracts the access_token from the request.
 
@@ -556,47 +275,77 @@ class Auth:
 
             return query_params[0].decode("ascii")
 
-    @trace
-    async def check_user_in_room_or_world_readable(
-        self, room_id: str, requester: Requester, allow_departed_users: bool = False
-    ) -> Tuple[str, Optional[str]]:
-        """Checks that the user is or was in the room or the room is world
-        readable. If it isn't then an exception is raised.
+    @cancellable
+    async def get_appservice_user(
+        self, request: Request, access_token: str
+    ) -> Optional[Requester]:
+        """
+        Given a request, reads the request parameters to determine:
+        - whether it's an application service that's making this request
+        - what user the application service should be treated as controlling
+          (the user_id URI parameter allows an application service to masquerade
+          any applicable user in its namespace)
+        - what device the application service should be treated as controlling
+          (the device_id[^1] URI parameter allows an application service to masquerade
+          as any device that exists for the relevant user)
 
-        Args:
-            room_id: The room to check.
-            requester: The user making the request, according to the access token.
-            allow_departed_users: If True, accept users that were previously
-                members but have now departed.
+        [^1] Unstable and provided by MSC3202.
+             Must use `org.matrix.msc3202.device_id` in place of `device_id` for now.
 
         Returns:
-            Resolves to the current membership of the user in the room and the
-            membership event ID of the user. If the user is not in the room and
-            never has been, then `(Membership.JOIN, None)` is returned.
+            the application service `Requester` of that request
+
+        Postconditions:
+        - The `app_service` field in the returned `Requester` is set
+        - The `user_id` field in the returned `Requester` is either the application
+          service sender or the controlled user set by the `user_id` URI parameter
+        - The returned application service is permitted to control the returned user ID.
+        - The returned device ID, if present, has been checked to be a valid device ID
+          for the returned user ID.
         """
+        DEVICE_ID_ARG_NAME = b"org.matrix.msc3202.device_id"
 
-        try:
-            # check_user_in_room will return the most recent membership
-            # event for the user if:
-            #  * The user is a non-guest user, and was ever in the room
-            #  * The user is a guest user, and has joined the room
-            # else it will throw.
-            return await self.check_user_in_room(
-                room_id, requester, allow_departed_users=allow_departed_users
-            )
-        except AuthError:
-            visibility = await self._storage_controllers.state.get_current_state_event(
-                room_id, EventTypes.RoomHistoryVisibility, ""
+        app_service = self.store.get_app_service_by_token(access_token)
+        if app_service is None:
+            return None
+
+        if app_service.ip_range_whitelist:
+            ip_address = IPAddress(request.getClientAddress().host)
+            if ip_address not in app_service.ip_range_whitelist:
+                return None
+
+        # This will always be set by the time Twisted calls us.
+        assert request.args is not None
+
+        if b"user_id" in request.args:
+            effective_user_id = request.args[b"user_id"][0].decode("utf8")
+            await self.validate_appservice_can_control_user_id(
+                app_service, effective_user_id
             )
-            if (
-                visibility
-                and visibility.content.get("history_visibility")
-                == HistoryVisibility.WORLD_READABLE
-            ):
-                return Membership.JOIN, None
-            raise UnstableSpecAuthError(
-                403,
-                "User %s not in room %s, and room previews are disabled"
-                % (requester.user, room_id),
-                errcode=Codes.NOT_JOINED,
+        else:
+            effective_user_id = app_service.sender
+
+        effective_device_id: Optional[str] = None
+
+        if (
+            self.hs.config.experimental.msc3202_device_masquerading_enabled
+            and DEVICE_ID_ARG_NAME in request.args
+        ):
+            effective_device_id = request.args[DEVICE_ID_ARG_NAME][0].decode("utf8")
+            # We only just set this so it can't be None!
+            assert effective_device_id is not None
+            device_opt = await self.store.get_device(
+                effective_user_id, effective_device_id
             )
+            if device_opt is None:
+                # For now, use 400 M_EXCLUSIVE if the device doesn't exist.
+                # This is an open thread of discussion on MSC3202 as of 2021-12-09.
+                raise AuthError(
+                    400,
+                    f"Application service trying to use a device that doesn't exist ('{effective_device_id}' for {effective_user_id})",
+                    Codes.EXCLUSIVE,
+                )
+
+        return create_requester(
+            effective_user_id, app_service=app_service, device_id=effective_device_id
+        )
diff --git a/synapse/api/auth/internal.py b/synapse/api/auth/internal.py
new file mode 100644
index 0000000000..e2ae198b19
--- /dev/null
+++ b/synapse/api/auth/internal.py
@@ -0,0 +1,291 @@
+# Copyright 2023 The Matrix.org Foundation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+from typing import TYPE_CHECKING
+
+import pymacaroons
+
+from synapse.api.errors import (
+    AuthError,
+    Codes,
+    InvalidClientTokenError,
+    MissingClientTokenError,
+)
+from synapse.http import get_request_user_agent
+from synapse.http.site import SynapseRequest
+from synapse.logging.opentracing import active_span, force_tracing, start_active_span
+from synapse.types import Requester, create_requester
+from synapse.util.cancellation import cancellable
+
+from . import GUEST_DEVICE_ID
+from .base import BaseAuth
+
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
+logger = logging.getLogger(__name__)
+
+
+class InternalAuth(BaseAuth):
+    """
+    This class contains functions for authenticating users of our client-server API.
+    """
+
+    def __init__(self, hs: "HomeServer"):
+        super().__init__(hs)
+        self.clock = hs.get_clock()
+        self._account_validity_handler = hs.get_account_validity_handler()
+        self._macaroon_generator = hs.get_macaroon_generator()
+
+        self._track_appservice_user_ips = hs.config.appservice.track_appservice_user_ips
+        self._track_puppeted_user_ips = hs.config.api.track_puppeted_user_ips
+        self._force_tracing_for_users = hs.config.tracing.force_tracing_for_users
+
+    @cancellable
+    async def get_user_by_req(
+        self,
+        request: SynapseRequest,
+        allow_guest: bool = False,
+        allow_expired: bool = False,
+    ) -> Requester:
+        """Get a registered user's ID.
+
+        Args:
+            request: An HTTP request with an access_token query parameter.
+            allow_guest: If False, will raise an AuthError if the user making the
+                request is a guest.
+            allow_expired: If True, allow the request through even if the account
+                is expired, or session token lifetime has ended. Note that
+                /login will deliver access tokens regardless of expiration.
+
+        Returns:
+            Resolves to the requester
+        Raises:
+            InvalidClientCredentialsError if no user by that token exists or the token
+                is invalid.
+            AuthError if access is denied for the user in the access token
+        """
+        parent_span = active_span()
+        with start_active_span("get_user_by_req"):
+            requester = await self._wrapped_get_user_by_req(
+                request, allow_guest, allow_expired
+            )
+
+            if parent_span:
+                if requester.authenticated_entity in self._force_tracing_for_users:
+                    # request tracing is enabled for this user, so we need to force it
+                    # tracing on for the parent span (which will be the servlet span).
+                    #
+                    # It's too late for the get_user_by_req span to inherit the setting,
+                    # so we also force it on for that.
+                    force_tracing()
+                    force_tracing(parent_span)
+                parent_span.set_tag(
+                    "authenticated_entity", requester.authenticated_entity
+                )
+                parent_span.set_tag("user_id", requester.user.to_string())
+                if requester.device_id is not None:
+                    parent_span.set_tag("device_id", requester.device_id)
+                if requester.app_service is not None:
+                    parent_span.set_tag("appservice_id", requester.app_service.id)
+            return requester
+
+    @cancellable
+    async def _wrapped_get_user_by_req(
+        self,
+        request: SynapseRequest,
+        allow_guest: bool,
+        allow_expired: bool,
+    ) -> Requester:
+        """Helper for get_user_by_req
+
+        Once get_user_by_req has set up the opentracing span, this does the actual work.
+        """
+        try:
+            ip_addr = request.getClientAddress().host
+            user_agent = get_request_user_agent(request)
+
+            access_token = self.get_access_token_from_request(request)
+
+            # First check if it could be a request from an appservice
+            requester = await self.get_appservice_user(request, access_token)
+            if not requester:
+                # If not, it should be from a regular user
+                requester = await self.get_user_by_access_token(
+                    access_token, allow_expired=allow_expired
+                )
+
+                # Deny the request if the user account has expired.
+                # This check is only done for regular users, not appservice ones.
+                if not allow_expired:
+                    if await self._account_validity_handler.is_user_expired(
+                        requester.user.to_string()
+                    ):
+                        # Raise the error if either an account validity module has determined
+                        # the account has expired, or the legacy account validity
+                        # implementation is enabled and determined the account has expired
+                        raise AuthError(
+                            403,
+                            "User account has expired",
+                            errcode=Codes.EXPIRED_ACCOUNT,
+                        )
+
+            if ip_addr and (
+                not requester.app_service or self._track_appservice_user_ips
+            ):
+                # XXX(quenting): I'm 95% confident that we could skip setting the
+                # device_id to "dummy-device" for appservices, and that the only impact
+                # would be some rows which whould not deduplicate in the 'user_ips'
+                # table during the transition
+                recorded_device_id = (
+                    "dummy-device"
+                    if requester.device_id is None and requester.app_service is not None
+                    else requester.device_id
+                )
+                await self.store.insert_client_ip(
+                    user_id=requester.authenticated_entity,
+                    access_token=access_token,
+                    ip=ip_addr,
+                    user_agent=user_agent,
+                    device_id=recorded_device_id,
+                )
+
+                # Track also the puppeted user client IP if enabled and the user is puppeting
+                if (
+                    requester.user.to_string() != requester.authenticated_entity
+                    and self._track_puppeted_user_ips
+                ):
+                    await self.store.insert_client_ip(
+                        user_id=requester.user.to_string(),
+                        access_token=access_token,
+                        ip=ip_addr,
+                        user_agent=user_agent,
+                        device_id=requester.device_id,
+                    )
+
+            if requester.is_guest and not allow_guest:
+                raise AuthError(
+                    403,
+                    "Guest access not allowed",
+                    errcode=Codes.GUEST_ACCESS_FORBIDDEN,
+                )
+
+            request.requester = requester
+            return requester
+        except KeyError:
+            raise MissingClientTokenError()
+
+    async def get_user_by_access_token(
+        self,
+        token: str,
+        allow_expired: bool = False,
+    ) -> Requester:
+        """Validate access token and get user_id from it
+
+        Args:
+            token: The access token to get the user by
+            allow_expired: If False, raises an InvalidClientTokenError
+                if the token is expired
+
+        Raises:
+            InvalidClientTokenError if a user by that token exists, but the token is
+                expired
+            InvalidClientCredentialsError if no user by that token exists or the token
+                is invalid
+        """
+
+        # First look in the database to see if the access token is present
+        # as an opaque token.
+        user_info = await self.store.get_user_by_access_token(token)
+        if user_info:
+            valid_until_ms = user_info.valid_until_ms
+            if (
+                not allow_expired
+                and valid_until_ms is not None
+                and valid_until_ms < self.clock.time_msec()
+            ):
+                # there was a valid access token, but it has expired.
+                # soft-logout the user.
+                raise InvalidClientTokenError(
+                    msg="Access token has expired", soft_logout=True
+                )
+
+            # Mark the token as used. This is used to invalidate old refresh
+            # tokens after some time.
+            await self.store.mark_access_token_as_used(user_info.token_id)
+
+            requester = create_requester(
+                user_id=user_info.user_id,
+                access_token_id=user_info.token_id,
+                is_guest=user_info.is_guest,
+                shadow_banned=user_info.shadow_banned,
+                device_id=user_info.device_id,
+                authenticated_entity=user_info.token_owner,
+            )
+
+            return requester
+
+        # If the token isn't found in the database, then it could still be a
+        # macaroon for a guest, so we check that here.
+        try:
+            user_id = self._macaroon_generator.verify_guest_token(token)
+
+            # Guest access tokens are not stored in the database (there can
+            # only be one access token per guest, anyway).
+            #
+            # In order to prevent guest access tokens being used as regular
+            # user access tokens (and hence getting around the invalidation
+            # process), we look up the user id and check that it is indeed
+            # a guest user.
+            #
+            # It would of course be much easier to store guest access
+            # tokens in the database as well, but that would break existing
+            # guest tokens.
+            stored_user = await self.store.get_user_by_id(user_id)
+            if not stored_user:
+                raise InvalidClientTokenError("Unknown user_id %s" % user_id)
+            if not stored_user["is_guest"]:
+                raise InvalidClientTokenError(
+                    "Guest access token used for regular user"
+                )
+
+            return create_requester(
+                user_id=user_id,
+                is_guest=True,
+                # all guests get the same device id
+                device_id=GUEST_DEVICE_ID,
+                authenticated_entity=user_id,
+            )
+        except (
+            pymacaroons.exceptions.MacaroonException,
+            TypeError,
+            ValueError,
+        ) as e:
+            logger.warning(
+                "Invalid access token in auth: %s %s.",
+                type(e),
+                e,
+            )
+            raise InvalidClientTokenError("Invalid access token passed.")
+
+    async def is_server_admin(self, requester: Requester) -> bool:
+        """Check if the given user is a local server admin.
+
+        Args:
+            requester: The user making the request, according to the access token.
+
+        Returns:
+            True if the user is an admin
+        """
+        return await self.store.is_server_admin(requester.user)
diff --git a/synapse/api/auth/msc3861_delegated.py b/synapse/api/auth/msc3861_delegated.py
new file mode 100644
index 0000000000..bd4fc9c0ee
--- /dev/null
+++ b/synapse/api/auth/msc3861_delegated.py
@@ -0,0 +1,352 @@
+# Copyright 2023 The Matrix.org Foundation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+from typing import TYPE_CHECKING, Any, Dict, List, Optional
+from urllib.parse import urlencode
+
+from authlib.oauth2 import ClientAuth
+from authlib.oauth2.auth import encode_client_secret_basic, encode_client_secret_post
+from authlib.oauth2.rfc7523 import ClientSecretJWT, PrivateKeyJWT, private_key_jwt_sign
+from authlib.oauth2.rfc7662 import IntrospectionToken
+from authlib.oidc.discovery import OpenIDProviderMetadata, get_well_known_url
+
+from twisted.web.client import readBody
+from twisted.web.http_headers import Headers
+
+from synapse.api.auth.base import BaseAuth
+from synapse.api.errors import (
+    AuthError,
+    HttpResponseException,
+    InvalidClientTokenError,
+    OAuthInsufficientScopeError,
+    StoreError,
+    SynapseError,
+)
+from synapse.http.site import SynapseRequest
+from synapse.logging.context import make_deferred_yieldable
+from synapse.types import Requester, UserID, create_requester
+from synapse.util import json_decoder
+from synapse.util.caches.cached_call import RetryOnExceptionCachedCall
+
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
+logger = logging.getLogger(__name__)
+
+# Scope as defined by MSC2967
+# https://github.com/matrix-org/matrix-spec-proposals/pull/2967
+SCOPE_MATRIX_API = "urn:matrix:org.matrix.msc2967.client:api:*"
+SCOPE_MATRIX_GUEST = "urn:matrix:org.matrix.msc2967.client:api:guest"
+SCOPE_MATRIX_DEVICE_PREFIX = "urn:matrix:org.matrix.msc2967.client:device:"
+
+# Scope which allows access to the Synapse admin API
+SCOPE_SYNAPSE_ADMIN = "urn:synapse:admin:*"
+
+
+def scope_to_list(scope: str) -> List[str]:
+    """Convert a scope string to a list of scope tokens"""
+    return scope.strip().split(" ")
+
+
+class PrivateKeyJWTWithKid(PrivateKeyJWT):  # type: ignore[misc]
+    """An implementation of the private_key_jwt client auth method that includes a kid header.
+
+    This is needed because some providers (Keycloak) require the kid header to figure
+    out which key to use to verify the signature.
+    """
+
+    def sign(self, auth: Any, token_endpoint: str) -> bytes:
+        return private_key_jwt_sign(
+            auth.client_secret,
+            client_id=auth.client_id,
+            token_endpoint=token_endpoint,
+            claims=self.claims,
+            header={"kid": auth.client_secret["kid"]},
+        )
+
+
+class MSC3861DelegatedAuth(BaseAuth):
+    AUTH_METHODS = {
+        "client_secret_post": encode_client_secret_post,
+        "client_secret_basic": encode_client_secret_basic,
+        "client_secret_jwt": ClientSecretJWT(),
+        "private_key_jwt": PrivateKeyJWTWithKid(),
+    }
+
+    EXTERNAL_ID_PROVIDER = "oauth-delegated"
+
+    def __init__(self, hs: "HomeServer"):
+        super().__init__(hs)
+
+        self._config = hs.config.experimental.msc3861
+        auth_method = MSC3861DelegatedAuth.AUTH_METHODS.get(
+            self._config.client_auth_method.value, None
+        )
+        # Those assertions are already checked when parsing the config
+        assert self._config.enabled, "OAuth delegation is not enabled"
+        assert self._config.issuer, "No issuer provided"
+        assert self._config.client_id, "No client_id provided"
+        assert auth_method is not None, "Invalid client_auth_method provided"
+
+        self._http_client = hs.get_proxied_http_client()
+        self._hostname = hs.hostname
+        self._admin_token = self._config.admin_token
+
+        self._issuer_metadata = RetryOnExceptionCachedCall(self._load_metadata)
+
+        if isinstance(auth_method, PrivateKeyJWTWithKid):
+            # Use the JWK as the client secret when using the private_key_jwt method
+            assert self._config.jwk, "No JWK provided"
+            self._client_auth = ClientAuth(
+                self._config.client_id, self._config.jwk, auth_method
+            )
+        else:
+            # Else use the client secret
+            assert self._config.client_secret, "No client_secret provided"
+            self._client_auth = ClientAuth(
+                self._config.client_id, self._config.client_secret, auth_method
+            )
+
+    async def _load_metadata(self) -> OpenIDProviderMetadata:
+        if self._config.issuer_metadata is not None:
+            return OpenIDProviderMetadata(**self._config.issuer_metadata)
+        url = get_well_known_url(self._config.issuer, external=True)
+        response = await self._http_client.get_json(url)
+        metadata = OpenIDProviderMetadata(**response)
+        # metadata.validate_introspection_endpoint()
+        return metadata
+
+    async def _introspect_token(self, token: str) -> IntrospectionToken:
+        """
+        Send a token to the introspection endpoint and returns the introspection response
+
+        Parameters:
+            token: The token to introspect
+
+        Raises:
+            HttpResponseException: If the introspection endpoint returns a non-2xx response
+            ValueError: If the introspection endpoint returns an invalid JSON response
+            JSONDecodeError: If the introspection endpoint returns a non-JSON response
+            Exception: If the HTTP request fails
+
+        Returns:
+            The introspection response
+        """
+        metadata = await self._issuer_metadata.get()
+        introspection_endpoint = metadata.get("introspection_endpoint")
+        raw_headers: Dict[str, str] = {
+            "Content-Type": "application/x-www-form-urlencoded",
+            "User-Agent": str(self._http_client.user_agent, "utf-8"),
+            "Accept": "application/json",
+        }
+
+        args = {"token": token, "token_type_hint": "access_token"}
+        body = urlencode(args, True)
+
+        # Fill the body/headers with credentials
+        uri, raw_headers, body = self._client_auth.prepare(
+            method="POST", uri=introspection_endpoint, headers=raw_headers, body=body
+        )
+        headers = Headers({k: [v] for (k, v) in raw_headers.items()})
+
+        # Do the actual request
+        # We're not using the SimpleHttpClient util methods as we don't want to
+        # check the HTTP status code, and we do the body encoding ourselves.
+        response = await self._http_client.request(
+            method="POST",
+            uri=uri,
+            data=body.encode("utf-8"),
+            headers=headers,
+        )
+
+        resp_body = await make_deferred_yieldable(readBody(response))
+
+        if response.code < 200 or response.code >= 300:
+            raise HttpResponseException(
+                response.code,
+                response.phrase.decode("ascii", errors="replace"),
+                resp_body,
+            )
+
+        resp = json_decoder.decode(resp_body.decode("utf-8"))
+
+        if not isinstance(resp, dict):
+            raise ValueError(
+                "The introspection endpoint returned an invalid JSON response."
+            )
+
+        return IntrospectionToken(**resp)
+
+    async def is_server_admin(self, requester: Requester) -> bool:
+        return "urn:synapse:admin:*" in requester.scope
+
+    async def get_user_by_req(
+        self,
+        request: SynapseRequest,
+        allow_guest: bool = False,
+        allow_expired: bool = False,
+    ) -> Requester:
+        access_token = self.get_access_token_from_request(request)
+
+        requester = await self.get_appservice_user(request, access_token)
+        if not requester:
+            # TODO: we probably want to assert the allow_guest inside this call
+            # so that we don't provision the user if they don't have enough permission:
+            requester = await self.get_user_by_access_token(access_token, allow_expired)
+
+        if not allow_guest and requester.is_guest:
+            raise OAuthInsufficientScopeError([SCOPE_MATRIX_API])
+
+        request.requester = requester
+
+        return requester
+
+    async def get_user_by_access_token(
+        self,
+        token: str,
+        allow_expired: bool = False,
+    ) -> Requester:
+        if self._admin_token is not None and token == self._admin_token:
+            # XXX: This is a temporary solution so that the admin API can be called by
+            # the OIDC provider. This will be removed once we have OIDC client
+            # credentials grant support in matrix-authentication-service.
+            logging.info("Admin toked used")
+            # XXX: that user doesn't exist and won't be provisioned.
+            # This is mostly fine for admin calls, but we should also think about doing
+            # requesters without a user_id.
+            admin_user = UserID("__oidc_admin", self._hostname)
+            return create_requester(
+                user_id=admin_user,
+                scope=["urn:synapse:admin:*"],
+            )
+
+        try:
+            introspection_result = await self._introspect_token(token)
+        except Exception:
+            logger.exception("Failed to introspect token")
+            raise SynapseError(503, "Unable to introspect the access token")
+
+        logger.info(f"Introspection result: {introspection_result!r}")
+
+        # TODO: introspection verification should be more extensive, especially:
+        #   - verify the audience
+        if not introspection_result.get("active"):
+            raise InvalidClientTokenError("Token is not active")
+
+        # Let's look at the scope
+        scope: List[str] = scope_to_list(introspection_result.get("scope", ""))
+
+        # Determine type of user based on presence of particular scopes
+        has_user_scope = SCOPE_MATRIX_API in scope
+        has_guest_scope = SCOPE_MATRIX_GUEST in scope
+
+        if not has_user_scope and not has_guest_scope:
+            raise InvalidClientTokenError("No scope in token granting user rights")
+
+        # Match via the sub claim
+        sub: Optional[str] = introspection_result.get("sub")
+        if sub is None:
+            raise InvalidClientTokenError(
+                "Invalid sub claim in the introspection result"
+            )
+
+        user_id_str = await self.store.get_user_by_external_id(
+            MSC3861DelegatedAuth.EXTERNAL_ID_PROVIDER, sub
+        )
+        if user_id_str is None:
+            # If we could not find a user via the external_id, it either does not exist,
+            # or the external_id was never recorded
+
+            # TODO: claim mapping should be configurable
+            username: Optional[str] = introspection_result.get("username")
+            if username is None or not isinstance(username, str):
+                raise AuthError(
+                    500,
+                    "Invalid username claim in the introspection result",
+                )
+            user_id = UserID(username, self._hostname)
+
+            # First try to find a user from the username claim
+            user_info = await self.store.get_userinfo_by_id(user_id=user_id.to_string())
+            if user_info is None:
+                # If the user does not exist, we should create it on the fly
+                # TODO: we could use SCIM to provision users ahead of time and listen
+                # for SCIM SET events if those ever become standard:
+                # https://datatracker.ietf.org/doc/html/draft-hunt-scim-notify-00
+
+                # TODO: claim mapping should be configurable
+                # If present, use the name claim as the displayname
+                name: Optional[str] = introspection_result.get("name")
+
+                await self.store.register_user(
+                    user_id=user_id.to_string(), create_profile_with_displayname=name
+                )
+
+            # And record the sub as external_id
+            await self.store.record_user_external_id(
+                MSC3861DelegatedAuth.EXTERNAL_ID_PROVIDER, sub, user_id.to_string()
+            )
+        else:
+            user_id = UserID.from_string(user_id_str)
+
+        # Find device_ids in scope
+        # We only allow a single device_id in the scope, so we find them all in the
+        # scope list, and raise if there are more than one. The OIDC server should be
+        # the one enforcing valid scopes, so we raise a 500 if we find an invalid scope.
+        device_ids = [
+            tok[len(SCOPE_MATRIX_DEVICE_PREFIX) :]
+            for tok in scope
+            if tok.startswith(SCOPE_MATRIX_DEVICE_PREFIX)
+        ]
+
+        if len(device_ids) > 1:
+            raise AuthError(
+                500,
+                "Multiple device IDs in scope",
+            )
+
+        device_id = device_ids[0] if device_ids else None
+        if device_id is not None:
+            # Sanity check the device_id
+            if len(device_id) > 255 or len(device_id) < 1:
+                raise AuthError(
+                    500,
+                    "Invalid device ID in scope",
+                )
+
+            # Create the device on the fly if it does not exist
+            try:
+                await self.store.get_device(
+                    user_id=user_id.to_string(), device_id=device_id
+                )
+            except StoreError:
+                await self.store.store_device(
+                    user_id=user_id.to_string(),
+                    device_id=device_id,
+                    initial_device_display_name="OIDC-native client",
+                )
+
+        # TODO: there is a few things missing in the requester here, which still need
+        # to be figured out, like:
+        #   - impersonation, with the `authenticated_entity`, which is used for
+        #     rate-limiting, MAU limits, etc.
+        #   - shadow-banning, with the `shadow_banned` flag
+        #   - a proper solution for appservices, which still needs to be figured out in
+        #     the context of MSC3861
+        return create_requester(
+            user_id=user_id,
+            device_id=device_id,
+            scope=scope,
+            is_guest=(has_guest_scope and not has_user_scope),
+        )
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index cde9a2ecef..faf0770c66 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -236,7 +236,7 @@ class EventContentFields:
     AUTHORISING_USER: Final = "join_authorised_via_users_server"
 
     # Use for mentioning users.
-    MSC3952_MENTIONS: Final = "org.matrix.msc3952.mentions"
+    MENTIONS: Final = "m.mentions"
 
     # an unspecced field added to to-device messages to identify them uniquely-ish
     TO_DEVICE_MSGID: Final = "org.matrix.msgid"
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index 8c7c94b045..af894243f8 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -119,14 +119,20 @@ class Codes(str, Enum):
 
 
 class CodeMessageException(RuntimeError):
-    """An exception with integer code and message string attributes.
+    """An exception with integer code, a message string attributes and optional headers.
 
     Attributes:
         code: HTTP error code
         msg: string describing the error
+        headers: optional response headers to send
     """
 
-    def __init__(self, code: Union[int, HTTPStatus], msg: str):
+    def __init__(
+        self,
+        code: Union[int, HTTPStatus],
+        msg: str,
+        headers: Optional[Dict[str, str]] = None,
+    ):
         super().__init__("%d: %s" % (code, msg))
 
         # Some calls to this method pass instances of http.HTTPStatus for `code`.
@@ -137,6 +143,7 @@ class CodeMessageException(RuntimeError):
         # To eliminate this behaviour, we convert them to their integer equivalents here.
         self.code = int(code)
         self.msg = msg
+        self.headers = headers
 
 
 class RedirectException(CodeMessageException):
@@ -182,6 +189,7 @@ class SynapseError(CodeMessageException):
         msg: str,
         errcode: str = Codes.UNKNOWN,
         additional_fields: Optional[Dict] = None,
+        headers: Optional[Dict[str, str]] = None,
     ):
         """Constructs a synapse error.
 
@@ -190,7 +198,7 @@ class SynapseError(CodeMessageException):
             msg: The human-readable error message.
             errcode: The matrix error code e.g 'M_FORBIDDEN'
         """
-        super().__init__(code, msg)
+        super().__init__(code, msg, headers)
         self.errcode = errcode
         if additional_fields is None:
             self._additional_fields: Dict = {}
@@ -335,6 +343,20 @@ class AuthError(SynapseError):
         super().__init__(code, msg, errcode, additional_fields)
 
 
+class OAuthInsufficientScopeError(SynapseError):
+    """An error raised when the caller does not have sufficient scope to perform the requested action"""
+
+    def __init__(
+        self,
+        required_scopes: List[str],
+    ):
+        headers = {
+            "WWW-Authenticate": 'Bearer error="insufficient_scope", scope="%s"'
+            % (" ".join(required_scopes))
+        }
+        super().__init__(401, "Insufficient scope", Codes.FORBIDDEN, None, headers)
+
+
 class UnstableSpecAuthError(AuthError):
     """An error raised when a new error code is being proposed to replace a previous one.
     This error will return a "org.matrix.unstable.errcode" property with the new error code,
diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py
index 82aeef8d19..0995ecbe83 100644
--- a/synapse/api/filtering.py
+++ b/synapse/api/filtering.py
@@ -152,9 +152,9 @@ class Filtering:
         self.DEFAULT_FILTER_COLLECTION = FilterCollection(hs, {})
 
     async def get_user_filter(
-        self, user_localpart: str, filter_id: Union[int, str]
+        self, user_id: UserID, filter_id: Union[int, str]
     ) -> "FilterCollection":
-        result = await self.store.get_user_filter(user_localpart, filter_id)
+        result = await self.store.get_user_filter(user_id, filter_id)
         return FilterCollection(self._hs, result)
 
     def add_user_filter(self, user_id: UserID, user_filter: JsonDict) -> Awaitable[int]:
diff --git a/synapse/config/auth.py b/synapse/config/auth.py
index 35774962c0..c7ab428f28 100644
--- a/synapse/config/auth.py
+++ b/synapse/config/auth.py
@@ -29,7 +29,14 @@ class AuthConfig(Config):
         if password_config is None:
             password_config = {}
 
-        passwords_enabled = password_config.get("enabled", True)
+        # The default value of password_config.enabled is True, unless msc3861 is enabled.
+        msc3861_enabled = (
+            config.get("experimental_features", {})
+            .get("msc3861", {})
+            .get("enabled", False)
+        )
+        passwords_enabled = password_config.get("enabled", not msc3861_enabled)
+
         # 'only_for_reauth' allows users who have previously set a password to use it,
         # even though passwords would otherwise be disabled.
         passwords_for_reauth_only = passwords_enabled == "only_for_reauth"
@@ -53,3 +60,13 @@ class AuthConfig(Config):
         self.ui_auth_session_timeout = self.parse_duration(
             ui_auth.get("session_timeout", 0)
         )
+
+        # Logging in with an existing session.
+        login_via_existing = config.get("login_via_existing_session", {})
+        self.login_via_existing_enabled = login_via_existing.get("enabled", False)
+        self.login_via_existing_require_ui_auth = login_via_existing.get(
+            "require_ui_auth", True
+        )
+        self.login_via_existing_token_timeout = self.parse_duration(
+            login_via_existing.get("token_timeout", "5m")
+        )
diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py
index d769b7f668..1d5b5ded45 100644
--- a/synapse/config/experimental.py
+++ b/synapse/config/experimental.py
@@ -12,15 +12,216 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from typing import Any, Optional
+import enum
+from typing import TYPE_CHECKING, Any, Optional
 
 import attr
+import attr.validators
 
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions
 from synapse.config import ConfigError
-from synapse.config._base import Config
+from synapse.config._base import Config, RootConfig
 from synapse.types import JsonDict
 
+# Determine whether authlib is installed.
+try:
+    import authlib  # noqa: F401
+
+    HAS_AUTHLIB = True
+except ImportError:
+    HAS_AUTHLIB = False
+
+if TYPE_CHECKING:
+    # Only import this if we're type checking, as it might not be installed at runtime.
+    from authlib.jose.rfc7517 import JsonWebKey
+
+
+class ClientAuthMethod(enum.Enum):
+    """List of supported client auth methods."""
+
+    CLIENT_SECRET_POST = "client_secret_post"
+    CLIENT_SECRET_BASIC = "client_secret_basic"
+    CLIENT_SECRET_JWT = "client_secret_jwt"
+    PRIVATE_KEY_JWT = "private_key_jwt"
+
+
+def _parse_jwks(jwks: Optional[JsonDict]) -> Optional["JsonWebKey"]:
+    """A helper function to parse a JWK dict into a JsonWebKey."""
+
+    if jwks is None:
+        return None
+
+    from authlib.jose.rfc7517 import JsonWebKey
+
+    return JsonWebKey.import_key(jwks)
+
+
+@attr.s(slots=True, frozen=True)
+class MSC3861:
+    """Configuration for MSC3861: Matrix architecture change to delegate authentication via OIDC"""
+
+    enabled: bool = attr.ib(default=False, validator=attr.validators.instance_of(bool))
+    """Whether to enable MSC3861 auth delegation."""
+
+    @enabled.validator
+    def _check_enabled(self, attribute: attr.Attribute, value: bool) -> None:
+        # Only allow enabling MSC3861 if authlib is installed
+        if value and not HAS_AUTHLIB:
+            raise ConfigError(
+                "MSC3861 is enabled but authlib is not installed. "
+                "Please install authlib to use MSC3861.",
+                ("experimental", "msc3861", "enabled"),
+            )
+
+    issuer: str = attr.ib(default="", validator=attr.validators.instance_of(str))
+    """The URL of the OIDC Provider."""
+
+    issuer_metadata: Optional[JsonDict] = attr.ib(default=None)
+    """The issuer metadata to use, otherwise discovered from /.well-known/openid-configuration as per MSC2965."""
+
+    client_id: str = attr.ib(
+        default="",
+        validator=attr.validators.instance_of(str),
+    )
+    """The client ID to use when calling the introspection endpoint."""
+
+    client_auth_method: ClientAuthMethod = attr.ib(
+        default=ClientAuthMethod.CLIENT_SECRET_POST, converter=ClientAuthMethod
+    )
+    """The auth method used when calling the introspection endpoint."""
+
+    client_secret: Optional[str] = attr.ib(
+        default=None,
+        validator=attr.validators.optional(attr.validators.instance_of(str)),
+    )
+    """
+    The client secret to use when calling the introspection endpoint,
+    when using any of the client_secret_* client auth methods.
+    """
+
+    jwk: Optional["JsonWebKey"] = attr.ib(default=None, converter=_parse_jwks)
+    """
+    The JWKS to use when calling the introspection endpoint,
+    when using the private_key_jwt client auth method.
+    """
+
+    @client_auth_method.validator
+    def _check_client_auth_method(
+        self, attribute: attr.Attribute, value: ClientAuthMethod
+    ) -> None:
+        # Check that the right client credentials are provided for the client auth method.
+        if not self.enabled:
+            return
+
+        if value == ClientAuthMethod.PRIVATE_KEY_JWT and self.jwk is None:
+            raise ConfigError(
+                "A JWKS must be provided when using the private_key_jwt client auth method",
+                ("experimental", "msc3861", "client_auth_method"),
+            )
+
+        if (
+            value
+            in (
+                ClientAuthMethod.CLIENT_SECRET_POST,
+                ClientAuthMethod.CLIENT_SECRET_BASIC,
+                ClientAuthMethod.CLIENT_SECRET_JWT,
+            )
+            and self.client_secret is None
+        ):
+            raise ConfigError(
+                f"A client secret must be provided when using the {value} client auth method",
+                ("experimental", "msc3861", "client_auth_method"),
+            )
+
+    account_management_url: Optional[str] = attr.ib(
+        default=None,
+        validator=attr.validators.optional(attr.validators.instance_of(str)),
+    )
+    """The URL of the My Account page on the OIDC Provider as per MSC2965."""
+
+    admin_token: Optional[str] = attr.ib(
+        default=None,
+        validator=attr.validators.optional(attr.validators.instance_of(str)),
+    )
+    """
+    A token that should be considered as an admin token.
+    This is used by the OIDC provider, to make admin calls to Synapse.
+    """
+
+    def check_config_conflicts(self, root: RootConfig) -> None:
+        """Checks for any configuration conflicts with other parts of Synapse.
+
+        Raises:
+            ConfigError: If there are any configuration conflicts.
+        """
+
+        if not self.enabled:
+            return
+
+        if (
+            root.auth.password_enabled_for_reauth
+            or root.auth.password_enabled_for_login
+        ):
+            raise ConfigError(
+                "Password auth cannot be enabled when OAuth delegation is enabled",
+                ("password_config", "enabled"),
+            )
+
+        if root.registration.enable_registration:
+            raise ConfigError(
+                "Registration cannot be enabled when OAuth delegation is enabled",
+                ("enable_registration",),
+            )
+
+        if (
+            root.oidc.oidc_enabled
+            or root.saml2.saml2_enabled
+            or root.cas.cas_enabled
+            or root.jwt.jwt_enabled
+        ):
+            raise ConfigError("SSO cannot be enabled when OAuth delegation is enabled")
+
+        if bool(root.authproviders.password_providers):
+            raise ConfigError(
+                "Password auth providers cannot be enabled when OAuth delegation is enabled"
+            )
+
+        if root.captcha.enable_registration_captcha:
+            raise ConfigError(
+                "CAPTCHA cannot be enabled when OAuth delegation is enabled",
+                ("captcha", "enable_registration_captcha"),
+            )
+
+        if root.auth.login_via_existing_enabled:
+            raise ConfigError(
+                "Login via existing session cannot be enabled when OAuth delegation is enabled",
+                ("login_via_existing_session", "enabled"),
+            )
+
+        if root.registration.refresh_token_lifetime:
+            raise ConfigError(
+                "refresh_token_lifetime cannot be set when OAuth delegation is enabled",
+                ("refresh_token_lifetime",),
+            )
+
+        if root.registration.nonrefreshable_access_token_lifetime:
+            raise ConfigError(
+                "nonrefreshable_access_token_lifetime cannot be set when OAuth delegation is enabled",
+                ("nonrefreshable_access_token_lifetime",),
+            )
+
+        if root.registration.session_lifetime:
+            raise ConfigError(
+                "session_lifetime cannot be set when OAuth delegation is enabled",
+                ("session_lifetime",),
+            )
+
+        if not root.experimental.msc3970_enabled:
+            raise ConfigError(
+                "experimental_features.msc3970_enabled must be 'true' when OAuth delegation is enabled",
+                ("experimental_features", "msc3970_enabled"),
+            )
+
 
 @attr.s(auto_attribs=True, frozen=True, slots=True)
 class MSC3866Config:
@@ -118,13 +319,6 @@ class ExperimentalConfig(Config):
         # MSC3881: Remotely toggle push notifications for another client
         self.msc3881_enabled: bool = experimental.get("msc3881_enabled", False)
 
-        # MSC3882: Allow an existing session to sign in a new session
-        self.msc3882_enabled: bool = experimental.get("msc3882_enabled", False)
-        self.msc3882_ui_auth: bool = experimental.get("msc3882_ui_auth", True)
-        self.msc3882_token_timeout = self.parse_duration(
-            experimental.get("msc3882_token_timeout", "5m")
-        )
-
         # MSC3874: Filtering /messages with rel_types / not_rel_types.
         self.msc3874_enabled: bool = experimental.get("msc3874_enabled", False)
 
@@ -164,11 +358,6 @@ class ExperimentalConfig(Config):
         # MSC3391: Removing account data.
         self.msc3391_enabled = experimental.get("msc3391_enabled", False)
 
-        # MSC3952: Intentional mentions, this depends on MSC3966.
-        self.msc3952_intentional_mentions = experimental.get(
-            "msc3952_intentional_mentions", False
-        )
-
         # MSC3959: Do not generate notifications for edits.
         self.msc3958_supress_edit_notifs = experimental.get(
             "msc3958_supress_edit_notifs", False
@@ -182,8 +371,19 @@ class ExperimentalConfig(Config):
             "msc3981_recurse_relations", False
         )
 
+        # MSC3861: Matrix architecture change to delegate authentication via OIDC
+        try:
+            self.msc3861 = MSC3861(**experimental.get("msc3861", {}))
+        except ValueError as exc:
+            raise ConfigError(
+                "Invalid MSC3861 configuration", ("experimental", "msc3861")
+            ) from exc
+
         # MSC3970: Scope transaction IDs to devices
-        self.msc3970_enabled = experimental.get("msc3970_enabled", False)
+        self.msc3970_enabled = experimental.get("msc3970_enabled", self.msc3861.enabled)
+
+        # Check that none of the other config options conflict with MSC3861 when enabled
+        self.msc3861.check_config_conflicts(self.root)
 
         # MSC4009: E.164 Matrix IDs
         self.msc4009_e164_mxids = experimental.get("msc4009_e164_mxids", False)
diff --git a/synapse/events/validator.py b/synapse/events/validator.py
index 47203209db..9278f1a1aa 100644
--- a/synapse/events/validator.py
+++ b/synapse/events/validator.py
@@ -134,13 +134,8 @@ class EventValidator:
                 )
 
         # If the event contains a mentions key, validate it.
-        if (
-            EventContentFields.MSC3952_MENTIONS in event.content
-            and config.experimental.msc3952_intentional_mentions
-        ):
-            validate_json_object(
-                event.content[EventContentFields.MSC3952_MENTIONS], Mentions
-            )
+        if EventContentFields.MENTIONS in event.content:
+            validate_json_object(event.content[EventContentFields.MENTIONS], Mentions)
 
     def _validate_retention(self, event: EventBase) -> None:
         """Checks that an event that defines the retention policy for a room respects the
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index f4ca70a698..9425b32507 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -515,7 +515,7 @@ class FederationServer(FederationBase):
                     logger.error(
                         "Failed to handle PDU %s",
                         event_id,
-                        exc_info=(f.type, f.value, f.getTracebackObject()),  # type: ignore
+                        exc_info=(f.type, f.value, f.getTracebackObject()),
                     )
                     return {"error": str(e)}
 
@@ -944,7 +944,7 @@ class FederationServer(FederationBase):
             if not self._is_mine_server_name(authorising_server):
                 raise SynapseError(
                     400,
-                    f"Cannot authorise request from resident server: {authorising_server}",
+                    f"Cannot authorise membership event for {authorising_server}. We can only authorise requests from our own homeserver",
                 )
 
             event.signatures.update(
@@ -1247,7 +1247,7 @@ class FederationServer(FederationBase):
                     logger.error(
                         "Failed to handle PDU %s",
                         event.event_id,
-                        exc_info=(f.type, f.value, f.getTracebackObject()),  # type: ignore
+                        exc_info=(f.type, f.value, f.getTracebackObject()),
                     )
 
                 received_ts = await self.store.remove_received_event_from_staging(
@@ -1291,9 +1291,6 @@ class FederationServer(FederationBase):
                 return
             lock = new_lock
 
-    def __str__(self) -> str:
-        return "<ReplicationLayer(%s)>" % self.server_name
-
     async def exchange_third_party_invite(
         self, sender_user_id: str, target_user_id: str, room_id: str, signed: Dict
     ) -> None:
diff --git a/synapse/handlers/account_validity.py b/synapse/handlers/account_validity.py
index 4aa4ebf7e4..f1a7a05df6 100644
--- a/synapse/handlers/account_validity.py
+++ b/synapse/handlers/account_validity.py
@@ -164,7 +164,7 @@ class AccountValidityHandler:
 
         try:
             user_display_name = await self.store.get_profile_displayname(
-                UserID.from_string(user_id).localpart
+                UserID.from_string(user_id)
             )
             if user_display_name is None:
                 user_display_name = user_id
diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index b06f25b03c..119c7f8384 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -89,7 +89,7 @@ class AdminHandler:
         }
 
         # Add additional user metadata
-        profile = await self._store.get_profileinfo(user.localpart)
+        profile = await self._store.get_profileinfo(user)
         threepids = await self._store.user_get_threepids(user.to_string())
         external_ids = [
             ({"auth_provider": auth_provider, "external_id": external_id})
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index d001f2fb2f..59ecafa6a0 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -274,6 +274,8 @@ class AuthHandler:
         # response.
         self._extra_attributes: Dict[str, SsoLoginExtraAttributes] = {}
 
+        self.msc3861_oauth_delegation_enabled = hs.config.experimental.msc3861.enabled
+
     async def validate_user_via_ui_auth(
         self,
         requester: Requester,
@@ -322,8 +324,12 @@ class AuthHandler:
 
             LimitExceededError if the ratelimiter's failed request count for this
                 user is too high to proceed
-
         """
+        if self.msc3861_oauth_delegation_enabled:
+            raise SynapseError(
+                HTTPStatus.INTERNAL_SERVER_ERROR, "UIA shouldn't be used with MSC3861"
+            )
+
         if not requester.access_token_id:
             raise ValueError("Cannot validate a user without an access token")
         if can_skip_ui_auth and self._ui_auth_session_timeout:
@@ -1753,7 +1759,7 @@ class AuthHandler:
             return
 
         user_profile_data = await self.store.get_profileinfo(
-            UserID.from_string(registered_user_id).localpart
+            UserID.from_string(registered_user_id)
         )
 
         # Store any extra attributes which will be passed in the login response.
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index f299b89a1b..67adeae6a7 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -297,5 +297,5 @@ class DeactivateAccountHandler:
         # Add the user to the directory, if necessary. Note that
         # this must be done after the user is re-activated, because
         # deactivated users are excluded from the user directory.
-        profile = await self.store.get_profileinfo(user.localpart)
+        profile = await self.store.get_profileinfo(user)
         await self.user_directory_handler.handle_local_profile_change(user_id, profile)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 2eb28d55ac..57d6b70cff 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -200,6 +200,7 @@ class FederationHandler:
             )
 
     @trace
+    @tag_args
     async def maybe_backfill(
         self, room_id: str, current_depth: int, limit: int
     ) -> bool:
@@ -214,6 +215,9 @@ class FederationHandler:
             limit: The number of events that the pagination request will
                 return. This is used as part of the heuristic to decide if we
                 should back paginate.
+
+        Returns:
+            True if we actually tried to backfill something, otherwise False.
         """
         # Starting the processing time here so we can include the room backfill
         # linearizer lock queue in the timing
@@ -227,6 +231,8 @@ class FederationHandler:
                 processing_start_time=processing_start_time,
             )
 
+    @trace
+    @tag_args
     async def _maybe_backfill_inner(
         self,
         room_id: str,
@@ -247,6 +253,9 @@ class FederationHandler:
             limit: The max number of events to request from the remote federated server.
             processing_start_time: The time when `maybe_backfill` started processing.
                 Only used for timing. If `None`, no timing observation will be made.
+
+        Returns:
+            True if we actually tried to backfill something, otherwise False.
         """
         backwards_extremities = [
             _BackfillPoint(event_id, depth, _BackfillPointType.BACKWARDS_EXTREMITY)
@@ -302,6 +311,14 @@ class FederationHandler:
             len(sorted_backfill_points),
             sorted_backfill_points,
         )
+        set_tag(
+            SynapseTags.RESULT_PREFIX + "sorted_backfill_points",
+            str(sorted_backfill_points),
+        )
+        set_tag(
+            SynapseTags.RESULT_PREFIX + "sorted_backfill_points.length",
+            str(len(sorted_backfill_points)),
+        )
 
         # If we have no backfill points lower than the `current_depth` then
         # either we can a) bail or b) still attempt to backfill. We opt to try
diff --git a/synapse/handlers/oidc.py b/synapse/handlers/oidc.py
index e7e0b5e049..24b68e0301 100644
--- a/synapse/handlers/oidc.py
+++ b/synapse/handlers/oidc.py
@@ -1354,7 +1354,7 @@ class OidcProvider:
         finish_request(request)
 
 
-class LogoutToken(JWTClaims):
+class LogoutToken(JWTClaims):  # type: ignore[misc]
     """
     Holds and verify claims of a logout token, as per
     https://openid.net/specs/openid-connect-backchannel-1_0.html#LogoutToken
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 63b35c8d62..d5257acb7d 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -360,7 +360,7 @@ class PaginationHandler:
         except Exception:
             f = Failure()
             logger.error(
-                "[purge] failed", exc_info=(f.type, f.value, f.getTracebackObject())  # type: ignore
+                "[purge] failed", exc_info=(f.type, f.value, f.getTracebackObject())
             )
             self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
             self._purges_by_id[purge_id].error = f.getErrorMessage()
@@ -689,7 +689,7 @@ class PaginationHandler:
             f = Failure()
             logger.error(
                 "failed",
-                exc_info=(f.type, f.value, f.getTracebackObject()),  # type: ignore
+                exc_info=(f.type, f.value, f.getTracebackObject()),
             )
             self._delete_by_id[delete_id].status = DeleteStatus.STATUS_FAILED
             self._delete_by_id[delete_id].error = f.getErrorMessage()
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 4ad2233573..0a219b7962 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -648,7 +648,6 @@ class PresenceHandler(BasePresenceHandler):
     def __init__(self, hs: "HomeServer"):
         super().__init__(hs)
         self.hs = hs
-        self.server_name = hs.hostname
         self.wheel_timer: WheelTimer[str] = WheelTimer()
         self.notifier = hs.get_notifier()
         self._presence_enabled = hs.config.server.use_presence
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index a9160c87e3..a7f8c5e636 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -67,7 +67,7 @@ class ProfileHandler:
         target_user = UserID.from_string(user_id)
 
         if self.hs.is_mine(target_user):
-            profileinfo = await self.store.get_profileinfo(target_user.localpart)
+            profileinfo = await self.store.get_profileinfo(target_user)
             if profileinfo.display_name is None:
                 raise SynapseError(404, "Profile was not found", Codes.NOT_FOUND)
 
@@ -99,9 +99,7 @@ class ProfileHandler:
     async def get_displayname(self, target_user: UserID) -> Optional[str]:
         if self.hs.is_mine(target_user):
             try:
-                displayname = await self.store.get_profile_displayname(
-                    target_user.localpart
-                )
+                displayname = await self.store.get_profile_displayname(target_user)
             except StoreError as e:
                 if e.code == 404:
                     raise SynapseError(404, "Profile was not found", Codes.NOT_FOUND)
@@ -147,7 +145,7 @@ class ProfileHandler:
             raise AuthError(400, "Cannot set another user's displayname")
 
         if not by_admin and not self.hs.config.registration.enable_set_displayname:
-            profile = await self.store.get_profileinfo(target_user.localpart)
+            profile = await self.store.get_profileinfo(target_user)
             if profile.display_name:
                 raise SynapseError(
                     400,
@@ -180,7 +178,7 @@ class ProfileHandler:
 
         await self.store.set_profile_displayname(target_user, displayname_to_set)
 
-        profile = await self.store.get_profileinfo(target_user.localpart)
+        profile = await self.store.get_profileinfo(target_user)
         await self.user_directory_handler.handle_local_profile_change(
             target_user.to_string(), profile
         )
@@ -194,9 +192,7 @@ class ProfileHandler:
     async def get_avatar_url(self, target_user: UserID) -> Optional[str]:
         if self.hs.is_mine(target_user):
             try:
-                avatar_url = await self.store.get_profile_avatar_url(
-                    target_user.localpart
-                )
+                avatar_url = await self.store.get_profile_avatar_url(target_user)
             except StoreError as e:
                 if e.code == 404:
                     raise SynapseError(404, "Profile was not found", Codes.NOT_FOUND)
@@ -241,7 +237,7 @@ class ProfileHandler:
             raise AuthError(400, "Cannot set another user's avatar_url")
 
         if not by_admin and not self.hs.config.registration.enable_set_avatar_url:
-            profile = await self.store.get_profileinfo(target_user.localpart)
+            profile = await self.store.get_profileinfo(target_user)
             if profile.avatar_url:
                 raise SynapseError(
                     400, "Changing avatar is disabled on this server", Codes.FORBIDDEN
@@ -272,7 +268,7 @@ class ProfileHandler:
 
         await self.store.set_profile_avatar_url(target_user, avatar_url_to_set)
 
-        profile = await self.store.get_profileinfo(target_user.localpart)
+        profile = await self.store.get_profileinfo(target_user)
         await self.user_directory_handler.handle_local_profile_change(
             target_user.to_string(), profile
         )
@@ -369,14 +365,10 @@ class ProfileHandler:
         response = {}
         try:
             if just_field is None or just_field == "displayname":
-                response["displayname"] = await self.store.get_profile_displayname(
-                    user.localpart
-                )
+                response["displayname"] = await self.store.get_profile_displayname(user)
 
             if just_field is None or just_field == "avatar_url":
-                response["avatar_url"] = await self.store.get_profile_avatar_url(
-                    user.localpart
-                )
+                response["avatar_url"] = await self.store.get_profile_avatar_url(user)
         except StoreError as e:
             if e.code == 404:
                 raise SynapseError(404, "Profile was not found", Codes.NOT_FOUND)
diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py
index 49a497a860..df5a4f3e22 100644
--- a/synapse/handlers/read_marker.py
+++ b/synapse/handlers/read_marker.py
@@ -27,7 +27,6 @@ logger = logging.getLogger(__name__)
 
 class ReadMarkerHandler:
     def __init__(self, hs: "HomeServer"):
-        self.server_name = hs.config.server.server_name
         self.store = hs.get_datastores().main
         self.account_data_handler = hs.get_account_data_handler()
         self.read_marker_linearizer = Linearizer(name="read_marker")
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index c80946c2e9..a2d3f03061 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -315,7 +315,7 @@ class RegistrationHandler:
                 approved=approved,
             )
 
-            profile = await self.store.get_profileinfo(localpart)
+            profile = await self.store.get_profileinfo(user)
             await self.user_directory_handler.handle_local_profile_change(
                 user_id, profile
             )
diff --git a/synapse/handlers/relations.py b/synapse/handlers/relations.py
index 4824635162..db97f7aede 100644
--- a/synapse/handlers/relations.py
+++ b/synapse/handlers/relations.py
@@ -205,16 +205,22 @@ class RelationsHandler:
             event_id: The event IDs to look and redact relations of.
             initial_redaction_event: The redaction for the event referred to by
                 event_id.
-            relation_types: The types of relations to look for.
+            relation_types: The types of relations to look for. If "*" is in the list,
+                all related events will be redacted regardless of the type.
 
         Raises:
             ShadowBanError if the requester is shadow-banned
         """
-        related_event_ids = (
-            await self._main_store.get_all_relations_for_event_with_types(
-                event_id, relation_types
+        if "*" in relation_types:
+            related_event_ids = await self._main_store.get_all_relations_for_event(
+                event_id
+            )
+        else:
+            related_event_ids = (
+                await self._main_store.get_all_relations_for_event_with_types(
+                    event_id, relation_types
+                )
             )
-        )
 
         for related_event_id in related_event_ids:
             try:
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 5e1702d78a..cb957f2033 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -1490,7 +1490,6 @@ class RoomContextHandler:
 
 class TimestampLookupHandler:
     def __init__(self, hs: "HomeServer"):
-        self.server_name = hs.hostname
         self.store = hs.get_datastores().main
         self.state_handler = hs.get_state_handler()
         self.federation_client = hs.get_federation_client()
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index 5c01482acf..7cabf7980a 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -42,7 +42,6 @@ class StatsHandler:
         self.store = hs.get_datastores().main
         self._storage_controllers = hs.get_storage_controllers()
         self.state = hs.get_state_handler()
-        self.server_name = hs.hostname
         self.clock = hs.get_clock()
         self.notifier = hs.get_notifier()
         self.is_mine_id = hs.is_mine_id
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 9094dab0fe..abb5ae5815 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -499,8 +499,15 @@ class MatrixFederationHttpClient:
                 Note that the above intervals are *in addition* to the time spent
                 waiting for the request to complete (up to `timeout` ms).
 
-                NB: the long retry algorithm takes over 20 minutes to complete, with
-                a default timeout of 60s!
+                NB: the long retry algorithm takes over 20 minutes to complete, with a
+                default timeout of 60s! It's best not to use the `long_retries` option
+                for something that is blocking a client so we don't make them wait for
+                aaaaages, whereas some things like sending transactions (server to
+                server) we can be a lot more lenient but its very fuzzy / hand-wavey.
+
+                In the future, we could be more intelligent about doing this sort of
+                thing by looking at things with the bigger picture in mind,
+                https://github.com/matrix-org/synapse/issues/8917
 
             ignore_backoff: true to ignore the historical backoff data
                 and try the request anyway.
diff --git a/synapse/http/server.py b/synapse/http/server.py
index 101dc2e747..933172c873 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -108,9 +108,12 @@ def return_json_error(
 
     if f.check(SynapseError):
         # mypy doesn't understand that f.check asserts the type.
-        exc: SynapseError = f.value  # type: ignore
+        exc: SynapseError = f.value
         error_code = exc.code
         error_dict = exc.error_dict(config)
+        if exc.headers is not None:
+            for header, value in exc.headers.items():
+                request.setHeader(header, value)
         logger.info("%s SynapseError: %s - %s", request, error_code, exc.msg)
     elif f.check(CancelledError):
         error_code = HTTP_STATUS_REQUEST_CANCELLED
@@ -121,7 +124,7 @@ def return_json_error(
                 "Got cancellation before client disconnection from %r: %r",
                 request.request_metrics.name,
                 request,
-                exc_info=(f.type, f.value, f.getTracebackObject()),  # type: ignore[arg-type]
+                exc_info=(f.type, f.value, f.getTracebackObject()),
             )
     else:
         error_code = 500
@@ -131,7 +134,7 @@ def return_json_error(
             "Failed handle request via %r: %r",
             request.request_metrics.name,
             request,
-            exc_info=(f.type, f.value, f.getTracebackObject()),  # type: ignore[arg-type]
+            exc_info=(f.type, f.value, f.getTracebackObject()),
         )
 
     # Only respond with an error response if we haven't already started writing,
@@ -169,9 +172,12 @@ def return_html_error(
     """
     if f.check(CodeMessageException):
         # mypy doesn't understand that f.check asserts the type.
-        cme: CodeMessageException = f.value  # type: ignore
+        cme: CodeMessageException = f.value
         code = cme.code
         msg = cme.msg
+        if cme.headers is not None:
+            for header, value in cme.headers.items():
+                request.setHeader(header, value)
 
         if isinstance(cme, RedirectException):
             logger.info("%s redirect to %s", request, cme.location)
@@ -183,7 +189,7 @@ def return_html_error(
             logger.error(
                 "Failed handle request %r",
                 request,
-                exc_info=(f.type, f.value, f.getTracebackObject()),  # type: ignore[arg-type]
+                exc_info=(f.type, f.value, f.getTracebackObject()),
             )
     elif f.check(CancelledError):
         code = HTTP_STATUS_REQUEST_CANCELLED
@@ -193,7 +199,7 @@ def return_html_error(
             logger.error(
                 "Got cancellation before client disconnection when handling request %r",
                 request,
-                exc_info=(f.type, f.value, f.getTracebackObject()),  # type: ignore[arg-type]
+                exc_info=(f.type, f.value, f.getTracebackObject()),
             )
     else:
         code = HTTPStatus.INTERNAL_SERVER_ERROR
@@ -202,7 +208,7 @@ def return_html_error(
         logger.error(
             "Failed handle request %r",
             request,
-            exc_info=(f.type, f.value, f.getTracebackObject()),  # type: ignore[arg-type]
+            exc_info=(f.type, f.value, f.getTracebackObject()),
         )
 
     if isinstance(error_template, str):
diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py
index c70eee649c..75217e3f45 100644
--- a/synapse/logging/opentracing.py
+++ b/synapse/logging/opentracing.py
@@ -171,6 +171,7 @@ from functools import wraps
 from typing import (
     TYPE_CHECKING,
     Any,
+    Awaitable,
     Callable,
     Collection,
     ContextManager,
@@ -903,6 +904,7 @@ def _custom_sync_async_decorator(
     """
 
     if inspect.iscoroutinefunction(func):
+        # For this branch, we handle async functions like `async def func() -> RInner`.
         # In this branch, R = Awaitable[RInner], for some other type RInner
         @wraps(func)
         async def _wrapper(
@@ -914,15 +916,16 @@ def _custom_sync_async_decorator(
                 return await func(*args, **kwargs)  # type: ignore[misc]
 
     else:
-        # The other case here handles both sync functions and those
-        # decorated with inlineDeferred.
+        # The other case here handles sync functions including those decorated with
+        # `@defer.inlineCallbacks` or that return a `Deferred` or other `Awaitable`.
         @wraps(func)
-        def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
+        def _wrapper(*args: P.args, **kwargs: P.kwargs) -> Any:
             scope = wrapping_logic(func, *args, **kwargs)
             scope.__enter__()
 
             try:
                 result = func(*args, **kwargs)
+
                 if isinstance(result, defer.Deferred):
 
                     def call_back(result: R) -> R:
@@ -930,20 +933,32 @@ def _custom_sync_async_decorator(
                         return result
 
                     def err_back(result: R) -> R:
+                        # TODO: Pass the error details into `scope.__exit__(...)` for
+                        #       consistency with the other paths.
                         scope.__exit__(None, None, None)
                         return result
 
                     result.addCallbacks(call_back, err_back)
 
+                elif inspect.isawaitable(result):
+
+                    async def wrap_awaitable() -> Any:
+                        try:
+                            assert isinstance(result, Awaitable)
+                            awaited_result = await result
+                            scope.__exit__(None, None, None)
+                            return awaited_result
+                        except Exception as e:
+                            scope.__exit__(type(e), None, e.__traceback__)
+                            raise
+
+                    # The original method returned an awaitable, eg. a coroutine, so we
+                    # create another awaitable wrapping it that calls
+                    # `scope.__exit__(...)`.
+                    return wrap_awaitable()
                 else:
-                    if inspect.isawaitable(result):
-                        logger.error(
-                            "@trace may not have wrapped %s correctly! "
-                            "The function is not async but returned a %s.",
-                            func.__qualname__,
-                            type(result).__name__,
-                        )
-
+                    # Just a simple sync function so we can just exit the scope and
+                    # return the result without any fuss.
                     scope.__exit__(None, None, None)
 
                 return result
diff --git a/synapse/media/oembed.py b/synapse/media/oembed.py
index c0eaf04be5..5ad9eec80b 100644
--- a/synapse/media/oembed.py
+++ b/synapse/media/oembed.py
@@ -14,7 +14,7 @@
 import html
 import logging
 import urllib.parse
-from typing import TYPE_CHECKING, List, Optional
+from typing import TYPE_CHECKING, List, Optional, cast
 
 import attr
 
@@ -98,7 +98,7 @@ class OEmbedProvider:
         # No match.
         return None
 
-    def autodiscover_from_html(self, tree: "etree.Element") -> Optional[str]:
+    def autodiscover_from_html(self, tree: "etree._Element") -> Optional[str]:
         """
         Search an HTML document for oEmbed autodiscovery information.
 
@@ -109,18 +109,22 @@ class OEmbedProvider:
             The URL to use for oEmbed information, or None if no URL was found.
         """
         # Search for link elements with the proper rel and type attributes.
-        for tag in tree.xpath(
-            "//link[@rel='alternate'][@type='application/json+oembed']"
+        # Cast: the type returned by xpath depends on the xpath expression: mypy can't deduce this.
+        for tag in cast(
+            List["etree._Element"],
+            tree.xpath("//link[@rel='alternate'][@type='application/json+oembed']"),
         ):
             if "href" in tag.attrib:
-                return tag.attrib["href"]
+                return cast(str, tag.attrib["href"])
 
         # Some providers (e.g. Flickr) use alternative instead of alternate.
-        for tag in tree.xpath(
-            "//link[@rel='alternative'][@type='application/json+oembed']"
+        # Cast: the type returned by xpath depends on the xpath expression: mypy can't deduce this.
+        for tag in cast(
+            List["etree._Element"],
+            tree.xpath("//link[@rel='alternative'][@type='application/json+oembed']"),
         ):
             if "href" in tag.attrib:
-                return tag.attrib["href"]
+                return cast(str, tag.attrib["href"])
 
         return None
 
@@ -212,11 +216,12 @@ class OEmbedProvider:
         return OEmbedResult(open_graph_response, author_name, cache_age)
 
 
-def _fetch_urls(tree: "etree.Element", tag_name: str) -> List[str]:
+def _fetch_urls(tree: "etree._Element", tag_name: str) -> List[str]:
     results = []
-    for tag in tree.xpath("//*/" + tag_name):
+    # Cast: the type returned by xpath depends on the xpath expression: mypy can't deduce this.
+    for tag in cast(List["etree._Element"], tree.xpath("//*/" + tag_name)):
         if "src" in tag.attrib:
-            results.append(tag.attrib["src"])
+            results.append(cast(str, tag.attrib["src"]))
     return results
 
 
@@ -244,11 +249,12 @@ def calc_description_and_urls(open_graph_response: JsonDict, html_body: str) ->
     parser = etree.HTMLParser(recover=True, encoding="utf-8")
 
     # Attempt to parse the body. If this fails, log and return no metadata.
-    tree = etree.fromstring(html_body, parser)
+    # TODO Develop of lxml-stubs has this correct.
+    tree = etree.fromstring(html_body, parser)  # type: ignore[arg-type]
 
     # The data was successfully parsed, but no tree was found.
     if tree is None:
-        return
+        return  # type: ignore[unreachable]
 
     # Attempt to find interesting URLs (images, videos, embeds).
     if "og:image" not in open_graph_response:
diff --git a/synapse/media/preview_html.py b/synapse/media/preview_html.py
index 516d0434f0..1bc7ccb7f3 100644
--- a/synapse/media/preview_html.py
+++ b/synapse/media/preview_html.py
@@ -24,6 +24,7 @@ from typing import (
     Optional,
     Set,
     Union,
+    cast,
 )
 
 if TYPE_CHECKING:
@@ -115,7 +116,7 @@ def _get_html_media_encodings(
 
 def decode_body(
     body: bytes, uri: str, content_type: Optional[str] = None
-) -> Optional["etree.Element"]:
+) -> Optional["etree._Element"]:
     """
     This uses lxml to parse the HTML document.
 
@@ -152,11 +153,12 @@ def decode_body(
 
     # Attempt to parse the body. Returns None if the body was successfully
     # parsed, but no tree was found.
-    return etree.fromstring(body, parser)
+    # TODO Develop of lxml-stubs has this correct.
+    return etree.fromstring(body, parser)  # type: ignore[arg-type]
 
 
 def _get_meta_tags(
-    tree: "etree.Element",
+    tree: "etree._Element",
     property: str,
     prefix: str,
     property_mapper: Optional[Callable[[str], Optional[str]]] = None,
@@ -175,9 +177,15 @@ def _get_meta_tags(
     Returns:
         A map of tag name to value.
     """
+    # This actually returns Dict[str, str], but the caller sets this as a variable
+    # which is Dict[str, Optional[str]].
     results: Dict[str, Optional[str]] = {}
-    for tag in tree.xpath(
-        f"//*/meta[starts-with(@{property}, '{prefix}:')][@content][not(@content='')]"
+    # Cast: the type returned by xpath depends on the xpath expression: mypy can't deduce this.
+    for tag in cast(
+        List["etree._Element"],
+        tree.xpath(
+            f"//*/meta[starts-with(@{property}, '{prefix}:')][@content][not(@content='')]"
+        ),
     ):
         # if we've got more than 50 tags, someone is taking the piss
         if len(results) >= 50:
@@ -187,14 +195,15 @@ def _get_meta_tags(
             )
             return {}
 
-        key = tag.attrib[property]
+        key = cast(str, tag.attrib[property])
         if property_mapper:
-            key = property_mapper(key)
+            new_key = property_mapper(key)
             # None is a special value used to ignore a value.
-            if key is None:
+            if new_key is None:
                 continue
+            key = new_key
 
-        results[key] = tag.attrib["content"]
+        results[key] = cast(str, tag.attrib["content"])
 
     return results
 
@@ -219,7 +228,7 @@ def _map_twitter_to_open_graph(key: str) -> Optional[str]:
     return "og" + key[7:]
 
 
-def parse_html_to_open_graph(tree: "etree.Element") -> Dict[str, Optional[str]]:
+def parse_html_to_open_graph(tree: "etree._Element") -> Dict[str, Optional[str]]:
     """
     Parse the HTML document into an Open Graph response.
 
@@ -276,24 +285,36 @@ def parse_html_to_open_graph(tree: "etree.Element") -> Dict[str, Optional[str]]:
 
     if "og:title" not in og:
         # Attempt to find a title from the title tag, or the biggest header on the page.
-        title = tree.xpath("((//title)[1] | (//h1)[1] | (//h2)[1] | (//h3)[1])/text()")
+        # Cast: the type returned by xpath depends on the xpath expression: mypy can't deduce this.
+        title = cast(
+            List["etree._ElementUnicodeResult"],
+            tree.xpath("((//title)[1] | (//h1)[1] | (//h2)[1] | (//h3)[1])/text()"),
+        )
         if title:
             og["og:title"] = title[0].strip()
         else:
             og["og:title"] = None
 
     if "og:image" not in og:
-        meta_image = tree.xpath(
-            "//*/meta[translate(@itemprop, 'IMAGE', 'image')='image'][not(@content='')]/@content[1]"
+        # Cast: the type returned by xpath depends on the xpath expression: mypy can't deduce this.
+        meta_image = cast(
+            List["etree._ElementUnicodeResult"],
+            tree.xpath(
+                "//*/meta[translate(@itemprop, 'IMAGE', 'image')='image'][not(@content='')]/@content[1]"
+            ),
         )
         # If a meta image is found, use it.
         if meta_image:
             og["og:image"] = meta_image[0]
         else:
             # Try to find images which are larger than 10px by 10px.
+            # Cast: the type returned by xpath depends on the xpath expression: mypy can't deduce this.
             #
             # TODO: consider inlined CSS styles as well as width & height attribs
-            images = tree.xpath("//img[@src][number(@width)>10][number(@height)>10]")
+            images = cast(
+                List["etree._Element"],
+                tree.xpath("//img[@src][number(@width)>10][number(@height)>10]"),
+            )
             images = sorted(
                 images,
                 key=lambda i: (
@@ -302,20 +323,29 @@ def parse_html_to_open_graph(tree: "etree.Element") -> Dict[str, Optional[str]]:
             )
             # If no images were found, try to find *any* images.
             if not images:
-                images = tree.xpath("//img[@src][1]")
+                # Cast: the type returned by xpath depends on the xpath expression: mypy can't deduce this.
+                images = cast(List["etree._Element"], tree.xpath("//img[@src][1]"))
             if images:
-                og["og:image"] = images[0].attrib["src"]
+                og["og:image"] = cast(str, images[0].attrib["src"])
 
             # Finally, fallback to the favicon if nothing else.
             else:
-                favicons = tree.xpath("//link[@href][contains(@rel, 'icon')]/@href[1]")
+                # Cast: the type returned by xpath depends on the xpath expression: mypy can't deduce this.
+                favicons = cast(
+                    List["etree._ElementUnicodeResult"],
+                    tree.xpath("//link[@href][contains(@rel, 'icon')]/@href[1]"),
+                )
                 if favicons:
                     og["og:image"] = favicons[0]
 
     if "og:description" not in og:
         # Check the first meta description tag for content.
-        meta_description = tree.xpath(
-            "//*/meta[translate(@name, 'DESCRIPTION', 'description')='description'][not(@content='')]/@content[1]"
+        # Cast: the type returned by xpath depends on the xpath expression: mypy can't deduce this.
+        meta_description = cast(
+            List["etree._ElementUnicodeResult"],
+            tree.xpath(
+                "//*/meta[translate(@name, 'DESCRIPTION', 'description')='description'][not(@content='')]/@content[1]"
+            ),
         )
         # If a meta description is found with content, use it.
         if meta_description:
@@ -332,7 +362,7 @@ def parse_html_to_open_graph(tree: "etree.Element") -> Dict[str, Optional[str]]:
     return og
 
 
-def parse_html_description(tree: "etree.Element") -> Optional[str]:
+def parse_html_description(tree: "etree._Element") -> Optional[str]:
     """
     Calculate a text description based on an HTML document.
 
@@ -368,6 +398,9 @@ def parse_html_description(tree: "etree.Element") -> Optional[str]:
         "canvas",
         "img",
         "picture",
+        # etree.Comment is a function which creates an etree._Comment element.
+        # The "tag" attribute of an etree._Comment instance is confusingly the
+        # etree.Comment function instead of a string.
         etree.Comment,
     }
 
@@ -381,8 +414,8 @@ def parse_html_description(tree: "etree.Element") -> Optional[str]:
 
 
 def _iterate_over_text(
-    tree: Optional["etree.Element"],
-    tags_to_ignore: Set[Union[str, "etree.Comment"]],
+    tree: Optional["etree._Element"],
+    tags_to_ignore: Set[object],
     stack_limit: int = 1024,
 ) -> Generator[str, None, None]:
     """Iterate over the tree returning text nodes in a depth first fashion,
@@ -402,7 +435,7 @@ def _iterate_over_text(
 
     # This is a stack whose items are elements to iterate over *or* strings
     # to be returned.
-    elements: List[Union[str, "etree.Element"]] = [tree]
+    elements: List[Union[str, "etree._Element"]] = [tree]
     while elements:
         el = elements.pop()
 
diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py
index 0e9f366cba..84b2aef620 100644
--- a/synapse/module_api/__init__.py
+++ b/synapse/module_api/__init__.py
@@ -38,6 +38,7 @@ from twisted.web.resource import Resource
 
 from synapse.api import errors
 from synapse.api.errors import SynapseError
+from synapse.config import ConfigError
 from synapse.events import EventBase
 from synapse.events.presence_router import (
     GET_INTERESTED_USERS_CALLBACK,
@@ -121,6 +122,7 @@ from synapse.types import (
     JsonMapping,
     Requester,
     RoomAlias,
+    RoomID,
     StateMap,
     UserID,
     UserInfo,
@@ -252,6 +254,7 @@ class ModuleApi:
         self._device_handler = hs.get_device_handler()
         self.custom_template_dir = hs.config.server.custom_template_directory
         self._callbacks = hs.get_module_api_callbacks()
+        self.msc3861_oauth_delegation_enabled = hs.config.experimental.msc3861.enabled
 
         try:
             app_name = self._hs.config.email.email_app_name
@@ -419,6 +422,11 @@ class ModuleApi:
 
         Added in Synapse v1.46.0.
         """
+        if self.msc3861_oauth_delegation_enabled:
+            raise ConfigError(
+                "Cannot use password auth provider callbacks when OAuth delegation is enabled"
+            )
+
         return self._password_auth_provider.register_password_auth_provider_callbacks(
             check_3pid_auth=check_3pid_auth,
             on_logged_out=on_logged_out,
@@ -647,7 +655,9 @@ class ModuleApi:
         Returns:
             The profile information (i.e. display name and avatar URL).
         """
-        return await self._store.get_profileinfo(localpart)
+        server_name = self._hs.hostname
+        user_id = UserID.from_string(f"@{localpart}:{server_name}")
+        return await self._store.get_profileinfo(user_id)
 
     async def get_threepids_for_user(self, user_id: str) -> List[Dict[str, str]]:
         """Look up the threepids (email addresses and phone numbers) associated with the
@@ -1563,6 +1573,32 @@ class ModuleApi:
             start_timestamp, end_timestamp
         )
 
+    async def get_canonical_room_alias(self, room_id: RoomID) -> Optional[RoomAlias]:
+        """
+        Retrieve the given room's current canonical alias.
+
+        A room may declare an alias as "canonical", meaning that it is the
+        preferred alias to use when referring to the room. This function
+        retrieves that alias from the room's state.
+
+        Added in Synapse v1.86.0.
+
+        Args:
+            room_id: The Room ID to find the alias of.
+
+        Returns:
+            None if the room ID does not exist, or if the room exists but has no canonical alias.
+            Otherwise, the parsed room alias.
+        """
+        room_alias_str = (
+            await self._storage_controllers.state.get_canonical_alias_for_room(
+                room_id.to_string()
+            )
+        )
+        if room_alias_str:
+            return RoomAlias.from_string(room_alias_str)
+        return None
+
     async def lookup_room_alias(self, room_alias: str) -> Tuple[str, List[str]]:
         """
         Get the room ID associated with a room alias.
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 320084f5f5..33002cc0f2 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -120,9 +120,6 @@ class BulkPushRuleEvaluator:
         self.should_calculate_push_rules = self.hs.config.push.enable_push
 
         self._related_event_match_enabled = self.hs.config.experimental.msc3664_enabled
-        self._intentional_mentions_enabled = (
-            self.hs.config.experimental.msc3952_intentional_mentions
-        )
 
         self.room_push_rule_cache_metrics = register_cache(
             "cache",
@@ -390,10 +387,7 @@ class BulkPushRuleEvaluator:
                         del notification_levels[key]
 
         # Pull out any user and room mentions.
-        has_mentions = (
-            self._intentional_mentions_enabled
-            and EventContentFields.MSC3952_MENTIONS in event.content
-        )
+        has_mentions = EventContentFields.MENTIONS in event.content
 
         evaluator = PushRuleEvaluator(
             _flatten_dict(event),
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index 491a09b71d..79e0627b6a 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -247,7 +247,7 @@ class Mailer:
 
         try:
             user_display_name = await self.store.get_profile_displayname(
-                UserID.from_string(user_id).localpart
+                UserID.from_string(user_id)
             )
             if user_display_name is None:
                 user_display_name = user_id
diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py
index c729364839..fe8177ed4d 100644
--- a/synapse/rest/admin/__init__.py
+++ b/synapse/rest/admin/__init__.py
@@ -257,9 +257,11 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
     DeleteRoomStatusByRoomIdRestServlet(hs).register(http_server)
     JoinRoomAliasServlet(hs).register(http_server)
     VersionServlet(hs).register(http_server)
-    UserAdminServlet(hs).register(http_server)
+    if not hs.config.experimental.msc3861.enabled:
+        UserAdminServlet(hs).register(http_server)
     UserMembershipRestServlet(hs).register(http_server)
-    UserTokenRestServlet(hs).register(http_server)
+    if not hs.config.experimental.msc3861.enabled:
+        UserTokenRestServlet(hs).register(http_server)
     UserRestServletV2(hs).register(http_server)
     UsersRestServletV2(hs).register(http_server)
     UserMediaStatisticsRestServlet(hs).register(http_server)
@@ -274,9 +276,10 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
     RoomEventContextServlet(hs).register(http_server)
     RateLimitRestServlet(hs).register(http_server)
     UsernameAvailableRestServlet(hs).register(http_server)
-    ListRegistrationTokensRestServlet(hs).register(http_server)
-    NewRegistrationTokenRestServlet(hs).register(http_server)
-    RegistrationTokenRestServlet(hs).register(http_server)
+    if not hs.config.experimental.msc3861.enabled:
+        ListRegistrationTokensRestServlet(hs).register(http_server)
+        NewRegistrationTokenRestServlet(hs).register(http_server)
+        RegistrationTokenRestServlet(hs).register(http_server)
     DestinationMembershipRestServlet(hs).register(http_server)
     DestinationResetConnectionRestServlet(hs).register(http_server)
     DestinationRestServlet(hs).register(http_server)
@@ -306,10 +309,12 @@ def register_servlets_for_client_rest_resource(
     # The following resources can only be run on the main process.
     if hs.config.worker.worker_app is None:
         DeactivateAccountRestServlet(hs).register(http_server)
-        ResetPasswordRestServlet(hs).register(http_server)
+        if not hs.config.experimental.msc3861.enabled:
+            ResetPasswordRestServlet(hs).register(http_server)
     SearchUsersRestServlet(hs).register(http_server)
-    UserRegisterServlet(hs).register(http_server)
-    AccountValidityRenewServlet(hs).register(http_server)
+    if not hs.config.experimental.msc3861.enabled:
+        UserRegisterServlet(hs).register(http_server)
+        AccountValidityRenewServlet(hs).register(http_server)
 
     # Load the media repo ones if we're using them. Otherwise load the servlets which
     # don't need a media repo (typically readonly admin APIs).
diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py
index 932333ae57..407fe9c804 100644
--- a/synapse/rest/admin/users.py
+++ b/synapse/rest/admin/users.py
@@ -71,6 +71,7 @@ class UsersRestServletV2(RestServlet):
         self.auth = hs.get_auth()
         self.admin_handler = hs.get_admin_handler()
         self._msc3866_enabled = hs.config.experimental.msc3866.enabled
+        self._msc3861_enabled = hs.config.experimental.msc3861.enabled
 
     async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
         await assert_requester_is_admin(self.auth, request)
@@ -94,7 +95,14 @@ class UsersRestServletV2(RestServlet):
 
         user_id = parse_string(request, "user_id")
         name = parse_string(request, "name")
+
         guests = parse_boolean(request, "guests", default=True)
+        if self._msc3861_enabled and guests:
+            raise SynapseError(
+                HTTPStatus.BAD_REQUEST,
+                "The guests parameter is not supported when MSC3861 is enabled.",
+                errcode=Codes.INVALID_PARAM,
+            )
         deactivated = parse_boolean(request, "deactivated", default=False)
 
         # If support for MSC3866 is not enabled, apply no filtering based on the
diff --git a/synapse/rest/client/account.py b/synapse/rest/client/account.py
index 3d0c55daa0..679ab9f266 100644
--- a/synapse/rest/client/account.py
+++ b/synapse/rest/client/account.py
@@ -27,6 +27,7 @@ from synapse.api.constants import LoginType
 from synapse.api.errors import (
     Codes,
     InteractiveAuthIncompleteError,
+    NotFoundError,
     SynapseError,
     ThreepidValidationError,
 )
@@ -600,6 +601,9 @@ class ThreepidRestServlet(RestServlet):
     # ThreePidBindRestServelet.PostBody with an `alias_generator` to handle
     # `threePidCreds` versus `three_pid_creds`.
     async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
+        if self.hs.config.experimental.msc3861.enabled:
+            raise NotFoundError(errcode=Codes.UNRECOGNIZED)
+
         if not self.hs.config.registration.enable_3pid_changes:
             raise SynapseError(
                 400, "3PID changes are disabled on this server", Codes.FORBIDDEN
@@ -890,19 +894,21 @@ class AccountStatusRestServlet(RestServlet):
 
 def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
     if hs.config.worker.worker_app is None:
-        EmailPasswordRequestTokenRestServlet(hs).register(http_server)
-        PasswordRestServlet(hs).register(http_server)
-        DeactivateAccountRestServlet(hs).register(http_server)
-        EmailThreepidRequestTokenRestServlet(hs).register(http_server)
-        MsisdnThreepidRequestTokenRestServlet(hs).register(http_server)
-        AddThreepidEmailSubmitTokenServlet(hs).register(http_server)
-        AddThreepidMsisdnSubmitTokenServlet(hs).register(http_server)
+        if not hs.config.experimental.msc3861.enabled:
+            EmailPasswordRequestTokenRestServlet(hs).register(http_server)
+            DeactivateAccountRestServlet(hs).register(http_server)
+            PasswordRestServlet(hs).register(http_server)
+            EmailThreepidRequestTokenRestServlet(hs).register(http_server)
+            MsisdnThreepidRequestTokenRestServlet(hs).register(http_server)
+            AddThreepidEmailSubmitTokenServlet(hs).register(http_server)
+            AddThreepidMsisdnSubmitTokenServlet(hs).register(http_server)
     ThreepidRestServlet(hs).register(http_server)
     if hs.config.worker.worker_app is None:
-        ThreepidAddRestServlet(hs).register(http_server)
         ThreepidBindRestServlet(hs).register(http_server)
         ThreepidUnbindRestServlet(hs).register(http_server)
-        ThreepidDeleteRestServlet(hs).register(http_server)
+        if not hs.config.experimental.msc3861.enabled:
+            ThreepidAddRestServlet(hs).register(http_server)
+            ThreepidDeleteRestServlet(hs).register(http_server)
     WhoamiRestServlet(hs).register(http_server)
 
     if hs.config.worker.worker_app is None and hs.config.experimental.msc3720_enabled:
diff --git a/synapse/rest/client/capabilities.py b/synapse/rest/client/capabilities.py
index 0dbf8f6818..3154b9f77e 100644
--- a/synapse/rest/client/capabilities.py
+++ b/synapse/rest/client/capabilities.py
@@ -65,6 +65,9 @@ class CapabilitiesRestServlet(RestServlet):
                 "m.3pid_changes": {
                     "enabled": self.config.registration.enable_3pid_changes
                 },
+                "m.get_login_token": {
+                    "enabled": self.config.auth.login_via_existing_enabled,
+                },
             }
         }
 
diff --git a/synapse/rest/client/devices.py b/synapse/rest/client/devices.py
index e97d0bf475..38dff9703f 100644
--- a/synapse/rest/client/devices.py
+++ b/synapse/rest/client/devices.py
@@ -19,7 +19,7 @@ from typing import TYPE_CHECKING, List, Optional, Tuple
 from pydantic import Extra, StrictStr
 
 from synapse.api import errors
-from synapse.api.errors import NotFoundError
+from synapse.api.errors import NotFoundError, UnrecognizedRequestError
 from synapse.handlers.device import DeviceHandler
 from synapse.http.server import HttpServer
 from synapse.http.servlet import (
@@ -135,6 +135,7 @@ class DeviceRestServlet(RestServlet):
         self.device_handler = handler
         self.auth_handler = hs.get_auth_handler()
         self._msc3852_enabled = hs.config.experimental.msc3852_enabled
+        self._msc3861_oauth_delegation_enabled = hs.config.experimental.msc3861.enabled
 
     async def on_GET(
         self, request: SynapseRequest, device_id: str
@@ -166,6 +167,9 @@ class DeviceRestServlet(RestServlet):
     async def on_DELETE(
         self, request: SynapseRequest, device_id: str
     ) -> Tuple[int, JsonDict]:
+        if self._msc3861_oauth_delegation_enabled:
+            raise UnrecognizedRequestError(code=404)
+
         requester = await self.auth.get_user_by_req(request)
 
         try:
@@ -344,7 +348,10 @@ class ClaimDehydratedDeviceServlet(RestServlet):
 
 
 def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
-    if hs.config.worker.worker_app is None:
+    if (
+        hs.config.worker.worker_app is None
+        and not hs.config.experimental.msc3861.enabled
+    ):
         DeleteDevicesRestServlet(hs).register(http_server)
     DevicesRestServlet(hs).register(http_server)
     if hs.config.worker.worker_app is None:
diff --git a/synapse/rest/client/filter.py b/synapse/rest/client/filter.py
index 04561f36d7..5da1e511a2 100644
--- a/synapse/rest/client/filter.py
+++ b/synapse/rest/client/filter.py
@@ -58,7 +58,7 @@ class GetFilterRestServlet(RestServlet):
 
         try:
             filter_collection = await self.filtering.get_user_filter(
-                user_localpart=target_user.localpart, filter_id=filter_id_int
+                user_id=target_user, filter_id=filter_id_int
             )
         except StoreError as e:
             if e.code != 404:
diff --git a/synapse/rest/client/keys.py b/synapse/rest/client/keys.py
index 413edd8a4d..70b8be1aa2 100644
--- a/synapse/rest/client/keys.py
+++ b/synapse/rest/client/keys.py
@@ -17,9 +17,10 @@
 import logging
 import re
 from collections import Counter
+from http import HTTPStatus
 from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple
 
-from synapse.api.errors import InvalidAPICallError, SynapseError
+from synapse.api.errors import Codes, InvalidAPICallError, SynapseError
 from synapse.http.server import HttpServer
 from synapse.http.servlet import (
     RestServlet,
@@ -375,9 +376,29 @@ class SigningKeyUploadServlet(RestServlet):
         user_id = requester.user.to_string()
         body = parse_json_object_from_request(request)
 
-        if self.hs.config.experimental.msc3967_enabled:
-            if await self.e2e_keys_handler.is_cross_signing_set_up_for_user(user_id):
-                # If we already have a master key then cross signing is set up and we require UIA to reset
+        is_cross_signing_setup = (
+            await self.e2e_keys_handler.is_cross_signing_set_up_for_user(user_id)
+        )
+
+        # Before MSC3967 we required UIA both when setting up cross signing for the
+        # first time and when resetting the device signing key. With MSC3967 we only
+        # require UIA when resetting cross-signing, and not when setting up the first
+        # time. Because there is no UIA in MSC3861, for now we throw an error if the
+        # user tries to reset the device signing key when MSC3861 is enabled, but allow
+        # first-time setup.
+        if self.hs.config.experimental.msc3861.enabled:
+            # There is no way to reset the device signing key with MSC3861
+            if is_cross_signing_setup:
+                raise SynapseError(
+                    HTTPStatus.NOT_IMPLEMENTED,
+                    "Resetting cross signing keys is not yet supported with MSC3861",
+                    Codes.UNRECOGNIZED,
+                )
+            # But first-time setup is fine
+
+        elif self.hs.config.experimental.msc3967_enabled:
+            # If we already have a master key then cross signing is set up and we require UIA to reset
+            if is_cross_signing_setup:
                 await self.auth_handler.validate_user_via_ui_auth(
                     requester,
                     request,
@@ -387,6 +408,7 @@ class SigningKeyUploadServlet(RestServlet):
                     can_skip_ui_auth=False,
                 )
             # Otherwise we don't require UIA since we are setting up cross signing for first time
+
         else:
             # Previous behaviour is to always require UIA but allow it to be skipped
             await self.auth_handler.validate_user_via_ui_auth(
diff --git a/synapse/rest/client/login.py b/synapse/rest/client/login.py
index 6ca61ffbd0..6493b00bb8 100644
--- a/synapse/rest/client/login.py
+++ b/synapse/rest/client/login.py
@@ -104,6 +104,9 @@ class LoginRestServlet(RestServlet):
             and hs.config.experimental.msc3866.require_approval_for_new_accounts
         )
 
+        # Whether get login token is enabled.
+        self._get_login_token_enabled = hs.config.auth.login_via_existing_enabled
+
         self.auth = hs.get_auth()
 
         self.clock = hs.get_clock()
@@ -142,6 +145,9 @@ class LoginRestServlet(RestServlet):
             # to SSO.
             flows.append({"type": LoginRestServlet.CAS_TYPE})
 
+        # The login token flow requires m.login.token to be advertised.
+        support_login_token_flow = self._get_login_token_enabled
+
         if self.cas_enabled or self.saml2_enabled or self.oidc_enabled:
             flows.append(
                 {
@@ -153,14 +159,23 @@ class LoginRestServlet(RestServlet):
                 }
             )
 
-            # While it's valid for us to advertise this login type generally,
-            # synapse currently only gives out these tokens as part of the
-            # SSO login flow.
-            # Generally we don't want to advertise login flows that clients
-            # don't know how to implement, since they (currently) will always
-            # fall back to the fallback API if they don't understand one of the
-            # login flow types returned.
-            flows.append({"type": LoginRestServlet.TOKEN_TYPE})
+            # SSO requires a login token to be generated, so we need to advertise that flow
+            support_login_token_flow = True
+
+        # While it's valid for us to advertise this login type generally,
+        # synapse currently only gives out these tokens as part of the
+        # SSO login flow or as part of login via an existing session.
+        #
+        # Generally we don't want to advertise login flows that clients
+        # don't know how to implement, since they (currently) will always
+        # fall back to the fallback API if they don't understand one of the
+        # login flow types returned.
+        if support_login_token_flow:
+            tokenTypeFlow: Dict[str, Any] = {"type": LoginRestServlet.TOKEN_TYPE}
+            # If the login token flow is enabled advertise the get_login_token flag.
+            if self._get_login_token_enabled:
+                tokenTypeFlow["get_login_token"] = True
+            flows.append(tokenTypeFlow)
 
         flows.extend({"type": t} for t in self.auth_handler.get_supported_login_types())
 
@@ -633,6 +648,9 @@ class CasTicketServlet(RestServlet):
 
 
 def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
+    if hs.config.experimental.msc3861.enabled:
+        return
+
     LoginRestServlet(hs).register(http_server)
     if (
         hs.config.worker.worker_app is None
diff --git a/synapse/rest/client/login_token_request.py b/synapse/rest/client/login_token_request.py
index 43ea21d5e6..b1629f94a5 100644
--- a/synapse/rest/client/login_token_request.py
+++ b/synapse/rest/client/login_token_request.py
@@ -15,6 +15,7 @@
 import logging
 from typing import TYPE_CHECKING, Tuple
 
+from synapse.api.ratelimiting import Ratelimiter
 from synapse.http.server import HttpServer
 from synapse.http.servlet import RestServlet, parse_json_object_from_request
 from synapse.http.site import SynapseRequest
@@ -33,7 +34,7 @@ class LoginTokenRequestServlet(RestServlet):
 
     Request:
 
-    POST /login/token HTTP/1.1
+    POST /login/get_token HTTP/1.1
     Content-Type: application/json
 
     {}
@@ -43,30 +44,45 @@ class LoginTokenRequestServlet(RestServlet):
     HTTP/1.1 200 OK
     {
         "login_token": "ABDEFGH",
-        "expires_in": 3600,
+        "expires_in_ms": 3600000,
     }
     """
 
-    PATTERNS = client_patterns(
-        "/org.matrix.msc3882/login/token$", releases=[], v1=False, unstable=True
-    )
+    PATTERNS = [
+        *client_patterns(
+            "/login/get_token$", releases=["v1"], v1=False, unstable=False
+        ),
+        # TODO: this is no longer needed once unstable MSC3882 does not need to be supported:
+        *client_patterns(
+            "/org.matrix.msc3882/login/token$", releases=[], v1=False, unstable=True
+        ),
+    ]
 
     def __init__(self, hs: "HomeServer"):
         super().__init__()
         self.auth = hs.get_auth()
-        self.store = hs.get_datastores().main
-        self.clock = hs.get_clock()
-        self.server_name = hs.config.server.server_name
+        self._main_store = hs.get_datastores().main
         self.auth_handler = hs.get_auth_handler()
-        self.token_timeout = hs.config.experimental.msc3882_token_timeout
-        self.ui_auth = hs.config.experimental.msc3882_ui_auth
+        self.token_timeout = hs.config.auth.login_via_existing_token_timeout
+        self._require_ui_auth = hs.config.auth.login_via_existing_require_ui_auth
+
+        # Ratelimit aggressively to a maxmimum of 1 request per minute.
+        #
+        # This endpoint can be used to spawn additional sessions and could be
+        # abused by a malicious client to create many sessions.
+        self._ratelimiter = Ratelimiter(
+            store=self._main_store,
+            clock=hs.get_clock(),
+            rate_hz=1 / 60,
+            burst_count=1,
+        )
 
     @interactive_auth_handler
     async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
         requester = await self.auth.get_user_by_req(request)
         body = parse_json_object_from_request(request)
 
-        if self.ui_auth:
+        if self._require_ui_auth:
             await self.auth_handler.validate_user_via_ui_auth(
                 requester,
                 request,
@@ -75,9 +91,12 @@ class LoginTokenRequestServlet(RestServlet):
                 can_skip_ui_auth=False,  # Don't allow skipping of UI auth
             )
 
+        # Ensure that this endpoint isn't being used too often. (Ensure this is
+        # done *after* UI auth.)
+        await self._ratelimiter.ratelimit(None, requester.user.to_string().lower())
+
         login_token = await self.auth_handler.create_login_token_for_user_id(
             user_id=requester.user.to_string(),
-            auth_provider_id="org.matrix.msc3882.login_token_request",
             duration_ms=self.token_timeout,
         )
 
@@ -85,11 +104,13 @@ class LoginTokenRequestServlet(RestServlet):
             200,
             {
                 "login_token": login_token,
+                # TODO: this is no longer needed once unstable MSC3882 does not need to be supported:
                 "expires_in": self.token_timeout // 1000,
+                "expires_in_ms": self.token_timeout,
             },
         )
 
 
 def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
-    if hs.config.experimental.msc3882_enabled:
+    if hs.config.auth.login_via_existing_enabled:
         LoginTokenRequestServlet(hs).register(http_server)
diff --git a/synapse/rest/client/logout.py b/synapse/rest/client/logout.py
index 6d34625ad5..94ad90942f 100644
--- a/synapse/rest/client/logout.py
+++ b/synapse/rest/client/logout.py
@@ -80,5 +80,8 @@ class LogoutAllRestServlet(RestServlet):
 
 
 def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
+    if hs.config.experimental.msc3861.enabled:
+        return
+
     LogoutRestServlet(hs).register(http_server)
     LogoutAllRestServlet(hs).register(http_server)
diff --git a/synapse/rest/client/register.py b/synapse/rest/client/register.py
index 7f84a17e29..d59669f0b6 100644
--- a/synapse/rest/client/register.py
+++ b/synapse/rest/client/register.py
@@ -869,6 +869,74 @@ class RegisterRestServlet(RestServlet):
         return 200, result
 
 
+class RegisterAppServiceOnlyRestServlet(RestServlet):
+    """An alternative registration API endpoint that only allows ASes to register
+
+    This replaces the regular /register endpoint if MSC3861. There are two notable
+    differences with the regular /register endpoint:
+     - It only allows the `m.login.application_service` login type
+     - It does not create a device or access token for the just-registered user
+
+    Note that the exact behaviour of this endpoint is not yet finalised. It should be
+    just good enough to make most ASes work.
+    """
+
+    PATTERNS = client_patterns("/register$")
+    CATEGORY = "Registration/login requests"
+
+    def __init__(self, hs: "HomeServer"):
+        super().__init__()
+
+        self.auth = hs.get_auth()
+        self.registration_handler = hs.get_registration_handler()
+        self.ratelimiter = hs.get_registration_ratelimiter()
+
+    @interactive_auth_handler
+    async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
+        body = parse_json_object_from_request(request)
+
+        client_addr = request.getClientAddress().host
+
+        await self.ratelimiter.ratelimit(None, client_addr, update=False)
+
+        kind = parse_string(request, "kind", default="user")
+
+        if kind == "guest":
+            raise SynapseError(403, "Guest access is disabled")
+        elif kind != "user":
+            raise UnrecognizedRequestError(
+                f"Do not understand membership kind: {kind}",
+            )
+
+        # Pull out the provided username and do basic sanity checks early since
+        # the auth layer will store these in sessions.
+        desired_username = body.get("username")
+        if not isinstance(desired_username, str) or len(desired_username) > 512:
+            raise SynapseError(400, "Invalid username")
+
+        # Allow only ASes to use this API.
+        if body.get("type") != APP_SERVICE_REGISTRATION_TYPE:
+            raise SynapseError(403, "Non-application service registration type")
+
+        if not self.auth.has_access_token(request):
+            raise SynapseError(
+                400,
+                "Appservice token must be provided when using a type of m.login.application_service",
+            )
+
+        # XXX we should check that desired_username is valid. Currently
+        # we give appservices carte blanche for any insanity in mxids,
+        # because the IRC bridges rely on being able to register stupid
+        # IDs.
+
+        as_token = self.auth.get_access_token_from_request(request)
+
+        user_id = await self.registration_handler.appservice_register(
+            desired_username, as_token
+        )
+        return 200, {"user_id": user_id}
+
+
 def _calculate_registration_flows(
     config: HomeServerConfig, auth_handler: AuthHandler
 ) -> List[List[str]]:
@@ -955,6 +1023,10 @@ def _calculate_registration_flows(
 
 
 def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
+    if hs.config.experimental.msc3861.enabled:
+        RegisterAppServiceOnlyRestServlet(hs).register(http_server)
+        return
+
     if hs.config.worker.worker_app is None:
         EmailRegisterRequestTokenRestServlet(hs).register(http_server)
         MsisdnRegisterRequestTokenRestServlet(hs).register(http_server)
diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py
index 03b0578945..d7854ed4fd 100644
--- a/synapse/rest/client/sync.py
+++ b/synapse/rest/client/sync.py
@@ -178,7 +178,7 @@ class SyncRestServlet(RestServlet):
         else:
             try:
                 filter_collection = await self.filtering.get_user_filter(
-                    user.localpart, filter_id
+                    user, filter_id
                 )
             except StoreError as err:
                 if err.code != 404:
diff --git a/synapse/rest/client/versions.py b/synapse/rest/client/versions.py
index 32df054f56..1910648755 100644
--- a/synapse/rest/client/versions.py
+++ b/synapse/rest/client/versions.py
@@ -113,8 +113,8 @@ class VersionsRestServlet(RestServlet):
                     "fi.mau.msc2815": self.config.experimental.msc2815_enabled,
                     # Adds a ping endpoint for appservices to check HS->AS connection
                     "fi.mau.msc2659.stable": True,  # TODO: remove when "v1.7" is added above
-                    # Adds support for login token requests as per MSC3882
-                    "org.matrix.msc3882": self.config.experimental.msc3882_enabled,
+                    # TODO: this is no longer needed once unstable MSC3882 does not need to be supported:
+                    "org.matrix.msc3882": self.config.auth.login_via_existing_enabled,
                     # Adds support for remotely enabling/disabling pushers, as per MSC3881
                     "org.matrix.msc3881": self.config.experimental.msc3881_enabled,
                     # Adds support for filtering /messages by event relation.
@@ -124,8 +124,6 @@ class VersionsRestServlet(RestServlet):
                     is not None,
                     # Adds support for relation-based redactions as per MSC3912.
                     "org.matrix.msc3912": self.config.experimental.msc3912_enabled,
-                    # Adds support for unstable "intentional mentions" behaviour.
-                    "org.matrix.msc3952_intentional_mentions": self.config.experimental.msc3952_intentional_mentions,
                     # Whether recursively provide relations is supported.
                     "org.matrix.msc3981": self.config.experimental.msc3981_recurse_relations,
                     # Adds support for deleting account data.
diff --git a/synapse/rest/media/upload_resource.py b/synapse/rest/media/upload_resource.py
index 697348613b..043e8d6077 100644
--- a/synapse/rest/media/upload_resource.py
+++ b/synapse/rest/media/upload_resource.py
@@ -39,7 +39,6 @@ class UploadResource(DirectServeJsonResource):
         self.filepaths = media_repo.filepaths
         self.store = hs.get_datastores().main
         self.clock = hs.get_clock()
-        self.server_name = hs.hostname
         self.auth = hs.get_auth()
         self.max_upload_size = hs.config.media.max_upload_size
         self.clock = hs.get_clock()
diff --git a/synapse/rest/synapse/client/__init__.py b/synapse/rest/synapse/client/__init__.py
index e55924f597..57335fb913 100644
--- a/synapse/rest/synapse/client/__init__.py
+++ b/synapse/rest/synapse/client/__init__.py
@@ -46,6 +46,12 @@ def build_synapse_client_resource_tree(hs: "HomeServer") -> Mapping[str, Resourc
         "/_synapse/client/unsubscribe": UnsubscribeResource(hs),
     }
 
+    # Expose the JWKS endpoint if OAuth2 delegation is enabled
+    if hs.config.experimental.msc3861.enabled:
+        from synapse.rest.synapse.client.jwks import JwksResource
+
+        resources["/_synapse/jwks"] = JwksResource(hs)
+
     # provider-specific SSO bits. Only load these if they are enabled, since they
     # rely on optional dependencies.
     if hs.config.oidc.oidc_enabled:
diff --git a/synapse/rest/synapse/client/jwks.py b/synapse/rest/synapse/client/jwks.py
new file mode 100644
index 0000000000..7c0a1223fb
--- /dev/null
+++ b/synapse/rest/synapse/client/jwks.py
@@ -0,0 +1,70 @@
+# Copyright 2022 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+from typing import TYPE_CHECKING, Tuple
+
+from synapse.http.server import DirectServeJsonResource
+from synapse.http.site import SynapseRequest
+from synapse.types import JsonDict
+
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
+logger = logging.getLogger(__name__)
+
+
+class JwksResource(DirectServeJsonResource):
+    def __init__(self, hs: "HomeServer"):
+        super().__init__(extract_context=True)
+
+        # Parameters that are allowed to be exposed in the public key.
+        # This is done manually, because authlib's private to public key conversion
+        # is unreliable depending on the version. Instead, we just serialize the private
+        # key and only keep the public parameters.
+        # List from https://www.iana.org/assignments/jose/jose.xhtml#web-key-parameters
+        public_parameters = {
+            "kty",
+            "use",
+            "key_ops",
+            "alg",
+            "kid",
+            "x5u",
+            "x5c",
+            "x5t",
+            "x5t#S256",
+            "crv",
+            "x",
+            "y",
+            "n",
+            "e",
+            "ext",
+        }
+
+        key = hs.config.experimental.msc3861.jwk
+
+        if key is not None:
+            private_key = key.as_dict()
+            public_key = {
+                k: v for k, v in private_key.items() if k in public_parameters
+            }
+            keys = [public_key]
+        else:
+            keys = []
+
+        self.res = {
+            "keys": keys,
+        }
+
+    async def _async_render_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
+        return 200, self.res
diff --git a/synapse/rest/well_known.py b/synapse/rest/well_known.py
index e2174fdfea..b8b4b5379b 100644
--- a/synapse/rest/well_known.py
+++ b/synapse/rest/well_known.py
@@ -44,6 +44,16 @@ class WellKnownBuilder:
                 "base_url": self._config.registration.default_identity_server
             }
 
+        # We use the MSC3861 values as they are used by multiple MSCs
+        if self._config.experimental.msc3861.enabled:
+            result["org.matrix.msc2965.authentication"] = {
+                "issuer": self._config.experimental.msc3861.issuer
+            }
+            if self._config.experimental.msc3861.account_management_url is not None:
+                result["org.matrix.msc2965.authentication"][
+                    "account"
+                ] = self._config.experimental.msc3861.account_management_url
+
         if self._config.server.extra_well_known_client_content:
             for (
                 key,
diff --git a/synapse/server.py b/synapse/server.py
index cce5fb66ff..0f36ef69cb 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -31,6 +31,7 @@ from twisted.web.iweb import IPolicyForHTTPS
 from twisted.web.resource import Resource
 
 from synapse.api.auth import Auth
+from synapse.api.auth.internal import InternalAuth
 from synapse.api.auth_blocking import AuthBlocking
 from synapse.api.filtering import Filtering
 from synapse.api.ratelimiting import Ratelimiter, RequestRatelimiter
@@ -427,7 +428,11 @@ class HomeServer(metaclass=abc.ABCMeta):
 
     @cache_in_self
     def get_auth(self) -> Auth:
-        return Auth(self)
+        if self.config.experimental.msc3861.enabled:
+            from synapse.api.auth.msc3861_delegated import MSC3861DelegatedAuth
+
+            return MSC3861DelegatedAuth(self)
+        return InternalAuth(self)
 
     @cache_in_self
     def get_auth_blocking(self) -> AuthBlocking:
diff --git a/synapse/storage/controllers/state.py b/synapse/storage/controllers/state.py
index 7089b0a1d8..233df7cce2 100644
--- a/synapse/storage/controllers/state.py
+++ b/synapse/storage/controllers/state.py
@@ -485,7 +485,7 @@ class StateStorageController:
         if not event:
             return None
 
-        return event.content.get("canonical_alias")
+        return event.content.get("alias")
 
     @trace
     @tag_args
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index a67fdb3c22..f677d048aa 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -1941,6 +1941,10 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
             user_id,
             stream_ids[-1],
         )
+        txn.call_after(
+            self._get_e2e_device_keys_for_federation_query_inner.invalidate,
+            (user_id,),
+        )
 
         min_stream_id = stream_ids[0]
 
diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py
index 4bc391f213..91ae9c457d 100644
--- a/synapse/storage/databases/main/end_to_end_keys.py
+++ b/synapse/storage/databases/main/end_to_end_keys.py
@@ -16,6 +16,7 @@
 import abc
 from typing import (
     TYPE_CHECKING,
+    Any,
     Collection,
     Dict,
     Iterable,
@@ -39,6 +40,7 @@ from synapse.appservice import (
     TransactionUnusedFallbackKeys,
 )
 from synapse.logging.opentracing import log_kv, set_tag, trace
+from synapse.replication.tcp.streams._base import DeviceListsStream
 from synapse.storage._base import SQLBaseStore, db_to_json
 from synapse.storage.database import (
     DatabasePool,
@@ -104,6 +106,23 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
             self.hs.config.federation.allow_device_name_lookup_over_federation
         )
 
+    def process_replication_rows(
+        self,
+        stream_name: str,
+        instance_name: str,
+        token: int,
+        rows: Iterable[Any],
+    ) -> None:
+        if stream_name == DeviceListsStream.NAME:
+            for row in rows:
+                assert isinstance(row, DeviceListsStream.DeviceListsStreamRow)
+                if row.entity.startswith("@"):
+                    self._get_e2e_device_keys_for_federation_query_inner.invalidate(
+                        (row.entity,)
+                    )
+
+        super().process_replication_rows(stream_name, instance_name, token, rows)
+
     async def get_e2e_device_keys_for_federation_query(
         self, user_id: str
     ) -> Tuple[int, List[JsonDict]]:
@@ -114,6 +133,50 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
         """
         now_stream_id = self.get_device_stream_token()
 
+        # We need to be careful with the caching here, as we need to always
+        # return *all* persisted devices, however there may be a lag between a
+        # new device being persisted and the cache being invalidated.
+        cached_results = (
+            self._get_e2e_device_keys_for_federation_query_inner.cache.get_immediate(
+                user_id, None
+            )
+        )
+        if cached_results is not None:
+            # Check that there have been no new devices added by another worker
+            # after the cache. This should be quick as there should be few rows
+            # with a higher stream ordering.
+            #
+            # Note that we invalidate based on the device stream, so we only
+            # have to check for potential invalidations after the
+            # `now_stream_id`.
+            sql = """
+                SELECT user_id FROM device_lists_stream
+                WHERE stream_id >= ? AND user_id = ?
+            """
+            rows = await self.db_pool.execute(
+                "get_e2e_device_keys_for_federation_query_check",
+                None,
+                sql,
+                now_stream_id,
+                user_id,
+            )
+            if not rows:
+                # No new rows, so cache is still valid.
+                return now_stream_id, cached_results
+
+            # There has, so let's invalidate the cache and run the query.
+            self._get_e2e_device_keys_for_federation_query_inner.invalidate((user_id,))
+
+        results = await self._get_e2e_device_keys_for_federation_query_inner(user_id)
+
+        return now_stream_id, results
+
+    @cached(iterable=True)
+    async def _get_e2e_device_keys_for_federation_query_inner(
+        self, user_id: str
+    ) -> List[JsonDict]:
+        """Get all devices (with any device keys) for a user"""
+
         devices = await self.get_e2e_device_keys_and_signatures([(user_id, None)])
 
         if devices:
@@ -134,9 +197,9 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
 
                 results.append(result)
 
-            return now_stream_id, results
+            return results
 
-        return now_stream_id, []
+        return []
 
     @trace
     @cancellable
diff --git a/synapse/storage/databases/main/filtering.py b/synapse/storage/databases/main/filtering.py
index f777777cbf..fff417f9e3 100644
--- a/synapse/storage/databases/main/filtering.py
+++ b/synapse/storage/databases/main/filtering.py
@@ -145,7 +145,7 @@ class FilteringWorkerStore(SQLBaseStore):
 
     @cached(num_args=2)
     async def get_user_filter(
-        self, user_localpart: str, filter_id: Union[int, str]
+        self, user_id: UserID, filter_id: Union[int, str]
     ) -> JsonDict:
         # filter_id is BIGINT UNSIGNED, so if it isn't a number, fail
         # with a coherent error message rather than 500 M_UNKNOWN.
@@ -156,7 +156,7 @@ class FilteringWorkerStore(SQLBaseStore):
 
         def_json = await self.db_pool.simple_select_one_onecol(
             table="user_filters",
-            keyvalues={"user_id": user_localpart, "filter_id": filter_id},
+            keyvalues={"full_user_id": user_id.to_string(), "filter_id": filter_id},
             retcol="filter_json",
             allow_none=False,
             desc="get_user_filter",
@@ -172,15 +172,15 @@ class FilteringWorkerStore(SQLBaseStore):
         def _do_txn(txn: LoggingTransaction) -> int:
             sql = (
                 "SELECT filter_id FROM user_filters "
-                "WHERE user_id = ? AND filter_json = ?"
+                "WHERE full_user_id = ? AND filter_json = ?"
             )
-            txn.execute(sql, (user_id.localpart, bytearray(def_json)))
+            txn.execute(sql, (user_id.to_string(), bytearray(def_json)))
             filter_id_response = txn.fetchone()
             if filter_id_response is not None:
                 return filter_id_response[0]
 
-            sql = "SELECT MAX(filter_id) FROM user_filters WHERE user_id = ?"
-            txn.execute(sql, (user_id.localpart,))
+            sql = "SELECT MAX(filter_id) FROM user_filters WHERE full_user_id = ?"
+            txn.execute(sql, (user_id.to_string(),))
             max_id = cast(Tuple[Optional[int]], txn.fetchone())[0]
             if max_id is None:
                 filter_id = 0
diff --git a/synapse/storage/databases/main/profile.py b/synapse/storage/databases/main/profile.py
index 21d54c7a7a..3ba9cc8853 100644
--- a/synapse/storage/databases/main/profile.py
+++ b/synapse/storage/databases/main/profile.py
@@ -137,11 +137,11 @@ class ProfileWorkerStore(SQLBaseStore):
 
         return 50
 
-    async def get_profileinfo(self, user_localpart: str) -> ProfileInfo:
+    async def get_profileinfo(self, user_id: UserID) -> ProfileInfo:
         try:
             profile = await self.db_pool.simple_select_one(
                 table="profiles",
-                keyvalues={"user_id": user_localpart},
+                keyvalues={"full_user_id": user_id.to_string()},
                 retcols=("displayname", "avatar_url"),
                 desc="get_profileinfo",
             )
@@ -156,18 +156,18 @@ class ProfileWorkerStore(SQLBaseStore):
             avatar_url=profile["avatar_url"], display_name=profile["displayname"]
         )
 
-    async def get_profile_displayname(self, user_localpart: str) -> Optional[str]:
+    async def get_profile_displayname(self, user_id: UserID) -> Optional[str]:
         return await self.db_pool.simple_select_one_onecol(
             table="profiles",
-            keyvalues={"user_id": user_localpart},
+            keyvalues={"full_user_id": user_id.to_string()},
             retcol="displayname",
             desc="get_profile_displayname",
         )
 
-    async def get_profile_avatar_url(self, user_localpart: str) -> Optional[str]:
+    async def get_profile_avatar_url(self, user_id: UserID) -> Optional[str]:
         return await self.db_pool.simple_select_one_onecol(
             table="profiles",
-            keyvalues={"user_id": user_localpart},
+            keyvalues={"full_user_id": user_id.to_string()},
             retcol="avatar_url",
             desc="get_profile_avatar_url",
         )
diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py
index 9f862f00c1..e098ceea3c 100644
--- a/synapse/storage/databases/main/push_rule.py
+++ b/synapse/storage/databases/main/push_rule.py
@@ -88,7 +88,6 @@ def _load_rules(
         msc1767_enabled=experimental_config.msc1767_enabled,
         msc3664_enabled=experimental_config.msc3664_enabled,
         msc3381_polls_enabled=experimental_config.msc3381_polls_enabled,
-        msc3952_intentional_mentions=experimental_config.msc3952_intentional_mentions,
         msc3958_suppress_edits_enabled=experimental_config.msc3958_supress_edit_notifs,
     )
 
diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py
index 4a6c6c724d..96908f14ba 100644
--- a/synapse/storage/databases/main/relations.py
+++ b/synapse/storage/databases/main/relations.py
@@ -365,6 +365,36 @@ class RelationsWorkerStore(SQLBaseStore):
             func=get_all_relation_ids_for_event_with_types_txn,
         )
 
+    async def get_all_relations_for_event(
+        self,
+        event_id: str,
+    ) -> List[str]:
+        """Get the event IDs of all events that have a relation to the given event.
+
+        Args:
+            event_id: The event for which to look for related events.
+
+        Returns:
+            A list of the IDs of the events that relate to the given event.
+        """
+
+        def get_all_relation_ids_for_event_txn(
+            txn: LoggingTransaction,
+        ) -> List[str]:
+            rows = self.db_pool.simple_select_list_txn(
+                txn=txn,
+                table="event_relations",
+                keyvalues={"relates_to_id": event_id},
+                retcols=["event_id"],
+            )
+
+            return [row["event_id"] for row in rows]
+
+        return await self.db_pool.runInteraction(
+            desc="get_all_relation_ids_for_event",
+            func=get_all_relation_ids_for_event_txn,
+        )
+
     async def event_includes_relation(self, event_id: str) -> bool:
         """Check if the given event relates to another event.
 
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index ae9c201b87..1b8ec67f54 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -927,11 +927,10 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
             raise Exception("Invalid host name")
 
         sql = """
-            SELECT state_key FROM current_state_events AS c
-            INNER JOIN room_memberships AS m USING (event_id)
-            WHERE m.membership = ?
+            SELECT state_key FROM current_state_events
+            WHERE membership = ?
                 AND type = 'm.room.member'
-                AND c.room_id = ?
+                AND room_id = ?
                 AND state_key LIKE ?
             LIMIT 1
         """
diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py
index a0319575f0..b0a06baf4f 100644
--- a/synapse/storage/databases/main/user_directory.py
+++ b/synapse/storage/databases/main/user_directory.py
@@ -1061,12 +1061,15 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore):
             # The array of numbers are the weights for the various part of the
             # search: (domain, _, display name, localpart)
             sql = """
+                WITH matching_users AS (
+                    SELECT user_id, vector FROM user_directory_search WHERE vector @@ to_tsquery('simple', ?)
+                    LIMIT 10000
+                )
                 SELECT d.user_id AS user_id, display_name, avatar_url
-                FROM user_directory_search as t
+                FROM matching_users as t
                 INNER JOIN user_directory AS d USING (user_id)
                 WHERE
                     %(where_clause)s
-                    AND vector @@ to_tsquery('simple', ?)
                 ORDER BY
                     (CASE WHEN d.user_id IS NOT NULL THEN 4.0 ELSE 1.0 END)
                     * (CASE WHEN display_name IS NOT NULL THEN 1.2 ELSE 1.0 END)
@@ -1095,8 +1098,9 @@ class UserDirectoryStore(UserDirectoryBackgroundUpdateStore):
                 "order_case_statements": " ".join(additional_ordering_statements),
             }
             args = (
-                join_args
-                + (full_query, exact_query, prefix_query)
+                (full_query,)
+                + join_args
+                + (exact_query, prefix_query)
                 + ordering_arguments
                 + (limit + 1,)
             )
diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index 5cc786f030..fc190a8b13 100644
--- a/synapse/storage/schema/__init__.py
+++ b/synapse/storage/schema/__init__.py
@@ -12,7 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-SCHEMA_VERSION = 77  # remember to update the list below when updating
+SCHEMA_VERSION = 78  # remember to update the list below when updating
 """Represents the expectations made by the codebase about the database schema
 
 This should be incremented whenever the codebase changes its requirements on the
@@ -103,6 +103,9 @@ Changes in SCHEMA_VERSION = 76:
 
 Changes in SCHEMA_VERSION = 77
     - (Postgres) Add NOT VALID CHECK (full_user_id IS NOT NULL) to tables profiles and user_filters
+
+Changes in SCHEMA_VERSION = 78
+    - Validate check (full_user_id IS NOT NULL) on tables profiles and user_filters
 """
 
 
diff --git a/synapse/storage/schema/main/delta/78/01_validate_and_update_profiles.py b/synapse/storage/schema/main/delta/78/01_validate_and_update_profiles.py
new file mode 100644
index 0000000000..8398d8f548
--- /dev/null
+++ b/synapse/storage/schema/main/delta/78/01_validate_and_update_profiles.py
@@ -0,0 +1,92 @@
+# Copyright 2023 The Matrix.org Foundation C.I.C
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+from synapse.config.homeserver import HomeServerConfig
+from synapse.storage.database import LoggingTransaction
+from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
+
+
+def run_upgrade(
+    cur: LoggingTransaction,
+    database_engine: BaseDatabaseEngine,
+    config: HomeServerConfig,
+) -> None:
+    """
+    Part 3 of a multi-step migration to drop the column `user_id` and replace it with
+    `full_user_id`. See the database schema docs for more information on the full
+    migration steps.
+    """
+    hostname = config.server.server_name
+
+    if isinstance(database_engine, PostgresEngine):
+        # check if the constraint can be validated
+        check_sql = """
+        SELECT user_id from profiles WHERE full_user_id IS NULL
+        """
+        cur.execute(check_sql)
+        res = cur.fetchall()
+
+        if res:
+            # there are rows the background job missed, finish them here before we validate the constraint
+            process_rows_sql = """
+            UPDATE profiles
+            SET full_user_id = '@' || user_id || ?
+            WHERE user_id IN (
+                SELECT user_id FROM profiles WHERE full_user_id IS NULL
+            )
+            """
+            cur.execute(process_rows_sql, (f":{hostname}",))
+
+        # Now we can validate
+        validate_sql = """
+        ALTER TABLE profiles VALIDATE CONSTRAINT full_user_id_not_null
+        """
+        cur.execute(validate_sql)
+
+    else:
+        # in SQLite we need to rewrite the table to add the constraint.
+        # First drop any temporary table that might be here from a previous failed migration.
+        cur.execute("DROP TABLE IF EXISTS temp_profiles")
+
+        create_sql = """
+        CREATE TABLE temp_profiles (
+            full_user_id text NOT NULL,
+            user_id text,
+            displayname text,
+            avatar_url text,
+            UNIQUE (full_user_id),
+            UNIQUE (user_id)
+        )
+        """
+        cur.execute(create_sql)
+
+        copy_sql = """
+        INSERT INTO temp_profiles (
+            user_id,
+            displayname,
+            avatar_url,
+            full_user_id)
+            SELECT user_id, displayname, avatar_url, '@' || user_id || ':' || ? FROM profiles
+        """
+        cur.execute(copy_sql, (f"{hostname}",))
+
+        drop_sql = """
+        DROP TABLE profiles
+        """
+        cur.execute(drop_sql)
+
+        rename_sql = """
+        ALTER TABLE temp_profiles RENAME to profiles
+        """
+        cur.execute(rename_sql)
diff --git a/synapse/storage/schema/main/delta/78/02_validate_and_update_user_filters.py b/synapse/storage/schema/main/delta/78/02_validate_and_update_user_filters.py
new file mode 100644
index 0000000000..8ef63335e7
--- /dev/null
+++ b/synapse/storage/schema/main/delta/78/02_validate_and_update_user_filters.py
@@ -0,0 +1,95 @@
+# Copyright 2023 The Matrix.org Foundation C.I.C
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+
+from synapse.config.homeserver import HomeServerConfig
+from synapse.storage.database import LoggingTransaction
+from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
+
+
+def run_upgrade(
+    cur: LoggingTransaction,
+    database_engine: BaseDatabaseEngine,
+    config: HomeServerConfig,
+) -> None:
+    """
+    Part 3 of a multi-step migration to drop the column `user_id` and replace it with
+    `full_user_id`. See the database schema docs for more information on the full
+    migration steps.
+    """
+    hostname = config.server.server_name
+
+    if isinstance(database_engine, PostgresEngine):
+        # check if the constraint can be validated
+        check_sql = """
+        SELECT user_id from user_filters WHERE full_user_id IS NULL
+        """
+        cur.execute(check_sql)
+        res = cur.fetchall()
+
+        if res:
+            # there are rows the background job missed, finish them here before we validate constraint
+            process_rows_sql = """
+            UPDATE user_filters
+            SET full_user_id = '@' || user_id || ?
+            WHERE user_id IN (
+                SELECT user_id FROM user_filters WHERE full_user_id IS NULL
+            )
+            """
+            cur.execute(process_rows_sql, (f":{hostname}",))
+
+        # Now we can validate
+        validate_sql = """
+        ALTER TABLE user_filters VALIDATE CONSTRAINT full_user_id_not_null
+        """
+        cur.execute(validate_sql)
+
+    else:
+        cur.execute("DROP TABLE IF EXISTS temp_user_filters")
+        create_sql = """
+        CREATE TABLE temp_user_filters (
+            full_user_id text NOT NULL,
+            user_id text NOT NULL,
+            filter_id bigint NOT NULL,
+            filter_json bytea NOT NULL,
+            UNIQUE (full_user_id),
+            UNIQUE (user_id)
+        )
+        """
+        cur.execute(create_sql)
+
+        index_sql = """
+        CREATE UNIQUE INDEX IF NOT EXISTS user_filters_unique ON
+            temp_user_filters (user_id, filter_id)
+        """
+        cur.execute(index_sql)
+
+        copy_sql = """
+        INSERT INTO temp_user_filters (
+            user_id,
+            filter_id,
+            filter_json,
+            full_user_id)
+            SELECT user_id, filter_id, filter_json, '@' || user_id || ':' || ? FROM user_filters
+        """
+        cur.execute(copy_sql, (f"{hostname}",))
+
+        drop_sql = """
+        DROP TABLE user_filters
+        """
+        cur.execute(drop_sql)
+
+        rename_sql = """
+        ALTER TABLE temp_user_filters RENAME to user_filters
+        """
+        cur.execute(rename_sql)
diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py
index 42baf8ac6b..dfc95e8ebb 100644
--- a/synapse/types/__init__.py
+++ b/synapse/types/__init__.py
@@ -131,6 +131,7 @@ class Requester:
     user: "UserID"
     access_token_id: Optional[int]
     is_guest: bool
+    scope: Set[str]
     shadow_banned: bool
     device_id: Optional[str]
     app_service: Optional["ApplicationService"]
@@ -147,6 +148,7 @@ class Requester:
             "user_id": self.user.to_string(),
             "access_token_id": self.access_token_id,
             "is_guest": self.is_guest,
+            "scope": list(self.scope),
             "shadow_banned": self.shadow_banned,
             "device_id": self.device_id,
             "app_server_id": self.app_service.id if self.app_service else None,
@@ -175,6 +177,7 @@ class Requester:
             user=UserID.from_string(input["user_id"]),
             access_token_id=input["access_token_id"],
             is_guest=input["is_guest"],
+            scope=set(input["scope"]),
             shadow_banned=input["shadow_banned"],
             device_id=input["device_id"],
             app_service=appservice,
@@ -186,6 +189,7 @@ def create_requester(
     user_id: Union[str, "UserID"],
     access_token_id: Optional[int] = None,
     is_guest: bool = False,
+    scope: StrCollection = (),
     shadow_banned: bool = False,
     device_id: Optional[str] = None,
     app_service: Optional["ApplicationService"] = None,
@@ -199,6 +203,7 @@ def create_requester(
         access_token_id:  *ID* of the access token used for this
             request, or None if it came via the appservice API or similar
         is_guest:  True if the user making this request is a guest user
+        scope:  the scope of the access token used for this request, if any
         shadow_banned:  True if the user making this request is shadow-banned.
         device_id:  device_id which was set at authentication time
         app_service:  the AS requesting on behalf of the user
@@ -215,10 +220,13 @@ def create_requester(
     if authenticated_entity is None:
         authenticated_entity = user_id.to_string()
 
+    scope = set(scope)
+
     return Requester(
         user_id,
         access_token_id,
         is_guest,
+        scope,
         shadow_banned,
         device_id,
         app_service,
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index 9ddd26ccaa..7ea0c4c36b 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -76,7 +76,7 @@ def unwrapFirstError(failure: Failure) -> Failure:
     # the subFailure's value, which will do a better job of preserving stacktraces.
     # (actually, you probably want to use yieldable_gather_results anyway)
     failure.trap(defer.FirstError)
-    return failure.value.subFailure  # type: ignore[union-attr]  # Issue in Twisted's annotations
+    return failure.value.subFailure
 
 
 P = ParamSpec("P")
@@ -178,7 +178,7 @@ def log_failure(
     """
 
     logger.error(
-        msg, exc_info=(failure.type, failure.value, failure.getTracebackObject())  # type: ignore[arg-type]
+        msg, exc_info=(failure.type, failure.value, failure.getTracebackObject())
     )
 
     if not consumeErrors:
diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py
index 01e3cd46f6..4041e49e71 100644
--- a/synapse/util/async_helpers.py
+++ b/synapse/util/async_helpers.py
@@ -138,7 +138,7 @@ class ObservableDeferred(Generic[_T], AbstractObservableDeferred[_T]):
             for observer in observers:
                 # This is a little bit of magic to correctly propagate stack
                 # traces when we `await` on one of the observer deferreds.
-                f.value.__failure__ = f  # type: ignore[union-attr]
+                f.value.__failure__ = f
                 try:
                     observer.errback(f)
                 except Exception as e:
diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py
index 452d5d04c1..ed0da17227 100644
--- a/synapse/util/caches/lrucache.py
+++ b/synapse/util/caches/lrucache.py
@@ -93,10 +93,8 @@ VT = TypeVar("VT")
 # a general type var, distinct from either KT or VT
 T = TypeVar("T")
 
-P = TypeVar("P")
 
-
-class _TimedListNode(ListNode[P]):
+class _TimedListNode(ListNode[T]):
     """A `ListNode` that tracks last access time."""
 
     __slots__ = ["last_access_ts_secs"]
@@ -821,7 +819,7 @@ class AsyncLruCache(Generic[KT, VT]):
     utilize external cache systems that require await behaviour to be created.
     """
 
-    def __init__(self, *args, **kwargs):  # type: ignore
+    def __init__(self, *args: Any, **kwargs: Any):
         self._lru_cache: LruCache[KT, VT] = LruCache(*args, **kwargs)
 
     async def get(
diff --git a/synapse/visibility.py b/synapse/visibility.py
index 468e22f8f6..fc71dc92a4 100644
--- a/synapse/visibility.py
+++ b/synapse/visibility.py
@@ -41,7 +41,7 @@ from synapse.types.state import StateFilter
 from synapse.util import Clock
 
 logger = logging.getLogger(__name__)
-
+filtered_event_logger = logging.getLogger("synapse.visibility.filtered_event_debug")
 
 VISIBILITY_PRIORITY = (
     HistoryVisibility.WORLD_READABLE,
@@ -97,8 +97,8 @@ async def filter_events_for_client(
     events_before_filtering = events
     events = [e for e in events if not e.internal_metadata.is_soft_failed()]
     if len(events_before_filtering) != len(events):
-        if logger.isEnabledFor(logging.DEBUG):
-            logger.debug(
+        if filtered_event_logger.isEnabledFor(logging.DEBUG):
+            filtered_event_logger.debug(
                 "filter_events_for_client: Filtered out soft-failed events: Before=%s, After=%s",
                 [event.event_id for event in events_before_filtering],
                 [event.event_id for event in events],
@@ -319,7 +319,7 @@ def _check_client_allowed_to_see_event(
             _check_filter_send_to_client(event, clock, retention_policy, sender_ignored)
             == _CheckFilter.DENIED
         ):
-            logger.debug(
+            filtered_event_logger.debug(
                 "_check_client_allowed_to_see_event(event=%s): Filtered out event because `_check_filter_send_to_client` returned `_CheckFilter.DENIED`",
                 event.event_id,
             )
@@ -341,7 +341,7 @@ def _check_client_allowed_to_see_event(
             )
             return event
 
-        logger.debug(
+        filtered_event_logger.debug(
             "_check_client_allowed_to_see_event(event=%s): Filtered out event because it's an outlier",
             event.event_id,
         )
@@ -367,7 +367,7 @@ def _check_client_allowed_to_see_event(
 
     membership_result = _check_membership(user_id, event, visibility, state, is_peeking)
     if not membership_result.allowed:
-        logger.debug(
+        filtered_event_logger.debug(
             "_check_client_allowed_to_see_event(event=%s): Filtered out event because the user can't see the event because of their membership, membership_result.allowed=%s membership_result.joined=%s",
             event.event_id,
             membership_result.allowed,
@@ -378,7 +378,7 @@ def _check_client_allowed_to_see_event(
     # If the sender has been erased and the user was not joined at the time, we
     # must only return the redacted form.
     if sender_erased and not membership_result.joined:
-        logger.debug(
+        filtered_event_logger.debug(
             "_check_client_allowed_to_see_event(event=%s): Returning pruned event because `sender_erased` and the user was not joined at the time",
             event.event_id,
         )