From 64887f06fcac63e069364d625d984b4951bf1ffc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 20 May 2021 16:11:48 +0100 Subject: Use ijson to parse the response to `/send_join`, reducing memory usage. (#9958) Instead of parsing the full response to `/send_join` into Python objects (which can be huge for large rooms) and *then* parsing that into events, we instead use ijson to stream parse the response directly into `EventBase` objects. --- synapse/federation/federation_client.py | 28 ++++++++++------------------ 1 file changed, 10 insertions(+), 18 deletions(-) (limited to 'synapse/federation/federation_client.py') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index a5b6a61195..e0e9f5d0be 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -55,6 +55,7 @@ from synapse.api.room_versions import ( ) from synapse.events import EventBase, builder from synapse.federation.federation_base import FederationBase, event_from_pdu_json +from synapse.federation.transport.client import SendJoinResponse from synapse.logging.context import make_deferred_yieldable, preserve_fn from synapse.logging.utils import log_function from synapse.types import JsonDict, get_domain_from_id @@ -665,19 +666,10 @@ class FederationClient(FederationBase): """ async def send_request(destination) -> Dict[str, Any]: - content = await self._do_send_join(destination, pdu) + response = await self._do_send_join(room_version, destination, pdu) - logger.debug("Got content: %s", content) - - state = [ - event_from_pdu_json(p, room_version, outlier=True) - for p in content.get("state", []) - ] - - auth_chain = [ - event_from_pdu_json(p, room_version, outlier=True) - for p in content.get("auth_chain", []) - ] + state = response.state + auth_chain = response.auth_events pdus = {p.event_id: p for p in itertools.chain(state, auth_chain)} @@ -752,11 +744,14 @@ class FederationClient(FederationBase): return await self._try_destination_list("send_join", destinations, send_request) - async def _do_send_join(self, destination: str, pdu: EventBase) -> JsonDict: + async def _do_send_join( + self, room_version: RoomVersion, destination: str, pdu: EventBase + ) -> SendJoinResponse: time_now = self._clock.time_msec() try: return await self.transport_layer.send_join_v2( + room_version=room_version, destination=destination, room_id=pdu.room_id, event_id=pdu.event_id, @@ -771,17 +766,14 @@ class FederationClient(FederationBase): logger.debug("Couldn't send_join with the v2 API, falling back to the v1 API") - resp = await self.transport_layer.send_join_v1( + return await self.transport_layer.send_join_v1( + room_version=room_version, destination=destination, room_id=pdu.room_id, event_id=pdu.event_id, content=pdu.get_pdu_json(time_now), ) - # We expect the v1 API to respond with [200, content], so we only return the - # content. - return resp[1] - async def send_invite( self, destination: str, -- cgit 1.4.1