diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 5fe102e2f2..d2188ca08f 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -36,8 +36,8 @@ from synapse.types import (
get_domain_from_id,
get_verify_key_from_cross_signing_key,
)
-from synapse.util import json_decoder, unwrapFirstError
-from synapse.util.async_helpers import Linearizer, delay_cancellation
+from synapse.util import json_decoder
+from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.cancellation import cancellable
from synapse.util.retryutils import NotRetryingDestination
@@ -238,24 +238,28 @@ class E2eKeysHandler:
# Now fetch any devices that we don't have in our cache
# TODO It might make sense to propagate cancellations into the
# deferreds which are querying remote homeservers.
- await make_deferred_yieldable(
- delay_cancellation(
- defer.gatherResults(
- [
- run_in_background(
- self._query_devices_for_destination,
- results,
- cross_signing_keys,
- failures,
- destination,
- queries,
- timeout,
- )
- for destination, queries in remote_queries_not_in_cache.items()
- ],
- consumeErrors=True,
- ).addErrback(unwrapFirstError)
+ logger.debug(
+ "%d destinations to query devices for", len(remote_queries_not_in_cache)
+ )
+
+ async def _query(
+ destination_queries: Tuple[str, Dict[str, Iterable[str]]]
+ ) -> None:
+ destination, queries = destination_queries
+ return await self._query_devices_for_destination(
+ results,
+ cross_signing_keys,
+ failures,
+ destination,
+ queries,
+ timeout,
)
+
+ await concurrently_execute(
+ _query,
+ remote_queries_not_in_cache.items(),
+ 10,
+ delay_cancellation=True,
)
ret = {"device_keys": results, "failures": failures}
@@ -300,28 +304,41 @@ class E2eKeysHandler:
# queries. We use the more efficient batched query_client_keys for all
# remaining users
user_ids_updated = []
- for (user_id, device_list) in destination_query.items():
- if user_id in user_ids_updated:
- continue
- if device_list:
- continue
+ # Perform a user device resync for each user only once and only as long as:
+ # - they have an empty device_list
+ # - they are in some rooms that this server can see
+ users_to_resync_devices = {
+ user_id
+ for (user_id, device_list) in destination_query.items()
+ if (not device_list) and (await self.store.get_rooms_for_user(user_id))
+ }
- room_ids = await self.store.get_rooms_for_user(user_id)
- if not room_ids:
- continue
+ logger.debug(
+ "%d users to resync devices for from destination %s",
+ len(users_to_resync_devices),
+ destination,
+ )
- # We've decided we're sharing a room with this user and should
- # probably be tracking their device lists. However, we haven't
- # done an initial sync on the device list so we do it now.
- try:
- resync_results = (
- await self.device_handler.device_list_updater.user_device_resync(
- user_id
- )
+ try:
+ user_resync_results = (
+ await self.device_handler.device_list_updater.multi_user_device_resync(
+ list(users_to_resync_devices)
)
+ )
+ for user_id in users_to_resync_devices:
+ resync_results = user_resync_results[user_id]
+
if resync_results is None:
- raise ValueError("Device resync failed")
+ # TODO: It's weird that we'll store a failure against a
+ # destination, yet continue processing users from that
+ # destination.
+ # We might want to consider changing this, but for now
+ # I'm leaving it as I found it.
+ failures[destination] = _exception_to_failure(
+ ValueError(f"Device resync failed for {user_id!r}")
+ )
+ continue
# Add the device keys to the results.
user_devices = resync_results["devices"]
@@ -339,8 +356,8 @@ class E2eKeysHandler:
if self_signing_key:
cross_signing_keys["self_signing_keys"][user_id] = self_signing_key
- except Exception as e:
- failures[destination] = _exception_to_failure(e)
+ except Exception as e:
+ failures[destination] = _exception_to_failure(e)
if len(destination_query) == len(user_ids_updated):
# We've updated all the users in the query and we do not need to
|