diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 003f147ca6..21b009ed47 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -36,8 +36,8 @@ from synapse.types import (
get_domain_from_id,
get_verify_key_from_cross_signing_key,
)
-from synapse.util import json_decoder, unwrapFirstError
-from synapse.util.async_helpers import Linearizer, delay_cancellation
+from synapse.util import json_decoder
+from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.cancellation import cancellable
from synapse.util.retryutils import NotRetryingDestination
@@ -241,24 +241,25 @@ class E2eKeysHandler:
logger.debug(
"%d destinations to query devices for", len(remote_queries_not_in_cache)
)
- await make_deferred_yieldable(
- delay_cancellation(
- defer.gatherResults(
- [
- run_in_background(
- self._query_devices_for_destination,
- results,
- cross_signing_keys,
- failures,
- destination,
- queries,
- timeout,
- )
- for destination, queries in remote_queries_not_in_cache.items()
- ],
- consumeErrors=True,
- ).addErrback(unwrapFirstError)
+
+ async def _query(
+ destination_queries: Tuple[str, Dict[str, Iterable[str]]]
+ ) -> None:
+ destination, queries = destination_queries
+ return await self._query_devices_for_destination(
+ results,
+ cross_signing_keys,
+ failures,
+ destination,
+ queries,
+ timeout,
)
+
+ await concurrently_execute(
+ _query,
+ remote_queries_not_in_cache.items(),
+ 10,
+ delay_cancellation=True,
)
ret = {"device_keys": results, "failures": failures}
|