diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 1300b540e3..366a0bc68b 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -65,6 +65,7 @@ class E2eKeysHandler(object):
}
}
"""
+
device_keys_query = query_body.get("device_keys", {})
# separate users by domain.
@@ -121,7 +122,58 @@ class E2eKeysHandler(object):
# Now fetch any devices that we don't have in our cache
@defer.inlineCallbacks
def do_remote_query(destination):
+ """This is called when we are querying the device list of a user on
+ a remote homeserver and their device list is not in the device list
+ cache. If we share a room with this user and we're not querying for
+ specific user we will update the cache
+ with their device list."""
+
destination_query = remote_queries_not_in_cache[destination]
+
+ # We first consider whether we wish to update the device list cache with
+ # the users device list. We want to track a user's devices when the
+ # authenticated user shares a room with the queried user and the query
+ # has not specified a particular device.
+ # If we update the cache for the queried user we remove them from further
+ # queries. We use the more efficient batched query_client_keys for all
+ # remaining users
+ user_ids_updated = []
+ for (user_id, device_list) in destination_query.items():
+ if user_id in user_ids_updated:
+ continue
+
+ if device_list:
+ continue
+
+ room_ids = yield self.store.get_rooms_for_user(user_id)
+ if not room_ids:
+ continue
+
+ # We've decided we're sharing a room with this user and should
+ # probably be tracking their device lists. However, we haven't
+ # done an initial sync on the device list so we do it now.
+ try:
+ user_devices = yield self.device_handler.device_list_updater.user_device_resync(
+ user_id
+ )
+ user_devices = user_devices["devices"]
+ for device in user_devices:
+ results[user_id] = {device["device_id"]: device["keys"]}
+ user_ids_updated.append(user_id)
+ except Exception as e:
+ failures[destination] = failures.get(destination, []).append(
+ _exception_to_failure(e)
+ )
+
+ if len(destination_query) == len(user_ids_updated):
+ # We've updated all the users in the query and we do not need to
+ # make any further remote calls.
+ return
+
+ # Remove all the users from the query which we have updated
+ for user_id in user_ids_updated:
+ destination_query.pop(user_id)
+
try:
remote_result = yield self.federation.query_client_keys(
destination, {"device_keys": destination_query}, timeout=timeout
@@ -132,7 +184,8 @@ class E2eKeysHandler(object):
results[user_id] = keys
except Exception as e:
- failures[destination] = _exception_to_failure(e)
+ failure = _exception_to_failure(e)
+ failures[destination] = failure
yield make_deferred_yieldable(
defer.gatherResults(
@@ -234,8 +287,10 @@ class E2eKeysHandler(object):
for user_id, keys in remote_result["one_time_keys"].items():
if user_id in device_keys:
json_result[user_id] = keys
+
except Exception as e:
- failures[destination] = _exception_to_failure(e)
+ failure = _exception_to_failure(e)
+ failures[destination] = failure
yield make_deferred_yieldable(
defer.gatherResults(
@@ -263,6 +318,7 @@ class E2eKeysHandler(object):
@defer.inlineCallbacks
def upload_keys_for_user(self, user_id, device_id, keys):
+
time_now = self.clock.time_msec()
# TODO: Validate the JSON to make sure it has the right keys.
|