diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/api/auth.py | 19 | ||||
-rw-r--r-- | synapse/api/constants.py | 2 | ||||
-rw-r--r-- | synapse/api/errors.py | 1 | ||||
-rw-r--r-- | synapse/config/logger.py | 5 | ||||
-rw-r--r-- | synapse/federation/federation_client.py | 38 | ||||
-rw-r--r-- | synapse/federation/transport/client.py | 11 | ||||
-rw-r--r-- | synapse/handlers/device.py | 17 | ||||
-rw-r--r-- | synapse/handlers/e2e_keys.py | 198 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 109 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 7 | ||||
-rw-r--r-- | synapse/replication/slave/storage/devices.py | 3 | ||||
-rw-r--r-- | synapse/rest/client/v2_alpha/keys.py | 46 | ||||
-rw-r--r-- | synapse/storage/__init__.py | 3 | ||||
-rw-r--r-- | synapse/storage/data_stores/main/__init__.py | 6 | ||||
-rw-r--r-- | synapse/storage/data_stores/main/devices.py | 95 | ||||
-rw-r--r-- | synapse/storage/data_stores/main/end_to_end_keys.py | 172 | ||||
-rw-r--r-- | synapse/storage/schema/delta/56/hidden_devices.sql | 18 | ||||
-rw-r--r-- | synapse/storage/schema/delta/56/signing_keys.sql | 55 | ||||
-rw-r--r-- | synapse/types.py | 24 |
19 files changed, 631 insertions, 198 deletions
diff --git a/synapse/api/auth.py b/synapse/api/auth.py index cb50579fd2..cd347fbe1b 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -84,27 +84,10 @@ class Auth(object): ) auth_events = yield self.store.get_events(auth_events_ids) auth_events = {(e.type, e.state_key): e for e in itervalues(auth_events)} - self.check( + event_auth.check( room_version, event, auth_events=auth_events, do_sig_check=do_sig_check ) - def check(self, room_version, event, auth_events, do_sig_check=True): - """ Checks if this event is correctly authed. - - Args: - room_version (str): version of the room - event: the event being checked. - auth_events (dict: event-key -> event): the existing room state. - - - Returns: - True if the auth checks pass. - """ - with Measure(self.clock, "auth.check"): - event_auth.check( - room_version, event, auth_events, do_sig_check=do_sig_check - ) - @defer.inlineCallbacks def check_joined_room(self, room_id, user_id, current_state=None): """Check if the user is currently joined in the room diff --git a/synapse/api/constants.py b/synapse/api/constants.py index f29bce560c..60e99e4663 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -97,8 +97,6 @@ class EventTypes(object): class RejectedReason(object): AUTH_ERROR = "auth_error" - REPLACED = "replaced" - NOT_ANCESTOR = "not_ancestor" class RoomCreationPreset(object): diff --git a/synapse/api/errors.py b/synapse/api/errors.py index 1bb2e86789..cca92c34ba 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -62,6 +62,7 @@ class Codes(object): INCOMPATIBLE_ROOM_VERSION = "M_INCOMPATIBLE_ROOM_VERSION" WRONG_ROOM_KEYS_VERSION = "M_WRONG_ROOM_KEYS_VERSION" EXPIRED_ACCOUNT = "ORG_MATRIX_EXPIRED_ACCOUNT" + INVALID_SIGNATURE = "M_INVALID_SIGNATURE" USER_DEACTIVATED = "M_USER_DEACTIVATED" diff --git a/synapse/config/logger.py b/synapse/config/logger.py index d609ec111b..be92e33f93 100644 --- a/synapse/config/logger.py +++ b/synapse/config/logger.py @@ -68,9 +68,6 @@ handlers: filters: [context] loggers: - synapse: - level: INFO - synapse.storage.SQL: # beware: increasing this to DEBUG will make synapse log sensitive # information such as access tokens. @@ -79,6 +76,8 @@ loggers: root: level: INFO handlers: [file, console] + +disable_existing_loggers: false """ ) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 6ee6216660..5b22a39b7f 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -879,44 +879,6 @@ class FederationClient(FederationBase): ) @defer.inlineCallbacks - def query_auth(self, destination, room_id, event_id, local_auth): - """ - Params: - destination (str) - event_it (str) - local_auth (list) - """ - time_now = self._clock.time_msec() - - send_content = {"auth_chain": [e.get_pdu_json(time_now) for e in local_auth]} - - code, content = yield self.transport_layer.send_query_auth( - destination=destination, - room_id=room_id, - event_id=event_id, - content=send_content, - ) - - room_version = yield self.store.get_room_version(room_id) - format_ver = room_version_to_event_format(room_version) - - auth_chain = [event_from_pdu_json(e, format_ver) for e in content["auth_chain"]] - - signed_auth = yield self._check_sigs_and_hash_and_fetch( - destination, auth_chain, outlier=True, room_version=room_version - ) - - signed_auth.sort(key=lambda e: e.depth) - - ret = { - "auth_chain": signed_auth, - "rejects": content.get("rejects", []), - "missing": content.get("missing", []), - } - - return ret - - @defer.inlineCallbacks def get_missing_events( self, destination, diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 482a101c09..7b18408144 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -383,17 +383,6 @@ class TransportLayerClient(object): @defer.inlineCallbacks @log_function - def send_query_auth(self, destination, room_id, event_id, content): - path = _create_v1_path("/query_auth/%s/%s", room_id, event_id) - - content = yield self.client.post_json( - destination=destination, path=path, data=content - ) - - return content - - @defer.inlineCallbacks - @log_function def query_client_keys(self, destination, query_content, timeout): """Query the device keys for a list of user ids hosted on a remote server. diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 71a8f33da3..5f23ee4488 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -1,5 +1,7 @@ # -*- coding: utf-8 -*- # Copyright 2016 OpenMarket Ltd +# Copyright 2019 New Vector Ltd +# Copyright 2019 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -439,6 +441,21 @@ class DeviceHandler(DeviceWorkerHandler): log_kv({"message": "sent device update to host", "host": host}) @defer.inlineCallbacks + def notify_user_signature_update(self, from_user_id, user_ids): + """Notify a user that they have made new signatures of other users. + + Args: + from_user_id (str): the user who made the signature + user_ids (list[str]): the users IDs that have new signatures + """ + + position = yield self.store.add_user_signature_change_to_streams( + from_user_id, user_ids + ) + + self.notifier.on_new_event("device_list_key", position, users=[from_user_id]) + + @defer.inlineCallbacks def on_federation_query_user_devices(self, user_id): stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id) return {"user_id": user_id, "stream_id": stream_id, "devices": devices} diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 0a84d0e2b0..be675197f1 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- # Copyright 2016 OpenMarket Ltd -# Copyright 2018 New Vector Ltd +# Copyright 2018-2019 New Vector Ltd +# Copyright 2019 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -19,13 +20,18 @@ import logging from six import iteritems from canonicaljson import encode_canonical_json, json +from signedjson.sign import SignatureVerifyException, verify_signed_json from twisted.internet import defer -from synapse.api.errors import CodeMessageException, SynapseError +from synapse.api.errors import CodeMessageException, Codes, SynapseError from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.logging.opentracing import log_kv, set_tag, tag_args, trace -from synapse.types import UserID, get_domain_from_id +from synapse.types import ( + UserID, + get_domain_from_id, + get_verify_key_from_cross_signing_key, +) from synapse.util import unwrapFirstError from synapse.util.retryutils import NotRetryingDestination @@ -49,7 +55,7 @@ class E2eKeysHandler(object): @trace @defer.inlineCallbacks - def query_devices(self, query_body, timeout): + def query_devices(self, query_body, timeout, from_user_id): """ Handle a device key query from a client { @@ -67,6 +73,11 @@ class E2eKeysHandler(object): } } } + + Args: + from_user_id (str): the user making the query. This is used when + adding cross-signing signatures to limit what signatures users + can see. """ device_keys_query = query_body.get("device_keys", {}) @@ -125,6 +136,11 @@ class E2eKeysHandler(object): r = remote_queries_not_in_cache.setdefault(domain, {}) r[user_id] = remote_queries[user_id] + # Get cached cross-signing keys + cross_signing_keys = yield self.get_cross_signing_keys_from_cache( + device_keys_query, from_user_id + ) + # Now fetch any devices that we don't have in our cache @trace @defer.inlineCallbacks @@ -188,6 +204,14 @@ class E2eKeysHandler(object): if user_id in destination_query: results[user_id] = keys + for user_id, key in remote_result["master_keys"].items(): + if user_id in destination_query: + cross_signing_keys["master_keys"][user_id] = key + + for user_id, key in remote_result["self_signing_keys"].items(): + if user_id in destination_query: + cross_signing_keys["self_signing_keys"][user_id] = key + except Exception as e: failure = _exception_to_failure(e) failures[destination] = failure @@ -204,7 +228,61 @@ class E2eKeysHandler(object): ).addErrback(unwrapFirstError) ) - return {"device_keys": results, "failures": failures} + ret = {"device_keys": results, "failures": failures} + + ret.update(cross_signing_keys) + + return ret + + @defer.inlineCallbacks + def get_cross_signing_keys_from_cache(self, query, from_user_id): + """Get cross-signing keys for users from the database + + Args: + query (Iterable[string]) an iterable of user IDs. A dict whose keys + are user IDs satisfies this, so the query format used for + query_devices can be used here. + from_user_id (str): the user making the query. This is used when + adding cross-signing signatures to limit what signatures users + can see. + + Returns: + defer.Deferred[dict[str, dict[str, dict]]]: map from + (master|self_signing|user_signing) -> user_id -> key + """ + master_keys = {} + self_signing_keys = {} + user_signing_keys = {} + + for user_id in query: + # XXX: consider changing the store functions to allow querying + # multiple users simultaneously. + key = yield self.store.get_e2e_cross_signing_key( + user_id, "master", from_user_id + ) + if key: + master_keys[user_id] = key + + key = yield self.store.get_e2e_cross_signing_key( + user_id, "self_signing", from_user_id + ) + if key: + self_signing_keys[user_id] = key + + # users can see other users' master and self-signing keys, but can + # only see their own user-signing keys + if from_user_id == user_id: + key = yield self.store.get_e2e_cross_signing_key( + user_id, "user_signing", from_user_id + ) + if key: + user_signing_keys[user_id] = key + + return { + "master_keys": master_keys, + "self_signing_keys": self_signing_keys, + "user_signing_keys": user_signing_keys, + } @trace @defer.inlineCallbacks @@ -441,6 +519,116 @@ class E2eKeysHandler(object): log_kv({"message": "Inserting new one_time_keys.", "keys": new_keys}) yield self.store.add_e2e_one_time_keys(user_id, device_id, time_now, new_keys) + @defer.inlineCallbacks + def upload_signing_keys_for_user(self, user_id, keys): + """Upload signing keys for cross-signing + + Args: + user_id (string): the user uploading the keys + keys (dict[string, dict]): the signing keys + """ + + # if a master key is uploaded, then check it. Otherwise, load the + # stored master key, to check signatures on other keys + if "master_key" in keys: + master_key = keys["master_key"] + + _check_cross_signing_key(master_key, user_id, "master") + else: + master_key = yield self.store.get_e2e_cross_signing_key(user_id, "master") + + # if there is no master key, then we can't do anything, because all the + # other cross-signing keys need to be signed by the master key + if not master_key: + raise SynapseError(400, "No master key available", Codes.MISSING_PARAM) + + try: + master_key_id, master_verify_key = get_verify_key_from_cross_signing_key( + master_key + ) + except ValueError: + if "master_key" in keys: + # the invalid key came from the request + raise SynapseError(400, "Invalid master key", Codes.INVALID_PARAM) + else: + # the invalid key came from the database + logger.error("Invalid master key found for user %s", user_id) + raise SynapseError(500, "Invalid master key") + + # for the other cross-signing keys, make sure that they have valid + # signatures from the master key + if "self_signing_key" in keys: + self_signing_key = keys["self_signing_key"] + + _check_cross_signing_key( + self_signing_key, user_id, "self_signing", master_verify_key + ) + + if "user_signing_key" in keys: + user_signing_key = keys["user_signing_key"] + + _check_cross_signing_key( + user_signing_key, user_id, "user_signing", master_verify_key + ) + + # if everything checks out, then store the keys and send notifications + deviceids = [] + if "master_key" in keys: + yield self.store.set_e2e_cross_signing_key(user_id, "master", master_key) + deviceids.append(master_verify_key.version) + if "self_signing_key" in keys: + yield self.store.set_e2e_cross_signing_key( + user_id, "self_signing", self_signing_key + ) + try: + deviceids.append( + get_verify_key_from_cross_signing_key(self_signing_key)[1].version + ) + except ValueError: + raise SynapseError(400, "Invalid self-signing key", Codes.INVALID_PARAM) + if "user_signing_key" in keys: + yield self.store.set_e2e_cross_signing_key( + user_id, "user_signing", user_signing_key + ) + # the signature stream matches the semantics that we want for + # user-signing key updates: only the user themselves is notified of + # their own user-signing key updates + yield self.device_handler.notify_user_signature_update(user_id, [user_id]) + + # master key and self-signing key updates match the semantics of device + # list updates: all users who share an encrypted room are notified + if len(deviceids): + yield self.device_handler.notify_device_update(user_id, deviceids) + + return {} + + +def _check_cross_signing_key(key, user_id, key_type, signing_key=None): + """Check a cross-signing key uploaded by a user. Performs some basic sanity + checking, and ensures that it is signed, if a signature is required. + + Args: + key (dict): the key data to verify + user_id (str): the user whose key is being checked + key_type (str): the type of key that the key should be + signing_key (VerifyKey): (optional) the signing key that the key should + be signed with. If omitted, signatures will not be checked. + """ + if ( + key.get("user_id") != user_id + or key_type not in key.get("usage", []) + or len(key.get("keys", {})) != 1 + ): + raise SynapseError(400, ("Invalid %s key" % (key_type,)), Codes.INVALID_PARAM) + + if signing_key: + try: + verify_signed_json(key, user_id, signing_key) + except SignatureVerifyException: + raise SynapseError( + 400, ("Invalid signature on %s key" % key_type), Codes.INVALID_SIGNATURE + ) + def _exception_to_failure(e): if isinstance(e, CodeMessageException): diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 50fc0fde2a..4b4c6c15f9 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -30,6 +30,7 @@ from unpaddedbase64 import decode_base64 from twisted.internet import defer +from synapse import event_auth from synapse.api.constants import EventTypes, Membership, RejectedReason from synapse.api.errors import ( AuthError, @@ -1763,7 +1764,7 @@ class FederationHandler(BaseHandler): auth_for_e[(EventTypes.Create, "")] = create_event try: - self.auth.check(room_version, e, auth_events=auth_for_e) + event_auth.check(room_version, e, auth_events=auth_for_e) except SynapseError as err: # we may get SynapseErrors here as well as AuthErrors. For # instance, there are a couple of (ancient) events in some @@ -1919,7 +1920,7 @@ class FederationHandler(BaseHandler): } try: - self.auth.check(room_version, event, auth_events=current_auth_events) + event_auth.check(room_version, event, auth_events=current_auth_events) except AuthError as e: logger.warn("Soft-failing %r because %s", event, e) event.internal_metadata.soft_failed = True @@ -2018,7 +2019,7 @@ class FederationHandler(BaseHandler): ) try: - self.auth.check(room_version, event, auth_events=auth_events) + event_auth.check(room_version, event, auth_events=auth_events) except AuthError as e: logger.warn("Failed auth resolution for %r because %s", event, e) raise e @@ -2181,103 +2182,10 @@ class FederationHandler(BaseHandler): auth_events.update(new_state) - different_auth = event_auth_events.difference( - e.event_id for e in auth_events.values() - ) - yield self._update_context_for_auth_events( event, context, auth_events, event_key ) - if not different_auth: - # we're done - return - - logger.info( - "auth_events still refers to events which are not in the calculated auth " - "chain after state resolution: %s", - different_auth, - ) - - # Only do auth resolution if we have something new to say. - # We can't prove an auth failure. - do_resolution = False - - for e_id in different_auth: - if e_id in have_events: - if have_events[e_id] == RejectedReason.NOT_ANCESTOR: - do_resolution = True - break - - if not do_resolution: - logger.info( - "Skipping auth resolution due to lack of provable rejection reasons" - ) - return - - logger.info("Doing auth resolution") - - prev_state_ids = yield context.get_prev_state_ids(self.store) - - # 1. Get what we think is the auth chain. - auth_ids = yield self.auth.compute_auth_events(event, prev_state_ids) - local_auth_chain = yield self.store.get_auth_chain(auth_ids, include_given=True) - - try: - # 2. Get remote difference. - try: - result = yield self.federation_client.query_auth( - origin, event.room_id, event.event_id, local_auth_chain - ) - except RequestSendFailed as e: - # The other side isn't around or doesn't implement the - # endpoint, so lets just bail out. - logger.info("Failed to query auth from remote: %s", e) - return - - seen_remotes = yield self.store.have_seen_events( - [e.event_id for e in result["auth_chain"]] - ) - - # 3. Process any remote auth chain events we haven't seen. - for ev in result["auth_chain"]: - if ev.event_id in seen_remotes: - continue - - if ev.event_id == event.event_id: - continue - - try: - auth_ids = ev.auth_event_ids() - auth = { - (e.type, e.state_key): e - for e in result["auth_chain"] - if e.event_id in auth_ids or event.type == EventTypes.Create - } - ev.internal_metadata.outlier = True - - logger.debug( - "do_auth %s different_auth: %s", event.event_id, e.event_id - ) - - yield self._handle_new_event(origin, ev, auth_events=auth) - - if ev.event_id in event_auth_events: - auth_events[(ev.type, ev.state_key)] = ev - except AuthError: - pass - - except Exception: - # FIXME: - logger.exception("Failed to query auth chain") - - # 4. Look at rejects and their proofs. - # TODO. - - yield self._update_context_for_auth_events( - event, context, auth_events, event_key - ) - @defer.inlineCallbacks def _update_context_for_auth_events(self, event, context, auth_events, event_key): """Update the state_ids in an event context after auth event resolution, @@ -2444,15 +2352,6 @@ class FederationHandler(BaseHandler): reason_map[e.event_id] = reason - if reason == RejectedReason.AUTH_ERROR: - pass - elif reason == RejectedReason.REPLACED: - # TODO: Get proof - pass - elif reason == RejectedReason.NOT_ANCESTOR: - # TODO: Get proof. - pass - logger.debug("construct_auth_difference returning") return { diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 19bca6717f..d99160e9d7 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd -# Copyright 2018 New Vector Ltd +# Copyright 2018, 2019 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -1124,6 +1124,11 @@ class SyncHandler(object): # weren't in the previous sync *or* they left and rejoined. users_that_have_changed.update(newly_joined_or_invited_users) + user_signatures_changed = yield self.store.get_users_whose_signatures_changed( + user_id, since_token.device_list_key + ) + users_that_have_changed.update(user_signatures_changed) + # Now find users that we no longer track for room_id in newly_left_rooms: left_users = yield self.state.get_current_users_in_room(room_id) diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py index f856c72d84..61557665a7 100644 --- a/synapse/replication/slave/storage/devices.py +++ b/synapse/replication/slave/storage/devices.py @@ -33,6 +33,9 @@ class SlavedDeviceStore(EndToEndKeyWorkerStore, DeviceWorkerStore, BaseSlavedSto self._device_list_stream_cache = StreamChangeCache( "DeviceListStreamChangeCache", device_list_max ) + self._user_signature_stream_cache = StreamChangeCache( + "UserSignatureStreamChangeCache", device_list_max + ) self._device_list_federation_stream_cache = StreamChangeCache( "DeviceListFederationStreamChangeCache", device_list_max ) diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py index 2e680134a0..151a70d449 100644 --- a/synapse/rest/client/v2_alpha/keys.py +++ b/synapse/rest/client/v2_alpha/keys.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd +# Copyright 2019 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -27,7 +28,7 @@ from synapse.http.servlet import ( from synapse.logging.opentracing import log_kv, set_tag, trace from synapse.types import StreamToken -from ._base import client_patterns +from ._base import client_patterns, interactive_auth_handler logger = logging.getLogger(__name__) @@ -155,10 +156,11 @@ class KeyQueryServlet(RestServlet): @defer.inlineCallbacks def on_POST(self, request): - yield self.auth.get_user_by_req(request, allow_guest=True) + requester = yield self.auth.get_user_by_req(request, allow_guest=True) + user_id = requester.user.to_string() timeout = parse_integer(request, "timeout", 10 * 1000) body = parse_json_object_from_request(request) - result = yield self.e2e_keys_handler.query_devices(body, timeout) + result = yield self.e2e_keys_handler.query_devices(body, timeout, user_id) return 200, result @@ -238,8 +240,46 @@ class OneTimeKeyServlet(RestServlet): return 200, result +class SigningKeyUploadServlet(RestServlet): + """ + POST /keys/device_signing/upload HTTP/1.1 + Content-Type: application/json + + { + } + """ + + PATTERNS = client_patterns("/keys/device_signing/upload$", releases=()) + + def __init__(self, hs): + """ + Args: + hs (synapse.server.HomeServer): server + """ + super(SigningKeyUploadServlet, self).__init__() + self.hs = hs + self.auth = hs.get_auth() + self.e2e_keys_handler = hs.get_e2e_keys_handler() + self.auth_handler = hs.get_auth_handler() + + @interactive_auth_handler + @defer.inlineCallbacks + def on_POST(self, request): + requester = yield self.auth.get_user_by_req(request) + user_id = requester.user.to_string() + body = parse_json_object_from_request(request) + + yield self.auth_handler.validate_user_via_ui_auth( + requester, body, self.hs.get_ip_from_request(request) + ) + + result = yield self.e2e_keys_handler.upload_signing_keys_for_user(user_id, body) + return (200, result) + + def register_servlets(hs, http_server): KeyUploadServlet(hs).register(http_server) KeyQueryServlet(hs).register(http_server) KeyChangesServlet(hs).register(http_server) OneTimeKeyServlet(hs).register(http_server) + SigningKeyUploadServlet(hs).register(http_server) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index e42fba45a1..2f29c4a112 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd -# Copyright 2018 New Vector Ltd +# Copyright 2018,2019 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. + from synapse.storage.data_stores.main import DataStore # noqa: F401 diff --git a/synapse/storage/data_stores/main/__init__.py b/synapse/storage/data_stores/main/__init__.py index d29135588f..b185ba0b3e 100644 --- a/synapse/storage/data_stores/main/__init__.py +++ b/synapse/storage/data_stores/main/__init__.py @@ -141,6 +141,9 @@ class DataStore( self._device_list_id_gen = StreamIdGenerator( db_conn, "device_lists_stream", "stream_id" ) + self._cross_signing_id_gen = StreamIdGenerator( + db_conn, "e2e_cross_signing_keys", "stream_id" + ) self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id") self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id") @@ -212,6 +215,9 @@ class DataStore( self._device_list_stream_cache = StreamChangeCache( "DeviceListStreamChangeCache", device_list_max ) + self._user_signature_stream_cache = StreamChangeCache( + "UserSignatureStreamChangeCache", device_list_max + ) self._device_list_federation_stream_cache = StreamChangeCache( "DeviceListFederationStreamChangeCache", device_list_max ) diff --git a/synapse/storage/data_stores/main/devices.py b/synapse/storage/data_stores/main/devices.py index ac5239e50a..f7a3542348 100644 --- a/synapse/storage/data_stores/main/devices.py +++ b/synapse/storage/data_stores/main/devices.py @@ -1,5 +1,7 @@ # -*- coding: utf-8 -*- # Copyright 2016 OpenMarket Ltd +# Copyright 2019 New Vector Ltd +# Copyright 2019 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -20,7 +22,7 @@ from canonicaljson import json from twisted.internet import defer -from synapse.api.errors import StoreError +from synapse.api.errors import Codes, StoreError from synapse.logging.opentracing import ( get_active_span_text_map, set_tag, @@ -47,7 +49,8 @@ DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES = ( class DeviceWorkerStore(SQLBaseStore): def get_device(self, user_id, device_id): - """Retrieve a device. + """Retrieve a device. Only returns devices that are not marked as + hidden. Args: user_id (str): The ID of the user which owns the device @@ -59,14 +62,15 @@ class DeviceWorkerStore(SQLBaseStore): """ return self._simple_select_one( table="devices", - keyvalues={"user_id": user_id, "device_id": device_id}, + keyvalues={"user_id": user_id, "device_id": device_id, "hidden": False}, retcols=("user_id", "device_id", "display_name"), desc="get_device", ) @defer.inlineCallbacks def get_devices_by_user(self, user_id): - """Retrieve all of a user's registered devices. + """Retrieve all of a user's registered devices. Only returns devices + that are not marked as hidden. Args: user_id (str): @@ -77,7 +81,7 @@ class DeviceWorkerStore(SQLBaseStore): """ devices = yield self._simple_select_list( table="devices", - keyvalues={"user_id": user_id}, + keyvalues={"user_id": user_id, "hidden": False}, retcols=("user_id", "device_id", "display_name"), desc="get_devices_by_user", ) @@ -324,6 +328,41 @@ class DeviceWorkerStore(SQLBaseStore): """ txn.execute(sql, (destination, stream_id)) + @defer.inlineCallbacks + def add_user_signature_change_to_streams(self, from_user_id, user_ids): + """Persist that a user has made new signatures + + Args: + from_user_id (str): the user who made the signatures + user_ids (list[str]): the users who were signed + """ + + with self._device_list_id_gen.get_next() as stream_id: + yield self.runInteraction( + "add_user_sig_change_to_streams", + self._add_user_signature_change_txn, + from_user_id, + user_ids, + stream_id, + ) + return stream_id + + def _add_user_signature_change_txn(self, txn, from_user_id, user_ids, stream_id): + txn.call_after( + self._user_signature_stream_cache.entity_has_changed, + from_user_id, + stream_id, + ) + self._simple_insert_txn( + txn, + "user_signature_stream", + values={ + "stream_id": stream_id, + "from_user_id": from_user_id, + "user_ids": json.dumps(user_ids), + }, + ) + def get_device_stream_token(self): return self._device_list_id_gen.get_current_token() @@ -469,6 +508,28 @@ class DeviceWorkerStore(SQLBaseStore): "get_users_whose_devices_changed", _get_users_whose_devices_changed_txn ) + @defer.inlineCallbacks + def get_users_whose_signatures_changed(self, user_id, from_key): + """Get the users who have new cross-signing signatures made by `user_id` since + `from_key`. + + Args: + user_id (str): the user who made the signatures + from_key (str): The device lists stream token + """ + from_key = int(from_key) + if self._user_signature_stream_cache.has_entity_changed(user_id, from_key): + sql = """ + SELECT DISTINCT user_ids FROM user_signature_stream + WHERE from_user_id = ? AND stream_id > ? + """ + rows = yield self._execute( + "get_users_whose_signatures_changed", None, sql, user_id, from_key + ) + return set(user for row in rows for user in json.loads(row[0])) + else: + return set() + def get_all_device_list_changes_for_remotes(self, from_key, to_key): """Return a list of `(stream_id, user_id, destination)` which is the combined list of changes to devices, and which destinations need to be @@ -592,6 +653,8 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): Returns: defer.Deferred: boolean whether the device was inserted or an existing device existed with that ID. + Raises: + StoreError: if the device is already in use """ key = (user_id, device_id) if self.device_id_exists_cache.get(key, None): @@ -604,12 +667,25 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): "user_id": user_id, "device_id": device_id, "display_name": initial_device_display_name, + "hidden": False, }, desc="store_device", or_ignore=True, ) + if not inserted: + # if the device already exists, check if it's a real device, or + # if the device ID is reserved by something else + hidden = yield self._simple_select_one_onecol( + "devices", + keyvalues={"user_id": user_id, "device_id": device_id}, + retcol="hidden", + ) + if hidden: + raise StoreError(400, "The device ID is in use", Codes.FORBIDDEN) self.device_id_exists_cache.prefill(key, True) return inserted + except StoreError: + raise except Exception as e: logger.error( "store_device with device_id=%s(%r) user_id=%s(%r)" @@ -636,7 +712,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): """ yield self._simple_delete_one( table="devices", - keyvalues={"user_id": user_id, "device_id": device_id}, + keyvalues={"user_id": user_id, "device_id": device_id, "hidden": False}, desc="delete_device", ) @@ -656,14 +732,15 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): table="devices", column="device_id", iterable=device_ids, - keyvalues={"user_id": user_id}, + keyvalues={"user_id": user_id, "hidden": False}, desc="delete_devices", ) for device_id in device_ids: self.device_id_exists_cache.invalidate((user_id, device_id)) def update_device(self, user_id, device_id, new_display_name=None): - """Update a device. + """Update a device. Only updates the device if it is not marked as + hidden. Args: user_id (str): The ID of the user which owns the device @@ -682,7 +759,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): return defer.succeed(None) return self._simple_update_one( table="devices", - keyvalues={"user_id": user_id, "device_id": device_id}, + keyvalues={"user_id": user_id, "device_id": device_id, "hidden": False}, updatevalues=updates, desc="update_device", ) diff --git a/synapse/storage/data_stores/main/end_to_end_keys.py b/synapse/storage/data_stores/main/end_to_end_keys.py index 7e44f41046..5e4c86025e 100644 --- a/synapse/storage/data_stores/main/end_to_end_keys.py +++ b/synapse/storage/data_stores/main/end_to_end_keys.py @@ -1,5 +1,7 @@ # -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd +# Copyright 2019 New Vector Ltd +# Copyright 2019 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,7 +16,7 @@ # limitations under the License. from six import iteritems -from canonicaljson import encode_canonical_json +from canonicaljson import encode_canonical_json, json from twisted.internet import defer @@ -101,7 +103,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore): " k.key_json" " FROM devices d" " %s JOIN e2e_device_keys_json k USING (user_id, device_id)" - " WHERE %s" + " WHERE %s AND NOT d.hidden" ) % ( "LEFT" if include_all_devices else "INNER", " OR ".join("(" + q + ")" for q in query_clauses), @@ -320,3 +322,169 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): return self.runInteraction( "delete_e2e_keys_by_device", delete_e2e_keys_by_device_txn ) + + def _set_e2e_cross_signing_key_txn(self, txn, user_id, key_type, key): + """Set a user's cross-signing key. + + Args: + txn (twisted.enterprise.adbapi.Connection): db connection + user_id (str): the user to set the signing key for + key_type (str): the type of key that is being set: either 'master' + for a master key, 'self_signing' for a self-signing key, or + 'user_signing' for a user-signing key + key (dict): the key data + """ + # the cross-signing keys need to occupy the same namespace as devices, + # since signatures are identified by device ID. So add an entry to the + # device table to make sure that we don't have a collision with device + # IDs + + # the 'key' dict will look something like: + # { + # "user_id": "@alice:example.com", + # "usage": ["self_signing"], + # "keys": { + # "ed25519:base64+self+signing+public+key": "base64+self+signing+public+key", + # }, + # "signatures": { + # "@alice:example.com": { + # "ed25519:base64+master+public+key": "base64+signature" + # } + # } + # } + # The "keys" property must only have one entry, which will be the public + # key, so we just grab the first value in there + pubkey = next(iter(key["keys"].values())) + self._simple_insert_txn( + txn, + "devices", + values={ + "user_id": user_id, + "device_id": pubkey, + "display_name": key_type + " signing key", + "hidden": True, + }, + ) + + # and finally, store the key itself + with self._cross_signing_id_gen.get_next() as stream_id: + self._simple_insert_txn( + txn, + "e2e_cross_signing_keys", + values={ + "user_id": user_id, + "keytype": key_type, + "keydata": json.dumps(key), + "stream_id": stream_id, + }, + ) + + def set_e2e_cross_signing_key(self, user_id, key_type, key): + """Set a user's cross-signing key. + + Args: + user_id (str): the user to set the user-signing key for + key_type (str): the type of cross-signing key to set + key (dict): the key data + """ + return self.runInteraction( + "add_e2e_cross_signing_key", + self._set_e2e_cross_signing_key_txn, + user_id, + key_type, + key, + ) + + def _get_e2e_cross_signing_key_txn(self, txn, user_id, key_type, from_user_id=None): + """Returns a user's cross-signing key. + + Args: + txn (twisted.enterprise.adbapi.Connection): db connection + user_id (str): the user whose key is being requested + key_type (str): the type of key that is being set: either 'master' + for a master key, 'self_signing' for a self-signing key, or + 'user_signing' for a user-signing key + from_user_id (str): if specified, signatures made by this user on + the key will be included in the result + + Returns: + dict of the key data or None if not found + """ + sql = ( + "SELECT keydata " + " FROM e2e_cross_signing_keys " + " WHERE user_id = ? AND keytype = ? ORDER BY stream_id DESC LIMIT 1" + ) + txn.execute(sql, (user_id, key_type)) + row = txn.fetchone() + if not row: + return None + key = json.loads(row[0]) + + device_id = None + for k in key["keys"].values(): + device_id = k + + if from_user_id is not None: + sql = ( + "SELECT key_id, signature " + " FROM e2e_cross_signing_signatures " + " WHERE user_id = ? " + " AND target_user_id = ? " + " AND target_device_id = ? " + ) + txn.execute(sql, (from_user_id, user_id, device_id)) + row = txn.fetchone() + if row: + key.setdefault("signatures", {}).setdefault(from_user_id, {})[ + row[0] + ] = row[1] + + return key + + def get_e2e_cross_signing_key(self, user_id, key_type, from_user_id=None): + """Returns a user's cross-signing key. + + Args: + user_id (str): the user whose self-signing key is being requested + key_type (str): the type of cross-signing key to get + from_user_id (str): if specified, signatures made by this user on + the self-signing key will be included in the result + + Returns: + dict of the key data or None if not found + """ + return self.runInteraction( + "get_e2e_cross_signing_key", + self._get_e2e_cross_signing_key_txn, + user_id, + key_type, + from_user_id, + ) + + def store_e2e_cross_signing_signatures(self, user_id, signatures): + """Stores cross-signing signatures. + + Args: + user_id (str): the user who made the signatures + signatures (iterable[(str, str, str, str)]): signatures to add - each + a tuple of (key_id, target_user_id, target_device_id, signature), + where key_id is the ID of the key (including the signature + algorithm) that made the signature, target_user_id and + target_device_id indicate the device being signed, and signature + is the signature of the device + """ + return self._simple_insert_many( + "e2e_cross_signing_signatures", + [ + { + "user_id": user_id, + "key_id": key_id, + "target_user_id": target_user_id, + "target_device_id": target_device_id, + "signature": signature, + } + for (key_id, target_user_id, target_device_id, signature) in signatures + ], + "add_e2e_signing_key", + ) diff --git a/synapse/storage/schema/delta/56/hidden_devices.sql b/synapse/storage/schema/delta/56/hidden_devices.sql new file mode 100644 index 0000000000..67f8b20297 --- /dev/null +++ b/synapse/storage/schema/delta/56/hidden_devices.sql @@ -0,0 +1,18 @@ +/* Copyright 2019 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- device list needs to know which ones are "real" devices, and which ones are +-- just used to avoid collisions +ALTER TABLE devices ADD COLUMN hidden BOOLEAN DEFAULT FALSE; diff --git a/synapse/storage/schema/delta/56/signing_keys.sql b/synapse/storage/schema/delta/56/signing_keys.sql new file mode 100644 index 0000000000..27a96123e3 --- /dev/null +++ b/synapse/storage/schema/delta/56/signing_keys.sql @@ -0,0 +1,55 @@ +/* Copyright 2019 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- cross-signing keys +CREATE TABLE IF NOT EXISTS e2e_cross_signing_keys ( + user_id TEXT NOT NULL, + -- the type of cross-signing key (master, user_signing, or self_signing) + keytype TEXT NOT NULL, + -- the full key information, as a json-encoded dict + keydata TEXT NOT NULL, + -- for keeping the keys in order, so that we can fetch the latest one + stream_id BIGINT NOT NULL +); + +CREATE UNIQUE INDEX e2e_cross_signing_keys_idx ON e2e_cross_signing_keys(user_id, keytype, stream_id); + +-- cross-signing signatures +CREATE TABLE IF NOT EXISTS e2e_cross_signing_signatures ( + -- user who did the signing + user_id TEXT NOT NULL, + -- key used to sign + key_id TEXT NOT NULL, + -- user who was signed + target_user_id TEXT NOT NULL, + -- device/key that was signed + target_device_id TEXT NOT NULL, + -- the actual signature + signature TEXT NOT NULL +); + +CREATE UNIQUE INDEX e2e_cross_signing_signatures_idx ON e2e_cross_signing_signatures(user_id, target_user_id, target_device_id); + +-- stream of user signature updates +CREATE TABLE IF NOT EXISTS user_signature_stream ( + -- uses the same stream ID as device list stream + stream_id BIGINT NOT NULL, + -- user who did the signing + from_user_id TEXT NOT NULL, + -- list of users who were signed, as a JSON array + user_ids TEXT NOT NULL +); + +CREATE UNIQUE INDEX user_signature_stream_idx ON user_signature_stream(stream_id); diff --git a/synapse/types.py b/synapse/types.py index 8f79797f17..aafc3ffe74 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2019 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -17,6 +18,8 @@ import string from collections import namedtuple import attr +from signedjson.key import decode_verify_key_bytes +from unpaddedbase64 import decode_base64 from synapse.api.errors import SynapseError @@ -476,3 +479,24 @@ class ReadReceipt(object): user_id = attr.ib() event_ids = attr.ib() data = attr.ib() + + +def get_verify_key_from_cross_signing_key(key_info): + """Get the key ID and signedjson verify key from a cross-signing key dict + + Args: + key_info (dict): a cross-signing key dict, which must have a "keys" + property that has exactly one item in it + + Returns: + (str, VerifyKey): the key ID and verify key for the cross-signing key + """ + # make sure that exactly one key is provided + if "keys" not in key_info: + raise ValueError("Invalid key") + keys = key_info["keys"] + if len(keys) != 1: + raise ValueError("Invalid key") + # and return that one key + for key_id, key_data in keys.items(): + return (key_id, decode_verify_key_bytes(key_id, decode_base64(key_data))) |