summary refs log tree commit diff
path: root/tests/rest
diff options
context:
space:
mode:
Diffstat (limited to 'tests/rest')
-rw-r--r--tests/rest/admin/test_user.py56
-rw-r--r--tests/rest/client/test_keys.py188
2 files changed, 242 insertions, 2 deletions
diff --git a/tests/rest/admin/test_user.py b/tests/rest/admin/test_user.py
index 492adb6160..cf71bbb461 100644
--- a/tests/rest/admin/test_user.py
+++ b/tests/rest/admin/test_user.py
@@ -4854,3 +4854,59 @@ class UsersByThreePidTestCase(unittest.HomeserverTestCase):
             {"user_id": self.other_user},
             channel.json_body,
         )
+
+
+class AllowCrossSigningReplacementTestCase(unittest.HomeserverTestCase):
+    servlets = [
+        synapse.rest.admin.register_servlets,
+        login.register_servlets,
+    ]
+
+    @staticmethod
+    def url(user: str) -> str:
+        template = (
+            "/_synapse/admin/v1/users/{}/_allow_cross_signing_replacement_without_uia"
+        )
+        return template.format(urllib.parse.quote(user))
+
+    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+        self.store = hs.get_datastores().main
+
+        self.admin_user = self.register_user("admin", "pass", admin=True)
+        self.admin_user_tok = self.login("admin", "pass")
+
+        self.other_user = self.register_user("user", "pass")
+
+    def test_error_cases(self) -> None:
+        fake_user = "@bums:other"
+        channel = self.make_request(
+            "POST", self.url(fake_user), access_token=self.admin_user_tok
+        )
+        # Fail: user doesn't exist
+        self.assertEqual(404, channel.code, msg=channel.json_body)
+
+        channel = self.make_request(
+            "POST", self.url(self.other_user), access_token=self.admin_user_tok
+        )
+        # Fail: user exists, but has no master cross-signing key
+        self.assertEqual(404, channel.code, msg=channel.json_body)
+
+    def test_success(self) -> None:
+        # Upload a master key.
+        dummy_key = {"keys": {"a": "b"}}
+        self.get_success(
+            self.store.set_e2e_cross_signing_key(self.other_user, "master", dummy_key)
+        )
+
+        channel = self.make_request(
+            "POST", self.url(self.other_user), access_token=self.admin_user_tok
+        )
+        # Success!
+        self.assertEqual(200, channel.code, msg=channel.json_body)
+
+        # Should now find that the key exists.
+        _, timestamp = self.get_success(
+            self.store.get_master_cross_signing_key_updatable_before(self.other_user)
+        )
+        assert timestamp is not None
+        self.assertGreater(timestamp, self.clock.time_msec())
diff --git a/tests/rest/client/test_keys.py b/tests/rest/client/test_keys.py
index 8ee5489057..9f81a695fa 100644
--- a/tests/rest/client/test_keys.py
+++ b/tests/rest/client/test_keys.py
@@ -11,8 +11,9 @@
 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 #  See the License for the specific language governing permissions and
 #  limitations under the License
-
+import urllib.parse
 from http import HTTPStatus
