summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/api/auth/__init__.py175
-rw-r--r--synapse/api/auth/base.py (renamed from synapse/api/auth.py)471
-rw-r--r--synapse/api/auth/internal.py291
-rw-r--r--synapse/api/auth/msc3861_delegated.py352
-rw-r--r--synapse/api/errors.py28
-rw-r--r--synapse/config/auth.py19
-rw-r--r--synapse/config/experimental.py225
-rw-r--r--synapse/federation/federation_server.py7
-rw-r--r--synapse/handlers/auth.py8
-rw-r--r--synapse/handlers/oidc.py2
-rw-r--r--synapse/handlers/pagination.py4
-rw-r--r--synapse/http/server.py20
-rw-r--r--synapse/media/oembed.py32
-rw-r--r--synapse/media/preview_html.py79
-rw-r--r--synapse/module_api/__init__.py34
-rw-r--r--synapse/rest/admin/__init__.py21
-rw-r--r--synapse/rest/admin/users.py8
-rw-r--r--synapse/rest/client/account.py24
-rw-r--r--synapse/rest/client/capabilities.py3
-rw-r--r--synapse/rest/client/devices.py11
-rw-r--r--synapse/rest/client/keys.py30
-rw-r--r--synapse/rest/client/login.py34
-rw-r--r--synapse/rest/client/login_token_request.py47
-rw-r--r--synapse/rest/client/logout.py3
-rw-r--r--synapse/rest/client/register.py72
-rw-r--r--synapse/rest/client/versions.py4
-rw-r--r--synapse/rest/synapse/client/__init__.py6
-rw-r--r--synapse/rest/synapse/client/jwks.py70
-rw-r--r--synapse/rest/well_known.py10
-rw-r--r--synapse/server.py7
-rw-r--r--synapse/storage/controllers/state.py2
-rw-r--r--synapse/storage/databases/main/devices.py4
-rw-r--r--synapse/storage/databases/main/end_to_end_keys.py67
-rw-r--r--synapse/types/__init__.py8
-rw-r--r--synapse/util/__init__.py4
-rw-r--r--synapse/util/async_helpers.py2
-rw-r--r--synapse/util/caches/lrucache.py6
37 files changed, 1706 insertions, 484 deletions
diff --git a/synapse/api/auth/__init__.py b/synapse/api/auth/__init__.py
new file mode 100644
index 0000000000..90cfe39d76
--- /dev/null
+++ b/synapse/api/auth/__init__.py
@@ -0,0 +1,175 @@
+# Copyright 2023 The Matrix.org Foundation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from typing import Optional, Tuple
+
+from typing_extensions import Protocol
+
+from twisted.web.server import Request
+
+from synapse.appservice import ApplicationService
+from synapse.http.site import SynapseRequest
+from synapse.types import Requester
+
+# guests always get this device id.
+GUEST_DEVICE_ID = "guest_device"
+
+
+class Auth(Protocol):
+    """The interface that an auth provider must implement."""
+
+    async def check_user_in_room(
+        self,
+        room_id: str,
+        requester: Requester,
+        allow_departed_users: bool = False,
+    ) -> Tuple[str, Optional[str]]:
+        """Check if the user is in the room, or was at some point.
+        Args:
+            room_id: The room to check.
+
+            user_id: The user to check.
+
+            current_state: Optional map of the current state of the room.
+                If provided then that map is used to check whether they are a
+                member of the room. Otherwise the current membership is
+                loaded from the database.
+
+            allow_departed_users: if True, accept users that were previously
+                members but have now departed.
+
+        Raises:
+            AuthError if the user is/was not in the room.
+        Returns:
+            The current membership of the user in the room and the
+            membership event ID of the user.
+        """
+
+    async def get_user_by_req(
+        self,
+        request: SynapseRequest,
+        allow_guest: bool = False,
+        allow_expired: bool = False,
+    ) -> Requester:
+        """Get a registered user's ID.
+
+        Args:
+            request: An HTTP request with an access_token query parameter.
+            allow_guest: If False, will raise an AuthError if the user making the
+                request is a guest.
+            allow_expired: If True, allow the request through even if the account
+                is expired, or session token lifetime has ended. Note that
+                /login will deliver access tokens regardless of expiration.
+
+        Returns:
+            Resolves to the requester
+        Raises:
+            InvalidClientCredentialsError if no user by that token exists or the token
+                is invalid.
+            AuthError if access is denied for the user in the access token
+        """
+
+    async def validate_appservice_can_control_user_id(
+        self, app_service: ApplicationService, user_id: str
+    ) -> None:
+        """Validates that the app service is allowed to control
+        the given user.
+
+        Args:
+            app_service: The app service that controls the user
+            user_id: The author MXID that the app service is controlling
+
+        Raises:
+            AuthError: If the application service is not allowed to control the user
+                (user namespace regex does not match, wrong homeserver, etc)
+                or if the user has not been registered yet.
+        """
+
+    async def get_user_by_access_token(
+        self,
+        token: str,
+        allow_expired: bool = False,
+    ) -> Requester:
+        """Validate access token and get user_id from it
+
+        Args:
+            token: The access token to get the user by
+            allow_expired: If False, raises an InvalidClientTokenError
+                if the token is expired
+
+        Raises:
+            InvalidClientTokenError if a user by that token exists, but the token is
+                expired
+            InvalidClientCredentialsError if no user by that token exists or the token
+                is invalid
+        """
+
+    async def is_server_admin(self, requester: Requester) -> bool:
+        """Check if the given user is a local server admin.
+
+        Args:
+            requester: user to check
+
+        Returns:
+            True if the user is an admin
+        """
+
+    async def check_can_change_room_list(
+        self, room_id: str, requester: Requester
+    ) -> bool:
+        """Determine whether the user is allowed to edit the room's entry in the
+        published room list.
+
+        Args:
+            room_id
+            user
+        """
+
+    @staticmethod
+    def has_access_token(request: Request) -> bool:
+        """Checks if the request has an access_token.
+
+        Returns:
+            False if no access_token was given, True otherwise.
+        """
+
+    @staticmethod
+    def get_access_token_from_request(request: Request) -> str:
+        """Extracts the access_token from the request.
+
+        Args:
+            request: The http request.
+        Returns:
+            The access_token
+        Raises:
+            MissingClientTokenError: If there isn't a single access_token in the
+                request
+        """
+
+    async def check_user_in_room_or_world_readable(
+        self, room_id: str, requester: Requester, allow_departed_users: bool = False
+    ) -> Tuple[str, Optional[str]]:
+        """Checks that the user is or was in the room or the room is world
+        readable. If it isn't then an exception is raised.
+
+        Args:
+            room_id: room to check
+            user_id: user to check
+            allow_departed_users: if True, accept users that were previously
+                members but have now departed
+
+        Returns:
+            Resolves to the current membership of the user in the room and the
+            membership event ID of the user. If the user is not in the room and
+            never has been, then `(Membership.JOIN, None)` is returned.
+        """
diff --git a/synapse/api/auth.py b/synapse/api/auth/base.py
index 66e869bc2d..9321d6f186 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth/base.py
@@ -1,4 +1,4 @@
-# Copyright 2014 - 2016 OpenMarket Ltd
+# Copyright 2023 The Matrix.org Foundation.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -14,7 +14,6 @@
 import logging
 from typing import TYPE_CHECKING, Optional, Tuple
 
-import pymacaroons
 from netaddr import IPAddress
 
 from twisted.web.server import Request
@@ -24,19 +23,11 @@ from synapse.api.constants import EventTypes, HistoryVisibility, Membership
 from synapse.api.errors import (
     AuthError,
     Codes,
-    InvalidClientTokenError,
     MissingClientTokenError,
     UnstableSpecAuthError,
 )
 from synapse.appservice import ApplicationService
-from synapse.http import get_request_user_agent
-from synapse.http.site import SynapseRequest
-from synapse.logging.opentracing import (
-    active_span,
-    force_tracing,
-    start_active_span,
-    trace,
-)
+from synapse.logging.opentracing import trace
 from synapse.types import Requester, create_requester
 from synapse.util.cancellation import cancellable
 
@@ -46,26 +37,13 @@ if TYPE_CHECKING:
 logger = logging.getLogger(__name__)
 
 
-# guests always get this device id.
-GUEST_DEVICE_ID = "guest_device"
-
-
-class Auth:
-    """
-    This class contains functions for authenticating users of our client-server API.
-    """
+class BaseAuth:
+    """Common base class for all auth implementations."""
 
     def __init__(self, hs: "HomeServer"):
         self.hs = hs
-        self.clock = hs.get_clock()
         self.store = hs.get_datastores().main
-        self._account_validity_handler = hs.get_account_validity_handler()
         self._storage_controllers = hs.get_storage_controllers()
-        self._macaroon_generator = hs.get_macaroon_generator()
-
-        self._track_appservice_user_ips = hs.config.appservice.track_appservice_user_ips
-        self._track_puppeted_user_ips = hs.config.api.track_puppeted_user_ips
-        self._force_tracing_for_users = hs.config.tracing.force_tracing_for_users
 
     async def check_user_in_room(
         self,
@@ -119,139 +97,49 @@ class Auth:
             errcode=Codes.NOT_JOINED,
         )
 
-    @cancellable
-    async def get_user_by_req(
-        self,
-        request: SynapseRequest,
-        allow_guest: bool = False,
-        allow_expired: bool = False,
-    ) -> Requester:
-        """Get a registered user's ID.
+    @trace
+    async def check_user_in_room_or_world_readable(
+        self, room_id: str, requester: Requester, allow_departed_users: bool = False
+    ) -> Tuple[str, Optional[str]]:
+        """Checks that the user is or was in the room or the room is world
+        readable. If it isn't then an exception is raised.
 
         Args:
-            request: An HTTP request with an access_token query parameter.
-            allow_guest: If False, will raise an AuthError if the user making the
-                request is a guest.
-            allow_expired: If True, allow the request through even if the account
-                is expired, or session token lifetime has ended. Note that
-                /login will deliver access tokens regardless of expiration.
+            room_id: room to check
+            user_id: user to check
+            allow_departed_users: if True, accept users that were previously
+                members but have now departed
 
         Returns:
