diff options
author | Erik Johnston <erik@matrix.org> | 2021-04-30 15:17:50 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2021-04-30 15:17:50 +0100 |
commit | 49da5e9ec4441db17fd9c3a29456afb04f82bb4d (patch) | |
tree | 4d3b360fd2c1ec2ab57f0a5ce0dda8ee93fb79f5 | |
parent | Log memory usage (diff) | |
download | synapse-49da5e9ec4441db17fd9c3a29456afb04f82bb4d.tar.xz |
Chunk _check_sigs_and_hash_and_fetch
-rw-r--r-- | synapse/federation/federation_client.py | 26 |
1 files changed, 16 insertions, 10 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 93f7b96e36..3153240f91 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -34,7 +34,6 @@ from typing import ( import attr import ijson -from synapse.logging.context import get_thread_resource_usage from prometheus_client import Counter from twisted.internet import defer @@ -57,11 +56,16 @@ from synapse.api.room_versions import ( ) from synapse.events import EventBase, builder from synapse.federation.federation_base import FederationBase, event_from_pdu_json -from synapse.logging.context import make_deferred_yieldable, preserve_fn +from synapse.logging.context import ( + get_thread_resource_usage, + make_deferred_yieldable, + preserve_fn, +) from synapse.logging.utils import log_function from synapse.types import JsonDict, get_domain_from_id from synapse.util import unwrapFirstError from synapse.util.caches.expiringcache import ExpiringCache +from synapse.util.iterutils import batch_iter from synapse.util.retryutils import NotRetryingDestination if TYPE_CHECKING: @@ -705,8 +709,6 @@ class FederationClient(FederationBase): logger.info("Parsed auth chain: %d", len(auth_chain)) - pdus = {p.event_id: p for p in itertools.chain(state, auth_chain)} - create_event = None for e in state: if (e.type, e.state_key) == (EventTypes.Create, ""): @@ -730,12 +732,16 @@ class FederationClient(FederationBase): % (create_room_version,) ) - valid_pdus = await self._check_sigs_and_hash_and_fetch( - destination, - list(pdus.values()), - outlier=True, - room_version=room_version, - ) + valid_pdus = [] + + for chunk in batch_iter(itertools.chain(state, auth_chain), 1000): + new_valid_pdus = await self._check_sigs_and_hash_and_fetch( + destination, + chunk, + outlier=True, + room_version=room_version, + ) + valid_pdus.extend(new_valid_pdus) logger.info("_check_sigs_and_hash_and_fetch done") |