summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2021-04-30 14:21:58 +0100
committerErik Johnston <erik@matrix.org>2021-04-30 14:21:58 +0100
commitaec80899ab1fb44ba183027fa2202e0dcff203f7 (patch)
tree5124774992a423b8f0896ddaefae33681d1d59e3
parentMerge branch 'erikj/efficient_presence_join' into erikj/test_send (diff)
parentUse ijson (diff)
downloadsynapse-aec80899ab1fb44ba183027fa2202e0dcff203f7.tar.xz
Merge branch 'erikj/stream_deserealize' into erikj/test_send
-rw-r--r--synapse/federation/federation_client.py11
-rw-r--r--synapse/federation/transport/client.py10
-rw-r--r--synapse/http/matrixfederationclient.py19
3 files changed, 29 insertions, 11 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 5fb342c097..40225abf81 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -33,6 +33,7 @@ from typing import (
 )
 
 import attr
+import ijson
 from prometheus_client import Counter
 
 from twisted.internet import defer
@@ -669,18 +670,20 @@ class FederationClient(FederationBase):
 
             logger.debug("Got content: %s", content)
 
-            logger.info("send_join content: %d", len(content))
+            # logger.info("send_join content: %d", len(content))
 
+            content.seek(0)
             state = [
                 event_from_pdu_json(p, room_version, outlier=True)
-                for p in content.get("state", [])
+                for p in ijson.items(content, "state.item")
             ]
 
             logger.info("Parsed auth chain: %d", len(state))
+            content.seek(0)
 
             auth_chain = [
                 event_from_pdu_json(p, room_version, outlier=True)
-                for p in content.get("auth_chain", [])
+                for p in ijson.items(content, "auth_chain.item")
             ]
 
             logger.info("Parsed auth chain: %d", len(auth_chain))
@@ -779,6 +782,8 @@ class FederationClient(FederationBase):
             if not self._is_unknown_endpoint(e):
                 raise
 
+        raise NotImplementedError()
+
         logger.debug("Couldn't send_join with the v2 API, falling back to the v1 API")
 
         resp = await self.transport_layer.send_join_v1(
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index ada322a81e..9c0a105f36 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -244,7 +244,10 @@ class TransportLayerClient:
         path = _create_v1_path("/send_join/%s/%s", room_id, event_id)
 
         response = await self.client.put_json(
-            destination=destination, path=path, data=content
+            destination=destination,
+            path=path,
+            data=content,
+            return_string_io=True,
         )
 
         return response
@@ -254,7 +257,10 @@ class TransportLayerClient:
         path = _create_v2_path("/send_join/%s/%s", room_id, event_id)
 
         response = await self.client.put_json(
-            destination=destination, path=path, data=content
+            destination=destination,
+            path=path,
+            data=content,
+            return_string_io=True,
         )
 
         return response
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 27662a6dc6..6db1aece35 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -154,6 +154,7 @@ async def _handle_json_response(
     request: MatrixFederationRequest,
     response: IResponse,
     start_ms: int,
+    return_string_io=False,
 ) -> JsonDict:
     """
     Reads the JSON body of a response, with a timeout
@@ -175,12 +176,12 @@ async def _handle_json_response(
         d = read_body_with_max_size(response, BinaryIOWrapper(buf), MAX_RESPONSE_SIZE)
         d = timeout_deferred(d, timeout=timeout_sec, reactor=reactor)
 
-        def parse(_len: int):
-            return json_decoder.decode(buf.getvalue())
+        await make_deferred_yieldable(d)
 
-        d.addCallback(parse)
-
-        body = await make_deferred_yieldable(d)
+        if return_string_io:
+            body = buf
+        else:
+            body = json_decoder.decode(buf.getvalue())
     except BodyExceededMaxSize as e:
         # The response was too big.
         logger.warning(
@@ -684,6 +685,7 @@ class MatrixFederationHttpClient:
         ignore_backoff: bool = False,
         backoff_on_404: bool = False,
         try_trailing_slash_on_400: bool = False,
+        return_string_io=False,
     ) -> Union[JsonDict, list]:
         """Sends the specified json data using PUT
 
@@ -758,7 +760,12 @@ class MatrixFederationHttpClient:
             _sec_timeout = self.default_timeout
 
         body = await _handle_json_response(
-            self.reactor, _sec_timeout, request, response, start_ms
+            self.reactor,
+            _sec_timeout,
+            request,
+            response,
+            start_ms,
+            return_string_io=return_string_io,
         )
 
         return body