-            Resolves to the requester
-        Raises:
-            InvalidClientCredentialsError if no user by that token exists or the token
-                is invalid.
-            AuthError if access is denied for the user in the access token
+            Resolves to the current membership of the user in the room and the
+            membership event ID of the user. If the user is not in the room and
+            never has been, then `(Membership.JOIN, None)` is returned.
         """
-        parent_span = active_span()
-        with start_active_span("get_user_by_req"):
-            requester = await self._wrapped_get_user_by_req(
-                request, allow_guest, allow_expired
-            )
-
-            if parent_span:
-                if requester.authenticated_entity in self._force_tracing_for_users:
-                    # request tracing is enabled for this user, so we need to force it
-                    # tracing on for the parent span (which will be the servlet span).
-                    #
-                    # It's too late for the get_user_by_req span to inherit the setting,
-                    # so we also force it on for that.
-                    force_tracing()
-                    force_tracing(parent_span)
-                parent_span.set_tag(
-                    "authenticated_entity", requester.authenticated_entity
-                )
-                parent_span.set_tag("user_id", requester.user.to_string())
-                if requester.device_id is not None:
-                    parent_span.set_tag("device_id", requester.device_id)
-                if requester.app_service is not None:
-                    parent_span.set_tag("appservice_id", requester.app_service.id)
-            return requester
 
-    @cancellable
-    async def _wrapped_get_user_by_req(
-        self,
-        request: SynapseRequest,
-        allow_guest: bool,
-        allow_expired: bool,
-    ) -> Requester:
-        """Helper for get_user_by_req
-
-        Once get_user_by_req has set up the opentracing span, this does the actual work.
-        """
         try:
-            ip_addr = request.getClientAddress().host
-            user_agent = get_request_user_agent(request)
-
-            access_token = self.get_access_token_from_request(request)
-
-            # First check if it could be a request from an appservice
-            requester = await self._get_appservice_user(request)
-            if not requester:
-                # If not, it should be from a regular user
-                requester = await self.get_user_by_access_token(
-                    access_token, allow_expired=allow_expired
-                )
-
-                # Deny the request if the user account has expired.
-                # This check is only done for regular users, not appservice ones.
-                if not allow_expired:
-                    if await self._account_validity_handler.is_user_expired(
-                        requester.user.to_string()
-                    ):
-                        # Raise the error if either an account validity module has determined
-                        # the account has expired, or the legacy account validity
-                        # implementation is enabled and determined the account has expired
-                        raise AuthError(
-                            403,
-                            "User account has expired",
-                            errcode=Codes.EXPIRED_ACCOUNT,
-                        )
-
-            if ip_addr and (
-                not requester.app_service or self._track_appservice_user_ips
+            # check_user_in_room will return the most recent membership
+            # event for the user if:
+            #  * The user is a non-guest user, and was ever in the room
+            #  * The user is a guest user, and has joined the room
+            # else it will throw.
+            return await self.check_user_in_room(
+                room_id, requester, allow_departed_users=allow_departed_users
+            )
+        except AuthError:
+            visibility = await self._storage_controllers.state.get_current_state_event(
+                room_id, EventTypes.RoomHistoryVisibility, ""
+            )
+            if (
+                visibility
+                and visibility.content.get("history_visibility")
+                == HistoryVisibility.WORLD_READABLE
             ):
-                # XXX(quenting): I'm 95% confident that we could skip setting the
-                # device_id to "dummy-device" for appservices, and that the only impact
-                # would be some rows which whould not deduplicate in the 'user_ips'
-                # table during the transition
-                recorded_device_id = (
-                    "dummy-device"
-                    if requester.device_id is None and requester.app_service is not None
-                    else requester.device_id
-                )
-                await self.store.insert_client_ip(
-                    user_id=requester.authenticated_entity,
-                    access_token=access_token,
-                    ip=ip_addr,
-                    user_agent=user_agent,
-                    device_id=recorded_device_id,
-                )
-
-                # Track also the puppeted user client IP if enabled and the user is puppeting
-                if (
-                    requester.user.to_string() != requester.authenticated_entity
-                    and self._track_puppeted_user_ips
-                ):
-                    await self.store.insert_client_ip(
-                        user_id=requester.user.to_string(),
-                        access_token=access_token,
-                        ip=ip_addr,
-                        user_agent=user_agent,
-                        device_id=requester.device_id,
-                    )
-
-            if requester.is_guest and not allow_guest:
-                raise AuthError(
-                    403,
-                    "Guest access not allowed",
-                    errcode=Codes.GUEST_ACCESS_FORBIDDEN,
-                )
-
-            request.requester = requester
-            return requester
-        except KeyError:
-            raise MissingClientTokenError()
+                return Membership.JOIN, None
+            raise AuthError(
+                403,
+                "User %r not in room %s, and room previews are disabled"
+                % (requester.user, room_id),
+            )
 
     async def validate_appservice_can_control_user_id(
         self, app_service: ApplicationService, user_id: str
@@ -284,184 +172,16 @@ class Auth:
                 403, "Application service has not registered this user (%s)" % user_id
             )
 
-    @cancellable
-    async def _get_appservice_user(self, request: Request) -> Optional[Requester]:
-        """
-        Given a request, reads the request parameters to determine:
-        - whether it's an application service that's making this request
-        - what user the application service should be treated as controlling
-          (the user_id URI parameter allows an application service to masquerade
-          any applicable user in its namespace)
-        - what device the application service should be treated as controlling
-          (the device_id[^1] URI parameter allows an application service to masquerade
-          as any device that exists for the relevant user)
-
-        [^1] Unstable and provided by MSC3202.
-             Must use `org.matrix.msc3202.device_id` in place of `device_id` for now.
-
-        Returns:
-            the application service `Requester` of that request
-
-        Postconditions:
-        - The `app_service` field in the returned `Requester` is set
-        - The `user_id` field in the returned `Requester` is either the application
-          service sender or the controlled user set by the `user_id` URI parameter
-        - The returned application service is permitted to control the returned user ID.
-        - The returned device ID, if present, has been checked to be a valid device ID
-          for the returned user ID.
-        """
-        DEVICE_ID_ARG_NAME = b"org.matrix.msc3202.device_id"
-
-        app_service = self.store.get_app_service_by_token(
-            self.get_access_token_from_request(request)
-        )
-        if app_service is None:
-            return None
-
-        if app_service.ip_range_whitelist:
-            ip_address = IPAddress(request.getClientAddress().host)
-            if ip_address not in app_service.ip_range_whitelist:
-                return None
-
-        # This will always be set by the time Twisted calls us.
-        assert request.args is not None
-
-        if b"user_id" in request.args:
-            effective_user_id = request.args[b"user_id"][0].decode("utf8")
-            await self.validate_appservice_can_control_user_id(
-                app_service, effective_user_id
-            )
-        else:
-            effective_user_id = app_service.sender
-
-        effective_device_id: Optional[str] = None
-
-        if (
-            self.hs.config.experimental.msc3202_device_masquerading_enabled
-            and DEVICE_ID_ARG_NAME in request.args
-        ):
-            effective_device_id = request.args[DEVICE_ID_ARG_NAME][0].decode("utf8")
-            # We only just set this so it can't be None!
-            assert effective_device_id is not None
-            device_opt = await self.store.get_device(
-                effective_user_id, effective_device_id
-            )
-            if device_opt is None:
-                # For now, use 400 M_EXCLUSIVE if the device doesn't exist.
-                # This is an open thread of discussion on MSC3202 as of 2021-12-09.
-                raise AuthError(
-                    400,
-                    f"Application service trying to use a device that doesn't exist ('{effective_device_id}' for {effective_user_id})",
-                    Codes.EXCLUSIVE,
-                )
-
-        return create_requester(
-            effective_user_id, app_service=app_service, device_id=effective_device_id
-        )
-
-    async def get_user_by_access_token(
-        self,
-        token: str,
-        allow_expired: bool = False,
-    ) -> Requester:
-        """Validate access token and get user_id from it
-
-        Args:
-            token: The access token to get the user by
-            allow_expired: If False, raises an InvalidClientTokenError
-                if the token is expired
-
-        Raises:
-            InvalidClientTokenError if a user by that token exists, but the token is
-                expired
-            InvalidClientCredentialsError if no user by that token exists or the token
-                is invalid
-        """
-
-        # First look in the database to see if the access token is present
-        # as an opaque token.
-        user_info = await self.store.get_user_by_access_token(token)
-        if user_info:
-            valid_until_ms = user_info.valid_until_ms
-            if (
-                not allow_expired
-                and valid_until_ms is not None
-                and valid_until_ms < self.clock.time_msec()
-            ):
-                # there was a valid access token, but it has expired.
-                # soft-logout the user.
-                raise InvalidClientTokenError(
-                    msg="Access token has expired", soft_logout=True
-                )
-
-            # Mark the token as used. This is used to invalidate old refresh
-            # tokens after some time.
-            await self.store.mark_access_token_as_used(user_info.token_id)
-
-            requester = create_requester(
-                user_id=user_info.user_id,
-                access_token_id=user_info.token_id,
-                is_guest=user_info.is_guest,
-                shadow_banned=user_info.shadow_banned,
-                device_id=user_info.device_id,
-                authenticated_entity=user_info.token_owner,
-            )
-
-            return requester
-
-        # If the token isn't found in the database, then it could still be a
-        # macaroon for a guest, so we check that here.
-        try:
-            user_id = self._macaroon_generator.verify_guest_token(token)
-
-            # Guest access tokens are not stored in the database (there can
-            # only be one access token per guest, anyway).
-            #
-            # In order to prevent guest access tokens being used as regular
-            # user access tokens (and hence getting around the invalidation
-            # process), we look up the user id and check that it is indeed
-            # a guest user.
-            #
-            # It would of course be much easier to store guest access
-            # tokens in the database as well, but that would break existing
-            # guest tokens.
-            stored_user = await self.store.get_user_by_id(user_id)
-            if not stored_user:
-                raise InvalidClientTokenError("Unknown user_id %s" % user_id)
-            if not stored_user["is_guest"]:
-                raise InvalidClientTokenError(
-                    "Guest access token used for regular user"
-                )
-
-            return create_requester(
-                user_id=user_id,
-                is_guest=True,
-                # all guests get the same device id
-                device_id=GUEST_DEVICE_ID,
-                authenticated_entity=user_id,
-            )
-        except (
-            pymacaroons.exceptions.MacaroonException,
-            TypeError,
-            ValueError,
-        ) as e:
-            logger.warning(
-                "Invalid access token in auth: %s %s.",
-                type(e),
-                e,
-            )
-            raise InvalidClientTokenError("Invalid access token passed.")
-
     async def is_server_admin(self, requester: Requester) -> bool:
         """Check if the given user is a local server admin.
 
         Args:
-            requester: The user making the request, according to the access token.
+            requester: user to check
 
         Returns:
             True if the user is an admin
         """
-        return await self.store.is_server_admin(requester.user)
+        raise NotImplementedError()
 
     async def check_can_change_room_list(
         self, room_id: str, requester: Requester
@@ -470,8 +190,8 @@ class Auth:
         published room list.
 
         Args:
-            room_id: The room to check.
-            requester: The user making the request, according to the access token.
+            room_id
+            user
         """
 
         is_admin = await self.is_server_admin(requester)
@@ -518,7 +238,6 @@ class Auth:
         return bool(query_params) or bool(auth_headers)
 
     @staticmethod
-    @cancellable
     def get_access_token_from_request(request: Request) -> str:
         """Extracts the access_token from the request.
 
@@ -556,47 +275,77 @@ class Auth:
 
             return query_params[0].decode("ascii")
 
-    @trace
-    async def check_user_in_room_or_world_readable(
-        self, room_id: str, requester: Requester, allow_departed_users: bool = False
-    ) -> Tuple[str, Optional[str]]:
-        """Checks that the user is or was in the room or the room is world
-        readable. If it isn't then an exception is raised.
+    @cancellable
+    async def get_appservice_user(
+        self, request: Request, access_token: str
+    ) -> Optional[Requester]:
+        """
+        Given a request, reads the request parameters to determine:
+        - whether it's an application service that's making this request
+        - what user the application service should be treated as controlling
+          (the user_id URI parameter allows an application service to masquerade
+          any applicable user in its namespace)
+        - what device the application service should be treated as controlling
+          (the device_id[^1] URI parameter allows an application service to masquerade
+          as any device that exists for the relevant user)
 
-        Args:
-            room_id: The room to check.
-            requester: The user making the request, according to the access token.
-            allow_departed_users: If True, accept users that were previously
-                members but have now departed.
+        [^1] Unstable and provided by MSC3202.
+             Must use `org.matrix.msc3202.device_id` in place of `device_id` for now.
 
         Returns:
-            Resolves to the current membership of the user in the room and the
-            membership event ID of the user. If the user is not in the room and
-            never has been, then `(Membership.JOIN, None)` is returned.
+            the application service `Requester` of that request
+
+        Postconditions:
+        - The `app_service` field in the returned `Requester` is set
+        - The `user_id` field in the returned `Requester` is either the application
+          service sender or the controlled user set by the `user_id` URI parameter
+        - The returned application service is permitted to control the returned user ID.
+        - The returned device ID, if present, has been checked to be a valid device ID
+          for the returned user ID.
         """
+        DEVICE_ID_ARG_NAME = b"org.matrix.msc3202.device_id"
 
