summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/federation_client.py28
-rw-r--r--synapse/federation/transport/client.py85
2 files changed, 91 insertions, 22 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index a5b6a61195..e0e9f5d0be 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -55,6 +55,7 @@ from synapse.api.room_versions import (
 )
 from synapse.events import EventBase, builder
 from synapse.federation.federation_base import FederationBase, event_from_pdu_json
+from synapse.federation.transport.client import SendJoinResponse
 from synapse.logging.context import make_deferred_yieldable, preserve_fn
 from synapse.logging.utils import log_function
 from synapse.types import JsonDict, get_domain_from_id
@@ -665,19 +666,10 @@ class FederationClient(FederationBase):
         """
 
         async def send_request(destination) -> Dict[str, Any]:
-            content = await self._do_send_join(destination, pdu)
+            response = await self._do_send_join(room_version, destination, pdu)
 
-            logger.debug("Got content: %s", content)
-
-            state = [
-                event_from_pdu_json(p, room_version, outlier=True)
-                for p in content.get("state", [])
-            ]
-
-            auth_chain = [
-                event_from_pdu_json(p, room_version, outlier=True)
-                for p in content.get("auth_chain", [])
-            ]
+            state = response.state
+            auth_chain = response.auth_events
 
             pdus = {p.event_id: p for p in itertools.chain(state, auth_chain)}
 
@@ -752,11 +744,14 @@ class FederationClient(FederationBase):
 
         return await self._try_destination_list("send_join", destinations, send_request)
 
-    async def _do_send_join(self, destination: str, pdu: EventBase) -> JsonDict:
+    async def _do_send_join(
+        self, room_version: RoomVersion, destination: str, pdu: EventBase
+    ) -> SendJoinResponse:
         time_now = self._clock.time_msec()
 
         try:
             return await self.transport_layer.send_join_v2(
+                room_version=room_version,
                 destination=destination,
                 room_id=pdu.room_id,
                 event_id=pdu.event_id,
@@ -771,17 +766,14 @@ class FederationClient(FederationBase):
 
         logger.debug("Couldn't send_join with the v2 API, falling back to the v1 API")
 
-        resp = await self.transport_layer.send_join_v1(
+        return await self.transport_layer.send_join_v1(
+            room_version=room_version,
             destination=destination,
             room_id=pdu.room_id,
             event_id=pdu.event_id,
             content=pdu.get_pdu_json(time_now),
         )
 
-        # We expect the v1 API to respond with [200, content], so we only return the
-        # content.
-        return resp[1]
-
     async def send_invite(
         self,
         destination: str,
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 497848a2b7..e93ab83f7f 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -17,13 +17,19 @@ import logging
 import urllib
 from typing import Any, Dict, List, Optional
 
+import attr
+import ijson
+
 from synapse.api.constants import Membership
 from synapse.api.errors import Codes, HttpResponseException, SynapseError
+from synapse.api.room_versions import RoomVersion
 from synapse.api.urls import (
     FEDERATION_UNSTABLE_PREFIX,
     FEDERATION_V1_PREFIX,
     FEDERATION_V2_PREFIX,
 )
+from synapse.events import EventBase, make_event_from_dict
+from synapse.http.matrixfederationclient import ByteParser
 from synapse.logging.utils import log_function
 from synapse.types import JsonDict
 
@@ -240,21 +246,36 @@ class TransportLayerClient:
         return content
 
     @log_function
-    async def send_join_v1(self, destination, room_id, event_id, content):
+    async def send_join_v1(
+        self,
+        room_version,
+        destination,
+        room_id,
+        event_id,
+        content,
+    ) -> "SendJoinResponse":
         path = _create_v1_path("/send_join/%s/%s", room_id, event_id)
 
         response = await self.client.put_json(
-            destination=destination, path=path, data=content
+            destination=destination,
+            path=path,
+            data=content,
+            parser=SendJoinParser(room_version, v1_api=True),
         )
 
         return response
 
     @log_function
-    async def send_join_v2(self, destination, room_id, event_id, content):
+    async def send_join_v2(
+        self, room_version, destination, room_id, event_id, content
+    ) -> "SendJoinResponse":
         path = _create_v2_path("/send_join/%s/%s", room_id, event_id)
 
         response = await self.client.put_json(
-            destination=destination, path=path, data=content
+            destination=destination,
+            path=path,
+            data=content,
+            parser=SendJoinParser(room_version, v1_api=False),
         )
 
         return response
@@ -1053,3 +1074,59 @@ def _create_v2_path(path, *args):
         str
     """
     return _create_path(FEDERATION_V2_PREFIX, path, *args)
+
+
+@attr.s(slots=True, auto_attribs=True)
+class SendJoinResponse:
+    """The parsed response of a `/send_join` request."""
+
+    auth_events: List[EventBase]
+    state: List[EventBase]
+
+
+@ijson.coroutine
+def _event_list_parser(room_version: RoomVersion, events: List[EventBase]):
+    """Helper function for use with `ijson.items_coro` to parse an array of
+    events and add them to the given list.
+    """
+
+    while True:
+        obj = yield
+        event = make_event_from_dict(obj, room_version)
+        events.append(event)
+
+
+class SendJoinParser(ByteParser[SendJoinResponse]):
+    """A parser for the response to `/send_join` requests.
+
+    Args:
+        room_version: The version of the room.
+        v1_api: Whether the response is in the v1 format.
+    """
+
+    CONTENT_TYPE = "application/json"
+
+    def __init__(self, room_version: RoomVersion, v1_api: bool):
+        self._response = SendJoinResponse([], [])
+
+        # The V1 API has the shape of `[200, {...}]`, which we handle by
+        # prefixing with `item.*`.
+        prefix = "item." if v1_api else ""
+
+        self._coro_state = ijson.items_coro(
+            _event_list_parser(room_version, self._response.state),
+            prefix + "state.item",
+        )
+        self._coro_auth = ijson.items_coro(
+            _event_list_parser(room_version, self._response.auth_events),
+            prefix + "auth_chain.item",
+        )
+
+    def write(self, data: bytes) -> int:
+        self._coro_state.send(data)
+        self._coro_auth.send(data)
+
+        return len(data)
+
+    def finish(self) -> SendJoinResponse:
+        return self._response