diff options
Diffstat (limited to 'synapse/handlers/device.py')
-rw-r--r-- | synapse/handlers/device.py | 236 |
1 files changed, 211 insertions, 25 deletions
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 71a8f33da3..230d170258 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -1,5 +1,7 @@ # -*- coding: utf-8 -*- # Copyright 2016 OpenMarket Ltd +# Copyright 2019 New Vector Ltd +# Copyright 2019 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,6 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +from typing import Any, Dict, Optional from six import iteritems, itervalues @@ -24,9 +27,15 @@ from synapse.api.errors import ( FederationDeniedError, HttpResponseException, RequestSendFailed, + SynapseError, ) from synapse.logging.opentracing import log_kv, set_tag, trace -from synapse.types import RoomStreamToken, get_domain_from_id +from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.types import ( + RoomStreamToken, + get_domain_from_id, + get_verify_key_from_cross_signing_key, +) from synapse.util import stringutils from synapse.util.async_helpers import Linearizer from synapse.util.caches.expiringcache import ExpiringCache @@ -37,6 +46,8 @@ from ._base import BaseHandler logger = logging.getLogger(__name__) +MAX_DEVICE_DISPLAY_NAME_LEN = 100 + class DeviceWorkerHandler(BaseHandler): def __init__(self, hs): @@ -44,6 +55,7 @@ class DeviceWorkerHandler(BaseHandler): self.hs = hs self.state = hs.get_state_handler() + self.state_store = hs.get_storage().state self._auth_handler = hs.get_auth_handler() @trace @@ -119,8 +131,14 @@ class DeviceWorkerHandler(BaseHandler): users_who_share_room = yield self.store.get_users_who_share_room_with_user( user_id ) + + tracked_users = set(users_who_share_room) + + # Always tell the user about their own devices + tracked_users.add(user_id) + changed = yield self.store.get_users_whose_devices_changed( - from_token.device_list_key, users_who_share_room + from_token.device_list_key, tracked_users ) # Then work out if any users have since joined @@ -176,7 +194,7 @@ class DeviceWorkerHandler(BaseHandler): continue # mapping from event_id -> state_dict - prev_state_ids = yield self.store.get_state_ids_for_events(event_ids) + prev_state_ids = yield self.state_store.get_state_ids_for_events(event_ids) # Check if we've joined the room? If so we just blindly add all the users to # the "possibly changed" users. @@ -222,6 +240,22 @@ class DeviceWorkerHandler(BaseHandler): return result + @defer.inlineCallbacks + def on_federation_query_user_devices(self, user_id): + stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id) + master_key = yield self.store.get_e2e_cross_signing_key(user_id, "master") + self_signing_key = yield self.store.get_e2e_cross_signing_key( + user_id, "self_signing" + ) + + return { + "user_id": user_id, + "stream_id": stream_id, + "devices": devices, + "master_key": master_key, + "self_signing_key": self_signing_key, + } + class DeviceHandler(DeviceWorkerHandler): def __init__(self, hs): @@ -236,9 +270,6 @@ class DeviceHandler(DeviceWorkerHandler): federation_registry.register_edu_handler( "m.device_list_update", self.device_list_updater.incoming_device_list_update ) - federation_registry.register_query_handler( - "user_devices", self.on_federation_query_user_devices - ) hs.get_distributor().observe("user_left_room", self.user_left_room) @@ -313,8 +344,10 @@ class DeviceHandler(DeviceWorkerHandler): else: raise - yield self._auth_handler.delete_access_tokens_for_user( - user_id, device_id=device_id + yield defer.ensureDeferred( + self._auth_handler.delete_access_tokens_for_user( + user_id, device_id=device_id + ) ) yield self.store.delete_e2e_keys_by_device(user_id=user_id, device_id=device_id) @@ -366,8 +399,10 @@ class DeviceHandler(DeviceWorkerHandler): # Delete access tokens and e2e keys for each device. Not optimised as it is not # considered as part of a critical path. for device_id in device_ids: - yield self._auth_handler.delete_access_tokens_for_user( - user_id, device_id=device_id + yield defer.ensureDeferred( + self._auth_handler.delete_access_tokens_for_user( + user_id, device_id=device_id + ) ) yield self.store.delete_e2e_keys_by_device( user_id=user_id, device_id=device_id @@ -388,9 +423,18 @@ class DeviceHandler(DeviceWorkerHandler): defer.Deferred: """ + # Reject a new displayname which is too long. + new_display_name = content.get("display_name") + if new_display_name and len(new_display_name) > MAX_DEVICE_DISPLAY_NAME_LEN: + raise SynapseError( + 400, + "Device display name is too long (max %i)" + % (MAX_DEVICE_DISPLAY_NAME_LEN,), + ) + try: yield self.store.update_device( - user_id, device_id, new_display_name=content.get("display_name") + user_id, device_id, new_display_name=new_display_name ) yield self.notify_device_update(user_id, [device_id]) except errors.StoreError as e: @@ -428,7 +472,11 @@ class DeviceHandler(DeviceWorkerHandler): room_ids = yield self.store.get_rooms_for_user(user_id) - yield self.notifier.on_new_event("device_list_key", position, rooms=room_ids) + # specify the user ID too since the user should always get their own device list + # updates, even if they aren't in any rooms. + yield self.notifier.on_new_event( + "device_list_key", position, users=[user_id], rooms=room_ids + ) if hosts: logger.info( @@ -439,9 +487,19 @@ class DeviceHandler(DeviceWorkerHandler): log_kv({"message": "sent device update to host", "host": host}) @defer.inlineCallbacks - def on_federation_query_user_devices(self, user_id): - stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id) - return {"user_id": user_id, "stream_id": stream_id, "devices": devices} + def notify_user_signature_update(self, from_user_id, user_ids): + """Notify a user that they have made new signatures of other users. + + Args: + from_user_id (str): the user who made the signature + user_ids (list[str]): the users IDs that have new signatures + """ + + position = yield self.store.add_user_signature_change_to_streams( + from_user_id, user_ids + ) + + self.notifier.on_new_event("device_list_key", position, users=[from_user_id]) @defer.inlineCallbacks def user_left_room(self, user, room_id): @@ -483,6 +541,15 @@ class DeviceListUpdater(object): iterable=True, ) + # Attempt to resync out of sync device lists every 30s. + self._resync_retry_in_progress = False + self.clock.looping_call( + run_as_background_process, + 30 * 1000, + func=self._maybe_retry_device_resync, + desc="_maybe_retry_device_resync", + ) + @trace @defer.inlineCallbacks def incoming_device_list_update(self, origin, edu_content): @@ -569,7 +636,13 @@ class DeviceListUpdater(object): # happens if we've missed updates. resync = yield self._need_to_do_resync(user_id, pending_updates) - logger.debug("Need to re-sync devices for %r? %r", user_id, resync) + if logger.isEnabledFor(logging.INFO): + logger.info( + "Received device list update for %s, requiring resync: %s. Devices: %s", + user_id, + resync, + ", ".join(u[0] for u in pending_updates), + ) if resync: yield self.user_device_resync(user_id) @@ -621,25 +694,83 @@ class DeviceListUpdater(object): return False @defer.inlineCallbacks - def user_device_resync(self, user_id): + def _maybe_retry_device_resync(self): + """Retry to resync device lists that are out of sync, except if another retry is + in progress. + """ + if self._resync_retry_in_progress: + return + + try: + # Prevent another call of this function to retry resyncing device lists so + # we don't send too many requests. + self._resync_retry_in_progress = True + # Get all of the users that need resyncing. + need_resync = yield self.store.get_user_ids_requiring_device_list_resync() + # Iterate over the set of user IDs. + for user_id in need_resync: + try: + # Try to resync the current user's devices list. + result = yield self.user_device_resync( + user_id=user_id, mark_failed_as_stale=False, + ) + + # user_device_resync only returns a result if it managed to + # successfully resync and update the database. Updating the table + # of users requiring resync isn't necessary here as + # user_device_resync already does it (through + # self.store.update_remote_device_list_cache). + if result: + logger.debug( + "Successfully resynced the device list for %s", user_id, + ) + except Exception as e: + # If there was an issue resyncing this user, e.g. if the remote + # server sent a malformed result, just log the error instead of + # aborting all the subsequent resyncs. + logger.debug( + "Could not resync the device list for %s: %s", user_id, e, + ) + finally: + # Allow future calls to retry resyncinc out of sync device lists. + self._resync_retry_in_progress = False + + @defer.inlineCallbacks + def user_device_resync(self, user_id, mark_failed_as_stale=True): """Fetches all devices for a user and updates the device cache with them. Args: user_id (str): The user's id whose device_list will be updated. + mark_failed_as_stale (bool): Whether to mark the user's device list as stale + if the attempt to resync failed. Returns: Deferred[dict]: a dict with device info as under the "devices" in the result of this request: https://matrix.org/docs/spec/server_server/r0.1.2#get-matrix-federation-v1-user-devices-userid """ + logger.debug("Attempting to resync the device list for %s", user_id) log_kv({"message": "Doing resync to update device list."}) # Fetch all devices for the user. origin = get_domain_from_id(user_id) try: result = yield self.federation.query_user_devices(origin, user_id) - except (NotRetryingDestination, RequestSendFailed, HttpResponseException): - # TODO: Remember that we are now out of sync and try again - # later - logger.warn("Failed to handle device list update for %s", user_id) + except NotRetryingDestination: + if mark_failed_as_stale: + # Mark the remote user's device list as stale so we know we need to retry + # it later. + yield self.store.mark_remote_user_device_cache_as_stale(user_id) + + return + except (RequestSendFailed, HttpResponseException) as e: + logger.warning( + "Failed to handle device list update for %s: %s", user_id, e, + ) + + if mark_failed_as_stale: + # Mark the remote user's device list as stale so we know we need to retry + # it later. + yield self.store.mark_remote_user_device_cache_as_stale(user_id) + # We abort on exceptions rather than accepting the update # as otherwise synapse will 'forget' that its device list # is out of date. If we bail then we will retry the resync @@ -653,18 +784,29 @@ class DeviceListUpdater(object): logger.info(e) return except Exception as e: - # TODO: Remember that we are now out of sync and try again - # later set_tag("error", True) log_kv( {"message": "Exception raised by federation request", "exception": e} ) logger.exception("Failed to handle device list update for %s", user_id) + + if mark_failed_as_stale: + # Mark the remote user's device list as stale so we know we need to retry + # it later. + yield self.store.mark_remote_user_device_cache_as_stale(user_id) + return log_kv({"result": result}) stream_id = result["stream_id"] devices = result["devices"] + # Get the master key and the self-signing key for this user if provided in the + # response (None if not in the response). + # The response will not contain the user signing key, as this key is only used by + # its owner, thus it doesn't make sense to send it over federation. + master_key = result.get("master_key") + self_signing_key = result.get("self_signing_key") + # If the remote server has more than ~1000 devices for this user # we assume that something is going horribly wrong (e.g. a bot # that logs in and creates a new device every time it tries to @@ -677,7 +819,7 @@ class DeviceListUpdater(object): # up on storing the total list of devices and only handle the # delta instead. if len(devices) > 1000: - logger.warn( + logger.warning( "Ignoring device list snapshot for %s as it has >1K devs (%d)", user_id, len(devices), @@ -694,10 +836,54 @@ class DeviceListUpdater(object): yield self.store.update_remote_device_list_cache(user_id, devices, stream_id) device_ids = [device["device_id"] for device in devices] + + # Handle cross-signing keys. + cross_signing_device_ids = yield self.process_cross_signing_key_update( + user_id, master_key, self_signing_key, + ) + device_ids = device_ids + cross_signing_device_ids + yield self.device_handler.notify_device_update(user_id, device_ids) # We clobber the seen updates since we've re-synced from a given # point. - self._seen_updates[user_id] = set([stream_id]) + self._seen_updates[user_id] = {stream_id} defer.returnValue(result) + + @defer.inlineCallbacks + def process_cross_signing_key_update( + self, + user_id: str, + master_key: Optional[Dict[str, Any]], + self_signing_key: Optional[Dict[str, Any]], + ) -> list: + """Process the given new master and self-signing key for the given remote user. + + Args: + user_id: The ID of the user these keys are for. + master_key: The dict of the cross-signing master key as returned by the + remote server. + self_signing_key: The dict of the cross-signing self-signing key as returned + by the remote server. + + Return: + The device IDs for the given keys. + """ + device_ids = [] + + if master_key: + yield self.store.set_e2e_cross_signing_key(user_id, "master", master_key) + _, verify_key = get_verify_key_from_cross_signing_key(master_key) + # verify_key is a VerifyKey from signedjson, which uses + # .version to denote the portion of the key ID after the + # algorithm and colon, which is the device ID + device_ids.append(verify_key.version) + if self_signing_key: + yield self.store.set_e2e_cross_signing_key( + user_id, "self_signing", self_signing_key + ) + _, verify_key = get_verify_key_from_cross_signing_key(self_signing_key) + device_ids.append(verify_key.version) + + return device_ids |