From f2f2c7c1f05de87f43cc2d18d5dc9bd636b3ed0a Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 15 Nov 2023 08:02:11 -0500 Subject: Use full GitHub links instead of bare issue numbers. (#16637) --- synapse/handlers/federation_event.py | 2 +- synapse/handlers/presence.py | 2 +- synapse/handlers/sync.py | 4 ++-- synapse/handlers/user_directory.py | 8 ++++---- 4 files changed, 8 insertions(+), 8 deletions(-) (limited to 'synapse/handlers') diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 0cc8e990d9..ba6b94a8b7 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -748,7 +748,7 @@ class FederationEventHandler: # fetching fresh state for the room if the missing event # can't be found, which slightly reduces our security. # it may also increase our DAG extremity count for the room, - # causing additional state resolution? See #1760. + # causing additional state resolution? See https://github.com/matrix-org/synapse/issues/1760. # However, fetching state doesn't hold the linearizer lock # apparently. # diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 202beee738..4137fd50b1 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -1816,7 +1816,7 @@ class PresenceEventSource(EventSource[int, UserPresenceState]): # the same token repeatedly. # # Hence this guard where we just return nothing so that the sync - # doesn't return. C.f. #5503. + # doesn't return. C.f. https://github.com/matrix-org/synapse/issues/5503. return [], max_token # Figure out which other users this user should explicitly receive diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 2f1bc5a015..bf0106c6e7 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -399,7 +399,7 @@ class SyncHandler: # # If that happens, we mustn't cache it, so that when the client comes back # with the same cache token, we don't immediately return the same empty - # result, causing a tightloop. (#8518) + # result, causing a tightloop. (https://github.com/matrix-org/synapse/issues/8518) if result.next_batch == since_token: cache_context.should_cache = False @@ -1003,7 +1003,7 @@ class SyncHandler: # always make sure we LL ourselves so we know we're in the room # (if we are) to fix https://github.com/vector-im/riot-web/issues/7209 # We only need apply this on full state syncs given we disabled - # LL for incr syncs in #3840. + # LL for incr syncs in https://github.com/matrix-org/synapse/pull/3840. # We don't insert ourselves into `members_to_fetch`, because in some # rare cases (an empty event batch with a now_token after the user's # leave in a partial state room which another local user has diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 75717ba4f9..3c19ea56f8 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -184,8 +184,8 @@ class UserDirectoryHandler(StateDeltasHandler): """Called to update index of our local user profiles when they change irrespective of any rooms the user may be in. """ - # FIXME(#3714): We should probably do this in the same worker as all - # the other changes. + # FIXME(https://github.com/matrix-org/synapse/issues/3714): We should + # probably do this in the same worker as all the other changes. if await self.store.should_include_local_user_in_dir(user_id): await self.store.update_profile_in_user_dir( @@ -194,8 +194,8 @@ class UserDirectoryHandler(StateDeltasHandler): async def handle_local_user_deactivated(self, user_id: str) -> None: """Called when a user ID is deactivated""" - # FIXME(#3714): We should probably do this in the same worker as all - # the other changes. + # FIXME(https://github.com/matrix-org/synapse/issues/3714): We should + # probably do this in the same worker as all the other changes. await self.store.remove_from_user_dir(user_id) async def _unsafe_process(self) -> None: -- cgit 1.5.1 From 43d1aa75e8cbf9d522b425d51d5ac1a742b59ffb Mon Sep 17 00:00:00 2001 From: David Robertson Date: Wed, 15 Nov 2023 17:28:10 +0000 Subject: Add an Admin API to temporarily grant the ability to update an existing cross-signing key without UIA (#16634) --- changelog.d/16634.misc | 1 + docs/admin_api/user_admin_api.md | 37 ++++ synapse/handlers/e2e_keys.py | 20 ++- synapse/rest/admin/__init__.py | 2 + synapse/rest/admin/users.py | 40 +++++ synapse/rest/client/keys.py | 16 +- synapse/storage/databases/main/end_to_end_keys.py | 84 +++++++++ .../delta/83/05_cross_signing_key_update_grant.sql | 15 ++ tests/handlers/test_e2e_keys.py | 47 ++++++ tests/rest/admin/test_user.py | 56 ++++++ tests/rest/client/test_keys.py | 188 ++++++++++++++++++++- .../storage/databases/main/test_end_to_end_keys.py | 121 +++++++++++++ 12 files changed, 613 insertions(+), 14 deletions(-) create mode 100644 changelog.d/16634.misc create mode 100644 synapse/storage/schema/main/delta/83/05_cross_signing_key_update_grant.sql create mode 100644 tests/storage/databases/main/test_end_to_end_keys.py (limited to 'synapse/handlers') diff --git a/changelog.d/16634.misc b/changelog.d/16634.misc new file mode 100644 index 0000000000..f81cf39691 --- /dev/null +++ b/changelog.d/16634.misc @@ -0,0 +1 @@ +Add an internal [Admin API endpoint](https://matrix-org.github.io/synapse/v1.97/usage/configuration/config_documentation.html#allow-replacing-master-cross-signing-key-without-user-interactive-auth) to temporarily grant the ability to update an existing cross-signing key without UIA. diff --git a/docs/admin_api/user_admin_api.md b/docs/admin_api/user_admin_api.md index b91848dd27..66089c634b 100644 --- a/docs/admin_api/user_admin_api.md +++ b/docs/admin_api/user_admin_api.md @@ -773,6 +773,43 @@ Note: The token will expire if the *admin* user calls `/logout/all` from any of their devices, but the token will *not* expire if the target user does the same. +## Allow replacing master cross-signing key without User-Interactive Auth + +This endpoint is not intended for server administrator usage; +we describe it here for completeness. + +This API temporarily permits a user to replace their master cross-signing key +without going through +[user-interactive authentication](https://spec.matrix.org/v1.8/client-server-api/#user-interactive-authentication-api) (UIA). +This is useful when Synapse has delegated its authentication to the +[Matrix Authentication Service](https://github.com/matrix-org/matrix-authentication-service/); +as Synapse cannot perform UIA is not possible in these circumstances. + +The API is + +```http request +POST /_synapse/admin/v1/users//_allow_cross_signing_replacement_without_uia +{} +``` + +If the user does not exist, or does exist but has no master cross-signing key, +this will return with status code `404 Not Found`. + +Otherwise, a response body like the following is returned, with status `200 OK`: + +```json +{ + "updatable_without_uia_before_ms": 1234567890 +} +``` + +The response body is a JSON object with a single field: + +- `updatable_without_uia_before_ms`: integer. The timestamp in milliseconds + before which the user is permitted to replace their cross-signing key without + going through UIA. + +_Added in Synapse 1.97.0._ ## User devices diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index d06524495f..70fa931d17 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -1450,19 +1450,25 @@ class E2eKeysHandler: return desired_key_data - async def is_cross_signing_set_up_for_user(self, user_id: str) -> bool: + async def check_cross_signing_setup(self, user_id: str) -> Tuple[bool, bool]: """Checks if the user has cross-signing set up Args: user_id: The user to check - Returns: - True if the user has cross-signing set up, False otherwise + Returns: a 2-tuple of booleans + - whether the user has cross-signing set up, and + - whether the user's master cross-signing key may be replaced without UIA. """ - existing_master_key = await self.store.get_e2e_cross_signing_key( - user_id, "master" - ) - return existing_master_key is not None + ( + exists, + ts_replacable_without_uia_before, + ) = await self.store.get_master_cross_signing_key_updatable_before(user_id) + + if ts_replacable_without_uia_before is None: + return exists, False + else: + return exists, self.clock.time_msec() < ts_replacable_without_uia_before def _check_cross_signing_key( diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py index 9bd0d764f8..91edfd45d7 100644 --- a/synapse/rest/admin/__init__.py +++ b/synapse/rest/admin/__init__.py @@ -88,6 +88,7 @@ from synapse.rest.admin.users import ( UserByThreePid, UserMembershipRestServlet, UserRegisterServlet, + UserReplaceMasterCrossSigningKeyRestServlet, UserRestServletV2, UsersRestServletV2, UserTokenRestServlet, @@ -292,6 +293,7 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: ListDestinationsRestServlet(hs).register(http_server) RoomMessagesRestServlet(hs).register(http_server) RoomTimestampToEventRestServlet(hs).register(http_server) + UserReplaceMasterCrossSigningKeyRestServlet(hs).register(http_server) UserByExternalId(hs).register(http_server) UserByThreePid(hs).register(http_server) diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py index 73878dd99d..9900498fbe 100644 --- a/synapse/rest/admin/users.py +++ b/synapse/rest/admin/users.py @@ -1270,6 +1270,46 @@ class AccountDataRestServlet(RestServlet): } +class UserReplaceMasterCrossSigningKeyRestServlet(RestServlet): + """Allow a given user to replace their master cross-signing key without UIA. + + This replacement is permitted for a limited period (currently 10 minutes). + + While this is exposed via the admin API, this is intended for use by the + Matrix Authentication Service rather than server admins. + """ + + PATTERNS = admin_patterns( + "/users/(?P[^/]*)/_allow_cross_signing_replacement_without_uia" + ) + REPLACEMENT_PERIOD_MS = 10 * 60 * 1000 # 10 minutes + + def __init__(self, hs: "HomeServer"): + self._auth = hs.get_auth() + self._store = hs.get_datastores().main + + async def on_POST( + self, + request: SynapseRequest, + user_id: str, + ) -> Tuple[int, JsonDict]: + await assert_requester_is_admin(self._auth, request) + + if user_id is None: + raise NotFoundError("User not found") + + timestamp = ( + await self._store.allow_master_cross_signing_key_replacement_without_uia( + user_id, self.REPLACEMENT_PERIOD_MS + ) + ) + + if timestamp is None: + raise NotFoundError("User has no master cross-signing key") + + return HTTPStatus.OK, {"updatable_without_uia_before_ms": timestamp} + + class UserByExternalId(RestServlet): """Find a user based on an external ID from an auth provider""" diff --git a/synapse/rest/client/keys.py b/synapse/rest/client/keys.py index 70b8be1aa2..add8045439 100644 --- a/synapse/rest/client/keys.py +++ b/synapse/rest/client/keys.py @@ -376,9 +376,10 @@ class SigningKeyUploadServlet(RestServlet): user_id = requester.user.to_string() body = parse_json_object_from_request(request) - is_cross_signing_setup = ( - await self.e2e_keys_handler.is_cross_signing_set_up_for_user(user_id) - ) + ( + is_cross_signing_setup, + master_key_updatable_without_uia, + ) = await self.e2e_keys_handler.check_cross_signing_setup(user_id) # Before MSC3967 we required UIA both when setting up cross signing for the # first time and when resetting the device signing key. With MSC3967 we only @@ -386,9 +387,14 @@ class SigningKeyUploadServlet(RestServlet): # time. Because there is no UIA in MSC3861, for now we throw an error if the # user tries to reset the device signing key when MSC3861 is enabled, but allow # first-time setup. + # + # XXX: We now have a get-out clause by which MAS can temporarily mark the master + # key as replaceable. It should do its own equivalent of user interactive auth + # before doing so. if self.hs.config.experimental.msc3861.enabled: - # There is no way to reset the device signing key with MSC3861 - if is_cross_signing_setup: + # The auth service has to explicitly mark the master key as replaceable + # without UIA to reset the device signing key with MSC3861. + if is_cross_signing_setup and not master_key_updatable_without_uia: raise SynapseError( HTTPStatus.NOT_IMPLEMENTED, "Resetting cross signing keys is not yet supported with MSC3861", diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py index 8cb61eaee3..9e98729330 100644 --- a/synapse/storage/databases/main/end_to_end_keys.py +++ b/synapse/storage/databases/main/end_to_end_keys.py @@ -1383,6 +1383,51 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker return otk_rows + async def get_master_cross_signing_key_updatable_before( + self, user_id: str + ) -> Tuple[bool, Optional[int]]: + """Get time before which a master cross-signing key may be replaced without UIA. + + (UIA means "User-Interactive Auth".) + + There are three cases to distinguish: + (1) No master cross-signing key. + (2) The key exists, but there is no replace-without-UI timestamp in the DB. + (3) The key exists, and has such a timestamp recorded. + + Returns: a 2-tuple of: + - a boolean: is there a master cross-signing key already? + - an optional timestamp, directly taken from the DB. + + In terms of the cases above, these are: + (1) (False, None). + (2) (True, None). + (3) (True, ). + + """ + + def impl(txn: LoggingTransaction) -> Tuple[bool, Optional[int]]: + # We want to distinguish between three cases: + txn.execute( + """ + SELECT updatable_without_uia_before_ms + FROM e2e_cross_signing_keys + WHERE user_id = ? AND keytype = 'master' + ORDER BY stream_id DESC + LIMIT 1 + """, + (user_id,), + ) + row = cast(Optional[Tuple[Optional[int]]], txn.fetchone()) + if row is None: + return False, None + return True, row[0] + + return await self.db_pool.runInteraction( + "e2e_cross_signing_keys", + impl, + ) + class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): def __init__( @@ -1630,3 +1675,42 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): ], desc="add_e2e_signing_key", ) + + async def allow_master_cross_signing_key_replacement_without_uia( + self, user_id: str, duration_ms: int + ) -> Optional[int]: + """Mark this user's latest master key as being replaceable without UIA. + + Said replacement will only be permitted for a short time after calling this + function. That time period is controlled by the duration argument. + + Returns: + None, if there is no such key. + Otherwise, the timestamp before which replacement is allowed without UIA. + """ + timestamp = self._clock.time_msec() + duration_ms + + def impl(txn: LoggingTransaction) -> Optional[int]: + txn.execute( + """ + UPDATE e2e_cross_signing_keys + SET updatable_without_uia_before_ms = ? + WHERE stream_id = ( + SELECT stream_id + FROM e2e_cross_signing_keys + WHERE user_id = ? AND keytype = 'master' + ORDER BY stream_id DESC + LIMIT 1 + ) + """, + (timestamp, user_id), + ) + if txn.rowcount == 0: + return None + + return timestamp + + return await self.db_pool.runInteraction( + "allow_master_cross_signing_key_replacement_without_uia", + impl, + ) diff --git a/synapse/storage/schema/main/delta/83/05_cross_signing_key_update_grant.sql b/synapse/storage/schema/main/delta/83/05_cross_signing_key_update_grant.sql new file mode 100644 index 0000000000..b74bdd71fa --- /dev/null +++ b/synapse/storage/schema/main/delta/83/05_cross_signing_key_update_grant.sql @@ -0,0 +1,15 @@ +/* Copyright 2023 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +ALTER TABLE e2e_cross_signing_keys ADD COLUMN updatable_without_uia_before_ms bigint DEFAULT NULL; \ No newline at end of file diff --git a/tests/handlers/test_e2e_keys.py b/tests/handlers/test_e2e_keys.py index 90b4da9ad5..07eb63f95e 100644 --- a/tests/handlers/test_e2e_keys.py +++ b/tests/handlers/test_e2e_keys.py @@ -1602,3 +1602,50 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase): } }, ) + + def test_check_cross_signing_setup(self) -> None: + # First check what happens with no master key. + alice = "@alice:test" + exists, replaceable_without_uia = self.get_success( + self.handler.check_cross_signing_setup(alice) + ) + self.assertIs(exists, False) + self.assertIs(replaceable_without_uia, False) + + # Upload a master key but don't specify a replacement timestamp. + dummy_key = {"keys": {"a": "b"}} + self.get_success( + self.store.set_e2e_cross_signing_key("@alice:test", "master", dummy_key) + ) + + # Should now find the key exists. + exists, replaceable_without_uia = self.get_success( + self.handler.check_cross_signing_setup(alice) + ) + self.assertIs(exists, True) + self.assertIs(replaceable_without_uia, False) + + # Set an expiry timestamp in the future. + self.get_success( + self.store.allow_master_cross_signing_key_replacement_without_uia( + alice, + 1000, + ) + ) + + # Should now be allowed to replace the key without UIA. + exists, replaceable_without_uia = self.get_success( + self.handler.check_cross_signing_setup(alice) + ) + self.assertIs(exists, True) + self.assertIs(replaceable_without_uia, True) + + # Wait 2 seconds, so that the timestamp is in the past. + self.reactor.advance(2.0) + + # Should no longer be allowed to replace the key without UIA. + exists, replaceable_without_uia = self.get_success( + self.handler.check_cross_signing_setup(alice) + ) + self.assertIs(exists, True) + self.assertIs(replaceable_without_uia, False) diff --git a/tests/rest/admin/test_user.py b/tests/rest/admin/test_user.py index 492adb6160..cf71bbb461 100644 --- a/tests/rest/admin/test_user.py +++ b/tests/rest/admin/test_user.py @@ -4854,3 +4854,59 @@ class UsersByThreePidTestCase(unittest.HomeserverTestCase): {"user_id": self.other_user}, channel.json_body, ) + + +class AllowCrossSigningReplacementTestCase(unittest.HomeserverTestCase): + servlets = [ + synapse.rest.admin.register_servlets, + login.register_servlets, + ] + + @staticmethod + def url(user: str) -> str: + template = ( + "/_synapse/admin/v1/users/{}/_allow_cross_signing_replacement_without_uia" + ) + return template.format(urllib.parse.quote(user)) + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.store = hs.get_datastores().main + + self.admin_user = self.register_user("admin", "pass", admin=True) + self.admin_user_tok = self.login("admin", "pass") + + self.other_user = self.register_user("user", "pass") + + def test_error_cases(self) -> None: + fake_user = "@bums:other" + channel = self.make_request( + "POST", self.url(fake_user), access_token=self.admin_user_tok + ) + # Fail: user doesn't exist + self.assertEqual(404, channel.code, msg=channel.json_body) + + channel = self.make_request( + "POST", self.url(self.other_user), access_token=self.admin_user_tok + ) + # Fail: user exists, but has no master cross-signing key + self.assertEqual(404, channel.code, msg=channel.json_body) + + def test_success(self) -> None: + # Upload a master key. + dummy_key = {"keys": {"a": "b"}} + self.get_success( + self.store.set_e2e_cross_signing_key(self.other_user, "master", dummy_key) + ) + + channel = self.make_request( + "POST", self.url(self.other_user), access_token=self.admin_user_tok + ) + # Success! + self.assertEqual(200, channel.code, msg=channel.json_body) + + # Should now find that the key exists. + _, timestamp = self.get_success( + self.store.get_master_cross_signing_key_updatable_before(self.other_user) + ) + assert timestamp is not None + self.assertGreater(timestamp, self.clock.time_msec()) diff --git a/tests/rest/client/test_keys.py b/tests/rest/client/test_keys.py index 8ee5489057..9f81a695fa 100644 --- a/tests/rest/client/test_keys.py +++ b/tests/rest/client/test_keys.py @@ -11,8 +11,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License - +import urllib.parse from http import HTTPStatus +from unittest.mock import patch from signedjson.key import ( encode_verify_key_base64, @@ -24,12 +25,19 @@ from signedjson.sign import sign_json from synapse.api.errors import Codes from synapse.rest import admin from synapse.rest.client import keys, login -from synapse.types import JsonDict +from synapse.types import JsonDict, Requester, create_requester from tests import unittest from tests.http.server._base import make_request_with_cancellation_test from tests.unittest import override_config +try: + import authlib # noqa: F401 + + HAS_AUTHLIB = True +except ImportError: + HAS_AUTHLIB = False + class KeyQueryTestCase(unittest.HomeserverTestCase): servlets = [ @@ -259,3 +267,179 @@ class KeyQueryTestCase(unittest.HomeserverTestCase): alice_token, ) self.assertEqual(channel.code, HTTPStatus.OK, channel.result) + + +class SigningKeyUploadServletTestCase(unittest.HomeserverTestCase): + servlets = [ + admin.register_servlets, + keys.register_servlets, + ] + + OIDC_ADMIN_TOKEN = "_oidc_admin_token" + + @unittest.skip_unless(HAS_AUTHLIB, "requires authlib") + @override_config( + { + "enable_registration": False, + "experimental_features": { + "msc3861": { + "enabled": True, + "issuer": "https://issuer", + "account_management_url": "https://my-account.issuer", + "client_id": "id", + "client_auth_method": "client_secret_post", + "client_secret": "secret", + "admin_token": OIDC_ADMIN_TOKEN, + }, + }, + } + ) + def test_master_cross_signing_key_replacement_msc3861(self) -> None: + # Provision a user like MAS would, cribbing from + # https://github.com/matrix-org/matrix-authentication-service/blob/08d46a79a4adb22819ac9d55e15f8375dfe2c5c7/crates/matrix-synapse/src/lib.rs#L224-L229 + alice = "@alice:test" + channel = self.make_request( + "PUT", + f"/_synapse/admin/v2/users/{urllib.parse.quote(alice)}", + access_token=self.OIDC_ADMIN_TOKEN, + content={}, + ) + self.assertEqual(channel.code, HTTPStatus.CREATED, channel.json_body) + + # Provision a device like MAS would, cribbing from + # https://github.com/matrix-org/matrix-authentication-service/blob/08d46a79a4adb22819ac9d55e15f8375dfe2c5c7/crates/matrix-synapse/src/lib.rs#L260-L262 + alice_device = "alice_device" + channel = self.make_request( + "POST", + f"/_synapse/admin/v2/users/{urllib.parse.quote(alice)}/devices", + access_token=self.OIDC_ADMIN_TOKEN, + content={"device_id": alice_device}, + ) + self.assertEqual(channel.code, HTTPStatus.CREATED, channel.json_body) + + # Prepare a mock MAS access token. + alice_token = "alice_token_1234_oidcwhatyoudidthere" + + async def mocked_get_user_by_access_token( + token: str, allow_expired: bool = False + ) -> Requester: + self.assertEqual(token, alice_token) + return create_requester( + user_id=alice, + device_id=alice_device, + scope=[], + is_guest=False, + ) + + patch_get_user_by_access_token = patch.object( + self.hs.get_auth(), + "get_user_by_access_token", + wraps=mocked_get_user_by_access_token, + ) + + # Copied from E2eKeysHandlerTestCase + master_pubkey = "nqOvzeuGWT/sRx3h7+MHoInYj3Uk2LD/unI9kDYcHwk" + master_pubkey2 = "fHZ3NPiKxoLQm5OoZbKa99SYxprOjNs4TwJUKP+twCM" + master_pubkey3 = "85T7JXPFBAySB/jwby4S3lBPTqY3+Zg53nYuGmu1ggY" + + master_key: JsonDict = { + "user_id": alice, + "usage": ["master"], + "keys": {"ed25519:" + master_pubkey: master_pubkey}, + } + master_key2: JsonDict = { + "user_id": alice, + "usage": ["master"], + "keys": {"ed25519:" + master_pubkey2: master_pubkey2}, + } + master_key3: JsonDict = { + "user_id": alice, + "usage": ["master"], + "keys": {"ed25519:" + master_pubkey3: master_pubkey3}, + } + + with patch_get_user_by_access_token: + # Upload an initial cross-signing key. + channel = self.make_request( + "POST", + "/_matrix/client/v3/keys/device_signing/upload", + access_token=alice_token, + content={ + "master_key": master_key, + }, + ) + self.assertEqual(channel.code, HTTPStatus.OK, channel.json_body) + + # Should not be able to upload another master key. + channel = self.make_request( + "POST", + "/_matrix/client/v3/keys/device_signing/upload", + access_token=alice_token, + content={ + "master_key": master_key2, + }, + ) + self.assertEqual( + channel.code, HTTPStatus.NOT_IMPLEMENTED, channel.json_body + ) + + # Pretend that MAS did UIA and allowed us to replace the master key. + channel = self.make_request( + "POST", + f"/_synapse/admin/v1/users/{urllib.parse.quote(alice)}/_allow_cross_signing_replacement_without_uia", + access_token=self.OIDC_ADMIN_TOKEN, + ) + self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) + + with patch_get_user_by_access_token: + # Should now be able to upload master key2. + channel = self.make_request( + "POST", + "/_matrix/client/v3/keys/device_signing/upload", + access_token=alice_token, + content={ + "master_key": master_key2, + }, + ) + self.assertEqual(channel.code, HTTPStatus.OK, channel.json_body) + + # Even though we're still in the grace period, we shouldn't be able to + # upload master key 3 immediately after uploading key 2. + channel = self.make_request( + "POST", + "/_matrix/client/v3/keys/device_signing/upload", + access_token=alice_token, + content={ + "master_key": master_key3, + }, + ) + self.assertEqual( + channel.code, HTTPStatus.NOT_IMPLEMENTED, channel.json_body + ) + + # Pretend that MAS did UIA and allowed us to replace the master key. + channel = self.make_request( + "POST", + f"/_synapse/admin/v1/users/{urllib.parse.quote(alice)}/_allow_cross_signing_replacement_without_uia", + access_token=self.OIDC_ADMIN_TOKEN, + ) + self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) + timestamp_ms = channel.json_body["updatable_without_uia_before_ms"] + + # Advance to 1 second after the replacement period ends. + self.reactor.advance(timestamp_ms - self.clock.time_msec() + 1000) + + with patch_get_user_by_access_token: + # We should not be able to upload master key3 because the replacement has + # expired. + channel = self.make_request( + "POST", + "/_matrix/client/v3/keys/device_signing/upload", + access_token=alice_token, + content={ + "master_key": master_key3, + }, + ) + self.assertEqual( + channel.code, HTTPStatus.NOT_IMPLEMENTED, channel.json_body + ) diff --git a/tests/storage/databases/main/test_end_to_end_keys.py b/tests/storage/databases/main/test_end_to_end_keys.py new file mode 100644 index 0000000000..23e6f82c75 --- /dev/null +++ b/tests/storage/databases/main/test_end_to_end_keys.py @@ -0,0 +1,121 @@ +# Copyright 2023 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import List, Optional, Tuple + +from twisted.test.proto_helpers import MemoryReactor + +from synapse.server import HomeServer +from synapse.storage._base import db_to_json +from synapse.storage.database import LoggingTransaction +from synapse.types import JsonDict +from synapse.util import Clock + +from tests.unittest import HomeserverTestCase + + +class EndToEndKeyWorkerStoreTestCase(HomeserverTestCase): + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.store = hs.get_datastores().main + + def test_get_master_cross_signing_key_updatable_before(self) -> None: + # Should return False, None when there is no master key. + alice = "@alice:test" + exists, timestamp = self.get_success( + self.store.get_master_cross_signing_key_updatable_before(alice) + ) + self.assertIs(exists, False) + self.assertIsNone(timestamp) + + # Upload a master key. + dummy_key = {"keys": {"a": "b"}} + self.get_success( + self.store.set_e2e_cross_signing_key(alice, "master", dummy_key) + ) + + # Should now find that the key exists. + exists, timestamp = self.get_success( + self.store.get_master_cross_signing_key_updatable_before(alice) + ) + self.assertIs(exists, True) + self.assertIsNone(timestamp) + + # Write an updateable_before timestamp. + written_timestamp = self.get_success( + self.store.allow_master_cross_signing_key_replacement_without_uia( + alice, 1000 + ) + ) + + # Should now find that the key exists. + exists, timestamp = self.get_success( + self.store.get_master_cross_signing_key_updatable_before(alice) + ) + self.assertIs(exists, True) + self.assertEqual(timestamp, written_timestamp) + + def test_master_replacement_only_applies_to_latest_master_key( + self, + ) -> None: + """We shouldn't allow updates w/o UIA to old master keys or other key types.""" + alice = "@alice:test" + # Upload two master keys. + key1 = {"keys": {"a": "b"}} + key2 = {"keys": {"c": "d"}} + key3 = {"keys": {"e": "f"}} + self.get_success(self.store.set_e2e_cross_signing_key(alice, "master", key1)) + self.get_success(self.store.set_e2e_cross_signing_key(alice, "other", key2)) + self.get_success(self.store.set_e2e_cross_signing_key(alice, "master", key3)) + + # Third key should be the current one. + key = self.get_success( + self.store.get_e2e_cross_signing_key(alice, "master", alice) + ) + self.assertEqual(key, key3) + + timestamp = self.get_success( + self.store.allow_master_cross_signing_key_replacement_without_uia( + alice, 1000 + ) + ) + assert timestamp is not None + + def check_timestamp_column( + txn: LoggingTransaction, + ) -> List[Tuple[JsonDict, Optional[int]]]: + """Fetch all rows for Alice's keys.""" + txn.execute( + """ + SELECT keydata, updatable_without_uia_before_ms + FROM e2e_cross_signing_keys + WHERE user_id = ? + ORDER BY stream_id ASC; + """, + (alice,), + ) + return [(db_to_json(keydata), ts) for keydata, ts in txn.fetchall()] + + values = self.get_success( + self.store.db_pool.runInteraction( + "check_timestamp_column", + check_timestamp_column, + ) + ) + self.assertEqual( + values, + [ + (key1, None), + (key2, None), + (key3, timestamp), + ], + ) -- cgit 1.5.1 From 1b238e88371516bfedb62d010e156820ab164b94 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 16 Nov 2023 14:25:35 +0000 Subject: Speed up persisting large number of outliers (#16649) Recalculating the roots tuple every iteration could be very expensive, so instead let's do a topological sort. --- changelog.d/16649.misc | 1 + synapse/handlers/federation_event.py | 18 ++++----- synapse/util/iterutils.py | 51 ++++++++++++++++++++++++ tests/util/test_itertools.py | 76 +++++++++++++++++++++++++++++++++++- 4 files changed, 134 insertions(+), 12 deletions(-) create mode 100644 changelog.d/16649.misc (limited to 'synapse/handlers') diff --git a/changelog.d/16649.misc b/changelog.d/16649.misc new file mode 100644 index 0000000000..cebd6aaee5 --- /dev/null +++ b/changelog.d/16649.misc @@ -0,0 +1 @@ +Speed up persisting large number of outliers. diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index ba6b94a8b7..f4c17894aa 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -88,7 +88,7 @@ from synapse.types import ( ) from synapse.types.state import StateFilter from synapse.util.async_helpers import Linearizer, concurrently_execute -from synapse.util.iterutils import batch_iter, partition +from synapse.util.iterutils import batch_iter, partition, sorted_topologically_batched from synapse.util.retryutils import NotRetryingDestination from synapse.util.stringutils import shortstr @@ -1669,14 +1669,13 @@ class FederationEventHandler: # XXX: it might be possible to kick this process off in parallel with fetching # the events. - while event_map: - # build a list of events whose auth events are not in the queue. - roots = tuple( - ev - for ev in event_map.values() - if not any(aid in event_map for aid in ev.auth_event_ids()) - ) + # We need to persist an event's auth events before the event. + auth_graph = { + ev: [event_map[e_id] for e_id in ev.auth_event_ids() if e_id in event_map] + for ev in event_map.values() + } + for roots in sorted_topologically_batched(event_map.values(), auth_graph): if not roots: # if *none* of the remaining events are ready, that means # we have a loop. This either means a bug in our logic, or that @@ -1698,9 +1697,6 @@ class FederationEventHandler: await self._auth_and_persist_outliers_inner(room_id, roots) - for ev in roots: - del event_map[ev.event_id] - async def _auth_and_persist_outliers_inner( self, room_id: str, fetched_events: Collection[EventBase] ) -> None: diff --git a/synapse/util/iterutils.py b/synapse/util/iterutils.py index a0efb96d3b..f4c0194af0 100644 --- a/synapse/util/iterutils.py +++ b/synapse/util/iterutils.py @@ -135,3 +135,54 @@ def sorted_topologically( degree_map[edge] -= 1 if degree_map[edge] == 0: heapq.heappush(zero_degree, edge) + + +def sorted_topologically_batched( + nodes: Iterable[T], + graph: Mapping[T, Collection[T]], +) -> Generator[Collection[T], None, None]: + r"""Walk the graph topologically, returning batches of nodes where all nodes + that references it have been previously returned. + + For example, given the following graph: + + A + / \ + B C + \ / + D + + This function will return: `[[A], [B, C], [D]]`. + + This function is useful for e.g. batch persisting events in an auth chain, + where we can only persist an event if all its auth events have already been + persisted. + """ + + degree_map = {node: 0 for node in nodes} + reverse_graph: Dict[T, Set[T]] = {} + + for node, edges in graph.items(): + if node not in degree_map: + continue + + for edge in set(edges): + if edge in degree_map: + degree_map[node] += 1 + + reverse_graph.setdefault(edge, set()).add(node) + reverse_graph.setdefault(node, set()) + + zero_degree = [node for node, degree in degree_map.items() if degree == 0] + + while zero_degree: + new_zero_degree = [] + for node in zero_degree: + for edge in reverse_graph.get(node, []): + if edge in degree_map: + degree_map[edge] -= 1 + if degree_map[edge] == 0: + new_zero_degree.append(edge) + + yield zero_degree + zero_degree = new_zero_degree diff --git a/tests/util/test_itertools.py b/tests/util/test_itertools.py index 406c16cdcf..fabb05c7e4 100644 --- a/tests/util/test_itertools.py +++ b/tests/util/test_itertools.py @@ -13,7 +13,11 @@ # limitations under the License. from typing import Dict, Iterable, List, Sequence -from synapse.util.iterutils import chunk_seq, sorted_topologically +from synapse.util.iterutils import ( + chunk_seq, + sorted_topologically, + sorted_topologically_batched, +) from tests.unittest import TestCase @@ -107,3 +111,73 @@ class SortTopologically(TestCase): graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [2], 4: [3, 2, 1]} self.assertEqual(list(sorted_topologically([4, 3, 2, 1], graph)), [1, 2, 3, 4]) + + +class SortTopologicallyBatched(TestCase): + "Test cases for `sorted_topologically_batched`" + + def test_empty(self) -> None: + "Test that an empty graph works correctly" + + graph: Dict[int, List[int]] = {} + self.assertEqual(list(sorted_topologically_batched([], graph)), []) + + def test_handle_empty_graph(self) -> None: + "Test that a graph where a node doesn't have an entry is treated as empty" + + graph: Dict[int, List[int]] = {} + + # For disconnected nodes the output is simply sorted. + self.assertEqual(list(sorted_topologically_batched([1, 2], graph)), [[1, 2]]) + + def test_disconnected(self) -> None: + "Test that a graph with no edges work" + + graph: Dict[int, List[int]] = {1: [], 2: []} + + # For disconnected nodes the output is simply sorted. + self.assertEqual(list(sorted_topologically_batched([1, 2], graph)), [[1, 2]]) + + def test_linear(self) -> None: + "Test that a simple `4 -> 3 -> 2 -> 1` graph works" + + graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [2], 4: [3]} + + self.assertEqual( + list(sorted_topologically_batched([4, 3, 2, 1], graph)), + [[1], [2], [3], [4]], + ) + + def test_subset(self) -> None: + "Test that only sorting a subset of the graph works" + graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [2], 4: [3]} + + self.assertEqual(list(sorted_topologically_batched([4, 3], graph)), [[3], [4]]) + + def test_fork(self) -> None: + "Test that a forked graph works" + graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [1], 4: [2, 3]} + + # Valid orderings are `[1, 3, 2, 4]` or `[1, 2, 3, 4]`, but we should + # always get the same one. + self.assertEqual( + list(sorted_topologically_batched([4, 3, 2, 1], graph)), [[1], [2, 3], [4]] + ) + + def test_duplicates(self) -> None: + "Test that a graph with duplicate edges work" + graph: Dict[int, List[int]] = {1: [], 2: [1, 1], 3: [2, 2], 4: [3]} + + self.assertEqual( + list(sorted_topologically_batched([4, 3, 2, 1], graph)), + [[1], [2], [3], [4]], + ) + + def test_multiple_paths(self) -> None: + "Test that a graph with multiple paths between two nodes work" + graph: Dict[int, List[int]] = {1: [], 2: [1], 3: [2], 4: [3, 2, 1]} + + self.assertEqual( + list(sorted_topologically_batched([4, 3, 2, 1], graph)), + [[1], [2], [3], [4]], + ) -- cgit 1.5.1 From 3e8531d3baf205733693f9ae8b43aa0b4c82b744 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 16 Nov 2023 15:19:35 +0000 Subject: Speed up deleting device messages (#16643) Keeping track of a lower bound of stream ID where we've deleted everything below makes the queries much faster. Otherwise, every time we scan for rows to delete we'd re-scan across all the rows that have previously deleted (until the next table VACUUM). --- changelog.d/16643.misc | 1 + synapse/handlers/device.py | 8 +- synapse/storage/databases/main/deviceinbox.py | 106 ++++++++++++++++++++------ synapse/util/task_scheduler.py | 2 +- 4 files changed, 88 insertions(+), 29 deletions(-) create mode 100644 changelog.d/16643.misc (limited to 'synapse/handlers') diff --git a/changelog.d/16643.misc b/changelog.d/16643.misc new file mode 100644 index 0000000000..cc0cf0901f --- /dev/null +++ b/changelog.d/16643.misc @@ -0,0 +1 @@ +Speed up deleting of device messages when deleting a device. diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 93472d0117..1af6d77545 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -396,15 +396,17 @@ class DeviceWorkerHandler: up_to_stream_id = task.params["up_to_stream_id"] # Delete the messages in batches to avoid too much DB load. + from_stream_id = None while True: - res = await self.store.delete_messages_for_device( + from_stream_id, _ = await self.store.delete_messages_for_device_between( user_id=user_id, device_id=device_id, - up_to_stream_id=up_to_stream_id, + from_stream_id=from_stream_id, + to_stream_id=up_to_stream_id, limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT, ) - if res < DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT: + if from_stream_id is None: return TaskStatus.COMPLETE, None, None await self.clock.sleep(DeviceHandler.DEVICE_MSGS_DELETE_SLEEP_MS / 1000.0) diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 3e7425d4a6..02dddd1da4 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -450,14 +450,12 @@ class DeviceInboxWorkerStore(SQLBaseStore): user_id: str, device_id: Optional[str], up_to_stream_id: int, - limit: Optional[int] = None, ) -> int: """ Args: user_id: The recipient user_id. device_id: The recipient device_id. up_to_stream_id: Where to delete messages up to. - limit: maximum number of messages to delete Returns: The number of messages deleted. @@ -478,32 +476,22 @@ class DeviceInboxWorkerStore(SQLBaseStore): log_kv({"message": "No changes in cache since last check"}) return 0 - def delete_messages_for_device_txn(txn: LoggingTransaction) -> int: - limit_statement = "" if limit is None else f"LIMIT {limit}" - sql = f""" - DELETE FROM device_inbox WHERE user_id = ? AND device_id = ? AND stream_id <= ( - SELECT MAX(stream_id) FROM ( - SELECT stream_id FROM device_inbox - WHERE user_id = ? AND device_id = ? AND stream_id <= ? - ORDER BY stream_id - {limit_statement} - ) AS q1 - ) - """ - txn.execute(sql, (user_id, device_id, user_id, device_id, up_to_stream_id)) - return txn.rowcount - - count = await self.db_pool.runInteraction( - "delete_messages_for_device", delete_messages_for_device_txn - ) + from_stream_id = None + count = 0 + while True: + from_stream_id, loop_count = await self.delete_messages_for_device_between( + user_id, + device_id, + from_stream_id=from_stream_id, + to_stream_id=up_to_stream_id, + limit=1000, + ) + count += loop_count + if from_stream_id is None: + break log_kv({"message": f"deleted {count} messages for device", "count": count}) - # In this case we don't know if we hit the limit or the delete is complete - # so let's not update the cache. - if count == limit: - return count - # Update the cache, ensuring that we only ever increase the value updated_last_deleted_stream_id = self._last_device_delete_cache.get( (user_id, device_id), 0 @@ -514,6 +502,74 @@ class DeviceInboxWorkerStore(SQLBaseStore): return count + @trace + async def delete_messages_for_device_between( + self, + user_id: str, + device_id: Optional[str], + from_stream_id: Optional[int], + to_stream_id: int, + limit: int, + ) -> Tuple[Optional[int], int]: + """Delete N device messages between the stream IDs, returning the + highest stream ID deleted (or None if all messages in the range have + been deleted) and the number of messages deleted. + + This is more efficient than `delete_messages_for_device` when calling in + a loop to batch delete messages. + """ + + # Keeping track of a lower bound of stream ID where we've deleted + # everything below makes the queries much faster. Otherwise, every time + # we scan for rows to delete we'd re-scan across all the rows that have + # previously deleted (until the next table VACUUM). + + if from_stream_id is None: + # Minimum device stream ID is 1. + from_stream_id = 0 + + def delete_messages_for_device_between_txn( + txn: LoggingTransaction, + ) -> Tuple[Optional[int], int]: + txn.execute( + """ + SELECT MAX(stream_id) FROM ( + SELECT stream_id FROM device_inbox + WHERE user_id = ? AND device_id = ? + AND ? < stream_id AND stream_id <= ? + ORDER BY stream_id + LIMIT ? + ) AS d + """, + (user_id, device_id, from_stream_id, to_stream_id, limit), + ) + row = txn.fetchone() + if row is None or row[0] is None: + return None, 0 + + (max_stream_id,) = row + + txn.execute( + """ + DELETE FROM device_inbox + WHERE user_id = ? AND device_id = ? + AND ? < stream_id AND stream_id <= ? + """, + (user_id, device_id, from_stream_id, max_stream_id), + ) + + num_deleted = txn.rowcount + if num_deleted < limit: + return None, num_deleted + + return max_stream_id, num_deleted + + return await self.db_pool.runInteraction( + "delete_messages_for_device_between", + delete_messages_for_device_between_txn, + db_autocommit=True, # We don't need to run in a transaction + ) + @trace async def get_new_device_msgs_for_remote( self, destination: str, last_stream_id: int, current_stream_id: int, limit: int diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index caf13b3474..29c561e555 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -193,7 +193,7 @@ class TaskScheduler: result: Optional[JsonMapping] = None, error: Optional[str] = None, ) -> bool: - """Update some task associated values. This is exposed publically so it can + """Update some task associated values. This is exposed publicly so it can be used inside task functions, mainly to update the result and be able to resume a task at a specific step after a restart of synapse. -- cgit 1.5.1