From 7d2261f9228be85ae55c874e49ed98b58e812336 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 20 Dec 2022 11:49:56 +0000 Subject: Limit query_devices_for_destination to 10 concurrent invocations --- synapse/handlers/e2e_keys.py | 39 ++++++++++++++++++++------------------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 003f147ca6..21b009ed47 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -36,8 +36,8 @@ from synapse.types import ( get_domain_from_id, get_verify_key_from_cross_signing_key, ) -from synapse.util import json_decoder, unwrapFirstError -from synapse.util.async_helpers import Linearizer, delay_cancellation +from synapse.util import json_decoder +from synapse.util.async_helpers import Linearizer, concurrently_execute from synapse.util.cancellation import cancellable from synapse.util.retryutils import NotRetryingDestination @@ -241,24 +241,25 @@ class E2eKeysHandler: logger.debug( "%d destinations to query devices for", len(remote_queries_not_in_cache) ) - await make_deferred_yieldable( - delay_cancellation( - defer.gatherResults( - [ - run_in_background( - self._query_devices_for_destination, - results, - cross_signing_keys, - failures, - destination, - queries, - timeout, - ) - for destination, queries in remote_queries_not_in_cache.items() - ], - consumeErrors=True, - ).addErrback(unwrapFirstError) + + async def _query( + destination_queries: Tuple[str, Dict[str, Iterable[str]]] + ) -> None: + destination, queries = destination_queries + return await self._query_devices_for_destination( + results, + cross_signing_keys, + failures, + destination, + queries, + timeout, ) + + await concurrently_execute( + _query, + remote_queries_not_in_cache.items(), + 10, + delay_cancellation=True, ) ret = {"device_keys": results, "failures": failures} -- cgit 1.5.1