diff options
author | Erik Johnston <erik@matrix.org> | 2021-06-08 11:07:46 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-06-08 11:07:46 +0100 |
commit | c842c581ed3d33cf0ca1972507508758f7aad1c8 (patch) | |
tree | 5f58e9a4bd974b5ff2ced5c3c09b1bb82683dee4 /synapse/util | |
parent | Handle /backfill returning no events (#10133) (diff) | |
download | synapse-c842c581ed3d33cf0ca1972507508758f7aad1c8.tar.xz |
When joining a remote room limit the number of events we concurrently check signatures/hashes for (#10117)
If we do hundreds of thousands at once the memory overhead can easily reach 500+ MB.
Diffstat (limited to '')
-rw-r--r-- | synapse/util/async_helpers.py | 21 |
1 files changed, 16 insertions, 5 deletions
diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 5c55bb0125..061102c3c8 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -15,6 +15,7 @@ import collections import inspect +import itertools import logging from contextlib import contextmanager from typing import ( @@ -160,8 +161,11 @@ class ObservableDeferred: ) +T = TypeVar("T") + + def concurrently_execute( - func: Callable, args: Iterable[Any], limit: int + func: Callable[[T], Any], args: Iterable[T], limit: int ) -> defer.Deferred: """Executes the function with each argument concurrently while limiting the number of concurrent executions. @@ -173,20 +177,27 @@ def concurrently_execute( limit: Maximum number of conccurent executions. Returns: - Deferred[list]: Resolved when all function invocations have finished. + Deferred: Resolved when all function invocations have finished. """ it = iter(args) - async def _concurrently_execute_inner(): + async def _concurrently_execute_inner(value: T) -> None: try: while True: - await maybe_awaitable(func(next(it))) + await maybe_awaitable(func(value)) + value = next(it) except StopIteration: pass + # We use `itertools.islice` to handle the case where the number of args is + # less than the limit, avoiding needlessly spawning unnecessary background + # tasks. return make_deferred_yieldable( defer.gatherResults( - [run_in_background(_concurrently_execute_inner) for _ in range(limit)], + [ + run_in_background(_concurrently_execute_inner, value) + for value in itertools.islice(it, limit) + ], consumeErrors=True, ) ).addErrback(unwrapFirstError) |