diff options
author | Erik Johnston <erik@matrix.org> | 2021-04-30 14:21:58 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2021-04-30 14:21:58 +0100 |
commit | aec80899ab1fb44ba183027fa2202e0dcff203f7 (patch) | |
tree | 5124774992a423b8f0896ddaefae33681d1d59e3 | |
parent | Merge branch 'erikj/efficient_presence_join' into erikj/test_send (diff) | |
parent | Use ijson (diff) | |
download | synapse-aec80899ab1fb44ba183027fa2202e0dcff203f7.tar.xz |
Merge branch 'erikj/stream_deserealize' into erikj/test_send
-rw-r--r-- | synapse/federation/federation_client.py | 11 | ||||
-rw-r--r-- | synapse/federation/transport/client.py | 10 | ||||
-rw-r--r-- | synapse/http/matrixfederationclient.py | 19 |
3 files changed, 29 insertions, 11 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 5fb342c097..40225abf81 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -33,6 +33,7 @@ from typing import ( ) import attr +import ijson from prometheus_client import Counter from twisted.internet import defer @@ -669,18 +670,20 @@ class FederationClient(FederationBase): logger.debug("Got content: %s", content) - logger.info("send_join content: %d", len(content)) + # logger.info("send_join content: %d", len(content)) + content.seek(0) state = [ event_from_pdu_json(p, room_version, outlier=True) - for p in content.get("state", []) + for p in ijson.items(content, "state.item") ] logger.info("Parsed auth chain: %d", len(state)) + content.seek(0) auth_chain = [ event_from_pdu_json(p, room_version, outlier=True) - for p in content.get("auth_chain", []) + for p in ijson.items(content, "auth_chain.item") ] logger.info("Parsed auth chain: %d", len(auth_chain)) @@ -779,6 +782,8 @@ class FederationClient(FederationBase): if not self._is_unknown_endpoint(e): raise + raise NotImplementedError() + logger.debug("Couldn't send_join with the v2 API, falling back to the v1 API") resp = await self.transport_layer.send_join_v1( diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index ada322a81e..9c0a105f36 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -244,7 +244,10 @@ class TransportLayerClient: path = _create_v1_path("/send_join/%s/%s", room_id, event_id) response = await self.client.put_json( - destination=destination, path=path, data=content + destination=destination, + path=path, + data=content, + return_string_io=True, ) return response @@ -254,7 +257,10 @@ class TransportLayerClient: path = _create_v2_path("/send_join/%s/%s", room_id, event_id) response = await self.client.put_json( - destination=destination, path=path, data=content + destination=destination, + path=path, + data=content, + return_string_io=True, ) return response diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 27662a6dc6..6db1aece35 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -154,6 +154,7 @@ async def _handle_json_response( request: MatrixFederationRequest, response: IResponse, start_ms: int, + return_string_io=False, ) -> JsonDict: """ Reads the JSON body of a response, with a timeout @@ -175,12 +176,12 @@ async def _handle_json_response( d = read_body_with_max_size(response, BinaryIOWrapper(buf), MAX_RESPONSE_SIZE) d = timeout_deferred(d, timeout=timeout_sec, reactor=reactor) - def parse(_len: int): - return json_decoder.decode(buf.getvalue()) + await make_deferred_yieldable(d) - d.addCallback(parse) - - body = await make_deferred_yieldable(d) + if return_string_io: + body = buf + else: + body = json_decoder.decode(buf.getvalue()) except BodyExceededMaxSize as e: # The response was too big. logger.warning( @@ -684,6 +685,7 @@ class MatrixFederationHttpClient: ignore_backoff: bool = False, backoff_on_404: bool = False, try_trailing_slash_on_400: bool = False, + return_string_io=False, ) -> Union[JsonDict, list]: """Sends the specified json data using PUT @@ -758,7 +760,12 @@ class MatrixFederationHttpClient: _sec_timeout = self.default_timeout body = await _handle_json_response( - self.reactor, _sec_timeout, request, response, start_ms + self.reactor, + _sec_timeout, + request, + response, + start_ms, + return_string_io=return_string_io, ) return body |