summary refs log tree commit diff
diff options
context:
space:
mode:
authorOlivier Wilkinson (reivilibre) <oliverw@matrix.org>2022-12-20 11:49:56 +0000
committerOlivier Wilkinson (reivilibre) <oliverw@matrix.org>2022-12-20 18:21:31 +0000
commit7d2261f9228be85ae55c874e49ed98b58e812336 (patch)
tree41fcf709b3f96df94f7646255fd2470890d6a381
parentAdd async helpers (diff)
downloadsynapse-7d2261f9228be85ae55c874e49ed98b58e812336.tar.xz
Limit query_devices_for_destination to 10 concurrent invocations
-rw-r--r--synapse/handlers/e2e_keys.py39
1 files changed, 20 insertions, 19 deletions
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py

index 003f147ca6..21b009ed47 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py
@@ -36,8 +36,8 @@ from synapse.types import ( get_domain_from_id, get_verify_key_from_cross_signing_key, ) -from synapse.util import json_decoder, unwrapFirstError -from synapse.util.async_helpers import Linearizer, delay_cancellation +from synapse.util import json_decoder +from synapse.util.async_helpers import Linearizer, concurrently_execute from synapse.util.cancellation import cancellable from synapse.util.retryutils import NotRetryingDestination @@ -241,24 +241,25 @@ class E2eKeysHandler: logger.debug( "%d destinations to query devices for", len(remote_queries_not_in_cache) ) - await make_deferred_yieldable( - delay_cancellation( - defer.gatherResults( - [ - run_in_background( - self._query_devices_for_destination, - results, - cross_signing_keys, - failures, - destination, - queries, - timeout, - ) - for destination, queries in remote_queries_not_in_cache.items() - ], - consumeErrors=True, - ).addErrback(unwrapFirstError) + + async def _query( + destination_queries: Tuple[str, Dict[str, Iterable[str]]] + ) -> None: + destination, queries = destination_queries + return await self._query_devices_for_destination( + results, + cross_signing_keys, + failures, + destination, + queries, + timeout, ) + + await concurrently_execute( + _query, + remote_queries_not_in_cache.items(), + 10, + delay_cancellation=True, ) ret = {"device_keys": results, "failures": failures}