diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index a5b6a61195..90acc23886 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -33,6 +33,7 @@ from typing import (
)
import attr
+import ijson
from prometheus_client import Counter
from twisted.internet import defer
@@ -55,11 +56,16 @@ from synapse.api.room_versions import (
)
from synapse.events import EventBase, builder
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
-from synapse.logging.context import make_deferred_yieldable, preserve_fn
+from synapse.logging.context import (
+ get_thread_resource_usage,
+ make_deferred_yieldable,
+ preserve_fn,
+)
from synapse.logging.utils import log_function
from synapse.types import JsonDict, get_domain_from_id
from synapse.util import unwrapFirstError
from synapse.util.caches.expiringcache import ExpiringCache
+from synapse.util.iterutils import batch_iter
from synapse.util.retryutils import NotRetryingDestination
if TYPE_CHECKING:
@@ -385,7 +391,6 @@ class FederationClient(FederationBase):
Returns:
A list of PDUs that have valid signatures and hashes.
"""
- deferreds = self._check_sigs_and_hashes(room_version, pdus)
async def handle_check_result(pdu: EventBase, deferred: Deferred):
try:
@@ -420,6 +425,7 @@ class FederationClient(FederationBase):
return res
handle = preserve_fn(handle_check_result)
+ deferreds = self._check_sigs_and_hashes(room_version, pdus)
deferreds2 = [handle(pdu, deferred) for pdu, deferred in zip(pdus, deferreds)]
valid_pdus = await make_deferred_yieldable(
@@ -667,19 +673,37 @@ class FederationClient(FederationBase):
async def send_request(destination) -> Dict[str, Any]:
content = await self._do_send_join(destination, pdu)
- logger.debug("Got content: %s", content)
+ # logger.debug("Got content: %s", content.getvalue())
- state = [
- event_from_pdu_json(p, room_version, outlier=True)
- for p in content.get("state", [])
- ]
+ # logger.info("send_join content: %d", len(content))
- auth_chain = [
- event_from_pdu_json(p, room_version, outlier=True)
- for p in content.get("auth_chain", [])
- ]
+ content.seek(0)
+
+ r = get_thread_resource_usage()
+ logger.info("Memory before state: %s", r.ru_maxrss)
+
+ state = []
+ for i, p in enumerate(ijson.items(content, "state.item")):
+ state.append(event_from_pdu_json(p, room_version, outlier=True))
+ if i % 1000 == 999:
+ await self._clock.sleep(0)
+
+ r = get_thread_resource_usage()
+ logger.info("Memory after state: %s", r.ru_maxrss)
+
+ logger.info("Parsed state: %d", len(state))
+ content.seek(0)
+
+ auth_chain = []
+ for i, p in enumerate(ijson.items(content, "auth_chain.item")):
+ auth_chain.append(event_from_pdu_json(p, room_version, outlier=True))
+ if i % 1000 == 999:
+ await self._clock.sleep(0)
- pdus = {p.event_id: p for p in itertools.chain(state, auth_chain)}
+ r = get_thread_resource_usage()
+ logger.info("Memory after: %s", r.ru_maxrss)
+
+ logger.info("Parsed auth chain: %d", len(auth_chain))
create_event = None
for e in state:
@@ -704,12 +728,19 @@ class FederationClient(FederationBase):
% (create_room_version,)
)
- valid_pdus = await self._check_sigs_and_hash_and_fetch(
- destination,
- list(pdus.values()),
- outlier=True,
- room_version=room_version,
- )
+ valid_pdus = []
+
+ for chunk in batch_iter(itertools.chain(state, auth_chain), 1000):
+ logger.info("Handling next _check_sigs_and_hash_and_fetch chunk")
+ new_valid_pdus = await self._check_sigs_and_hash_and_fetch(
+ destination,
+ chunk,
+ outlier=True,
+ room_version=room_version,
+ )
+ valid_pdus.extend(new_valid_pdus)
+
+ logger.info("_check_sigs_and_hash_and_fetch done")
valid_pdus_map = {p.event_id: p for p in valid_pdus}
@@ -744,6 +775,8 @@ class FederationClient(FederationBase):
% (auth_chain_create_events,)
)
+ logger.info("Returning from send_join")
+
return {
"state": signed_state,
"auth_chain": signed_auth,
@@ -769,6 +802,8 @@ class FederationClient(FederationBase):
if not self._is_unknown_endpoint(e):
raise
+ raise NotImplementedError()
+
logger.debug("Couldn't send_join with the v2 API, falling back to the v1 API")
resp = await self.transport_layer.send_join_v1(
|