summary refs log tree commit diff
path: root/synapse/federation/federation_client.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation/federation_client.py')
-rw-r--r--synapse/federation/federation_client.py71
1 files changed, 53 insertions, 18 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py

index a5b6a61195..90acc23886 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py
@@ -33,6 +33,7 @@ from typing import ( ) import attr +import ijson from prometheus_client import Counter from twisted.internet import defer @@ -55,11 +56,16 @@ from synapse.api.room_versions import ( ) from synapse.events import EventBase, builder from synapse.federation.federation_base import FederationBase, event_from_pdu_json -from synapse.logging.context import make_deferred_yieldable, preserve_fn +from synapse.logging.context import ( + get_thread_resource_usage, + make_deferred_yieldable, + preserve_fn, +) from synapse.logging.utils import log_function from synapse.types import JsonDict, get_domain_from_id from synapse.util import unwrapFirstError from synapse.util.caches.expiringcache import ExpiringCache +from synapse.util.iterutils import batch_iter from synapse.util.retryutils import NotRetryingDestination if TYPE_CHECKING: @@ -385,7 +391,6 @@ class FederationClient(FederationBase): Returns: A list of PDUs that have valid signatures and hashes. """ - deferreds = self._check_sigs_and_hashes(room_version, pdus) async def handle_check_result(pdu: EventBase, deferred: Deferred): try: @@ -420,6 +425,7 @@ class FederationClient(FederationBase): return res handle = preserve_fn(handle_check_result) + deferreds = self._check_sigs_and_hashes(room_version, pdus) deferreds2 = [handle(pdu, deferred) for pdu, deferred in zip(pdus, deferreds)] valid_pdus = await make_deferred_yieldable( @@ -667,19 +673,37 @@ class FederationClient(FederationBase): async def send_request(destination) -> Dict[str, Any]: content = await self._do_send_join(destination, pdu) - logger.debug("Got content: %s", content) + # logger.debug("Got content: %s", content.getvalue()) - state = [ - event_from_pdu_json(p, room_version, outlier=True) - for p in content.get("state", []) - ] + # logger.info("send_join content: %d", len(content)) - auth_chain = [ - event_from_pdu_json(p, room_version, outlier=True) - for p in content.get("auth_chain", []) - ] + content.seek(0) + + r = get_thread_resource_usage() + logger.info("Memory before state: %s", r.ru_maxrss) + + state = [] + for i, p in enumerate(ijson.items(content, "state.item")): + state.append(event_from_pdu_json(p, room_version, outlier=True)) + if i % 1000 == 999: + await self._clock.sleep(0) + + r = get_thread_resource_usage() + logger.info("Memory after state: %s", r.ru_maxrss) + + logger.info("Parsed state: %d", len(state)) + content.seek(0) + + auth_chain = [] + for i, p in enumerate(ijson.items(content, "auth_chain.item")): + auth_chain.append(event_from_pdu_json(p, room_version, outlier=True)) + if i % 1000 == 999: + await self._clock.sleep(0) - pdus = {p.event_id: p for p in itertools.chain(state, auth_chain)} + r = get_thread_resource_usage() + logger.info("Memory after: %s", r.ru_maxrss) + + logger.info("Parsed auth chain: %d", len(auth_chain)) create_event = None for e in state: @@ -704,12 +728,19 @@ class FederationClient(FederationBase): % (create_room_version,) ) - valid_pdus = await self._check_sigs_and_hash_and_fetch( - destination, - list(pdus.values()), - outlier=True, - room_version=room_version, - ) + valid_pdus = [] + + for chunk in batch_iter(itertools.chain(state, auth_chain), 1000): + logger.info("Handling next _check_sigs_and_hash_and_fetch chunk") + new_valid_pdus = await self._check_sigs_and_hash_and_fetch( + destination, + chunk, + outlier=True, + room_version=room_version, + ) + valid_pdus.extend(new_valid_pdus) + + logger.info("_check_sigs_and_hash_and_fetch done") valid_pdus_map = {p.event_id: p for p in valid_pdus} @@ -744,6 +775,8 @@ class FederationClient(FederationBase): % (auth_chain_create_events,) ) + logger.info("Returning from send_join") + return { "state": signed_state, "auth_chain": signed_auth, @@ -769,6 +802,8 @@ class FederationClient(FederationBase): if not self._is_unknown_endpoint(e): raise + raise NotImplementedError() + logger.debug("Couldn't send_join with the v2 API, falling back to the v1 API") resp = await self.transport_layer.send_join_v1(