diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 497848a2b7..e93ab83f7f 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -17,13 +17,19 @@ import logging
import urllib
from typing import Any, Dict, List, Optional
+import attr
+import ijson
+
from synapse.api.constants import Membership
from synapse.api.errors import Codes, HttpResponseException, SynapseError
+from synapse.api.room_versions import RoomVersion
from synapse.api.urls import (
FEDERATION_UNSTABLE_PREFIX,
FEDERATION_V1_PREFIX,
FEDERATION_V2_PREFIX,
)
+from synapse.events import EventBase, make_event_from_dict
+from synapse.http.matrixfederationclient import ByteParser
from synapse.logging.utils import log_function
from synapse.types import JsonDict
@@ -240,21 +246,36 @@ class TransportLayerClient:
return content
@log_function
- async def send_join_v1(self, destination, room_id, event_id, content):
+ async def send_join_v1(
+ self,
+ room_version,
+ destination,
+ room_id,
+ event_id,
+ content,
+ ) -> "SendJoinResponse":
path = _create_v1_path("/send_join/%s/%s", room_id, event_id)
response = await self.client.put_json(
- destination=destination, path=path, data=content
+ destination=destination,
+ path=path,
+ data=content,
+ parser=SendJoinParser(room_version, v1_api=True),
)
return response
@log_function
- async def send_join_v2(self, destination, room_id, event_id, content):
+ async def send_join_v2(
+ self, room_version, destination, room_id, event_id, content
+ ) -> "SendJoinResponse":
path = _create_v2_path("/send_join/%s/%s", room_id, event_id)
response = await self.client.put_json(
- destination=destination, path=path, data=content
+ destination=destination,
+ path=path,
+ data=content,
+ parser=SendJoinParser(room_version, v1_api=False),
)
return response
@@ -1053,3 +1074,59 @@ def _create_v2_path(path, *args):
str
"""
return _create_path(FEDERATION_V2_PREFIX, path, *args)
+
+
+@attr.s(slots=True, auto_attribs=True)
+class SendJoinResponse:
+ """The parsed response of a `/send_join` request."""
+
+ auth_events: List[EventBase]
+ state: List[EventBase]
+
+
+@ijson.coroutine
+def _event_list_parser(room_version: RoomVersion, events: List[EventBase]):
+ """Helper function for use with `ijson.items_coro` to parse an array of
+ events and add them to the given list.
+ """
+
+ while True:
+ obj = yield
+ event = make_event_from_dict(obj, room_version)
+ events.append(event)
+
+
+class SendJoinParser(ByteParser[SendJoinResponse]):
+ """A parser for the response to `/send_join` requests.
+
+ Args:
+ room_version: The version of the room.
+ v1_api: Whether the response is in the v1 format.
+ """
+
+ CONTENT_TYPE = "application/json"
+
+ def __init__(self, room_version: RoomVersion, v1_api: bool):
+ self._response = SendJoinResponse([], [])
+
+ # The V1 API has the shape of `[200, {...}]`, which we handle by
+ # prefixing with `item.*`.
+ prefix = "item." if v1_api else ""
+
+ self._coro_state = ijson.items_coro(
+ _event_list_parser(room_version, self._response.state),
+ prefix + "state.item",
+ )
+ self._coro_auth = ijson.items_coro(
+ _event_list_parser(room_version, self._response.auth_events),
+ prefix + "auth_chain.item",
+ )
+
+ def write(self, data: bytes) -> int:
+ self._coro_state.send(data)
+ self._coro_auth.send(data)
+
+ return len(data)
+
+ def finish(self) -> SendJoinResponse:
+ return self._response
|