diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index d6ab337783..5c1cf83c9d 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -209,12 +209,12 @@ class DeviceHandler(DeviceWorkerHandler):
self.federation_sender = hs.get_federation_sender()
- self._edu_updater = DeviceListEduUpdater(hs, self)
+ self.device_list_updater = DeviceListUpdater(hs, self)
federation_registry = hs.get_federation_registry()
federation_registry.register_edu_handler(
- "m.device_list_update", self._edu_updater.incoming_device_list_update
+ "m.device_list_update", self.device_list_updater.incoming_device_list_update
)
federation_registry.register_query_handler(
"user_devices", self.on_federation_query_user_devices
@@ -426,7 +426,7 @@ def _update_device_from_client_ips(device, client_ips):
device.update({"last_seen_ts": ip.get("last_seen"), "last_seen_ip": ip.get("ip")})
-class DeviceListEduUpdater(object):
+class DeviceListUpdater(object):
"Handles incoming device list updates from federation and updates the DB"
def __init__(self, hs, device_handler):
@@ -519,75 +519,7 @@ class DeviceListEduUpdater(object):
logger.debug("Need to re-sync devices for %r? %r", user_id, resync)
if resync:
- # Fetch all devices for the user.
- origin = get_domain_from_id(user_id)
- try:
- result = yield self.federation.query_user_devices(origin, user_id)
- except (
- NotRetryingDestination,
- RequestSendFailed,
- HttpResponseException,
- ):
- # TODO: Remember that we are now out of sync and try again
- # later
- logger.warn("Failed to handle device list update for %s", user_id)
- # We abort on exceptions rather than accepting the update
- # as otherwise synapse will 'forget' that its device list
- # is out of date. If we bail then we will retry the resync
- # next time we get a device list update for this user_id.
- # This makes it more likely that the device lists will
- # eventually become consistent.
- return
- except FederationDeniedError as e:
- logger.info(e)
- return
- except Exception:
- # TODO: Remember that we are now out of sync and try again
- # later
- logger.exception(
- "Failed to handle device list update for %s", user_id
- )
- return
-
- stream_id = result["stream_id"]
- devices = result["devices"]
-
- # If the remote server has more than ~1000 devices for this user
- # we assume that something is going horribly wrong (e.g. a bot
- # that logs in and creates a new device every time it tries to
- # send a message). Maintaining lots of devices per user in the
- # cache can cause serious performance issues as if this request
- # takes more than 60s to complete, internal replication from the
- # inbound federation worker to the synapse master may time out
- # causing the inbound federation to fail and causing the remote
- # server to retry, causing a DoS. So in this scenario we give
- # up on storing the total list of devices and only handle the
- # delta instead.
- if len(devices) > 1000:
- logger.warn(
- "Ignoring device list snapshot for %s as it has >1K devs (%d)",
- user_id,
- len(devices),
- )
- devices = []
-
- for device in devices:
- logger.debug(
- "Handling resync update %r/%r, ID: %r",
- user_id,
- device["device_id"],
- stream_id,
- )
-
- yield self.store.update_remote_device_list_cache(
- user_id, devices, stream_id
- )
- device_ids = [device["device_id"] for device in devices]
- yield self.device_handler.notify_device_update(user_id, device_ids)
-
- # We clobber the seen updates since we've re-synced from a given
- # point.
- self._seen_updates[user_id] = set([stream_id])
+ yield self.user_device_resync(user_id)
else:
# Simply update the single device, since we know that is the only
# change (because of the single prev_id matching the current cache)
@@ -634,3 +566,77 @@ class DeviceListEduUpdater(object):
stream_id_in_updates.add(stream_id)
return False
+
+ @defer.inlineCallbacks
+ def user_device_resync(self, user_id):
+ """Fetches all devices for a user and updates the device cache with them.
+
+ Args:
+ user_id (str): The user's id whose device_list will be updated.
+ Returns:
+ Deferred[dict]: a dict with device info as under the "devices" in the result of this
+ request:
+ https://matrix.org/docs/spec/server_server/r0.1.2#get-matrix-federation-v1-user-devices-userid
+ """
+ # Fetch all devices for the user.
+ origin = get_domain_from_id(user_id)
+ try:
+ result = yield self.federation.query_user_devices(origin, user_id)
+ except (NotRetryingDestination, RequestSendFailed, HttpResponseException):
+ # TODO: Remember that we are now out of sync and try again
+ # later
+ logger.warn("Failed to handle device list update for %s", user_id)
+ # We abort on exceptions rather than accepting the update
+ # as otherwise synapse will 'forget' that its device list
+ # is out of date. If we bail then we will retry the resync
+ # next time we get a device list update for this user_id.
+ # This makes it more likely that the device lists will
+ # eventually become consistent.
+ return
+ except FederationDeniedError as e:
+ logger.info(e)
+ return
+ except Exception:
+ # TODO: Remember that we are now out of sync and try again
+ # later
+ logger.exception("Failed to handle device list update for %s", user_id)
+ return
+ stream_id = result["stream_id"]
+ devices = result["devices"]
+
+ # If the remote server has more than ~1000 devices for this user
+ # we assume that something is going horribly wrong (e.g. a bot
+ # that logs in and creates a new device every time it tries to
+ # send a message). Maintaining lots of devices per user in the
+ # cache can cause serious performance issues as if this request
+ # takes more than 60s to complete, internal replication from the
+ # inbound federation worker to the synapse master may time out
+ # causing the inbound federation to fail and causing the remote
+ # server to retry, causing a DoS. So in this scenario we give
+ # up on storing the total list of devices and only handle the
+ # delta instead.
+ if len(devices) > 1000:
+ logger.warn(
+ "Ignoring device list snapshot for %s as it has >1K devs (%d)",
+ user_id,
+ len(devices),
+ )
+ devices = []
+
+ for device in devices:
+ logger.debug(
+ "Handling resync update %r/%r, ID: %r",
+ user_id,
+ device["device_id"],
+ stream_id,
+ )
+
+ yield self.store.update_remote_device_list_cache(user_id, devices, stream_id)
+ device_ids = [device["device_id"] for device in devices]
+ yield self.device_handler.notify_device_update(user_id, device_ids)
+
+ # We clobber the seen updates since we've re-synced from a given
+ # point.
+ self._seen_updates[user_id] = set([stream_id])
+
+ defer.returnValue(result)
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 1300b540e3..366a0bc68b 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -65,6 +65,7 @@ class E2eKeysHandler(object):
}
}
"""
+
device_keys_query = query_body.get("device_keys", {})
# separate users by domain.
@@ -121,7 +122,58 @@ class E2eKeysHandler(object):
# Now fetch any devices that we don't have in our cache
@defer.inlineCallbacks
def do_remote_query(destination):
+ """This is called when we are querying the device list of a user on
+ a remote homeserver and their device list is not in the device list
+ cache. If we share a room with this user and we're not querying for
+ specific user we will update the cache
+ with their device list."""
+
destination_query = remote_queries_not_in_cache[destination]
+
+ # We first consider whether we wish to update the device list cache with
+ # the users device list. We want to track a user's devices when the
+ # authenticated user shares a room with the queried user and the query
+ # has not specified a particular device.
+ # If we update the cache for the queried user we remove them from further
+ # queries. We use the more efficient batched query_client_keys for all
+ # remaining users
+ user_ids_updated = []
+ for (user_id, device_list) in destination_query.items():
+ if user_id in user_ids_updated:
+ continue
+
+ if device_list:
+ continue
+
+ room_ids = yield self.store.get_rooms_for_user(user_id)
+ if not room_ids:
+ continue
+
+ # We've decided we're sharing a room with this user and should
+ # probably be tracking their device lists. However, we haven't
+ # done an initial sync on the device list so we do it now.
+ try:
+ user_devices = yield self.device_handler.device_list_updater.user_device_resync(
+ user_id
+ )
+ user_devices = user_devices["devices"]
+ for device in user_devices:
+ results[user_id] = {device["device_id"]: device["keys"]}
+ user_ids_updated.append(user_id)
+ except Exception as e:
+ failures[destination] = failures.get(destination, []).append(
+ _exception_to_failure(e)
+ )
+
+ if len(destination_query) == len(user_ids_updated):
+ # We've updated all the users in the query and we do not need to
+ # make any further remote calls.
+ return
+
+ # Remove all the users from the query which we have updated
+ for user_id in user_ids_updated:
+ destination_query.pop(user_id)
+
try:
remote_result = yield self.federation.query_client_keys(
destination, {"device_keys": destination_query}, timeout=timeout
@@ -132,7 +184,8 @@ class E2eKeysHandler(object):
results[user_id] = keys
except Exception as e:
- failures[destination] = _exception_to_failure(e)
+ failure = _exception_to_failure(e)
+ failures[destination] = failure
yield make_deferred_yieldable(
defer.gatherResults(
@@ -234,8 +287,10 @@ class E2eKeysHandler(object):
for user_id, keys in remote_result["one_time_keys"].items():
if user_id in device_keys:
json_result[user_id] = keys
+
except Exception as e:
- failures[destination] = _exception_to_failure(e)
+ failure = _exception_to_failure(e)
+ failures[destination] = failure
yield make_deferred_yieldable(
defer.gatherResults(
@@ -263,6 +318,7 @@ class E2eKeysHandler(object):
@defer.inlineCallbacks
def upload_keys_for_user(self, user_id, device_id, keys):
+
time_now = self.clock.time_msec()
# TODO: Validate the JSON to make sure it has the right keys.
|