summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/16634.misc1
-rw-r--r--docs/admin_api/user_admin_api.md37
-rw-r--r--synapse/handlers/e2e_keys.py20
-rw-r--r--synapse/rest/admin/__init__.py2
-rw-r--r--synapse/rest/admin/users.py40
-rw-r--r--synapse/rest/client/keys.py16
-rw-r--r--synapse/storage/databases/main/end_to_end_keys.py84
-rw-r--r--synapse/storage/schema/main/delta/83/05_cross_signing_key_update_grant.sql15
-rw-r--r--tests/handlers/test_e2e_keys.py47
-rw-r--r--tests/rest/admin/test_user.py56
-rw-r--r--tests/rest/client/test_keys.py188
-rw-r--r--tests/storage/databases/main/test_end_to_end_keys.py121
12 files changed, 613 insertions, 14 deletions
diff --git a/changelog.d/16634.misc b/changelog.d/16634.misc
new file mode 100644
index 0000000000..f81cf39691
--- /dev/null
+++ b/changelog.d/16634.misc
@@ -0,0 +1 @@
+Add an internal [Admin API endpoint](https://matrix-org.github.io/synapse/v1.97/usage/configuration/config_documentation.html#allow-replacing-master-cross-signing-key-without-user-interactive-auth) to temporarily grant the ability to update an existing cross-signing key without UIA.
diff --git a/docs/admin_api/user_admin_api.md b/docs/admin_api/user_admin_api.md
index b91848dd27..66089c634b 100644
--- a/docs/admin_api/user_admin_api.md
+++ b/docs/admin_api/user_admin_api.md
@@ -773,6 +773,43 @@ Note: The token will expire if the *admin* user calls `/logout/all` from any
 of their devices, but the token will *not* expire if the target user does the
 same.
 
+## Allow replacing master cross-signing key without User-Interactive Auth
+
+This endpoint is not intended for server administrator usage;
+we describe it here for completeness.
+
+This API temporarily permits a user to replace their master cross-signing key
+without going through
+[user-interactive authentication](https://spec.matrix.org/v1.8/client-server-api/#user-interactive-authentication-api) (UIA).
+This is useful when Synapse has delegated its authentication to the
+[Matrix Authentication Service](https://github.com/matrix-org/matrix-authentication-service/);
+as Synapse cannot perform UIA is not possible in these circumstances.
+
+The API is
+
+```http request
+POST /_synapse/admin/v1/users/<user_id>/_allow_cross_signing_replacement_without_uia
+{}
+```
+
+If the user does not exist, or does exist but has no master cross-signing key,
+this will return with status code `404 Not Found`.
+
+Otherwise, a response body like the following is returned, with status `200 OK`:
+
+```json
+{
+    "updatable_without_uia_before_ms": 1234567890
+}
+```
+
+The response body is a JSON object with a single field:
+
+- `updatable_without_uia_before_ms`: integer. The timestamp in milliseconds
+  before which the user is permitted to replace their cross-signing key without
+  going through UIA.
+
+_Added in Synapse 1.97.0._
 
 ## User devices
 
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index d06524495f..70fa931d17 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -1450,19 +1450,25 @@ class E2eKeysHandler:
 
         return desired_key_data
 
-    async def is_cross_signing_set_up_for_user(self, user_id: str) -> bool:
+    async def check_cross_signing_setup(self, user_id: str) -> Tuple[bool, bool]:
         """Checks if the user has cross-signing set up
 
         Args:
             user_id: The user to check
 
-        Returns:
-            True if the user has cross-signing set up, False otherwise
+        Returns: a 2-tuple of booleans
+            - whether the user has cross-signing set up, and
+            - whether the user's master cross-signing key may be replaced without UIA.
         """
-        existing_master_key = await self.store.get_e2e_cross_signing_key(
-            user_id, "master"
-        )
-        return existing_master_key is not None
+        (
+            exists,
+            ts_replacable_without_uia_before,
+        ) = await self.store.get_master_cross_signing_key_updatable_before(user_id)
+
+        if ts_replacable_without_uia_before is None:
+            return exists, False
+        else:
+            return exists, self.clock.time_msec() < ts_replacable_without_uia_before
 
 
 def _check_cross_signing_key(
diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py
index 9bd0d764f8..91edfd45d7 100644
--- a/synapse/rest/admin/__init__.py
+++ b/synapse/rest/admin/__init__.py
@@ -88,6 +88,7 @@ from synapse.rest.admin.users import (
     UserByThreePid,
     UserMembershipRestServlet,
     UserRegisterServlet,
+    UserReplaceMasterCrossSigningKeyRestServlet,
     UserRestServletV2,
     UsersRestServletV2,
     UserTokenRestServlet,
@@ -292,6 +293,7 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
     ListDestinationsRestServlet(hs).register(http_server)
     RoomMessagesRestServlet(hs).register(http_server)
     RoomTimestampToEventRestServlet(hs).register(http_server)
+    UserReplaceMasterCrossSigningKeyRestServlet(hs).register(http_server)
     UserByExternalId(hs).register(http_server)
     UserByThreePid(hs).register(http_server)
 
diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py
index 73878dd99d..9900498fbe 100644
--- a/synapse/rest/admin/users.py
+++ b/synapse/rest/admin/users.py
@@ -1270,6 +1270,46 @@ class AccountDataRestServlet(RestServlet):
         }
 
 
+class UserReplaceMasterCrossSigningKeyRestServlet(RestServlet):
+    """Allow a given user to replace their master cross-signing key without UIA.
+
+    This replacement is permitted for a limited period (currently 10 minutes).
+
+    While this is exposed via the admin API, this is intended for use by the
+    Matrix Authentication Service rather than server admins.
+    """
+
+    PATTERNS = admin_patterns(
+        "/users/(?P<user_id>[^/]*)/_allow_cross_signing_replacement_without_uia"
+    )
+    REPLACEMENT_PERIOD_MS = 10 * 60 * 1000  # 10 minutes
+
+    def __init__(self, hs: "HomeServer"):
+        self._auth = hs.get_auth()
+        self._store = hs.get_datastores().main
+
+    async def on_POST(
+        self,
+        request: SynapseRequest,
+        user_id: str,
+    ) -> Tuple[int, JsonDict]:
+        await assert_requester_is_admin(self._auth, request)
+
+        if user_id is None:
+            raise NotFoundError("User not found")
+
+        timestamp = (
+            await self._store.allow_master_cross_signing_key_replacement_without_uia(
+                user_id, self.REPLACEMENT_PERIOD_MS
+            )
+        )
+
+        if timestamp is None:
+            raise NotFoundError("User has no master cross-signing key")
+
+        return HTTPStatus.OK, {"updatable_without_uia_before_ms": timestamp}
+
+
 class UserByExternalId(RestServlet):
     """Find a user based on an external ID from an auth provider"""
 
diff --git a/synapse/rest/client/keys.py b/synapse/rest/client/keys.py
index 70b8be1aa2..add8045439 100644
--- a/synapse/rest/client/keys.py
+++ b/synapse/rest/client/keys.py
@@ -376,9 +376,10 @@ class SigningKeyUploadServlet(RestServlet):
         user_id = requester.user.to_string()
         body = parse_json_object_from_request(request)
 
-        is_cross_signing_setup = (
-            await self.e2e_keys_handler.is_cross_signing_set_up_for_user(user_id)
-        )
+        (
+            is_cross_signing_setup,
+            master_key_updatable_without_uia,
+        ) = await self.e2e_keys_handler.check_cross_signing_setup(user_id)
 
         # Before MSC3967 we required UIA both when setting up cross signing for the
         # first time and when resetting the device signing key. With MSC3967 we only
@@ -386,9 +387,14 @@ class SigningKeyUploadServlet(RestServlet):
         # time. Because there is no UIA in MSC3861, for now we throw an error if the
         # user tries to reset the device signing key when MSC3861 is enabled, but allow
         # first-time setup.
+        #
+        # XXX: We now have a get-out clause by which MAS can temporarily mark the master
+        # key as replaceable. It should do its own equivalent of user interactive auth
+        # before doing so.
         if self.hs.config.experimental.msc3861.enabled:
-            # There is no way to reset the device signing key with MSC3861
-            if is_cross_signing_setup:
+            # The auth service has to explicitly mark the master key as replaceable
+            # without UIA to reset the device signing key with MSC3861.
+            if is_cross_signing_setup and not master_key_updatable_without_uia:
                 raise SynapseError(
                     HTTPStatus.NOT_IMPLEMENTED,
                     "Resetting cross signing keys is not yet supported with MSC3861",
diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py
index 8cb61eaee3..9e98729330 100644
--- a/synapse/storage/databases/main/end_to_end_keys.py
+++ b/synapse/storage/databases/main/end_to_end_keys.py
@@ -1383,6 +1383,51 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker
 
         return otk_rows
 
+    async def get_master_cross_signing_key_updatable_before(
+        self, user_id: str
+    ) -> Tuple[bool, Optional[int]]:
+        """Get time before which a master cross-signing key may be replaced without UIA.
+
+        (UIA means "User-Interactive Auth".)
+
+        There are three cases to distinguish:
+         (1) No master cross-signing key.
+         (2) The key exists, but there is no replace-without-UI timestamp in the DB.
+         (3) The key exists, and has such a timestamp recorded.
+
+        Returns: a 2-tuple of:
+          - a boolean: is there a master cross-signing key already?
+          - an optional timestamp, directly taken from the DB.
+
+        In terms of the cases above, these are:
+         (1) (False, None).
+         (2) (True, None).
+         (3) (True, <timestamp in ms>).
+
+        """
+
+        def impl(txn: LoggingTransaction) -> Tuple[bool, Optional[int]]:
+            # We want to distinguish between three cases:
+            txn.execute(
+                """
+                SELECT updatable_without_uia_before_ms
+                FROM e2e_cross_signing_keys
+                WHERE user_id = ? AND keytype = 'master'
+                ORDER BY stream_id DESC
+                LIMIT 1
+            """,
+                (user_id,),
+            )
+            row = cast(Optional[Tuple[Optional[int]]], txn.fetchone())
+            if row is None:
+                return False, None
+            return True, row[0]
+
+        return await self.db_pool.runInteraction(
+            "e2e_cross_signing_keys",
+            impl,
+        )
+
 
 class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
     def __init__(
@@ -1630,3 +1675,42 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
             ],
             desc="add_e2e_signing_key",
         )
+
+    async def allow_master_cross_signing_key_replacement_without_uia(
+        self, user_id: str, duration_ms: int
+    ) -> Optional[int]:
+        """Mark this user's latest master key as being replaceable without UIA.
+
+        Said replacement will only be permitted for a short time after calling this
+        function. That time period is controlled by the duration argument.
+
+        Returns:
+            None, if there is no such key.
+            Otherwise, the timestamp before which replacement is allowed without UIA.
+        """
+        timestamp = self._clock.time_msec() + duration_ms
+
+        def impl(txn: LoggingTransaction) -> Optional[int]:
+            txn.execute(
+                """
+                UPDATE e2e_cross_signing_keys
+                SET updatable_without_uia_before_ms = ?
+                WHERE stream_id = (
+                    SELECT stream_id
+                    FROM e2e_cross_signing_keys
+                    WHERE user_id = ? AND keytype = 'master'
+                    ORDER BY stream_id DESC
+                    LIMIT 1
+                )
+            """,
+                (timestamp, user_id),
+            )
+            if txn.rowcount == 0:
+                return None
+
+            return timestamp
+
+        return await self.db_pool.runInteraction(
+            "allow_master_cross_signing_key_replacement_without_uia",
+            impl,
+        )
diff --git a/synapse/storage/schema/main/delta/83/05_cross_signing_key_update_grant.sql b/synapse/storage/schema/main/delta/83/05_cross_signing_key_update_grant.sql
new file mode 100644
index 0000000000..b74bdd71fa
--- /dev/null
+++ b/synapse/storage/schema/main/delta/83/05_cross_signing_key_update_grant.sql
@@ -0,0 +1,15 @@
+/* Copyright 2023 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ALTER TABLE e2e_cross_signing_keys ADD COLUMN updatable_without_uia_before_ms bigint DEFAULT NULL;
\ No newline at end of file
diff --git a/tests/handlers/test_e2e_keys.py b/tests/handlers/test_e2e_keys.py
index 90b4da9ad5..07eb63f95e 100644
--- a/tests/handlers/test_e2e_keys.py
+++ b/tests/handlers/test_e2e_keys.py
@@ -1602,3 +1602,50 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase):
                 }
             },
         )
+
+    def test_check_cross_signing_setup(self) -> None:
+        # First check what happens with no master key.
+        alice = "@alice:test"
+        exists, replaceable_without_uia = self.get_success(
+            self.handler.check_cross_signing_setup(alice)
+        )
+        self.assertIs(exists, False)
+        self.assertIs(replaceable_without_uia, False)
+
+        # Upload a master key but don't specify a replacement timestamp.
+        dummy_key = {"keys": {"a": "b"}}
+        self.get_success(
+            self.store.set_e2e_cross_signing_key("@alice:test", "master", dummy_key)
+        )
+
+        # Should now find the key exists.
+        exists, replaceable_without_uia = self.get_success(
+            self.handler.check_cross_signing_setup(alice)
+        )
+        self.assertIs(exists, True)
+        self.assertIs(replaceable_without_uia, False)
+
+        # Set an expiry timestamp in the future.
+        self.get_success(
+            self.store.allow_master_cross_signing_key_replacement_without_uia(
+                alice,
+                1000,
+            )
+        )
+
+        # Should now be allowed to replace the key without UIA.
+        exists, replaceable_without_uia = self.get_success(
+            self.handler.check_cross_signing_setup(alice)
+        )
+        self.assertIs(exists, True)
+        self.assertIs(replaceable_without_uia, True)
+
+        # Wait 2 seconds, so that the timestamp is in the past.
+        self.reactor.advance(2.0)
+
+        # Should no longer be allowed to replace the key without UIA.
+        exists, replaceable_without_uia = self.get_success(
+            self.handler.check_cross_signing_setup(alice)
+        )
+        self.assertIs(exists, True)
+        self.assertIs(replaceable_without_uia, False)
diff --git a/tests/rest/admin/test_user.py b/tests/rest/admin/test_user.py
index 492adb6160..cf71bbb461 100644
--- a/tests/rest/admin/test_user.py
+++ b/tests/rest/admin/test_user.py
@@ -4854,3 +4854,59 @@ class UsersByThreePidTestCase(unittest.HomeserverTestCase):
             {"user_id": self.other_user},
             channel.json_body,
         )
+
+
+class AllowCrossSigningReplacementTestCase(unittest.HomeserverTestCase):
+    servlets = [
+        synapse.rest.admin.register_servlets,
+        login.register_servlets,
+    ]
+
+    @staticmethod
+    def url(user: str) -> str:
+        template = (
+            "/_synapse/admin/v1/users/{}/_allow_cross_signing_replacement_without_uia"
+        )
+        return template.format(urllib.parse.quote(user))
+
+    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+        self.store = hs.get_datastores().main
+
+        self.admin_user = self.register_user("admin", "pass", admin=True)
+        self.admin_user_tok = self.login("admin", "pass")
+
+        self.other_user = self.register_user("user", "pass")
+
+    def test_error_cases(self) -> None:
+        fake_user = "@bums:other"
+        channel = self.make_request(
+            "POST", self.url(fake_user), access_token=self.admin_user_tok
+        )
+        # Fail: user doesn't exist
+        self.assertEqual(404, channel.code, msg=channel.json_body)
+
+        channel = self.make_request(
+            "POST", self.url(self.other_user), access_token=self.admin_user_tok
+        )
+        # Fail: user exists, but has no master cross-signing key
+        self.assertEqual(404, channel.code, msg=channel.json_body)
+
+    def test_success(self) -> None:
+        # Upload a master key.
+        dummy_key = {"keys": {"a": "b"}}
+        self.get_success(
+            self.store.set_e2e_cross_signing_key(self.other_user, "master", dummy_key)
+        )
+
+        channel = self.make_request(
+            "POST", self.url(self.other_user), access_token=self.admin_user_tok
+        )
+        # Success!
+        self.assertEqual(200, channel.code, msg=channel.json_body)
+
+        # Should now find that the key exists.
+        _, timestamp = self.get_success(
+            self.store.get_master_cross_signing_key_updatable_before(self.other_user)
+        )
+        assert timestamp is not None
+        self.assertGreater(timestamp, self.clock.time_msec())
diff --git a/tests/rest/client/test_keys.py b/tests/rest/client/test_keys.py
index 8ee5489057..9f81a695fa 100644
--- a/tests/rest/client/test_keys.py
+++ b/tests/rest/client/test_keys.py
@@ -11,8 +11,9 @@
 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 #  See the License for the specific language governing permissions and
 #  limitations under the License
-
+import urllib.parse
 from http import HTTPStatus
+from unittest.mock import patch
 
 from signedjson.key import (
     encode_verify_key_base64,
@@ -24,12 +25,19 @@ from signedjson.sign import sign_json
 from synapse.api.errors import Codes
 from synapse.rest import admin
 from synapse.rest.client import keys, login
-from synapse.types import JsonDict
+from synapse.types import JsonDict, Requester, create_requester
 
 from tests import unittest
 from tests.http.server._base import make_request_with_cancellation_test
 from tests.unittest import override_config
 
+try:
+    import authlib  # noqa: F401
+
+    HAS_AUTHLIB = True
+except ImportError:
+    HAS_AUTHLIB = False
+
 
 class KeyQueryTestCase(unittest.HomeserverTestCase):
     servlets = [
@@ -259,3 +267,179 @@ class KeyQueryTestCase(unittest.HomeserverTestCase):
             alice_token,
         )
         self.assertEqual(channel.code, HTTPStatus.OK, channel.result)
+
+
+class SigningKeyUploadServletTestCase(unittest.HomeserverTestCase):
+    servlets = [
+        admin.register_servlets,
+        keys.register_servlets,
+    ]
+
+    OIDC_ADMIN_TOKEN = "_oidc_admin_token"
+
+    @unittest.skip_unless(HAS_AUTHLIB, "requires authlib")
+    @override_config(
+        {
+            "enable_registration": False,
+            "experimental_features": {
+                "msc3861": {
+                    "enabled": True,
+                    "issuer": "https://issuer",
+                    "account_management_url": "https://my-account.issuer",
+                    "client_id": "id",
+                    "client_auth_method": "client_secret_post",
+                    "client_secret": "secret",
+                    "admin_token": OIDC_ADMIN_TOKEN,
+                },
+            },
+        }
+    )
+    def test_master_cross_signing_key_replacement_msc3861(self) -> None:
+        # Provision a user like MAS would, cribbing from
+        # https://github.com/matrix-org/matrix-authentication-service/blob/08d46a79a4adb22819ac9d55e15f8375dfe2c5c7/crates/matrix-synapse/src/lib.rs#L224-L229
+        alice = "@alice:test"
+        channel = self.make_request(
+            "PUT",
+            f"/_synapse/admin/v2/users/{urllib.parse.quote(alice)}",
+            access_token=self.OIDC_ADMIN_TOKEN,
+            content={},
+        )
+        self.assertEqual(channel.code, HTTPStatus.CREATED, channel.json_body)
+
+        # Provision a device like MAS would, cribbing from
+        # https://github.com/matrix-org/matrix-authentication-service/blob/08d46a79a4adb22819ac9d55e15f8375dfe2c5c7/crates/matrix-synapse/src/lib.rs#L260-L262
+        alice_device = "alice_device"
+        channel = self.make_request(
+            "POST",
+            f"/_synapse/admin/v2/users/{urllib.parse.quote(alice)}/devices",
+            access_token=self.OIDC_ADMIN_TOKEN,
+            content={"device_id": alice_device},
+        )
+        self.assertEqual(channel.code, HTTPStatus.CREATED, channel.json_body)
+
+        # Prepare a mock MAS access token.
+        alice_token = "alice_token_1234_oidcwhatyoudidthere"
+
+        async def mocked_get_user_by_access_token(
+            token: str, allow_expired: bool = False
+        ) -> Requester:
+            self.assertEqual(token, alice_token)
+            return create_requester(
+                user_id=alice,
+                device_id=alice_device,
+                scope=[],
+                is_guest=False,
+            )
+
+        patch_get_user_by_access_token = patch.object(
+            self.hs.get_auth(),
+            "get_user_by_access_token",
+            wraps=mocked_get_user_by_access_token,
+        )
+
+        # Copied from E2eKeysHandlerTestCase
+        master_pubkey = "nqOvzeuGWT/sRx3h7+MHoInYj3Uk2LD/unI9kDYcHwk"
+        master_pubkey2 = "fHZ3NPiKxoLQm5OoZbKa99SYxprOjNs4TwJUKP+twCM"
+        master_pubkey3 = "85T7JXPFBAySB/jwby4S3lBPTqY3+Zg53nYuGmu1ggY"
+
+        master_key: JsonDict = {
+            "user_id": alice,
+            "usage": ["master"],
+            "keys": {"ed25519:" + master_pubkey: master_pubkey},
+        }
+        master_key2: JsonDict = {
+            "user_id": alice,
+            "usage": ["master"],
+            "keys": {"ed25519:" + master_pubkey2: master_pubkey2},
+        }
+        master_key3: JsonDict = {
+            "user_id": alice,
+            "usage": ["master"],
+            "keys": {"ed25519:" + master_pubkey3: master_pubkey3},
+        }
+
+        with patch_get_user_by_access_token:
+            # Upload an initial cross-signing key.
+            channel = self.make_request(
+                "POST",
+                "/_matrix/client/v3/keys/device_signing/upload",
+                access_token=alice_token,
+                content={
+                    "master_key": master_key,
+                },
+            )
+            self.assertEqual(channel.code, HTTPStatus.OK, channel.json_body)
+
+            # Should not be able to upload another master key.
+            channel = self.make_request(
+                "POST",
+                "/_matrix/client/v3/keys/device_signing/upload",
+                access_token=alice_token,
+                content={
+                    "master_key": master_key2,
+                },
+            )
+            self.assertEqual(
+                channel.code, HTTPStatus.NOT_IMPLEMENTED, channel.json_body
+            )
+
+        # Pretend that MAS did UIA and allowed us to replace the master key.
+        channel = self.make_request(
+            "POST",
+            f"/_synapse/admin/v1/users/{urllib.parse.quote(alice)}/_allow_cross_signing_replacement_without_uia",
+            access_token=self.OIDC_ADMIN_TOKEN,
+        )
+        self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
+
+        with patch_get_user_by_access_token:
+            # Should now be able to upload master key2.
+            channel = self.make_request(
+                "POST",
+                "/_matrix/client/v3/keys/device_signing/upload",
+                access_token=alice_token,
+                content={
+                    "master_key": master_key2,
+                },
+            )
+            self.assertEqual(channel.code, HTTPStatus.OK, channel.json_body)
+
+            # Even though we're still in the grace period, we shouldn't be able to
+            # upload master key 3 immediately after uploading key 2.
+            channel = self.make_request(
+                "POST",
+                "/_matrix/client/v3/keys/device_signing/upload",
+                access_token=alice_token,
+                content={
+                    "master_key": master_key3,
+                },
+            )
+            self.assertEqual(
+                channel.code, HTTPStatus.NOT_IMPLEMENTED, channel.json_body
+            )
+
+        # Pretend that MAS did UIA and allowed us to replace the master key.
+        channel = self.make_request(
+            "POST",
+            f"/_synapse/admin/v1/users/{urllib.parse.quote(alice)}/_allow_cross_signing_replacement_without_uia",
+            access_token=self.OIDC_ADMIN_TOKEN,
+        )
+        self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
+        timestamp_ms = channel.json_body["updatable_without_uia_before_ms"]
+
+        # Advance to 1 second after the replacement period ends.
+        self.reactor.advance(timestamp_ms - self.clock.time_msec() + 1000)
+
+        with patch_get_user_by_access_token:
+            # We should not be able to upload master key3 because the replacement has
+            # expired.
+            channel = self.make_request(
+                "POST",
+                "/_matrix/client/v3/keys/device_signing/upload",
+                access_token=alice_token,
+                content={
+                    "master_key": master_key3,
+                },
+            )
+            self.assertEqual(
+                channel.code, HTTPStatus.NOT_IMPLEMENTED, channel.json_body
+            )
diff --git a/tests/storage/databases/main/test_end_to_end_keys.py b/tests/storage/databases/main/test_end_to_end_keys.py
new file mode 100644
index 0000000000..23e6f82c75
--- /dev/null
+++ b/tests/storage/databases/main/test_end_to_end_keys.py
@@ -0,0 +1,121 @@
+# Copyright 2023 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the 'License');
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from typing import List, Optional, Tuple
+
+from twisted.test.proto_helpers import MemoryReactor
+
+from synapse.server import HomeServer
+from synapse.storage._base import db_to_json
+from synapse.storage.database import LoggingTransaction
+from synapse.types import JsonDict
+from synapse.util import Clock
+
+from tests.unittest import HomeserverTestCase
+
+
+class EndToEndKeyWorkerStoreTestCase(HomeserverTestCase):
+    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+        self.store = hs.get_datastores().main
+
+    def test_get_master_cross_signing_key_updatable_before(self) -> None:
+        # Should return False, None when there is no master key.
+        alice = "@alice:test"
+        exists, timestamp = self.get_success(
+            self.store.get_master_cross_signing_key_updatable_before(alice)
+        )
+        self.assertIs(exists, False)
+        self.assertIsNone(timestamp)
+
+        # Upload a master key.
+        dummy_key = {"keys": {"a": "b"}}
+        self.get_success(
+            self.store.set_e2e_cross_signing_key(alice, "master", dummy_key)
+        )
+
+        # Should now find that the key exists.
+        exists, timestamp = self.get_success(
+            self.store.get_master_cross_signing_key_updatable_before(alice)
+        )
+        self.assertIs(exists, True)
+        self.assertIsNone(timestamp)
+
+        # Write an updateable_before timestamp.
+        written_timestamp = self.get_success(
+            self.store.allow_master_cross_signing_key_replacement_without_uia(
+                alice, 1000
+            )
+        )
+
+        # Should now find that the key exists.
+        exists, timestamp = self.get_success(
+            self.store.get_master_cross_signing_key_updatable_before(alice)
+        )
+        self.assertIs(exists, True)
+        self.assertEqual(timestamp, written_timestamp)
+
+    def test_master_replacement_only_applies_to_latest_master_key(
+        self,
+    ) -> None:
+        """We shouldn't allow updates w/o UIA to old master keys or other key types."""
+        alice = "@alice:test"
+        # Upload two master keys.
+        key1 = {"keys": {"a": "b"}}
+        key2 = {"keys": {"c": "d"}}
+        key3 = {"keys": {"e": "f"}}
+        self.get_success(self.store.set_e2e_cross_signing_key(alice, "master", key1))
+        self.get_success(self.store.set_e2e_cross_signing_key(alice, "other", key2))
+        self.get_success(self.store.set_e2e_cross_signing_key(alice, "master", key3))
+
+        # Third key should be the current one.
+        key = self.get_success(
+            self.store.get_e2e_cross_signing_key(alice, "master", alice)
+        )
+        self.assertEqual(key, key3)
+
+        timestamp = self.get_success(
+            self.store.allow_master_cross_signing_key_replacement_without_uia(
+                alice, 1000
+            )
+        )
+        assert timestamp is not None
+
+        def check_timestamp_column(
+            txn: LoggingTransaction,
+        ) -> List[Tuple[JsonDict, Optional[int]]]:
+            """Fetch all rows for Alice's keys."""
+            txn.execute(
+                """
+                SELECT keydata, updatable_without_uia_before_ms
+                FROM e2e_cross_signing_keys
+                WHERE user_id = ?
+                ORDER BY stream_id ASC;
+            """,
+                (alice,),
+            )
+            return [(db_to_json(keydata), ts) for keydata, ts in txn.fetchall()]
+
+        values = self.get_success(
+            self.store.db_pool.runInteraction(
+                "check_timestamp_column",
+                check_timestamp_column,
+            )
+        )
+        self.assertEqual(
+            values,
+            [
+                (key1, None),
+                (key2, None),
+                (key3, timestamp),
+            ],
+        )