diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 71a8f33da3..230d170258 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -1,5 +1,7 @@
# -*- coding: utf-8 -*-
# Copyright 2016 OpenMarket Ltd
+# Copyright 2019 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -13,6 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
+from typing import Any, Dict, Optional
from six import iteritems, itervalues
@@ -24,9 +27,15 @@ from synapse.api.errors import (
FederationDeniedError,
HttpResponseException,
RequestSendFailed,
+ SynapseError,
)
from synapse.logging.opentracing import log_kv, set_tag, trace
-from synapse.types import RoomStreamToken, get_domain_from_id
+from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.types import (
+ RoomStreamToken,
+ get_domain_from_id,
+ get_verify_key_from_cross_signing_key,
+)
from synapse.util import stringutils
from synapse.util.async_helpers import Linearizer
from synapse.util.caches.expiringcache import ExpiringCache
@@ -37,6 +46,8 @@ from ._base import BaseHandler
logger = logging.getLogger(__name__)
+MAX_DEVICE_DISPLAY_NAME_LEN = 100
+
class DeviceWorkerHandler(BaseHandler):
def __init__(self, hs):
@@ -44,6 +55,7 @@ class DeviceWorkerHandler(BaseHandler):
self.hs = hs
self.state = hs.get_state_handler()
+ self.state_store = hs.get_storage().state
self._auth_handler = hs.get_auth_handler()
@trace
@@ -119,8 +131,14 @@ class DeviceWorkerHandler(BaseHandler):
users_who_share_room = yield self.store.get_users_who_share_room_with_user(
user_id
)
+
+ tracked_users = set(users_who_share_room)
+
+ # Always tell the user about their own devices
+ tracked_users.add(user_id)
+
changed = yield self.store.get_users_whose_devices_changed(
- from_token.device_list_key, users_who_share_room
+ from_token.device_list_key, tracked_users
)
# Then work out if any users have since joined
@@ -176,7 +194,7 @@ class DeviceWorkerHandler(BaseHandler):
continue
# mapping from event_id -> state_dict
- prev_state_ids = yield self.store.get_state_ids_for_events(event_ids)
+ prev_state_ids = yield self.state_store.get_state_ids_for_events(event_ids)
# Check if we've joined the room? If so we just blindly add all the users to
# the "possibly changed" users.
@@ -222,6 +240,22 @@ class DeviceWorkerHandler(BaseHandler):
return result
+ @defer.inlineCallbacks
+ def on_federation_query_user_devices(self, user_id):
+ stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id)
+ master_key = yield self.store.get_e2e_cross_signing_key(user_id, "master")
+ self_signing_key = yield self.store.get_e2e_cross_signing_key(
+ user_id, "self_signing"
+ )
+
+ return {
+ "user_id": user_id,
+ "stream_id": stream_id,
+ "devices": devices,
+ "master_key": master_key,
+ "self_signing_key": self_signing_key,
+ }
+
class DeviceHandler(DeviceWorkerHandler):
def __init__(self, hs):
@@ -236,9 +270,6 @@ class DeviceHandler(DeviceWorkerHandler):
federation_registry.register_edu_handler(
"m.device_list_update", self.device_list_updater.incoming_device_list_update
)
- federation_registry.register_query_handler(
- "user_devices", self.on_federation_query_user_devices
- )
hs.get_distributor().observe("user_left_room", self.user_left_room)
@@ -313,8 +344,10 @@ class DeviceHandler(DeviceWorkerHandler):
else:
raise
- yield self._auth_handler.delete_access_tokens_for_user(
- user_id, device_id=device_id
+ yield defer.ensureDeferred(
+ self._auth_handler.delete_access_tokens_for_user(
+ user_id, device_id=device_id
+ )
)
yield self.store.delete_e2e_keys_by_device(user_id=user_id, device_id=device_id)
@@ -366,8 +399,10 @@ class DeviceHandler(DeviceWorkerHandler):
# Delete access tokens and e2e keys for each device. Not optimised as it is not
# considered as part of a critical path.
for device_id in device_ids:
- yield self._auth_handler.delete_access_tokens_for_user(
- user_id, device_id=device_id
+ yield defer.ensureDeferred(
+ self._auth_handler.delete_access_tokens_for_user(
+ user_id, device_id=device_id
+ )
)
yield self.store.delete_e2e_keys_by_device(
user_id=user_id, device_id=device_id
@@ -388,9 +423,18 @@ class DeviceHandler(DeviceWorkerHandler):
defer.Deferred:
"""
+ # Reject a new displayname which is too long.
+ new_display_name = content.get("display_name")
+ if new_display_name and len(new_display_name) > MAX_DEVICE_DISPLAY_NAME_LEN:
+ raise SynapseError(
+ 400,
+ "Device display name is too long (max %i)"
+ % (MAX_DEVICE_DISPLAY_NAME_LEN,),
+ )
+
try:
yield self.store.update_device(
- user_id, device_id, new_display_name=content.get("display_name")
+ user_id, device_id, new_display_name=new_display_name
)
yield self.notify_device_update(user_id, [device_id])
except errors.StoreError as e:
@@ -428,7 +472,11 @@ class DeviceHandler(DeviceWorkerHandler):
room_ids = yield self.store.get_rooms_for_user(user_id)
- yield self.notifier.on_new_event("device_list_key", position, rooms=room_ids)
+ # specify the user ID too since the user should always get their own device list
+ # updates, even if they aren't in any rooms.
+ yield self.notifier.on_new_event(
+ "device_list_key", position, users=[user_id], rooms=room_ids
+ )
if hosts:
logger.info(
@@ -439,9 +487,19 @@ class DeviceHandler(DeviceWorkerHandler):
log_kv({"message": "sent device update to host", "host": host})
@defer.inlineCallbacks
- def on_federation_query_user_devices(self, user_id):
- stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id)
- return {"user_id": user_id, "stream_id": stream_id, "devices": devices}
+ def notify_user_signature_update(self, from_user_id, user_ids):
+ """Notify a user that they have made new signatures of other users.
+
+ Args:
+ from_user_id (str): the user who made the signature
+ user_ids (list[str]): the users IDs that have new signatures
+ """
+
+ position = yield self.store.add_user_signature_change_to_streams(
+ from_user_id, user_ids
+ )
+
+ self.notifier.on_new_event("device_list_key", position, users=[from_user_id])
@defer.inlineCallbacks
def user_left_room(self, user, room_id):
@@ -483,6 +541,15 @@ class DeviceListUpdater(object):
iterable=True,
)
+ # Attempt to resync out of sync device lists every 30s.
+ self._resync_retry_in_progress = False
+ self.clock.looping_call(
+ run_as_background_process,
+ 30 * 1000,
+ func=self._maybe_retry_device_resync,
+ desc="_maybe_retry_device_resync",
+ )
+
@trace
@defer.inlineCallbacks
def incoming_device_list_update(self, origin, edu_content):
@@ -569,7 +636,13 @@ class DeviceListUpdater(object):
# happens if we've missed updates.
resync = yield self._need_to_do_resync(user_id, pending_updates)
- logger.debug("Need to re-sync devices for %r? %r", user_id, resync)
+ if logger.isEnabledFor(logging.INFO):
+ logger.info(
+ "Received device list update for %s, requiring resync: %s. Devices: %s",
+ user_id,
+ resync,
+ ", ".join(u[0] for u in pending_updates),
+ )
if resync:
yield self.user_device_resync(user_id)
@@ -621,25 +694,83 @@ class DeviceListUpdater(object):
return False
@defer.inlineCallbacks
- def user_device_resync(self, user_id):
+ def _maybe_retry_device_resync(self):
+ """Retry to resync device lists that are out of sync, except if another retry is
+ in progress.
+ """
+ if self._resync_retry_in_progress:
+ return
+
+ try:
+ # Prevent another call of this function to retry resyncing device lists so
+ # we don't send too many requests.
+ self._resync_retry_in_progress = True
+ # Get all of the users that need resyncing.
+ need_resync = yield self.store.get_user_ids_requiring_device_list_resync()
+ # Iterate over the set of user IDs.
+ for user_id in need_resync:
+ try:
+ # Try to resync the current user's devices list.
+ result = yield self.user_device_resync(
+ user_id=user_id, mark_failed_as_stale=False,
+ )
+
+ # user_device_resync only returns a result if it managed to
+ # successfully resync and update the database. Updating the table
+ # of users requiring resync isn't necessary here as
+ # user_device_resync already does it (through
+ # self.store.update_remote_device_list_cache).
+ if result:
+ logger.debug(
+ "Successfully resynced the device list for %s", user_id,
+ )
+ except Exception as e:
+ # If there was an issue resyncing this user, e.g. if the remote
+ # server sent a malformed result, just log the error instead of
+ # aborting all the subsequent resyncs.
+ logger.debug(
+ "Could not resync the device list for %s: %s", user_id, e,
+ )
+ finally:
+ # Allow future calls to retry resyncinc out of sync device lists.
+ self._resync_retry_in_progress = False
+
+ @defer.inlineCallbacks
+ def user_device_resync(self, user_id, mark_failed_as_stale=True):
"""Fetches all devices for a user and updates the device cache with them.
Args:
user_id (str): The user's id whose device_list will be updated.
+ mark_failed_as_stale (bool): Whether to mark the user's device list as stale
+ if the attempt to resync failed.
Returns:
Deferred[dict]: a dict with device info as under the "devices" in the result of this
request:
https://matrix.org/docs/spec/server_server/r0.1.2#get-matrix-federation-v1-user-devices-userid
"""
+ logger.debug("Attempting to resync the device list for %s", user_id)
log_kv({"message": "Doing resync to update device list."})
# Fetch all devices for the user.
origin = get_domain_from_id(user_id)
try:
result = yield self.federation.query_user_devices(origin, user_id)
- except (NotRetryingDestination, RequestSendFailed, HttpResponseException):
- # TODO: Remember that we are now out of sync and try again
- # later
- logger.warn("Failed to handle device list update for %s", user_id)
+ except NotRetryingDestination:
+ if mark_failed_as_stale:
+ # Mark the remote user's device list as stale so we know we need to retry
+ # it later.
+ yield self.store.mark_remote_user_device_cache_as_stale(user_id)
+
+ return
+ except (RequestSendFailed, HttpResponseException) as e:
+ logger.warning(
+ "Failed to handle device list update for %s: %s", user_id, e,
+ )
+
+ if mark_failed_as_stale:
+ # Mark the remote user's device list as stale so we know we need to retry
+ # it later.
+ yield self.store.mark_remote_user_device_cache_as_stale(user_id)
+
# We abort on exceptions rather than accepting the update
# as otherwise synapse will 'forget' that its device list
# is out of date. If we bail then we will retry the resync
@@ -653,18 +784,29 @@ class DeviceListUpdater(object):
logger.info(e)
return
except Exception as e:
- # TODO: Remember that we are now out of sync and try again
- # later
set_tag("error", True)
log_kv(
{"message": "Exception raised by federation request", "exception": e}
)
logger.exception("Failed to handle device list update for %s", user_id)
+
+ if mark_failed_as_stale:
+ # Mark the remote user's device list as stale so we know we need to retry
+ # it later.
+ yield self.store.mark_remote_user_device_cache_as_stale(user_id)
+
return
log_kv({"result": result})
stream_id = result["stream_id"]
devices = result["devices"]
+ # Get the master key and the self-signing key for this user if provided in the
+ # response (None if not in the response).
+ # The response will not contain the user signing key, as this key is only used by
+ # its owner, thus it doesn't make sense to send it over federation.
+ master_key = result.get("master_key")
+ self_signing_key = result.get("self_signing_key")
+
# If the remote server has more than ~1000 devices for this user
# we assume that something is going horribly wrong (e.g. a bot
# that logs in and creates a new device every time it tries to
@@ -677,7 +819,7 @@ class DeviceListUpdater(object):
# up on storing the total list of devices and only handle the
# delta instead.
if len(devices) > 1000:
- logger.warn(
+ logger.warning(
"Ignoring device list snapshot for %s as it has >1K devs (%d)",
user_id,
len(devices),
@@ -694,10 +836,54 @@ class DeviceListUpdater(object):
yield self.store.update_remote_device_list_cache(user_id, devices, stream_id)
device_ids = [device["device_id"] for device in devices]
+
+ # Handle cross-signing keys.
+ cross_signing_device_ids = yield self.process_cross_signing_key_update(
+ user_id, master_key, self_signing_key,
+ )
+ device_ids = device_ids + cross_signing_device_ids
+
yield self.device_handler.notify_device_update(user_id, device_ids)
# We clobber the seen updates since we've re-synced from a given
# point.
- self._seen_updates[user_id] = set([stream_id])
+ self._seen_updates[user_id] = {stream_id}
defer.returnValue(result)
+
+ @defer.inlineCallbacks
+ def process_cross_signing_key_update(
+ self,
+ user_id: str,
+ master_key: Optional[Dict[str, Any]],
+ self_signing_key: Optional[Dict[str, Any]],
+ ) -> list:
+ """Process the given new master and self-signing key for the given remote user.
+
+ Args:
+ user_id: The ID of the user these keys are for.
+ master_key: The dict of the cross-signing master key as returned by the
+ remote server.
+ self_signing_key: The dict of the cross-signing self-signing key as returned
+ by the remote server.
+
+ Return:
+ The device IDs for the given keys.
+ """
+ device_ids = []
+
+ if master_key:
+ yield self.store.set_e2e_cross_signing_key(user_id, "master", master_key)
+ _, verify_key = get_verify_key_from_cross_signing_key(master_key)
+ # verify_key is a VerifyKey from signedjson, which uses
+ # .version to denote the portion of the key ID after the
+ # algorithm and colon, which is the device ID
+ device_ids.append(verify_key.version)
+ if self_signing_key:
+ yield self.store.set_e2e_cross_signing_key(
+ user_id, "self_signing", self_signing_key
+ )
+ _, verify_key = get_verify_key_from_cross_signing_key(self_signing_key)
+ device_ids.append(verify_key.version)
+
+ return device_ids
|