diff options
Diffstat (limited to 'synapse/federation')
-rw-r--r-- | synapse/federation/federation_client.py | 28 | ||||
-rw-r--r-- | synapse/federation/transport/client.py | 85 |
2 files changed, 91 insertions, 22 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index a5b6a61195..e0e9f5d0be 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -55,6 +55,7 @@ from synapse.api.room_versions import ( ) from synapse.events import EventBase, builder from synapse.federation.federation_base import FederationBase, event_from_pdu_json +from synapse.federation.transport.client import SendJoinResponse from synapse.logging.context import make_deferred_yieldable, preserve_fn from synapse.logging.utils import log_function from synapse.types import JsonDict, get_domain_from_id @@ -665,19 +666,10 @@ class FederationClient(FederationBase): """ async def send_request(destination) -> Dict[str, Any]: - content = await self._do_send_join(destination, pdu) + response = await self._do_send_join(room_version, destination, pdu) - logger.debug("Got content: %s", content) - - state = [ - event_from_pdu_json(p, room_version, outlier=True) - for p in content.get("state", []) - ] - - auth_chain = [ - event_from_pdu_json(p, room_version, outlier=True) - for p in content.get("auth_chain", []) - ] + state = response.state + auth_chain = response.auth_events pdus = {p.event_id: p for p in itertools.chain(state, auth_chain)} @@ -752,11 +744,14 @@ class FederationClient(FederationBase): return await self._try_destination_list("send_join", destinations, send_request) - async def _do_send_join(self, destination: str, pdu: EventBase) -> JsonDict: + async def _do_send_join( + self, room_version: RoomVersion, destination: str, pdu: EventBase + ) -> SendJoinResponse: time_now = self._clock.time_msec() try: return await self.transport_layer.send_join_v2( + room_version=room_version, destination=destination, room_id=pdu.room_id, event_id=pdu.event_id, @@ -771,17 +766,14 @@ class FederationClient(FederationBase): logger.debug("Couldn't send_join with the v2 API, falling back to the v1 API") - resp = await self.transport_layer.send_join_v1( + return await self.transport_layer.send_join_v1( + room_version=room_version, destination=destination, room_id=pdu.room_id, event_id=pdu.event_id, content=pdu.get_pdu_json(time_now), ) - # We expect the v1 API to respond with [200, content], so we only return the - # content. - return resp[1] - async def send_invite( self, destination: str, diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 497848a2b7..e93ab83f7f 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -17,13 +17,19 @@ import logging import urllib from typing import Any, Dict, List, Optional +import attr +import ijson + from synapse.api.constants import Membership from synapse.api.errors import Codes, HttpResponseException, SynapseError +from synapse.api.room_versions import RoomVersion from synapse.api.urls import ( FEDERATION_UNSTABLE_PREFIX, FEDERATION_V1_PREFIX, FEDERATION_V2_PREFIX, ) +from synapse.events import EventBase, make_event_from_dict +from synapse.http.matrixfederationclient import ByteParser from synapse.logging.utils import log_function from synapse.types import JsonDict @@ -240,21 +246,36 @@ class TransportLayerClient: return content @log_function - async def send_join_v1(self, destination, room_id, event_id, content): + async def send_join_v1( + self, + room_version, + destination, + room_id, + event_id, + content, + ) -> "SendJoinResponse": path = _create_v1_path("/send_join/%s/%s", room_id, event_id) response = await self.client.put_json( - destination=destination, path=path, data=content + destination=destination, + path=path, + data=content, + parser=SendJoinParser(room_version, v1_api=True), ) return response @log_function - async def send_join_v2(self, destination, room_id, event_id, content): + async def send_join_v2( + self, room_version, destination, room_id, event_id, content + ) -> "SendJoinResponse": path = _create_v2_path("/send_join/%s/%s", room_id, event_id) response = await self.client.put_json( - destination=destination, path=path, data=content + destination=destination, + path=path, + data=content, + parser=SendJoinParser(room_version, v1_api=False), ) return response @@ -1053,3 +1074,59 @@ def _create_v2_path(path, *args): str """ return _create_path(FEDERATION_V2_PREFIX, path, *args) + + +@attr.s(slots=True, auto_attribs=True) +class SendJoinResponse: + """The parsed response of a `/send_join` request.""" + + auth_events: List[EventBase] + state: List[EventBase] + + +@ijson.coroutine +def _event_list_parser(room_version: RoomVersion, events: List[EventBase]): + """Helper function for use with `ijson.items_coro` to parse an array of + events and add them to the given list. + """ + + while True: + obj = yield + event = make_event_from_dict(obj, room_version) + events.append(event) + + +class SendJoinParser(ByteParser[SendJoinResponse]): + """A parser for the response to `/send_join` requests. + + Args: + room_version: The version of the room. + v1_api: Whether the response is in the v1 format. + """ + + CONTENT_TYPE = "application/json" + + def __init__(self, room_version: RoomVersion, v1_api: bool): + self._response = SendJoinResponse([], []) + + # The V1 API has the shape of `[200, {...}]`, which we handle by + # prefixing with `item.*`. + prefix = "item." if v1_api else "" + + self._coro_state = ijson.items_coro( + _event_list_parser(room_version, self._response.state), + prefix + "state.item", + ) + self._coro_auth = ijson.items_coro( + _event_list_parser(room_version, self._response.auth_events), + prefix + "auth_chain.item", + ) + + def write(self, data: bytes) -> int: + self._coro_state.send(data) + self._coro_auth.send(data) + + return len(data) + + def finish(self) -> SendJoinResponse: + return self._response |