-        try:
-            # check_user_in_room will return the most recent membership
-            # event for the user if:
-            #  * The user is a non-guest user, and was ever in the room
-            #  * The user is a guest user, and has joined the room
-            # else it will throw.
-            return await self.check_user_in_room(
-                room_id, requester, allow_departed_users=allow_departed_users
-            )
-        except AuthError:
-            visibility = await self._storage_controllers.state.get_current_state_event(
-                room_id, EventTypes.RoomHistoryVisibility, ""
+        app_service = self.store.get_app_service_by_token(access_token)
+        if app_service is None:
+            return None
+
+        if app_service.ip_range_whitelist:
+            ip_address = IPAddress(request.getClientAddress().host)
+            if ip_address not in app_service.ip_range_whitelist:
+                return None
+
+        # This will always be set by the time Twisted calls us.
+        assert request.args is not None
+
+        if b"user_id" in request.args:
+            effective_user_id = request.args[b"user_id"][0].decode("utf8")
+            await self.validate_appservice_can_control_user_id(
+                app_service, effective_user_id
             )
-            if (
-                visibility
-                and visibility.content.get("history_visibility")
-                == HistoryVisibility.WORLD_READABLE
-            ):
-                return Membership.JOIN, None
-            raise UnstableSpecAuthError(
-                403,
-                "User %s not in room %s, and room previews are disabled"
-                % (requester.user, room_id),
-                errcode=Codes.NOT_JOINED,
+        else:
+            effective_user_id = app_service.sender
+
+        effective_device_id: Optional[str] = None
+
+        if (
+            self.hs.config.experimental.msc3202_device_masquerading_enabled
+            and DEVICE_ID_ARG_NAME in request.args
+        ):
+            effective_device_id = request.args[DEVICE_ID_ARG_NAME][0].decode("utf8")
+            # We only just set this so it can't be None!
+            assert effective_device_id is not None
+            device_opt = await self.store.get_device(
+                effective_user_id, effective_device_id
             )
+            if device_opt is None:
+                # For now, use 400 M_EXCLUSIVE if the device doesn't exist.
+                # This is an open thread of discussion on MSC3202 as of 2021-12-09.
+                raise AuthError(
+                    400,
+                    f"Application service trying to use a device that doesn't exist ('{effective_device_id}' for {effective_user_id})",
+                    Codes.EXCLUSIVE,
+                )
+
+        return create_requester(
+            effective_user_id, app_service=app_service, device_id=effective_device_id
+        )
diff --git a/synapse/api/auth/internal.py b/synapse/api/auth/internal.py
new file mode 100644
index 0000000000..e2ae198b19
--- /dev/null
+++ b/synapse/api/auth/internal.py
@@ -0,0 +1,291 @@
+# Copyright 2023 The Matrix.org Foundation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+from typing import TYPE_CHECKING
+
+import pymacaroons
+
+from synapse.api.errors import (
+    AuthError,
+    Codes,
+    InvalidClientTokenError,
+    MissingClientTokenError,
+)
+from synapse.http import get_request_user_agent
+from synapse.http.site import SynapseRequest
+from synapse.logging.opentracing import active_span, force_tracing, start_active_span
+from synapse.types import Requester, create_requester
+from synapse.util.cancellation import cancellable
+
+from . import GUEST_DEVICE_ID
+from .base import BaseAuth
+
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
+logger = logging.getLogger(__name__)
+
+
+class InternalAuth(BaseAuth):
+    """
+    This class contains functions for authenticating users of our client-server API.
+    """
+
+    def __init__(self, hs: "HomeServer"):
+        super().__init__(hs)
+        self.clock = hs.get_clock()
+        self._account_validity_handler = hs.get_account_validity_handler()
+        self._macaroon_generator = hs.get_macaroon_generator()
+
+        self._track_appservice_user_ips = hs.config.appservice.track_appservice_user_ips
+        self._track_puppeted_user_ips = hs.config.api.track_puppeted_user_ips
+        self._force_tracing_for_users = hs.config.tracing.force_tracing_for_users
+
+    @cancellable
+    async def get_user_by_req(
+        self,
+        request: SynapseRequest,
+        allow_guest: bool = False,
+        allow_expired: bool = False,
+    ) -> Requester:
+        """Get a registered user's ID.
+
+        Args:
+            request: An HTTP request with an access_token query parameter.
+            allow_guest: If False, will raise an AuthError if the user making the
+                request is a guest.
+            allow_expired: If True, allow the request through even if the account
+                is expired, or session token lifetime has ended. Note that
+                /login will deliver access tokens regardless of expiration.
+
+        Returns:
+            Resolves to the requester
+        Raises:
+            InvalidClientCredentialsError if no user by that token exists or the token
+                is invalid.
+            AuthError if access is denied for the user in the access token
+        """
+        parent_span = active_span()
+        with start_active_span("get_user_by_req"):
+            requester = await self._wrapped_get_user_by_req(
+                request, allow_guest, allow_expired
+            )
+
+            if parent_span:
+                if requester.authenticated_entity in self._force_tracing_for_users:
+                    # request tracing is enabled for this user, so we need to force it
+                    # tracing on for the parent span (which will be the servlet span).
+                    #
+                    # It's too late for the get_user_by_req span to inherit the setting,
+                    # so we also force it on for that.
+                    force_tracing()
+                    force_tracing(parent_span)
+                parent_span.set_tag(
+                    "authenticated_entity", requester.authenticated_entity
+                )
+                parent_span.set_tag("user_id", requester.user.to_string())
+                if requester.device_id is not None:
+                    parent_span.set_tag("device_id", requester.device_id)
+                if requester.app_service is not None:
+                    parent_span.set_tag("appservice_id", requester.app_service.id)
+            return requester
+
+    @cancellable
+    async def _wrapped_get_user_by_req(
+        self,
+        request: SynapseRequest,
+        allow_guest: bool,
+        allow_expired: bool,
+    ) -> Requester:
+        """Helper for get_user_by_req
+
+        Once get_user_by_req has set up the opentracing span, this does the actual work.
+        """
+        try:
+            ip_addr = request.getClientAddress().host
+            user_agent = get_request_user_agent(request)
+
+            access_token = self.get_access_token_from_request(request)
+
+            # First check if it could be a request from an appservice
+            requester = await self.get_appservice_user(request, access_token)
+            if not requester:
+                # If not, it should be from a regular user
+                requester = await self.get_user_by_access_token(
+                    access_token, allow_expired=allow_expired
+                )
+
+                # Deny the request if the user account has expired.
+                # This check is only done for regular users, not appservice ones.
+                if not allow_expired:
+                    if await self._account_validity_handler.is_user_expired(
+                        requester.user.to_string()
+                    ):
+                        # Raise the error if either an account validity module has determined
+                        # the account has expired, or the legacy account validity
+                        # implementation is enabled and determined the account has expired
+                        raise AuthError(
+                            403,
+                            "User account has expired",
+                            errcode=Codes.EXPIRED_ACCOUNT,
+                        )
+
+            if ip_addr and (
+                not requester.app_service or self._track_appservice_user_ips
+            ):
+                # XXX(quenting): I'm 95% confident that we could skip setting the
+                # device_id to "dummy-device" for appservices, and that the only impact
+                # would be some rows which whould not deduplicate in the 'user_ips'
+                # table during the transition
+                recorded_device_id = (
+                    "dummy-device"
+                    if requester.device_id is None and requester.app_service is not None
+                    else requester.device_id
+                )
+                await self.store.insert_client_ip(
+                    user_id=requester.authenticated_entity,
+                    access_token=access_token,
+                    ip=ip_addr,
+                    user_agent=user_agent,
+                    device_id=recorded_device_id,
+                )
+
+                # Track also the puppeted user client IP if enabled and the user is puppeting
+                if (
+                    requester.user.to_string() != requester.authenticated_entity
+                    and self._track_puppeted_user_ips
+                ):
+                    await self.store.insert_client_ip(
+                        user_id=requester.user.to_string(),
+                        access_token=access_token,
+                        ip=ip_addr,
+                        user_agent=user_agent,
+                        device_id=requester.device_id,
+                    )
+
+            if requester.is_guest and not allow_guest:
+                raise AuthError(
+                    403,
+                    "Guest access not allowed",
+                    errcode=Codes.GUEST_ACCESS_FORBIDDEN,
+                )
+
+            request.requester = requester
+            return requester
+        except KeyError:
+            raise MissingClientTokenError()
+
+    async def get_user_by_access_token(
+        self,
+        token: str,
+        allow_expired: bool = False,
+    ) -> Requester:
+        """Validate access token and get user_id from it
+
+        Args:
+            token: The access token to get the user by
+            allow_expired: If False, raises an InvalidClientTokenError
+                if the token is expired
+
+        Raises:
+            InvalidClientTokenError if a user by that token exists, but the token is
+                expired
+            InvalidClientCredentialsError if no user by that token exists or the token
+                is invalid
+        """
+
+        # First look in the database to see if the access token is present
+        # as an opaque token.
+        user_info = await self.store.get_user_by_access_token(token)
+        if user_info:
+            valid_until_ms = user_info.valid_until_ms
+            if (
+                not allow_expired
+                and valid_until_ms is not None
+                and valid_until_ms < self.clock.time_msec()
+            ):
+                # there was a valid access token, but it has expired.
+                # soft-logout the user.
+                raise InvalidClientTokenError(
+                    msg="Access token has expired", soft_logout=True
+                )
+
+            # Mark the token as used. This is used to invalidate old refresh
+            # tokens after some time.
+            await self.store.mark_access_token_as_used(user_info.token_id)
+
+            requester = create_requester(
+                user_id=user_info.user_id,
+                access_token_id=user_info.token_id,
+                is_guest=user_info.is_guest,
+                shadow_banned=user_info.shadow_banned,
+                device_id=user_info.device_id,
+                authenticated_entity=user_info.token_owner,
+            )
+
+            return requester
+
+        # If the token isn't found in the database, then it could still be a
+        # macaroon for a guest, so we check that here.
+        try:
+            user_id = self._macaroon_generator.verify_guest_token(token)
+
+            # Guest access tokens are not stored in the database (there can
+            # only be one access token per guest, anyway).
+            #
+            # In order to prevent guest access tokens being used as regular
+            # user access tokens (and hence getting around the invalidation
+            # process), we look up the user id and check that it is indeed
+            # a guest user.
+            #
+            # It would of course be much easier to store guest access
+            # tokens in the database as well, but that would break existing
+            # guest tokens.
+            stored_user = await self.store.get_user_by_id(user_id)
+            if not stored_user:
+                raise InvalidClientTokenError("Unknown user_id %s" % user_id)
+            if not stored_user["is_guest"]:
+                raise InvalidClientTokenError(
+                    "Guest access token used for regular user"
+                )
+
+            return create_requester(
+                user_id=user_id,
+                is_guest=True,
+                # all guests get the same device id
+                device_id=GUEST_DEVICE_ID,
+                authenticated_entity=user_id,
+            )
+        except (
+            pymacaroons.exceptions.MacaroonException,
+            TypeError,
+            ValueError,
+        ) as e:
+            logger.warning(
+                "Invalid access token in auth: %s %s.",
+                type(e),
+                e,
+            )
+            raise InvalidClientTokenError("Invalid access token passed.")
+
+    async def is_server_admin(self, requester: Requester) -> bool:
+        """Check if the given user is a local server admin.
+
+        Args:
+            requester: The user making the request, according to the access token.
+
+        Returns:
+            True if the user is an admin
+        """
+        return await self.store.is_server_admin(requester.user)
diff --git a/synapse/api/auth/msc3861_delegated.py b/synapse/api/auth/msc3861_delegated.py
new file mode 100644
index 0000000000..bd4fc9c0ee
--- /dev/null
+++ b/synapse/api/auth/msc3861_delegated.py
@@ -0,0 +1,352 @@
+# Copyright 2023 The Matrix.org Foundation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+from typing import TYPE_CHECKING, Any, Dict, List, Optional
+from urllib.parse import urlencode
+
+from authlib.oauth2 import ClientAuth
+from authlib.oauth2.auth import encode_client_secret_basic, encode_client_secret_post
+from authlib.oauth2.rfc7523 import ClientSecretJWT, PrivateKeyJWT, private_key_jwt_sign
+from authlib.oauth2.rfc7662 import IntrospectionToken
+from authlib.oidc.discovery import OpenIDProviderMetadata, get_well_known_url
+
+from twisted.web.client import readBody
+from twisted.web.http_headers import Headers
+
+from synapse.api.auth.base import BaseAuth
+from synapse.api.errors import (
+    AuthError,
+    HttpResponseException,
+    InvalidClientTokenError,
+    OAuthInsufficientScopeError,
+    StoreError,
+    SynapseError,
+)
+from synapse.http.site import SynapseRequest
+from synapse.logging.context import make_deferred_yieldable
+from synapse.types import Requester, UserID, create_requester
+from synapse.util import json_decoder
+from synapse.util.caches.cached_call import RetryOnExceptionCachedCall
+
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
+logger = logging.getLogger(__name__)
+
+# Scope as defined by MSC2967
+# https://github.com/matrix-org/matrix-spec-proposals/pull/2967
+SCOPE_MATRIX_API = "urn:matrix:org.matrix.msc2967.client:api:*"
+SCOPE_MATRIX_GUEST = "urn:matrix:org.matrix.msc2967.client:api:guest"
+SCOPE_MATRIX_DEVICE_PREFIX = "urn:matrix:org.matrix.msc2967.client:device:"
+
+# Scope which allows access to the Synapse admin API
+SCOPE_SYNAPSE_ADMIN = "urn:synapse:admin:*"
+
+
+def scope_to_list(scope: str) -> List[str]:
+    """Convert a scope string to a list of scope tokens"""
+    return scope.strip().split(" ")
+
+
+class PrivateKeyJWTWithKid(PrivateKeyJWT):  # type: ignore[misc]
+    """An implementation of the private_key_jwt client auth method that includes a kid header.
+
+    This is needed because some providers (Keycloak) require the kid header to figure
+    out which key to use to verify the signature.
+    """
+
+    def sign(self, auth: Any, token_endpoint: str) -> bytes:
+        return private_key_jwt_sign(
+            auth.client_secret,
+            client_id=auth.client_id,
+            token_endpoint=token_endpoint,
+            claims=self.claims,
+            header={"kid": auth.client_secret["kid"]},
+        )
+
+
+class MSC3861DelegatedAuth(BaseAuth):
+    AUTH_METHODS = {
+        "client_secret_post": encode_client_secret_post,
+        "client_secret_basic": encode_client_secret_basic,
+        "client_secret_jwt": ClientSecretJWT(),
+        "private_key_jwt": PrivateKeyJWTWithKid(),
+    }
+
+    EXTERNAL_ID_PROVIDER = "oauth-delegated"
+
+    def __init__(self, hs: "HomeServer"):
+        super().__init__(hs)
+
+        self._config = hs.config.experimental.msc3861
+        auth_method = MSC3861DelegatedAuth.AUTH_METHODS.get(
+            self._config.client_auth_method.value, None
+        )
+        # Those assertions are already checked when parsing the config
+        assert self._config.enabled, "OAuth delegation is not enabled"
+        assert self._config.issuer, "No issuer provided"
+        assert self._config.client_id, "No client_id provided"
+        assert auth_method is not None, "Invalid client_auth_method provided"
+
+        self._http_client = hs.get_proxied_http_client()
+        self._hostname = hs.hostname
+        self._admin_token = self._config.admin_token
+
+        self._issuer_metadata = RetryOnExceptionCachedCall(self._load_metadata)
+
+        if isinstance(auth_method, PrivateKeyJWTWithKid):
+            # Use the JWK as the client secret when using the private_key_jwt method
+            assert self._config.jwk, "No JWK provided"
+            self._client_auth = ClientAuth(
+                self._config.client_id, self._config.jwk, auth_method
+            )
+        else:
+            # Else use the client secret
+            assert self._config.client_secret, "No client_secret provided"
+            self._client_auth = ClientAuth(
+                self._config.client_id, self._config.client_secret, auth_method
+            )
+
+    async def _load_metadata(self) -> OpenIDProviderMetadata:
+        if self._config.issuer_metadata is not None:
+            return OpenIDProviderMetadata(**self._config.issuer_metadata)
+        url = get_well_known_url(self._config.issuer, external=True)
+        response = await self._http_client.get_json(url)
+        metadata = OpenIDProviderMetadata(**response)
+        # metadata.validate_introspection_endpoint()
+        return metadata
+
+    async def _introspect_token(self, token: str) -> IntrospectionToken:
+        """
+        Send a token to the introspection endpoint and returns the introspection response
+
+        Parameters:
+            token: The token to introspect
+
+        Raises:
+            HttpResponseException: If the introspection endpoint returns a non-2xx response
+            ValueError: If the introspection endpoint returns an invalid JSON response
+            JSONDecodeError: If the introspection endpoint returns a non-JSON response
+            Exception: If the HTTP request fails
+
+        Returns:
+            The introspection response
+        """
+        metadata = await self._issuer_metadata.get()
+        introspection_endpoint = metadata.get("introspection_endpoint")
+        raw_headers: Dict[str, str] = {
+            "Content-Type": "application/x-www-form-urlencoded",
+            "User-Agent": str(self._http_client.user_agent, "utf-8"),
+            "Accept": "application/json",
+        }
+
+        args = {"token": token, "token_type_hint": "access_token"}
+        body = urlencode(args, True)
+
+        # Fill the body/headers with credentials
+        uri, raw_headers, body = self._client_auth.prepare(
+            method="POST", uri=introspection_endpoint, headers=raw_headers, body=body
+        )
+        headers = Headers({k: [v] for (k, v) in raw_headers.items()})
+
+        # Do the actual request
+        # We're not using the SimpleHttpClient util methods as we don't want to
+        # check the HTTP status code, and we do the body encoding ourselves.
+        response = await self._http_client.request(
+            method="POST",
+            uri=uri,
+            data=body.encode("utf-8"),
+            headers=headers,
+        )
+
+        resp_body = await make_deferred_yieldable(readBody(response))
+
+        if response.code < 200 or response.code >= 300:
+            raise HttpResponseException(
+                response.code,
+                response.phrase.decode("ascii", errors="replace"),
+                resp_body,
+            )
+
+        resp = json_decoder.decode(resp_body.decode("utf-8"))
+
+        if not isinstance(resp, dict):
+            raise ValueError(
+                "The introspection endpoint returned an invalid JSON response."
+            )
+
+        return IntrospectionToken(**resp)
+
+    async def is_server_admin(self, requester: Requester) -> bool:
+        return "urn:synapse:admin:*" in requester.scope
+
+    async def get_user_by_req(
+        self,
+        request: SynapseRequest,
+        allow_guest: bool = False,
+        allow_expired: bool = False,
+    ) -> Requester:
+        access_token = self.get_access_token_from_request(request)
+
+        requester = await self.get_appservice_user(request, access_token)
+        if not requester:
+            # TODO: we probably want to assert the allow_guest inside this call
+            # so that we don't provision the user if they don't have enough permission:
+            requester = await self.get_user_by_access_token(access_token, allow_expired)
+
+        if not allow_guest and requester.is_guest:
+            raise OAuthInsufficientScopeError([SCOPE_MATRIX_API])
+
+        request.requester = requester
+
+        return requester
+
+    async def get_user_by_access_token(
+        self,
+        token: str,
+        allow_expired: bool = False,
+    ) -> Requester:
+        if self._admin_token is not None and token == self._admin_token:
+            # XXX: This is a temporary solution so that the admin API can be called by
+            # the OIDC provider. This will be removed once we have OIDC client
+            # credentials grant support in matrix-authentication-service.
+            logging.info("Admin toked used")
+            # XXX: that user doesn't exist and won't be provisioned.
+            # This is mostly fine for admin calls, but we should also think about doing
+            # requesters without a user_id.
+            admin_user = UserID("__oidc_admin", self._hostname)
+            return create_requester(
+                user_id=admin_user,
+                scope=["urn:synapse:admin:*"],
+            )
+
+        try:
+            introspection_result = await self._introspect_token(token)
+        except Exception:
+            logger.exception("Failed to introspect token")
+            raise SynapseError(503, "Unable to introspect the access token")
+
+        logger.info(f"Introspection result: {introspection_result!r}")
+
+        # TODO: introspection verification should be more extensive, especially:
+        #   - verify the audience
+        if not introspection_result.get("active"):
+            raise InvalidClientTokenError("Token is not active")
+
+        # Let's look at the scope
+        scope: List[str] = scope_to_list(introspection_result.get("scope", ""))
+
+        # Determine type of user based on presence of particular scopes
+        has_user_scope = SCOPE_MATRIX_API in scope
+        has_guest_scope = SCOPE_MATRIX_GUEST in scope
+
+        if not has_user_scope and not has_guest_scope:
+            raise InvalidClientTokenError("No scope in token granting user rights")
+
+        # Match via the sub claim
+        sub: Optional[str] = introspection_result.get("sub")
+        if sub is None:
+            raise InvalidClientTokenError(
+                "Invalid sub claim in the introspection result"
+            )
+
+        user_id_str = await self.store.get_user_by_external_id(
+            MSC3861DelegatedAuth.EXTERNAL_ID_PROVIDER, sub
+        )
+        if user_id_str is None:
+            # If we could not find a user via the external_id, it either does not exist,
+            # or the external_id was never recorded
+
+            # TODO: claim mapping should be configurable
+            username: Optional[str] = introspection_result.get("username")
+            if username is None or not isinstance(username, str):
+                raise AuthError(
+                    500,
+                    "Invalid username claim in the introspection result",
+                )
+            user_id = UserID(username, self._hostname)
+
+            # First try to find a user from the username claim
+            user_info = await self.store.get_userinfo_by_id(user_id=user_id.to_string())
+            if user_info is None:
+                # If the user does not exist, we should create it on the fly
+                # TODO: we could use SCIM to provision users ahead of time and listen
+                # for SCIM SET events if those ever become standard:
+                # https://datatracker.ietf.org/doc/html/draft-hunt-scim-notify-00
+
+                # TODO: claim mapping should be configurable
+                # If present, use the name claim as the displayname
+                name: Optional[str] = introspection_result.get("name")
+
+                await self.store.register_user(
+                    user_id=user_id.to_string(), create_profile_with_displayname=name
+                )
+
+            # And record the sub as external_id
+            await self.store.record_user_external_id(
+                MSC3861DelegatedAuth.EXTERNAL_ID_PROVIDER, sub, user_id.to_string()
+            )
+        else:
+            user_id = UserID.from_string(user_id_str)
+
+        # Find device_ids in scope
+        # We only allow a single device_id in the scope, so we find them all in the
+        # scope list, and raise if there are more than one. The OIDC server should be
+        # the one enforcing valid scopes, so we raise a 500 if we find an invalid scope.
+        device_ids = [
+            tok[len(SCOPE_MATRIX_DEVICE_PREFIX) :]
+            for tok in scope
+            if tok.startswith(SCOPE_MATRIX_DEVICE_PREFIX)
+        ]
+
+        if len(device_ids) > 1:
+            raise AuthError(
+                500,
+                "Multiple device IDs in scope",
+            )
+
+        device_id = device_ids[0] if device_ids else None
+        if device_id is not None:
+            # Sanity check the device_id
+            if len(device_id) > 255 or len(device_id) < 1:
+                raise AuthError(
+                    500,
+                    "Invalid device ID in scope",
+                )
+
+            # Create the device on the fly if it does not exist
+            try:
+                await self.store.get_device(
+                    user_id=user_id.to_string(), device_id=device_id
+                )
+            except StoreError:
+                await self.store.store_device(
+                    user_id=user_id.to_string(),
+                    device_id=device_id,
+                    initial_device_display_name="OIDC-native client",
+                )
+
+        # TODO: there is a few things missing in the requester here, which still need
+        # to be figured out, like:
+        #   - impersonation, with the `authenticated_entity`, which is used for
+        #     rate-limiting, MAU limits, etc.
+        #   - shadow-banning, with the `shadow_banned` flag
+        #   - a proper solution for appservices, which still needs to be figured out in
+        #     the context of MSC3861
+        return create_requester(
+            user_id=user_id,
+            device_id=device_id,
+            scope=scope,
+            is_guest=(has_guest_scope and not has_user_scope),
+        )
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index 8c7c94b045..af894243f8 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -119,14 +119,20 @@ class Codes(str, Enum):
 
 
 class CodeMessageException(RuntimeError):
-    """An exception with integer code and message string attributes.
+    """An exception with integer code, a message string attributes and optional headers.
 
     Attributes:
         code: HTTP error code
         msg: string describing the error
+        headers: optional response headers to send
     """
 
-    def __init__(self, code: Union[int, HTTPStatus], msg: str):
+    def __init__(
+        self,
+        code: Union[int, HTTPStatus],
+        msg: str,
+        headers: Optional[Dict[str, str]] = None,
+    ):
         super().__init__("%d: %s" % (code, msg))
 
         # Some calls to this method pass instances of http.HTTPStatus for `code`.
@@ -137,6 +143,7 @@ class CodeMessageException(RuntimeError):
         # To eliminate this behaviour, we convert them to their integer equivalents here.
         self.code = int(code)
         self.msg = msg
+        self.headers = headers
 
 
 class RedirectException(CodeMessageException):
@@ -182,6 +189,7 @@ class SynapseError(CodeMessageException):
         msg: str,
         errcode: str = Codes.UNKNOWN,
         additional_fields: Optional[Dict] = None,
+        headers: Optional[Dict[str, str]] = None,
     ):
         """Constructs a synapse error.
 
@@ -190,7 +198,7 @@ class SynapseError(CodeMessageException):
             msg: The human-readable error message.
             errcode: The matrix error code e.g 'M_FORBIDDEN'
         """
-        super().__init__(code, msg)
+        super().__init__(code, msg, headers)
         self.errcode = errcode
         if additional_fields is None:
             self._additional_fields: Dict = {}
@@ -335,6 +343,20 @@ class AuthError(SynapseError):
         super().__init__(code, msg, errcode, additional_fields)
 
 
+class OAuthInsufficientScopeError(SynapseError):
+    """An error raised when the caller does not have sufficient scope to perform the requested action"""
+
+    def __init__(
+        self,
+        required_scopes: List[str],
+    ):
+        headers = {
+            "WWW-Authenticate": 'Bearer error="insufficient_scope", scope="%s"'
+            % (" ".join(required_scopes))
+        }
+        super().__init__(401, "Insufficient scope", Codes.FORBIDDEN, None, headers)
+
+
 class UnstableSpecAuthError(AuthError):
     """An error raised when a new error code is being proposed to replace a previous one.
     This error will return a "org.matrix.unstable.errcode" property with the new error code,
diff --git a/synapse/config/auth.py b/synapse/config/auth.py
index 35774962c0..c7ab428f28 100644
--- a/synapse/config/auth.py
+++ b/synapse/config/auth.py
@@ -29,7 +29,14 @@ class AuthConfig(Config):
         if password_config is None:
             password_config = {}
 
-        passwords_enabled = password_config.get("enabled", True)
+        # The default value of password_config.enabled is True, unless msc3861 is enabled.
+        msc3861_enabled = (
+            config.get("experimental_features", {})
+            .get("msc3861", {})
+            .get("enabled", False)
+        )
+        passwords_enabled = password_config.get("enabled", not msc3861_enabled)
+
         # 'only_for_reauth' allows users who have previously set a password to use it,
         # even though passwords would otherwise be disabled.
         passwords_for_reauth_only = passwords_enabled == "only_for_reauth"
@@ -53,3 +60,13 @@ class AuthConfig(Config):
         self.ui_auth_session_timeout = self.parse_duration(
             ui_auth.get("session_timeout", 0)
         )
+
+        # Logging in with an existing session.
+        login_via_existing = config.get("login_via_existing_session", {})
+        self.login_via_existing_enabled = login_via_existing.get("enabled", False)
+        self.login_via_existing_require_ui_auth = login_via_existing.get(
+            "require_ui_auth", True
+        )
+        self.login_via_existing_token_timeout = self.parse_duration(
+            login_via_existing.get("token_timeout", "5m")
+        )
diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py
index d769b7f668..a9e002cf08 100644
--- a/synapse/config/experimental.py
+++ b/synapse/config/experimental.py
@@ -12,15 +12,216 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from typing import Any, Optional
+import enum
+from typing import TYPE_CHECKING, Any, Optional
 
 import attr
+import attr.validators
 
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions
 from synapse.config import ConfigError
-from synapse.config._base import Config
+from synapse.config._base import Config, RootConfig
 from synapse.types import JsonDict
 
+# Determine whether authlib is installed.
+try:
+    import authlib  # noqa: F401
+
+    HAS_AUTHLIB = True
+except ImportError:
+    HAS_AUTHLIB = False
+
+if TYPE_CHECKING:
+    # Only import this if we're type checking, as it might not be installed at runtime.
+    from authlib.jose.rfc7517 import JsonWebKey
+
+
+class ClientAuthMethod(enum.Enum):
+    """List of supported client auth methods."""
+
+    CLIENT_SECRET_POST = "client_secret_post"
+    CLIENT_SECRET_BASIC = "client_secret_basic"
+    CLIENT_SECRET_JWT = "client_secret_jwt"
+    PRIVATE_KEY_JWT = "private_key_jwt"
+
+
+def _parse_jwks(jwks: Optional[JsonDict]) -> Optional["JsonWebKey"]:
+    """A helper function to parse a JWK dict into a JsonWebKey."""
+
+    if jwks is None:
+        return None
+
+    from authlib.jose.rfc7517 import JsonWebKey
+
+    return JsonWebKey.import_key(jwks)
+
+
+@attr.s(slots=True, frozen=True)
+class MSC3861:
+    """Configuration for MSC3861: Matrix architecture change to delegate authentication via OIDC"""
+
+    enabled: bool = attr.ib(default=False, validator=attr.validators.instance_of(bool))
+    """Whether to enable MSC3861 auth delegation."""
+
+    @enabled.validator
+    def _check_enabled(self, attribute: attr.Attribute, value: bool) -> None:
+        # Only allow enabling MSC3861 if authlib is installed
+        if value and not HAS_AUTHLIB:
+            raise ConfigError(
+                "MSC3861 is enabled but authlib is not installed. "
+                "Please install authlib to use MSC3861.",
+                ("experimental", "msc3861", "enabled"),
+            )
+
+    issuer: str = attr.ib(default="", validator=attr.validators.instance_of(str))
+    """The URL of the OIDC Provider."""
+
+    issuer_metadata: Optional[JsonDict] = attr.ib(default=None)
+    """The issuer metadata to use, otherwise discovered from /.well-known/openid-configuration as per MSC2965."""
+
+    client_id: str = attr.ib(
+        default="",
+        validator=attr.validators.instance_of(str),
+    )
+    """The client ID to use when calling the introspection endpoint."""
+
+    client_auth_method: ClientAuthMethod = attr.ib(
+        default=ClientAuthMethod.CLIENT_SECRET_POST, converter=ClientAuthMethod
+    )
+    """The auth method used when calling the introspection endpoint."""
+
+    client_secret: Optional[str] = attr.ib(
+        default=None,
+        validator=attr.validators.optional(attr.validators.instance_of(str)),
+    )
+    """
+    The client secret to use when calling the introspection endpoint,
+    when using any of the client_secret_* client auth methods.
+    """
+
+    jwk: Optional["JsonWebKey"] = attr.ib(default=None, converter=_parse_jwks)
+    """
+    The JWKS to use when calling the introspection endpoint,
+    when using the private_key_jwt client auth method.
+    """
+
+    @client_auth_method.validator
+    def _check_client_auth_method(
+        self, attribute: attr.Attribute, value: ClientAuthMethod
+    ) -> None:
+        # Check that the right client credentials are provided for the client auth method.
+        if not self.enabled:
+            return
+
+        if value == ClientAuthMethod.PRIVATE_KEY_JWT and self.jwk is None:
+            raise ConfigError(
+                "A JWKS must be provided when using the private_key_jwt client auth method",
+                ("experimental", "msc3861", "client_auth_method"),
+            )
+
+        if (
+            value
+            in (
+                ClientAuthMethod.CLIENT_SECRET_POST,
+                ClientAuthMethod.CLIENT_SECRET_BASIC,
+                ClientAuthMethod.CLIENT_SECRET_JWT,
+            )
+            and self.client_secret is None
+        ):
+            raise ConfigError(
+                f"A client secret must be provided when using the {value} client auth method",
+                ("experimental", "msc3861", "client_auth_method"),
+            )
+
+    account_management_url: Optional[str] = attr.ib(
+        default=None,
+        validator=attr.validators.optional(attr.validators.instance_of(str)),
+    )
+    """The URL of the My Account page on the OIDC Provider as per MSC2965."""
+
+    admin_token: Optional[str] = attr.ib(
+        default=None,
+        validator=attr.validators.optional(attr.validators.instance_of(str)),
+    )
+    """
+    A token that should be considered as an admin token.
+    This is used by the OIDC provider, to make admin calls to Synapse.
+    """
+
+    def check_config_conflicts(self, root: RootConfig) -> None:
+        """Checks for any configuration conflicts with other parts of Synapse.
+
+        Raises:
+            ConfigError: If there are any configuration conflicts.
+        """
+
+        if not self.enabled:
+            return
+
+        if (
+            root.auth.password_enabled_for_reauth
+            or root.auth.password_enabled_for_login
+        ):
+            raise ConfigError(
+                "Password auth cannot be enabled when OAuth delegation is enabled",
+                ("password_config", "enabled"),
+            )
+
+        if root.registration.enable_registration:
+            raise ConfigError(
+                "Registration cannot be enabled when OAuth delegation is enabled",
+                ("enable_registration",),
+            )
+
+        if (
+            root.oidc.oidc_enabled
+            or root.saml2.saml2_enabled
+            or root.cas.cas_enabled
+            or root.jwt.jwt_enabled
+        ):
+            raise ConfigError("SSO cannot be enabled when OAuth delegation is enabled")
+
+        if bool(root.authproviders.password_providers):
+            raise ConfigError(
+                "Password auth providers cannot be enabled when OAuth delegation is enabled"
+            )
+
+        if root.captcha.enable_registration_captcha:
+            raise ConfigError(
+                "CAPTCHA cannot be enabled when OAuth delegation is enabled",
+                ("captcha", "enable_registration_captcha"),
+            )
+
+        if root.auth.login_via_existing_enabled:
+            raise ConfigError(
+                "Login via existing session cannot be enabled when OAuth delegation is enabled",
+                ("login_via_existing_session", "enabled"),
+            )
+
+        if root.registration.refresh_token_lifetime:
+            raise ConfigError(
+                "refresh_token_lifetime cannot be set when OAuth delegation is enabled",
+                ("refresh_token_lifetime",),
+            )
+
+        if root.registration.nonrefreshable_access_token_lifetime:
+            raise ConfigError(
+                "nonrefreshable_access_token_lifetime cannot be set when OAuth delegation is enabled",
+                ("nonrefreshable_access_token_lifetime",),
+            )
+
+        if root.registration.session_lifetime:
+            raise ConfigError(
+                "session_lifetime cannot be set when OAuth delegation is enabled",
+                ("session_lifetime",),
+            )
+
+        if not root.experimental.msc3970_enabled:
+            raise ConfigError(
+                "experimental_features.msc3970_enabled must be 'true' when OAuth delegation is enabled",
+                ("experimental_features", "msc3970_enabled"),
+            )
+
 
 @attr.s(auto_attribs=True, frozen=True, slots=True)
 class MSC3866Config:
@@ -118,13 +319,6 @@ class ExperimentalConfig(Config):
         # MSC3881: Remotely toggle push notifications for another client
         self.msc3881_enabled: bool = experimental.get("msc3881_enabled", False)
 
-        # MSC3882: Allow an existing session to sign in a new session
-        self.msc3882_enabled: bool = experimental.get("msc3882_enabled", False)
-        self.msc3882_ui_auth: bool = experimental.get("msc3882_ui_auth", True)
-        self.msc3882_token_timeout = self.parse_duration(
-            experimental.get("msc3882_token_timeout", "5m")
-        )
-
         # MSC3874: Filtering /messages with rel_types / not_rel_types.
         self.msc3874_enabled: bool = experimental.get("msc3874_enabled", False)
 
@@ -182,8 +376,19 @@ class ExperimentalConfig(Config):
             "msc3981_recurse_relations", False
         )
 
+        # MSC3861: Matrix architecture change to delegate authentication via OIDC
+        try:
+            self.msc3861 = MSC3861(**experimental.get("msc3861", {}))
+        except ValueError as exc:
+            raise ConfigError(
+                "Invalid MSC3861 configuration", ("experimental", "msc3861")
+            ) from exc
+
         # MSC3970: Scope transaction IDs to devices
-        self.msc3970_enabled = experimental.get("msc3970_enabled", False)
+        self.msc3970_enabled = experimental.get("msc3970_enabled", self.msc3861.enabled)
+
+        # Check that none of the other config options conflict with MSC3861 when enabled
+        self.msc3861.check_config_conflicts(self.root)
 
         # MSC4009: E.164 Matrix IDs
         self.msc4009_e164_mxids = experimental.get("msc4009_e164_mxids", False)
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index f4ca70a698..149351dda0 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -515,7 +515,7 @@ class FederationServer(FederationBase):
                     logger.error(
                         "Failed to handle PDU %s",
                         event_id,
-                        exc_info=(f.type, f.value, f.getTracebackObject()),  # type: ignore
+                        exc_info=(f.type, f.value, f.getTracebackObject()),
                     )
                     return {"error": str(e)}
 
@@ -1247,7 +1247,7 @@ class FederationServer(FederationBase):
                     logger.error(
                         "Failed to handle PDU %s",
                         event.event_id,
-                        exc_info=(f.type, f.value, f.getTracebackObject()),  # type: ignore
+                        exc_info=(f.type, f.value, f.getTracebackObject()),
                     )
 
                 received_ts = await self.store.remove_received_event_from_staging(
@@ -1291,9 +1291,6 @@ class FederationServer(FederationBase):
                 return
             lock = new_lock
 
-    def __str__(self) -> str:
-        return "<ReplicationLayer(%s)>" % self.server_name
-
     async def exchange_third_party_invite(
         self, sender_user_id: str, target_user_id: str, room_id: str, signed: Dict
     ) -> None:
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index d001f2fb2f..4f986d90cb 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -274,6 +274,8 @@ class AuthHandler:
         # response.
         self._extra_attributes: Dict[str, SsoLoginExtraAttributes] = {}
 
+        self.msc3861_oauth_delegation_enabled = hs.config.experimental.msc3861.enabled
+
     async def validate_user_via_ui_auth(
         self,
         requester: Requester,
@@ -322,8 +324,12 @@ class AuthHandler:
 
             LimitExceededError if the ratelimiter's failed request count for this
                 user is too high to proceed
-
         """
+        if self.msc3861_oauth_delegation_enabled:
+            raise SynapseError(
+                HTTPStatus.INTERNAL_SERVER_ERROR, "UIA shouldn't be used with MSC3861"
+            )
+
         if not requester.access_token_id:
             raise ValueError("Cannot validate a user without an access token")
         if can_skip_ui_auth and self._ui_auth_session_timeout:
diff --git a/synapse/handlers/oidc.py b/synapse/handlers/oidc.py
index e7e0b5e049..24b68e0301 100644
--- a/synapse/handlers/oidc.py
+++ b/synapse/handlers/oidc.py
@@ -1354,7 +1354,7 @@ class OidcProvider:
         finish_request(request)
 
 
-class LogoutToken(JWTClaims):
+class LogoutToken(JWTClaims):  # type: ignore[misc]
     """
     Holds and verify claims of a logout token, as per
     https://openid.net/specs/openid-connect-backchannel-1_0.html#LogoutToken
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 63b35c8d62..d5257acb7d 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -360,7 +360,7 @@ class PaginationHandler:
         except Exception:
             f = Failure()
             logger.error(
-                "[purge] failed", exc_info=(f.type, f.value, f.getTracebackObject())  # type: ignore
+                "[purge] failed", exc_info=(f.type, f.value, f.getTracebackObject())
             )
             self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
             self._purges_by_id[purge_id].error = f.getErrorMessage()
@@ -689,7 +689,7 @@ class PaginationHandler:
             f = Failure()
             logger.error(
                 "failed",
-                exc_info=(f.type, f.value, f.getTracebackObject()),  # type: ignore
+                exc_info=(f.type, f.value, f.getTracebackObject()),
             )
             self._delete_by_id[delete_id].status = DeleteStatus.STATUS_FAILED
             self._delete_by_id[delete_id].error = f.getErrorMessage()
diff --git a/synapse/http/server.py b/synapse/http/server.py
index 101dc2e747..933172c873 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -108,9 +108,12 @@ def return_json_error(
 
     if f.check(SynapseError):
         # mypy doesn't understand that f.check asserts the type.
-        exc: SynapseError = f.value  # type: ignore
+        exc: SynapseError = f.value
         error_code = exc.code
         error_dict = exc.error_dict(config)
+        if exc.headers is not None:
+            for header, value in exc.headers.items():
+                request.setHeader(header, value)
         logger.info("%s SynapseError: %s - %s", request, error_code, exc.msg)
     elif f.check(CancelledError):
         error_code = HTTP_STATUS_REQUEST_CANCELLED
@@ -121,7 +124,7 @@ def return_json_error(
                 "Got cancellation before client disconnection from %r: %r",
                 request.request_metrics.name,
                 request,
-                exc_info=(f.type, f.value, f.getTracebackObject()),  # type: ignore[arg-type]
+                exc_info=(f.type, f.value, f.getTracebackObject()),
             )
     else:
         error_code = 500
@@ -131,7 +134,7 @@ def return_json_error(
             "Failed handle request via %r: %r",
             request.request_metrics.name,
             request,
-            exc_info=(f.type, f.value, f.getTracebackObject()),  # type: ignore[arg-type]
+            exc_info=(f.type, f.value, f.getTracebackObject()),
         )
 
     # Only respond with an error response if we haven't already started writing,
@@ -169,9 +172,12 @@ def return_html_error(
     """
     if f.check(CodeMessageException):
         # mypy doesn't understand that f.check asserts the type.
-        cme: CodeMessageException = f.value  # type: ignore
+        cme: CodeMessageException = f.value
         code = cme.code
         msg = cme.msg
+        if cme.headers is not None:
+            for header, value in cme.headers.items():
+                request.setHeader(header, value)
 
         if isinstance(cme, RedirectException):
             logger.info("%s redirect to %s", request, cme.location)
@@ -183,7 +189,7 @@ def return_html_error(
             logger.error(
                 "Failed handle request %r",
                 request,
-                exc_info=(f.type, f.value, f.getTracebackObject()),  # type: ignore[arg-type]
+                exc_info=(f.type, f.value, f.getTracebackObject()),
             )
     elif f.check(CancelledError):
         code = HTTP_STATUS_REQUEST_CANCELLED
@@ -193,7 +199,7 @@ def return_html_error(
             logger.error(
                 "Got cancellation before client disconnection when handling request %r",
                 request,
-                exc_info=(f.type, f.value, f.getTracebackObject()),  # type: ignore[arg-type]
+                exc_info=(f.type, f.value, f.getTracebackObject()),
             )
     else:
         code = HTTPStatus.INTERNAL_SERVER_ERROR
@@ -202,7 +208,7 @@ def return_html_error(
         logger.error(
             "Failed handle request %r",
             request,
-            exc_info=(f.type, f.value, f.getTracebackObject()),  # type: ignore[arg-type]
+            exc_info=(f.type, f.value, f.getTracebackObject()),
         )
 
     if isinstance(error_template, str):
diff --git a/synapse/media/oembed.py b/synapse/media/oembed.py
index c0eaf04be5..5ad9eec80b 100644
--- a/synapse/media/oembed.py
+++ b/synapse/media/oembed.py
@@ -14,7 +14,7 @@
 import html
 import logging
 import urllib.parse
-from typing import TYPE_CHECKING, List, Optional
+from typing import TYPE_CHECKING, List, Optional, cast
 
 import attr
 
@@ -98,7 +98,7 @@ class OEmbedProvider:
         # No match.
         return None
 
-    def autodiscover_from_html(self, tree: "etree.Element") -> Optional[str]:
+    def autodiscover_from_html(self, tree: "etree._Element") -> Optional[str]:
         """
         Search an HTML document for oEmbed autodiscovery information.
 
@@ -109,18 +109,22 @@ class OEmbedProvider:
             The URL to use for oEmbed information, or None if no URL was found.
         """
         # Search for link elements with the proper rel and type attributes.
-        for tag in tree.xpath(
-            "//link[@rel='alternate'][@type='application/json+oembed']"
+        # Cast: the type returned by xpath depends on the xpath expression: mypy can't deduce this.
+        for tag in cast(
+            List["etree._Element"],
+            tree.xpath("//link[@rel='alternate'][@type='application/json+oembed']"),
         ):
             if "href" in tag.attrib:
-                return tag.attrib["href"]
+                return cast(str, tag.attrib["href"])
 
         # Some providers (e.g. Flickr) use alternative instead of alternate.
-        for tag in tree.xpath(
-            "//link[@rel='alternative'][@type='application/json+oembed']"
+        # Cast: the type returned by xpath depends on the xpath expression: mypy can't deduce this.
+        for tag in cast(
+            List["etree._Element"],
+            tree.xpath("//link[@rel='alternative'][@type='application/json+oembed']"),
         ):
             if "href" in tag.attrib:
-                return tag.attrib["href"]
+                return cast(str, tag.attrib["href"])
 
         return None
 
@@ -212,11 +216,12 @@ class OEmbedProvider:
         return OEmbedResult(open_graph_response, author_name, cache_age)
 
 
-def _fetch_urls(tree: "etree.Element", tag_name: str) -> List[str]:
+def _fetch_urls(tree: "etree._Element", tag_name: str) -> List[str]:
     results = []
-    for tag in tree.xpath("//*/" + tag_name):
+    # Cast: the type returned by xpath depends on the xpath expression: mypy can't deduce this.
+    for tag in cast(List["etree._Element"], tree.xpath("//*/" + tag_name)):
         if "src" in tag.attrib:
-            results.append(tag.attrib["src"])
+            results.append(cast(str, tag.attrib["src"]))
     return results
 
 
@@ -244,11 +249,12 @@ def calc_description_and_urls(open_graph_response: JsonDict, html_body: str) ->
     parser = etree.HTMLParser(recover=True, encoding="utf-8")
 
     # Attempt to parse the body. If this fails, log and return no metadata.
-    tree = etree.fromstring(html_body, parser)
+    # TODO Develop of lxml-stubs has this correct.
+    tree = etree.fromstring(html_body, parser)  # type: ignore[arg-type]
 
     # The data was successfully parsed, but no tree was found.
     if tree is None:
-        return
+        return  # type: ignore[unreachable]
 
     # Attempt to find interesting URLs (images, videos, embeds).
     if "og:image" not in open_graph_response:
diff --git a/synapse/media/preview_html.py b/synapse/media/preview_html.py
index 516d0434f0..1bc7ccb7f3 100644
--- a/synapse/media/preview_html.py
+++ b/synapse/media/preview_html.py
@@ -24,6 +24,7 @@ from typing import (
     Optional,
     Set,
     Union,
+    cast,
 )
 
 if TYPE_CHECKING:
@@ -115,7 +116,7 @@ def _get_html_media_encodings(
 
 def decode_body(
     body: bytes, uri: str, content_type: Optional[str] = None
-) -> Optional["etree.Element"]:
+) -> Optional["etree._Element"]:
     """
     This uses lxml to parse the HTML document.
 
@@ -152,11 +153,12 @@ def decode_body(
 
     # Attempt to parse the body. Returns None if the body was successfully
     # parsed, but no tree was found.
-    return etree.fromstring(body, parser)
+    # TODO Develop of lxml-stubs has this correct.
+    return etree.fromstring(body, parser)  # type: ignore[arg-type]
 
 
 def _get_meta_tags(
-    tree: "etree.Element",
+    tree: "etree._Element",
     property: str,
     prefix: str,
     property_mapper: Optional[Callable[[str], Optional[str]]] = None,
@@ -175,9 +177,15 @@ def _get_meta_tags(
     Returns:
         A map of tag name to value.
     """
+    # This actually returns Dict[str, str], but the caller sets this as a variable
+    # which is Dict[str, Optional[str]].
     results: Dict[str, Optional[str]] = {}
-    for tag in tree.xpath(
-        f"//*/meta[starts-with(@{property}, '{prefix}:')][@content][not(@content='')]"
+    # Cast: the type returned by xpath depends on the xpath expression: mypy can't deduce this.
+    for tag in cast(
+        List["etree._Element"],
+        tree.xpath(
+            f"//*/meta[starts-with(@{property}, '{prefix}:')][@content][not(@content='')]"
+        ),
     ):
         # if we've got more than 50 tags, someone is taking the piss
         if len(results) >= 50:
@@ -187,14 +195,15 @@ def _get_meta_tags(
             )
             return {}
 
-        key = tag.attrib[property]
+        key = cast(str, tag.attrib[property])
         if property_mapper:
-            key = property_mapper(key)
+            new_key = property_mapper(key)
             # None is a special value used to ignore a value.
-            if key is None:
+            if new_key is None:
                 continue
+            key = new_key
 
-        results[key] = tag.attrib["content"]
+        results[key] = cast(str, tag.attrib["content"])
 
     return results
 
@@ -219,7 +228,7 @@ def _map_twitter_to_open_graph(key: str) -> Optional[str]:
     return "og" + key[7:]
 
 
-def parse_html_to_open_graph(tree: "etree.Element") -> Dict[str, Optional[str]]:
+def parse_html_to_open_graph(tree: "etree._Element") -> Dict[str, Optional[str]]:
     """
     Parse the HTML document into an Open Graph response.
 
@@ -276,24 +285,36 @@ def parse_html_to_open_graph(tree: "etree.Element") -> Dict[str, Optional[str]]:
 
     if "og:title" not in og:
         # Attempt to find a title from the title tag, or the biggest header on the page.
-        title = tree.xpath("((//title)[1] | (//h1)[1] | (//h2)[1] | (//h3)[1])/text()")
+        # Cast: the type returned by xpath depends on the xpath expression: mypy can't deduce this.
+        title = cast(
+            List["etree._ElementUnicodeResult"],
+            tree.xpath("((//title)[1] | (//h1)[1] | (//h2)[1] | (//h3)[1])/text()"),
+        )
         if title:
             og["og:title"] = title[0].strip()
         else:
             og["og:title"] = None
 
     if "og:image" not in og:
-        meta_image = tree.xpath(
-            "//*/meta[translate(@itemprop, 'IMAGE', 'image')='image'][not(@content='')]/@content[1]"
+        # Cast: the type returned by xpath depends on the xpath expression: mypy can't deduce this.
+        meta_image = cast(
+            List["etree._ElementUnicodeResult"],
+            tree.xpath(
+                "//*/meta[translate(@itemprop, 'IMAGE', 'image')='image'][not(@content='')]/@content[1]"
+            ),
         )
         # If a meta image is found, use it.
         if meta_image:
             og["og:image"] = meta_image[0]
         else:
             # Try to find images which are larger than 10px by 10px.
+            # Cast: the type returned by xpath depends on the xpath expression: mypy can't deduce this.
             #
             # TODO: consider inlined CSS styles as well as width & height attribs
-            images = tree.xpath("//img[@src][number(@width)>10][number(@height)>10]")
+            images = cast(
+                List["etree._Element"],
+                tree.xpath("//img[@src][number(@width)>10][number(@height)>10]"),
+            )
             images = sorted(
                 images,
                 key=lambda i: (
@@ -302,20 +323,29 @@ def parse_html_to_open_graph(tree: "etree.Element") -> Dict[str, Optional[str]]:
             )
             # If no images were found, try to find *any* images.
             if not images:
-                images = tree.xpath("//img[@src][1]")
+                # Cast: the type returned by xpath depends on the xpath expression: mypy can't deduce this.
+                images = cast(List["etree._Element"], tree.xpath("//img[@src][1]"))
             if images:
-                og["og:image"] = images[0].attrib["src"]
+                og["og:image"] = cast(str, images[0].attrib["src"])
 
             # Finally, fallback to the favicon if nothing else.
             else:
-                favicons = tree.xpath("//link[@href][contains(@rel, 'icon')]/@href[1]")
+                # Cast: the type returned by xpath depends on the xpath expression: mypy can't deduce this.
+                favicons = cast(
+                    List["etree._ElementUnicodeResult"],
+                    tree.xpath("//link[@href][contains(@rel, 'icon')]/@href[1]"),
+                )
                 if favicons:
                     og["og:image"] = favicons[0]
 
     if "og:description" not in og:
         # Check the first meta description tag for content.
-        meta_description = tree.xpath(
-            "//*/meta[translate(@name, 'DESCRIPTION', 'description')='description'][not(@content='')]/@content[1]"
+        # Cast: the type returned by xpath depends on the xpath expression: mypy can't deduce this.
+        meta_description = cast(
+            List["etree._ElementUnicodeResult"],
+            tree.xpath(
+                "//*/meta[translate(@name, 'DESCRIPTION', 'description')='description'][not(@content='')]/@content[1]"
+            ),
         )
         # If a meta description is found with content, use it.
         if meta_description:
@@ -332,7 +362,7 @@ def parse_html_to_open_graph(tree: "etree.Element") -> Dict[str, Optional[str]]:
     return og
 
 
-def parse_html_description(tree: "etree.Element") -> Optional[str]:
+def parse_html_description(tree: "etree._Element") -> Optional[str]:
     """
     Calculate a text description based on an HTML document.
 
@@ -368,6 +398,9 @@ def parse_html_description(tree: "etree.Element") -> Optional[str]:
         "canvas",
         "img",
         "picture",
+        # etree.Comment is a function which creates an etree._Comment element.
+        # The "tag" attribute of an etree._Comment instance is confusingly the
+        # etree.Comment function instead of a string.
         etree.Comment,
     }
 
@@ -381,8 +414,8 @@ def parse_html_description(tree: "etree.Element") -> Optional[str]:
 
 
 def _iterate_over_text(
-    tree: Optional["etree.Element"],
-    tags_to_ignore: Set[Union[str, "etree.Comment"]],
+    tree: Optional["etree._Element"],
+    tags_to_ignore: Set[object],
     stack_limit: int = 1024,
 ) -> Generator[str, None, None]:
     """Iterate over the tree returning text nodes in a depth first fashion,
@@ -402,7 +435,7 @@ def _iterate_over_text(
 
     # This is a stack whose items are elements to iterate over *or* strings
     # to be returned.
-    elements: List[Union[str, "etree.Element"]] = [tree]
+    elements: List[Union[str, "etree._Element"]] = [tree]
     while elements:
         el = elements.pop()
 
diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py
index 0e9f366cba..a8d6224a45 100644
--- a/synapse/module_api/__init__.py
+++ b/synapse/module_api/__init__.py
@@ -38,6 +38,7 @@ from twisted.web.resource import Resource
 
 from synapse.api import errors
 from synapse.api.errors import SynapseError
+from synapse.config import ConfigError
 from synapse.events import EventBase
 from synapse.events.presence_router import (
     GET_INTERESTED_USERS_CALLBACK,
@@ -121,6 +122,7 @@ from synapse.types import (
     JsonMapping,
     Requester,
     RoomAlias,
+    RoomID,
     StateMap,
     UserID,
     UserInfo,
@@ -252,6 +254,7 @@ class ModuleApi:
         self._device_handler = hs.get_device_handler()
         self.custom_template_dir = hs.config.server.custom_template_directory
         self._callbacks = hs.get_module_api_callbacks()
+        self.msc3861_oauth_delegation_enabled = hs.config.experimental.msc3861.enabled
 
         try:
             app_name = self._hs.config.email.email_app_name
@@ -419,6 +422,11 @@ class ModuleApi:
 
         Added in Synapse v1.46.0.
         """
+        if self.msc3861_oauth_delegation_enabled:
+            raise ConfigError(
+                "Cannot use password auth provider callbacks when OAuth delegation is enabled"
+            )
+
         return self._password_auth_provider.register_password_auth_provider_callbacks(
             check_3pid_auth=check_3pid_auth,
             on_logged_out=on_logged_out,
@@ -1563,6 +1571,32 @@ class ModuleApi:
             start_timestamp, end_timestamp
         )
 
+    async def get_canonical_room_alias(self, room_id: RoomID) -> Optional[RoomAlias]:
+        """
+        Retrieve the given room's current canonical alias.
+
+        A room may declare an alias as "canonical", meaning that it is the
+        preferred alias to use when referring to the room. This function
+        retrieves that alias from the room's state.
+
+        Added in Synapse v1.86.0.
+
+        Args:
+            room_id: The Room ID to find the alias of.
+
+        Returns:
+            None if the room ID does not exist, or if the room exists but has no canonical alias.
+            Otherwise, the parsed room alias.
+        """
+        room_alias_str = (
+            await self._storage_controllers.state.get_canonical_alias_for_room(
+                room_id.to_string()
+            )
+        )
+        if room_alias_str:
+            return RoomAlias.from_string(room_alias_str)
+        return None
+
     async def lookup_room_alias(self, room_alias: str) -> Tuple[str, List[str]]:
         """
         Get the room ID associated with a room alias.
diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py
index c729364839..fe8177ed4d 100644
--- a/synapse/rest/admin/__init__.py
+++ b/synapse/rest/admin/__init__.py
@@ -257,9 +257,11 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
     DeleteRoomStatusByRoomIdRestServlet(hs).register(http_server)
     JoinRoomAliasServlet(hs).register(http_server)
     VersionServlet(hs).register(http_server)
-    UserAdminServlet(hs).register(http_server)
+    if not hs.config.experimental.msc3861.enabled:
+        UserAdminServlet(hs).register(http_server)
     UserMembershipRestServlet(hs).register(http_server)
-    UserTokenRestServlet(hs).register(http_server)
+    if not hs.config.experimental.msc3861.enabled:
+        UserTokenRestServlet(hs).register(http_server)
     UserRestServletV2(hs).register(http_server)
     UsersRestServletV2(hs).register(http_server)
     UserMediaStatisticsRestServlet(hs).register(http_server)
@@ -274,9 +276,10 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
     RoomEventContextServlet(hs).register(http_server)
     RateLimitRestServlet(hs).register(http_server)
     UsernameAvailableRestServlet(hs).register(http_server)
-    ListRegistrationTokensRestServlet(hs).register(http_server)
-    NewRegistrationTokenRestServlet(hs).register(http_server)
-    RegistrationTokenRestServlet(hs).register(http_server)
+    if not hs.config.experimental.msc3861.enabled:
+        ListRegistrationTokensRestServlet(hs).register(http_server)
+        NewRegistrationTokenRestServlet(hs).register(http_server)
+        RegistrationTokenRestServlet(hs).register(http_server)
     DestinationMembershipRestServlet(hs).register(http_server)
     DestinationResetConnectionRestServlet(hs).register(http_server)
     DestinationRestServlet(hs).register(http_server)
@@ -306,10 +309,12 @@ def register_servlets_for_client_rest_resource(
     # The following resources can only be run on the main process.
     if hs.config.worker.worker_app is None:
         DeactivateAccountRestServlet(hs).register(http_server)
-        ResetPasswordRestServlet(hs).register(http_server)
+        if not hs.config.experimental.msc3861.enabled:
+            ResetPasswordRestServlet(hs).register(http_server)
     SearchUsersRestServlet(hs).register(http_server)
-    UserRegisterServlet(hs).register(http_server)
-    AccountValidityRenewServlet(hs).register(http_server)
+    if not hs.config.experimental.msc3861.enabled:
+        UserRegisterServlet(hs).register(http_server)
+        AccountValidityRenewServlet(hs).register(http_server)
 
     # Load the media repo ones if we're using them. Otherwise load the servlets which
     # don't need a media repo (typically readonly admin APIs).
diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py
index 932333ae57..407fe9c804 100644
--- a/synapse/rest/admin/users.py
+++ b/synapse/rest/admin/users.py
@@ -71,6 +71,7 @@ class UsersRestServletV2(RestServlet):
         self.auth = hs.get_auth()
         self.admin_handler = hs.get_admin_handler()
         self._msc3866_enabled = hs.config.experimental.msc3866.enabled
+        self._msc3861_enabled = hs.config.experimental.msc3861.enabled
 
     async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
         await assert_requester_is_admin(self.auth, request)
@@ -94,7 +95,14 @@ class UsersRestServletV2(RestServlet):
 
         user_id = parse_string(request, "user_id")
         name = parse_string(request, "name")
+
         guests = parse_boolean(request, "guests", default=True)
+        if self._msc3861_enabled and guests:
+            raise SynapseError(
+                HTTPStatus.BAD_REQUEST,
+                "The guests parameter is not supported when MSC3861 is enabled.",
+                errcode=Codes.INVALID_PARAM,
+            )
         deactivated = parse_boolean(request, "deactivated", default=False)
 
         # If support for MSC3866 is not enabled, apply no filtering based on the
diff --git a/synapse/rest/client/account.py b/synapse/rest/client/account.py
index 3d0c55daa0..679ab9f266 100644
--- a/synapse/rest/client/account.py
+++ b/synapse/rest/client/account.py
@@ -27,6 +27,7 @@ from synapse.api.constants import LoginType
 from synapse.api.errors import (
     Codes,
     InteractiveAuthIncompleteError,
+    NotFoundError,
     SynapseError,
     ThreepidValidationError,
 )
@@ -600,6 +601,9 @@ class ThreepidRestServlet(RestServlet):
     # ThreePidBindRestServelet.PostBody with an `alias_generator` to handle
     # `threePidCreds` versus `three_pid_creds`.
     async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
+        if self.hs.config.experimental.msc3861.enabled:
+            raise NotFoundError(errcode=Codes.UNRECOGNIZED)
+
         if not self.hs.config.registration.enable_3pid_changes:
             raise SynapseError(
                 400, "3PID changes are disabled on this server", Codes.FORBIDDEN
@@ -890,19 +894,21 @@ class AccountStatusRestServlet(RestServlet):
 
 def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
     if hs.config.worker.worker_app is None:
-        EmailPasswordRequestTokenRestServlet(hs).register(http_server)
-        PasswordRestServlet(hs).register(http_server)
-        DeactivateAccountRestServlet(hs).register(http_server)
-        EmailThreepidRequestTokenRestServlet(hs).register(http_server)
-        MsisdnThreepidRequestTokenRestServlet(hs).register(http_server)
-        AddThreepidEmailSubmitTokenServlet(hs).register(http_server)
-        AddThreepidMsisdnSubmitTokenServlet(hs).register(http_server)
+        if not hs.config.experimental.msc3861.enabled:
+            EmailPasswordRequestTokenRestServlet(hs).register(http_server)
+            DeactivateAccountRestServlet(hs).register(http_server)
+            PasswordRestServlet(hs).register(http_server)
+            EmailThreepidRequestTokenRestServlet(hs).register(http_server)
+            MsisdnThreepidRequestTokenRestServlet(hs).register(http_server)
+            AddThreepidEmailSubmitTokenServlet(hs).register(http_server)
+            AddThreepidMsisdnSubmitTokenServlet(hs).register(http_server)
     ThreepidRestServlet(hs).register(http_server)
     if hs.config.worker.worker_app is None:
-        ThreepidAddRestServlet(hs).register(http_server)
         ThreepidBindRestServlet(hs).register(http_server)
         ThreepidUnbindRestServlet(hs).register(http_server)
-        ThreepidDeleteRestServlet(hs).register(http_server)
+        if not hs.config.experimental.msc3861.enabled:
+            ThreepidAddRestServlet(hs).register(http_server)
+            ThreepidDeleteRestServlet(hs).register(http_server)
     WhoamiRestServlet(hs).register(http_server)
 
     if hs.config.worker.worker_app is None and hs.config.experimental.msc3720_enabled:
diff --git a/synapse/rest/client/capabilities.py b/synapse/rest/client/capabilities.py
index 0dbf8f6818..3154b9f77e 100644
--- a/synapse/rest/client/capabilities.py
+++ b/synapse/rest/client/capabilities.py
@@ -65,6 +65,9 @@ class CapabilitiesRestServlet(RestServlet):
                 "m.3pid_changes": {
                     "enabled": self.config.registration.enable_3pid_changes
                 },
+                "m.get_login_token": {
+                    "enabled": self.config.auth.login_via_existing_enabled,
+                },
             }
         }
 
diff --git a/synapse/rest/client/devices.py b/synapse/rest/client/devices.py
index e97d0bf475..38dff9703f 100644
--- a/synapse/rest/client/devices.py
+++ b/synapse/rest/client/devices.py
@@ -19,7 +19,7 @@ from typing import TYPE_CHECKING, List, Optional, Tuple
 from pydantic import Extra, StrictStr
 
 from synapse.api import errors
-from synapse.api.errors import NotFoundError
+from synapse.api.errors import NotFoundError, UnrecognizedRequestError
 from synapse.handlers.device import DeviceHandler
 from synapse.http.server import HttpServer
 from synapse.http.servlet import (
@@ -135,6 +135,7 @@ class DeviceRestServlet(RestServlet):
         self.device_handler = handler
         self.auth_handler = hs.get_auth_handler()
         self._msc3852_enabled = hs.config.experimental.msc3852_enabled
+        self._msc3861_oauth_delegation_enabled = hs.config.experimental.msc3861.enabled
 
     async def on_GET(
         self, request: SynapseRequest, device_id: str
@@ -166,6 +167,9 @@ class DeviceRestServlet(RestServlet):
     async def on_DELETE(
         self, request: SynapseRequest, device_id: str
     ) -> Tuple[int, JsonDict]:
+        if self._msc3861_oauth_delegation_enabled:
+            raise UnrecognizedRequestError(code=404)
+
         requester = await self.auth.get_user_by_req(request)
 
         try:
@@ -344,7 +348,10 @@ class ClaimDehydratedDeviceServlet(RestServlet):
 
 
 def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
-    if hs.config.worker.worker_app is None:
+    if (
+        hs.config.worker.worker_app is None
+        and not hs.config.experimental.msc3861.enabled
+    ):
         DeleteDevicesRestServlet(hs).register(http_server)
     DevicesRestServlet(hs).register(http_server)
     if hs.config.worker.worker_app is None:
diff --git a/synapse/rest/client/keys.py b/synapse/rest/client/keys.py
index 413edd8a4d..70b8be1aa2 100644
--- a/synapse/rest/client/keys.py
+++ b/synapse/rest/client/keys.py
@@ -17,9 +17,10 @@
 import logging
 import re
 from collections import Counter
+from http import HTTPStatus
 from typing import TYPE_CHECKING, Any, Dict, Optional, Tuple
 
-from synapse.api.errors import InvalidAPICallError, SynapseError
+from synapse.api.errors import Codes, InvalidAPICallError, SynapseError
 from synapse.http.server import HttpServer
 from synapse.http.servlet import (
     RestServlet,
@@ -375,9 +376,29 @@ class SigningKeyUploadServlet(RestServlet):
         user_id = requester.user.to_string()
         body = parse_json_object_from_request(request)
 
-        if self.hs.config.experimental.msc3967_enabled:
-            if await self.e2e_keys_handler.is_cross_signing_set_up_for_user(user_id):
-                # If we already have a master key then cross signing is set up and we require UIA to reset
+        is_cross_signing_setup = (
+            await self.e2e_keys_handler.is_cross_signing_set_up_for_user(user_id)
+        )
+
+        # Before MSC3967 we required UIA both when setting up cross signing for the
+        # first time and when resetting the device signing key. With MSC3967 we only
+        # require UIA when resetting cross-signing, and not when setting up the first
+        # time. Because there is no UIA in MSC3861, for now we throw an error if the
+        # user tries to reset the device signing key when MSC3861 is enabled, but allow
+        # first-time setup.
+        if self.hs.config.experimental.msc3861.enabled:
+            # There is no way to reset the device signing key with MSC3861
+            if is_cross_signing_setup:
+                raise SynapseError(
+                    HTTPStatus.NOT_IMPLEMENTED,
+                    "Resetting cross signing keys is not yet supported with MSC3861",
+                    Codes.UNRECOGNIZED,
+                )
+            # But first-time setup is fine
+
+        elif self.hs.config.experimental.msc3967_enabled:
+            # If we already have a master key then cross signing is set up and we require UIA to reset
+            if is_cross_signing_setup:
                 await self.auth_handler.validate_user_via_ui_auth(
                     requester,
                     request,
@@ -387,6 +408,7 @@ class SigningKeyUploadServlet(RestServlet):
                     can_skip_ui_auth=False,
                 )
             # Otherwise we don't require UIA since we are setting up cross signing for first time
+
         else:
             # Previous behaviour is to always require UIA but allow it to be skipped
             await self.auth_handler.validate_user_via_ui_auth(
diff --git a/synapse/rest/client/login.py b/synapse/rest/client/login.py
index 6ca61ffbd0..6493b00bb8 100644
--- a/synapse/rest/client/login.py
+++ b/synapse/rest/client/login.py
@@ -104,6 +104,9 @@ class LoginRestServlet(RestServlet):
             and hs.config.experimental.msc3866.require_approval_for_new_accounts
         )
 
+        # Whether get login token is enabled.
+        self._get_login_token_enabled = hs.config.auth.login_via_existing_enabled
+
         self.auth = hs.get_auth()
 
         self.clock = hs.get_clock()
@@ -142,6 +145,9 @@ class LoginRestServlet(RestServlet):
             # to SSO.
             flows.append({"type": LoginRestServlet.CAS_TYPE})
 
+        # The login token flow requires m.login.token to be advertised.
+        support_login_token_flow = self._get_login_token_enabled
+
         if self.cas_enabled or self.saml2_enabled or self.oidc_enabled:
             flows.append(
                 {
@@ -153,14 +159,23 @@ class LoginRestServlet(RestServlet):
                 }
             )
 
-            # While it's valid for us to advertise this login type generally,
-            # synapse currently only gives out these tokens as part of the
-            # SSO login flow.
-            # Generally we don't want to advertise login flows that clients
-            # don't know how to implement, since they (currently) will always
-            # fall back to the fallback API if they don't understand one of the
-            # login flow types returned.
-            flows.append({"type": LoginRestServlet.TOKEN_TYPE})
+            # SSO requires a login token to be generated, so we need to advertise that flow
+            support_login_token_flow = True
+
+        # While it's valid for us to advertise this login type generally,
+        # synapse currently only gives out these tokens as part of the
+        # SSO login flow or as part of login via an existing session.
+        #
+        # Generally we don't want to advertise login flows that clients
+        # don't know how to implement, since they (currently) will always
+        # fall back to the fallback API if they don't understand one of the
+        # login flow types returned.
+        if support_login_token_flow:
+            tokenTypeFlow: Dict[str, Any] = {"type": LoginRestServlet.TOKEN_TYPE}
+            # If the login token flow is enabled advertise the get_login_token flag.
+            if self._get_login_token_enabled:
+                tokenTypeFlow["get_login_token"] = True
+            flows.append(tokenTypeFlow)
 
         flows.extend({"type": t} for t in self.auth_handler.get_supported_login_types())
 
@@ -633,6 +648,9 @@ class CasTicketServlet(RestServlet):
 
 
 def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
+    if hs.config.experimental.msc3861.enabled:
+        return
+
     LoginRestServlet(hs).register(http_server)
     if (
         hs.config.worker.worker_app is None
diff --git a/synapse/rest/client/login_token_request.py b/synapse/rest/client/login_token_request.py
index 43ea21d5e6..b1629f94a5 100644
--- a/synapse/rest/client/login_token_request.py
+++ b/synapse/rest/client/login_token_request.py
@@ -15,6 +15,7 @@
 import logging
 from typing import TYPE_CHECKING, Tuple
 
+from synapse.api.ratelimiting import Ratelimiter
 from synapse.http.server import HttpServer
 from synapse.http.servlet import RestServlet, parse_json_object_from_request
 from synapse.http.site import SynapseRequest
@@ -33,7 +34,7 @@ class LoginTokenRequestServlet(RestServlet):
 
     Request:
 
-    POST /login/token HTTP/1.1
+    POST /login/get_token HTTP/1.1
     Content-Type: application/json
 
     {}
@@ -43,30 +44,45 @@ class LoginTokenRequestServlet(RestServlet):
     HTTP/1.1 200 OK
     {
         "login_token": "ABDEFGH",
-        "expires_in": 3600,
+        "expires_in_ms": 3600000,
     }
     """
 
-    PATTERNS = client_patterns(
-        "/org.matrix.msc3882/login/token$", releases=[], v1=False, unstable=True
-    )
+    PATTERNS = [
+        *client_patterns(
+            "/login/get_token$", releases=["v1"], v1=False, unstable=False
+        ),
+        # TODO: this is no longer needed once unstable MSC3882 does not need to be supported:
+        *client_patterns(
+            "/org.matrix.msc3882/login/token$", releases=[], v1=False, unstable=True
+        ),
+    ]
 
     def __init__(self, hs: "HomeServer"):
         super().__init__()
         self.auth = hs.get_auth()
-        self.store = hs.get_datastores().main
-        self.clock = hs.get_clock()
-        self.server_name = hs.config.server.server_name
+        self._main_store = hs.get_datastores().main
         self.auth_handler = hs.get_auth_handler()
-        self.token_timeout = hs.config.experimental.msc3882_token_timeout
-        self.ui_auth = hs.config.experimental.msc3882_ui_auth
+        self.token_timeout = hs.config.auth.login_via_existing_token_timeout
+        self._require_ui_auth = hs.config.auth.login_via_existing_require_ui_auth
+
+        # Ratelimit aggressively to a maxmimum of 1 request per minute.
+        #
+        # This endpoint can be used to spawn additional sessions and could be
+        # abused by a malicious client to create many sessions.
+        self._ratelimiter = Ratelimiter(
+            store=self._main_store,
+            clock=hs.get_clock(),
+            rate_hz=1 / 60,
+            burst_count=1,
+        )
 
     @interactive_auth_handler
     async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
         requester = await self.auth.get_user_by_req(request)
         body = parse_json_object_from_request(request)
 
-        if self.ui_auth:
+        if self._require_ui_auth:
             await self.auth_handler.validate_user_via_ui_auth(
                 requester,
                 request,
@@ -75,9 +91,12 @@ class LoginTokenRequestServlet(RestServlet):
                 can_skip_ui_auth=False,  # Don't allow skipping of UI auth
             )
 
+        # Ensure that this endpoint isn't being used too often. (Ensure this is
+        # done *after* UI auth.)
+        await self._ratelimiter.ratelimit(None, requester.user.to_string().lower())
+
         login_token = await self.auth_handler.create_login_token_for_user_id(
             user_id=requester.user.to_string(),
-            auth_provider_id="org.matrix.msc3882.login_token_request",
             duration_ms=self.token_timeout,
         )
 
@@ -85,11 +104,13 @@ class LoginTokenRequestServlet(RestServlet):
             200,
             {
                 "login_token": login_token,
+                # TODO: this is no longer needed once unstable MSC3882 does not need to be supported:
                 "expires_in": self.token_timeout // 1000,
+                "expires_in_ms": self.token_timeout,
             },
         )
 
 
 def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
-    if hs.config.experimental.msc3882_enabled:
+    if hs.config.auth.login_via_existing_enabled:
         LoginTokenRequestServlet(hs).register(http_server)
diff --git a/synapse/rest/client/logout.py b/synapse/rest/client/logout.py
index 6d34625ad5..94ad90942f 100644
--- a/synapse/rest/client/logout.py
+++ b/synapse/rest/client/logout.py
@@ -80,5 +80,8 @@ class LogoutAllRestServlet(RestServlet):
 
 
 def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
+    if hs.config.experimental.msc3861.enabled:
+        return
+
     LogoutRestServlet(hs).register(http_server)
     LogoutAllRestServlet(hs).register(http_server)
diff --git a/synapse/rest/client/register.py b/synapse/rest/client/register.py
index 7f84a17e29..d59669f0b6 100644
--- a/synapse/rest/client/register.py
+++ b/synapse/rest/client/register.py
@@ -869,6 +869,74 @@ class RegisterRestServlet(RestServlet):
         return 200, result
 
 
+class RegisterAppServiceOnlyRestServlet(RestServlet):
+    """An alternative registration API endpoint that only allows ASes to register
+
+    This replaces the regular /register endpoint if MSC3861. There are two notable
+    differences with the regular /register endpoint:
+     - It only allows the `m.login.application_service` login type
+     - It does not create a device or access token for the just-registered user
+
+    Note that the exact behaviour of this endpoint is not yet finalised. It should be
+    just good enough to make most ASes work.
+    """
+
+    PATTERNS = client_patterns("/register$")
+    CATEGORY = "Registration/login requests"
+
+    def __init__(self, hs: "HomeServer"):
+        super().__init__()
+
+        self.auth = hs.get_auth()
+        self.registration_handler = hs.get_registration_handler()
+        self.ratelimiter = hs.get_registration_ratelimiter()
+
+    @interactive_auth_handler
+    async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
+        body = parse_json_object_from_request(request)
+
+        client_addr = request.getClientAddress().host
+
+        await self.ratelimiter.ratelimit(None, client_addr, update=False)
+
+        kind = parse_string(request, "kind", default="user")
+
+        if kind == "guest":
+            raise SynapseError(403, "Guest access is disabled")
+        elif kind != "user":
+            raise UnrecognizedRequestError(
+                f"Do not understand membership kind: {kind}",
+            )
+
+        # Pull out the provided username and do basic sanity checks early since
+        # the auth layer will store these in sessions.
+        desired_username = body.get("username")
+        if not isinstance(desired_username, str) or len(desired_username) > 512:
+            raise SynapseError(400, "Invalid username")
+
+        # Allow only ASes to use this API.
+        if body.get("type") != APP_SERVICE_REGISTRATION_TYPE:
+            raise SynapseError(403, "Non-application service registration type")
+
+        if not self.auth.has_access_token(request):
+            raise SynapseError(
+                400,
+                "Appservice token must be provided when using a type of m.login.application_service",
+            )
+
+        # XXX we should check that desired_username is valid. Currently
+        # we give appservices carte blanche for any insanity in mxids,
+        # because the IRC bridges rely on being able to register stupid
+        # IDs.
+
+        as_token = self.auth.get_access_token_from_request(request)
+
+        user_id = await self.registration_handler.appservice_register(
+            desired_username, as_token
+        )
+        return 200, {"user_id": user_id}
+
+
 def _calculate_registration_flows(
     config: HomeServerConfig, auth_handler: AuthHandler
 ) -> List[List[str]]:
@@ -955,6 +1023,10 @@ def _calculate_registration_flows(
 
 
 def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
+    if hs.config.experimental.msc3861.enabled:
+        RegisterAppServiceOnlyRestServlet(hs).register(http_server)
+        return
+
     if hs.config.worker.worker_app is None:
         EmailRegisterRequestTokenRestServlet(hs).register(http_server)
         MsisdnRegisterRequestTokenRestServlet(hs).register(http_server)
diff --git a/synapse/rest/client/versions.py b/synapse/rest/client/versions.py
index 32df054f56..547bf34df1 100644
--- a/synapse/rest/client/versions.py
+++ b/synapse/rest/client/versions.py
@@ -113,8 +113,8 @@ class VersionsRestServlet(RestServlet):
                     "fi.mau.msc2815": self.config.experimental.msc2815_enabled,
                     # Adds a ping endpoint for appservices to check HS->AS connection
                     "fi.mau.msc2659.stable": True,  # TODO: remove when "v1.7" is added above
-                    # Adds support for login token requests as per MSC3882
-                    "org.matrix.msc3882": self.config.experimental.msc3882_enabled,
+                    # TODO: this is no longer needed once unstable MSC3882 does not need to be supported:
+                    "org.matrix.msc3882": self.config.auth.login_via_existing_enabled,
                     # Adds support for remotely enabling/disabling pushers, as per MSC3881
                     "org.matrix.msc3881": self.config.experimental.msc3881_enabled,
                     # Adds support for filtering /messages by event relation.
diff --git a/synapse/rest/synapse/client/__init__.py b/synapse/rest/synapse/client/__init__.py
index e55924f597..57335fb913 100644
--- a/synapse/rest/synapse/client/__init__.py
+++ b/synapse/rest/synapse/client/__init__.py
@@ -46,6 +46,12 @@ def build_synapse_client_resource_tree(hs: "HomeServer") -> Mapping[str, Resourc
         "/_synapse/client/unsubscribe": UnsubscribeResource(hs),
     }
 
+    # Expose the JWKS endpoint if OAuth2 delegation is enabled
+    if hs.config.experimental.msc3861.enabled:
+        from synapse.rest.synapse.client.jwks import JwksResource
+
+        resources["/_synapse/jwks"] = JwksResource(hs)
+
     # provider-specific SSO bits. Only load these if they are enabled, since they
     # rely on optional dependencies.
     if hs.config.oidc.oidc_enabled:
diff --git a/synapse/rest/synapse/client/jwks.py b/synapse/rest/synapse/client/jwks.py
new file mode 100644
index 0000000000..7c0a1223fb
--- /dev/null
+++ b/synapse/rest/synapse/client/jwks.py
@@ -0,0 +1,70 @@
+# Copyright 2022 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+from typing import TYPE_CHECKING, Tuple
+
+from synapse.http.server import DirectServeJsonResource
+from synapse.http.site import SynapseRequest
+from synapse.types import JsonDict
+
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
+logger = logging.getLogger(__name__)
+
+
+class JwksResource(DirectServeJsonResource):
+    def __init__(self, hs: "HomeServer"):
+        super().__init__(extract_context=True)
+
+        # Parameters that are allowed to be exposed in the public key.
+        # This is done manually, because authlib's private to public key conversion
+        # is unreliable depending on the version. Instead, we just serialize the private
+        # key and only keep the public parameters.
+        # List from https://www.iana.org/assignments/jose/jose.xhtml#web-key-parameters
+        public_parameters = {
+            "kty",
+            "use",
+            "key_ops",
+            "alg",
+            "kid",
+            "x5u",
+            "x5c",
+            "x5t",
+            "x5t#S256",
+            "crv",
+            "x",
+            "y",
+            "n",
+            "e",
+            "ext",
+        }
+
+        key = hs.config.experimental.msc3861.jwk
+
+        if key is not None:
+            private_key = key.as_dict()
+            public_key = {
+                k: v for k, v in private_key.items() if k in public_parameters
+            }
+            keys = [public_key]
+        else:
+            keys = []
+
+        self.res = {
+            "keys": keys,
+        }
+
+    async def _async_render_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
+        return 200, self.res
diff --git a/synapse/rest/well_known.py b/synapse/rest/well_known.py
index e2174fdfea..b8b4b5379b 100644
--- a/synapse/rest/well_known.py
+++ b/synapse/rest/well_known.py
@@ -44,6 +44,16 @@ class WellKnownBuilder:
                 "base_url": self._config.registration.default_identity_server
             }
 
+        # We use the MSC3861 values as they are used by multiple MSCs
+        if self._config.experimental.msc3861.enabled:
+            result["org.matrix.msc2965.authentication"] = {
+                "issuer": self._config.experimental.msc3861.issuer
+            }
+            if self._config.experimental.msc3861.account_management_url is not None:
+                result["org.matrix.msc2965.authentication"][
+                    "account"
+                ] = self._config.experimental.msc3861.account_management_url
+
         if self._config.server.extra_well_known_client_content:
             for (
                 key,
diff --git a/synapse/server.py b/synapse/server.py
index cce5fb66ff..0f36ef69cb 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -31,6 +31,7 @@ from twisted.web.iweb import IPolicyForHTTPS
 from twisted.web.resource import Resource
 
 from synapse.api.auth import Auth
+from synapse.api.auth.internal import InternalAuth
 from synapse.api.auth_blocking import AuthBlocking
 from synapse.api.filtering import Filtering
 from synapse.api.ratelimiting import Ratelimiter, RequestRatelimiter
@@ -427,7 +428,11 @@ class HomeServer(metaclass=abc.ABCMeta):
 
     @cache_in_self
     def get_auth(self) -> Auth:
-        return Auth(self)
+        if self.config.experimental.msc3861.enabled:
+            from synapse.api.auth.msc3861_delegated import MSC3861DelegatedAuth
+
+            return MSC3861DelegatedAuth(self)
+        return InternalAuth(self)
 
     @cache_in_self
     def get_auth_blocking(self) -> AuthBlocking:
diff --git a/synapse/storage/controllers/state.py b/synapse/storage/controllers/state.py
index 7089b0a1d8..233df7cce2 100644
--- a/synapse/storage/controllers/state.py
+++ b/synapse/storage/controllers/state.py
@@ -485,7 +485,7 @@ class StateStorageController:
         if not event:
             return None
 
-        return event.content.get("canonical_alias")
+        return event.content.get("alias")
 
     @trace
     @tag_args
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index a67fdb3c22..f677d048aa 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -1941,6 +1941,10 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
             user_id,
             stream_ids[-1],
         )
+        txn.call_after(
+            self._get_e2e_device_keys_for_federation_query_inner.invalidate,
+            (user_id,),
+        )
 
         min_stream_id = stream_ids[0]
 
diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py
index 4bc391f213..91ae9c457d 100644
--- a/synapse/storage/databases/main/end_to_end_keys.py
+++ b/synapse/storage/databases/main/end_to_end_keys.py
@@ -16,6 +16,7 @@
 import abc
 from typing import (
     TYPE_CHECKING,
+    Any,
     Collection,
     Dict,
     Iterable,
@@ -39,6 +40,7 @@ from synapse.appservice import (
     TransactionUnusedFallbackKeys,
 )
 from synapse.logging.opentracing import log_kv, set_tag, trace
+from synapse.replication.tcp.streams._base import DeviceListsStream
 from synapse.storage._base import SQLBaseStore, db_to_json
 from synapse.storage.database import (
     DatabasePool,
@@ -104,6 +106,23 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
             self.hs.config.federation.allow_device_name_lookup_over_federation
         )
 
+    def process_replication_rows(
+        self,
+        stream_name: str,
+        instance_name: str,
+        token: int,
+        rows: Iterable[Any],
+    ) -> None:
+        if stream_name == DeviceListsStream.NAME:
+            for row in rows:
+                assert isinstance(row, DeviceListsStream.DeviceListsStreamRow)
+                if row.entity.startswith("@"):
+                    self._get_e2e_device_keys_for_federation_query_inner.invalidate(
+                        (row.entity,)
+                    )
+
+        super().process_replication_rows(stream_name, instance_name, token, rows)
+
     async def get_e2e_device_keys_for_federation_query(
         self, user_id: str
     ) -> Tuple[int, List[JsonDict]]:
@@ -114,6 +133,50 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
         """
         now_stream_id = self.get_device_stream_token()
 
+        # We need to be careful with the caching here, as we need to always
+        # return *all* persisted devices, however there may be a lag between a
+        # new device being persisted and the cache being invalidated.
+        cached_results = (
+            self._get_e2e_device_keys_for_federation_query_inner.cache.get_immediate(
+                user_id, None
+            )
+        )
+        if cached_results is not None:
+            # Check that there have been no new devices added by another worker
+            # after the cache. This should be quick as there should be few rows
+            # with a higher stream ordering.
+            #
+            # Note that we invalidate based on the device stream, so we only
+            # have to check for potential invalidations after the
+            # `now_stream_id`.
+            sql = """
+                SELECT user_id FROM device_lists_stream
+                WHERE stream_id >= ? AND user_id = ?
+            """
+            rows = await self.db_pool.execute(
+                "get_e2e_device_keys_for_federation_query_check",
+                None,
+                sql,
+                now_stream_id,
+                user_id,
+            )
+            if not rows:
+                # No new rows, so cache is still valid.
+                return now_stream_id, cached_results
+
+            # There has, so let's invalidate the cache and run the query.
+            self._get_e2e_device_keys_for_federation_query_inner.invalidate((user_id,))
+
+        results = await self._get_e2e_device_keys_for_federation_query_inner(user_id)
+
+        return now_stream_id, results
+
+    @cached(iterable=True)
+    async def _get_e2e_device_keys_for_federation_query_inner(
+        self, user_id: str
+    ) -> List[JsonDict]:
+        """Get all devices (with any device keys) for a user"""
+
         devices = await self.get_e2e_device_keys_and_signatures([(user_id, None)])
 
         if devices:
@@ -134,9 +197,9 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
 
                 results.append(result)
 
-            return now_stream_id, results
+            return results
 
-        return now_stream_id, []
+        return []
 
     @trace
     @cancellable
diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py
index 42baf8ac6b..dfc95e8ebb 100644
--- a/synapse/types/__init__.py
+++ b/synapse/types/__init__.py
@@ -131,6 +131,7 @@ class Requester:
     user: "UserID"
     access_token_id: Optional[int]
     is_guest: bool
+    scope: Set[str]
     shadow_banned: bool
     device_id: Optional[str]
     app_service: Optional["ApplicationService"]
@@ -147,6 +148,7 @@ class Requester:
             "user_id": self.user.to_string(),
             "access_token_id": self.access_token_id,
             "is_guest": self.is_guest,
+            "scope": list(self.scope),
             "shadow_banned": self.shadow_banned,
             "device_id": self.device_id,
             "app_server_id": self.app_service.id if self.app_service else None,
@@ -175,6 +177,7 @@ class Requester:
             user=UserID.from_string(input["user_id"]),
             access_token_id=input["access_token_id"],
             is_guest=input["is_guest"],
+            scope=set(input["scope"]),
             shadow_banned=input["shadow_banned"],
             device_id=input["device_id"],
             app_service=appservice,
@@ -186,6 +189,7 @@ def create_requester(
     user_id: Union[str, "UserID"],
     access_token_id: Optional[int] = None,
     is_guest: bool = False,
+    scope: StrCollection = (),
     shadow_banned: bool = False,
     device_id: Optional[str] = None,
     app_service: Optional["ApplicationService"] = None,
@@ -199,6 +203,7 @@ def create_requester(
         access_token_id:  *ID* of the access token used for this
             request, or None if it came via the appservice API or similar
         is_guest:  True if the user making this request is a guest user
+        scope:  the scope of the access token used for this request, if any
         shadow_banned:  True if the user making this request is shadow-banned.
         device_id:  device_id which was set at authentication time
         app_service:  the AS requesting on behalf of the user
@@ -215,10 +220,13 @@ def create_requester(
     if authenticated_entity is None:
         authenticated_entity = user_id.to_string()
 
+    scope = set(scope)
+
     return Requester(
         user_id,
         access_token_id,
         is_guest,
+        scope,
         shadow_banned,
         device_id,
         app_service,
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index 9ddd26ccaa..7ea0c4c36b 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -76,7 +76,7 @@ def unwrapFirstError(failure: Failure) -> Failure:
     # the subFailure's value, which will do a better job of preserving stacktraces.
     # (actually, you probably want to use yieldable_gather_results anyway)
     failure.trap(defer.FirstError)
-    return failure.value.subFailure  # type: ignore[union-attr]  # Issue in Twisted's annotations
+    return failure.value.subFailure
 
 
 P = ParamSpec("P")
@@ -178,7 +178,7 @@ def log_failure(
     """
 
     logger.error(
-        msg, exc_info=(failure.type, failure.value, failure.getTracebackObject())  # type: ignore[arg-type]
+        msg, exc_info=(failure.type, failure.value, failure.getTracebackObject())
     )
 
     if not consumeErrors:
diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py
index 01e3cd46f6..4041e49e71 100644
--- a/synapse/util/async_helpers.py
+++ b/synapse/util/async_helpers.py
@@ -138,7 +138,7 @@ class ObservableDeferred(Generic[_T], AbstractObservableDeferred[_T]):
             for observer in observers:
                 # This is a little bit of magic to correctly propagate stack
                 # traces when we `await` on one of the observer deferreds.
-                f.value.__failure__ = f  # type: ignore[union-attr]
+                f.value.__failure__ = f
                 try:
                     observer.errback(f)
                 except Exception as e:
diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py
index 452d5d04c1..ed0da17227 100644
--- a/synapse/util/caches/lrucache.py
+++ b/synapse/util/caches/lrucache.py
@@ -93,10 +93,8 @@ VT = TypeVar("VT")
 # a general type var, distinct from either KT or VT
 T = TypeVar("T")
 
-P = TypeVar("P")
 
-
-class _TimedListNode(ListNode[P]):
+class _TimedListNode(ListNode[T]):
     """A `ListNode` that tracks last access time."""
 
     __slots__ = ["last_access_ts_secs"]
@@ -821,7 +819,7 @@ class AsyncLruCache(Generic[KT, VT]):
     utilize external cache systems that require await behaviour to be created.
     """
 
-    def __init__(self, *args, **kwargs):  # type: ignore
+    def __init__(self, *args: Any, **kwargs: Any):
         self._lru_cache: LruCache[KT, VT] = LruCache(*args, **kwargs)
 
     async def get(