+from unittest.mock import patch
 
 from signedjson.key import (
     encode_verify_key_base64,
@@ -24,12 +25,19 @@ from signedjson.sign import sign_json
 from synapse.api.errors import Codes
 from synapse.rest import admin
 from synapse.rest.client import keys, login
-from synapse.types import JsonDict
+from synapse.types import JsonDict, Requester, create_requester
 
 from tests import unittest
 from tests.http.server._base import make_request_with_cancellation_test
 from tests.unittest import override_config
 
+try:
+    import authlib  # noqa: F401
+
+    HAS_AUTHLIB = True
+except ImportError:
+    HAS_AUTHLIB = False
+
 
 class KeyQueryTestCase(unittest.HomeserverTestCase):
     servlets = [
@@ -259,3 +267,179 @@ class KeyQueryTestCase(unittest.HomeserverTestCase):
             alice_token,
         )
         self.assertEqual(channel.code, HTTPStatus.OK, channel.result)
+
+
+class SigningKeyUploadServletTestCase(unittest.HomeserverTestCase):
+    servlets = [
+        admin.register_servlets,
+        keys.register_servlets,
+    ]
+
+    OIDC_ADMIN_TOKEN = "_oidc_admin_token"
+
+    @unittest.skip_unless(HAS_AUTHLIB, "requires authlib")
+    @override_config(
+        {
+            "enable_registration": False,
+            "experimental_features": {
+                "msc3861": {
+                    "enabled": True,
+                    "issuer": "https://issuer",
+                    "account_management_url": "https://my-account.issuer",
+                    "client_id": "id",
+                    "client_auth_method": "client_secret_post",
+                    "client_secret": "secret",
+                    "admin_token": OIDC_ADMIN_TOKEN,
+                },
+            },
+        }
+    )
+    def test_master_cross_signing_key_replacement_msc3861(self) -> None:
+        # Provision a user like MAS would, cribbing from
+        # https://github.com/matrix-org/matrix-authentication-service/blob/08d46a79a4adb22819ac9d55e15f8375dfe2c5c7/crates/matrix-synapse/src/lib.rs#L224-L229
+        alice = "@alice:test"
+        channel = self.make_request(
+            "PUT",
+            f"/_synapse/admin/v2/users/{urllib.parse.quote(alice)}",
+            access_token=self.OIDC_ADMIN_TOKEN,
+            content={},
+        )
+        self.assertEqual(channel.code, HTTPStatus.CREATED, channel.json_body)
+
+        # Provision a device like MAS would, cribbing from
+        # https://github.com/matrix-org/matrix-authentication-service/blob/08d46a79a4adb22819ac9d55e15f8375dfe2c5c7/crates/matrix-synapse/src/lib.rs#L260-L262
+        alice_device = "alice_device"
+        channel = self.make_request(
+            "POST",
+            f"/_synapse/admin/v2/users/{urllib.parse.quote(alice)}/devices",
+            access_token=self.OIDC_ADMIN_TOKEN,
+            content={"device_id": alice_device},
+        )
+        self.assertEqual(channel.code, HTTPStatus.CREATED, channel.json_body)
+
+        # Prepare a mock MAS access token.
+        alice_token = "alice_token_1234_oidcwhatyoudidthere"
+
+        async def mocked_get_user_by_access_token(
+            token: str, allow_expired: bool = False
+        ) -> Requester:
+            self.assertEqual(token, alice_token)
+            return create_requester(
+                user_id=alice,
+                device_id=alice_device,
+                scope=[],
+                is_guest=False,
+            )
+
+        patch_get_user_by_access_token = patch.object(
+            self.hs.get_auth(),
+            "get_user_by_access_token",
+            wraps=mocked_get_user_by_access_token,
+        )
+
+        # Copied from E2eKeysHandlerTestCase
+        master_pubkey = "nqOvzeuGWT/sRx3h7+MHoInYj3Uk2LD/unI9kDYcHwk"
+        master_pubkey2 = "fHZ3NPiKxoLQm5OoZbKa99SYxprOjNs4TwJUKP+twCM"
+        master_pubkey3 = "85T7JXPFBAySB/jwby4S3lBPTqY3+Zg53nYuGmu1ggY"
+
+        master_key: JsonDict = {
+            "user_id": alice,
+            "usage": ["master"],
+            "keys": {"ed25519:" + master_pubkey: master_pubkey},
+        }
+        master_key2: JsonDict = {
+            "user_id": alice,
+            "usage": ["master"],
+            "keys": {"ed25519:" + master_pubkey2: master_pubkey2},
+        }
+        master_key3: JsonDict = {
+            "user_id": alice,
+            "usage": ["master"],
+            "keys": {"ed25519:" + master_pubkey3: master_pubkey3},
+        }
+
+        with patch_get_user_by_access_token:
+            # Upload an initial cross-signing key.
+            channel = self.make_request(
+                "POST",
+                "/_matrix/client/v3/keys/device_signing/upload",
+                access_token=alice_token,
+                content={
+                    "master_key": master_key,
+                },
+            )
+            self.assertEqual(channel.code, HTTPStatus.OK, channel.json_body)
+
+            # Should not be able to upload another master key.
+            channel = self.make_request(
+                "POST",
+                "/_matrix/client/v3/keys/device_signing/upload",
+                access_token=alice_token,
+                content={
+                    "master_key": master_key2,
+                },
+            )
+            self.assertEqual(
+                channel.code, HTTPStatus.NOT_IMPLEMENTED, channel.json_body
+            )
+
+        # Pretend that MAS did UIA and allowed us to replace the master key.
+        channel = self.make_request(
+            "POST",
+            f"/_synapse/admin/v1/users/{urllib.parse.quote(alice)}/_allow_cross_signing_replacement_without_uia",
+            access_token=self.OIDC_ADMIN_TOKEN,
+        )
+        self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
+
+        with patch_get_user_by_access_token:
+            # Should now be able to upload master key2.
+            channel = self.make_request(
+                "POST",
+                "/_matrix/client/v3/keys/device_signing/upload",
+                access_token=alice_token,
+                content={
+                    "master_key": master_key2,
+                },
+            )
+            self.assertEqual(channel.code, HTTPStatus.OK, channel.json_body)
+
+            # Even though we're still in the grace period, we shouldn't be able to
+            # upload master key 3 immediately after uploading key 2.
+            channel = self.make_request(
+                "POST",
+                "/_matrix/client/v3/keys/device_signing/upload",
+                access_token=alice_token,
+                content={
+                    "master_key": master_key3,
+                },
+            )
+            self.assertEqual(
+                channel.code, HTTPStatus.NOT_IMPLEMENTED, channel.json_body
+            )
+
+        # Pretend that MAS did UIA and allowed us to replace the master key.
+        channel = self.make_request(
+            "POST",
+            f"/_synapse/admin/v1/users/{urllib.parse.quote(alice)}/_allow_cross_signing_replacement_without_uia",
+            access_token=self.OIDC_ADMIN_TOKEN,
+        )
+        self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
+        timestamp_ms = channel.json_body["updatable_without_uia_before_ms"]
+
+        # Advance to 1 second after the replacement period ends.
+        self.reactor.advance(timestamp_ms - self.clock.time_msec() + 1000)
+
+        with patch_get_user_by_access_token:
+            # We should not be able to upload master key3 because the replacement has
+            # expired.
+            channel = self.make_request(
+                "POST",
+                "/_matrix/client/v3/keys/device_signing/upload",
+                access_token=alice_token,
+                content={
+                    "master_key": master_key3,
+                },
+            )
+            self.assertEqual(
+                channel.code, HTTPStatus.NOT_IMPLEMENTED, channel.json_body
+            )