diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index a5b6a61195..481f3f6438 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -33,6 +33,7 @@ from typing import (
)
import attr
+import ijson
from prometheus_client import Counter
from twisted.internet import defer
@@ -669,16 +670,22 @@ class FederationClient(FederationBase):
logger.debug("Got content: %s", content)
+ content.seek(0)
state = [
event_from_pdu_json(p, room_version, outlier=True)
- for p in content.get("state", [])
+ for p in ijson.items(content, "state.item")
]
+ logger.debug("state: %s", state)
+ content.seek(0)
+
auth_chain = [
event_from_pdu_json(p, room_version, outlier=True)
- for p in content.get("auth_chain", [])
+ for p in ijson.items(content, "auth_chain.item")
]
+ logger.debug("auth_chain: %s", auth_chain)
+
pdus = {p.event_id: p for p in itertools.chain(state, auth_chain)}
create_event = None
@@ -769,6 +776,8 @@ class FederationClient(FederationBase):
if not self._is_unknown_endpoint(e):
raise
+ raise NotImplementedError()
+
logger.debug("Couldn't send_join with the v2 API, falling back to the v1 API")
resp = await self.transport_layer.send_join_v1(
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index ada322a81e..9c0a105f36 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -244,7 +244,10 @@ class TransportLayerClient:
path = _create_v1_path("/send_join/%s/%s", room_id, event_id)
response = await self.client.put_json(
- destination=destination, path=path, data=content
+ destination=destination,
+ path=path,
+ data=content,
+ return_string_io=True,
)
return response
@@ -254,7 +257,10 @@ class TransportLayerClient:
path = _create_v2_path("/send_join/%s/%s", room_id, event_id)
response = await self.client.put_json(
- destination=destination, path=path, data=content
+ destination=destination,
+ path=path,
+ data=content,
+ return_string_io=True,
)
return response
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index bb837b7b19..d44fa0cb9b 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -154,6 +154,7 @@ async def _handle_json_response(
request: MatrixFederationRequest,
response: IResponse,
start_ms: int,
+ return_string_io=False,
) -> JsonDict:
"""
Reads the JSON body of a response, with a timeout
@@ -175,12 +176,12 @@ async def _handle_json_response(
d = read_body_with_max_size(response, BinaryIOWrapper(buf), MAX_RESPONSE_SIZE)
d = timeout_deferred(d, timeout=timeout_sec, reactor=reactor)
- def parse(_len: int):
- return json_decoder.decode(buf.getvalue())
+ await make_deferred_yieldable(d)
- d.addCallback(parse)
-
- body = await make_deferred_yieldable(d)
+ if return_string_io:
+ body = buf
+ else:
+ body = json_decoder.decode(buf.getvalue())
except BodyExceededMaxSize as e:
# The response was too big.
logger.warning(
@@ -683,6 +684,7 @@ class MatrixFederationHttpClient:
ignore_backoff: bool = False,
backoff_on_404: bool = False,
try_trailing_slash_on_400: bool = False,
+ return_string_io=False,
) -> Union[JsonDict, list]:
"""Sends the specified json data using PUT
@@ -757,7 +759,12 @@ class MatrixFederationHttpClient:
_sec_timeout = self.default_timeout
body = await _handle_json_response(
- self.reactor, _sec_timeout, request, response, start_ms
+ self.reactor,
+ _sec_timeout,
+ request,
+ response,
+ start_ms,
+ return_string_io=return_string_io,
)
return body
|