diff options
author | Patrick Cloke <patrickc@matrix.org> | 2023-07-11 15:34:31 -0400 |
---|---|---|
committer | Patrick Cloke <patrickc@matrix.org> | 2023-07-17 11:05:44 -0400 |
commit | a2d697b74561196d73088623d2e6c1ab97324ac3 (patch) | |
tree | e350836f90a4b002f1cd1d8a388c4e14de3e01b9 | |
parent | Implement new event and backfill endpoints. (diff) | |
download | synapse-a2d697b74561196d73088623d2e6c1ab97324ac3.tar.xz |
Implement new send_{join,leave,knock} endpoints.
-rw-r--r-- | synapse/federation/federation_client.py | 37 | ||||
-rw-r--r-- | synapse/federation/federation_server.py | 36 | ||||
-rw-r--r-- | synapse/federation/transport/client.py | 41 | ||||
-rw-r--r-- | synapse/federation/transport/server/federation.py | 59 | ||||
-rw-r--r-- | synapse/http/matrixfederationclient.py | 37 |
5 files changed, 195 insertions, 15 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index f60ef8c16c..35faf48431 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -1248,6 +1248,19 @@ class FederationClient(FederationBase): time_now = self._clock.time_msec() try: + return await self.transport_layer.send_join_unstable( + room_version=room_version, + destination=destination, + txn_id=pdu.event_id, + content=pdu.get_pdu_json(time_now), + ) + except HttpResponseException as e: + # If an error is received that is due to an unrecognised endpoint, + # fallback to the v2 endpoint. + if not is_unknown_endpoint(e): + raise + + try: return await self.transport_layer.send_join_v2( room_version=room_version, destination=destination, @@ -1385,6 +1398,18 @@ class FederationClient(FederationBase): time_now = self._clock.time_msec() try: + return await self.transport_layer.send_leave_unstable( + destination=destination, + txn_id=pdu.event_id, + content=pdu.get_pdu_json(time_now), + ) + except HttpResponseException as e: + # If an error is received that is due to an unrecognised endpoint, + # fallback to the v2. + if not is_unknown_endpoint(e): + raise + + try: return await self.transport_layer.send_leave_v2( destination=destination, room_id=pdu.room_id, @@ -1460,6 +1485,18 @@ class FederationClient(FederationBase): """ time_now = self._clock.time_msec() + try: + return await self.transport_layer.send_knock_unstable( + destination=destination, + txn_id=pdu.event_id, + content=pdu.get_pdu_json(time_now), + ) + except HttpResponseException as e: + # If an error is received that is due to an unrecognised endpoint, + # fallback to the v2. + if not is_unknown_endpoint(e): + raise + return await self.transport_layer.send_knock_v1( destination=destination, room_id=pdu.room_id, diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index aff750d0f5..9585e297b5 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -679,13 +679,14 @@ class FederationServer(FederationBase): self, origin: str, content: JsonDict, - room_id: str, + expected_room_id: Optional[str] = None, caller_supports_partial_state: bool = False, ) -> Dict[str, Any]: set_tag( SynapseTags.SEND_JOIN_RESPONSE_IS_PARTIAL_STATE, caller_supports_partial_state, ) + room_id = content["room_id"] await self._room_member_handler._join_rate_per_room_limiter.ratelimit( # type: ignore[has-type] requester=None, key=room_id, @@ -693,7 +694,7 @@ class FederationServer(FederationBase): ) event, context = await self._on_send_membership_event( - origin, content, Membership.JOIN, room_id + origin, content, Membership.JOIN, expected_room_id ) prev_state_ids = await context.get_prev_state_ids() @@ -752,10 +753,12 @@ class FederationServer(FederationBase): return {"event": pdu.get_templated_pdu_json(), "room_version": room_version} async def on_send_leave_request( - self, origin: str, content: JsonDict, room_id: str + self, origin: str, content: JsonDict, expected_room_id: Optional[str] = None ) -> dict: logger.debug("on_send_leave_request: content: %s", content) - await self._on_send_membership_event(origin, content, Membership.LEAVE, room_id) + await self._on_send_membership_event( + origin, content, Membership.LEAVE, expected_room_id + ) return {} async def on_make_knock_request( @@ -813,10 +816,7 @@ class FederationServer(FederationBase): } async def on_send_knock_request( - self, - origin: str, - content: JsonDict, - room_id: str, + self, origin: str, content: JsonDict, expected_room_id: Optional[str] = None ) -> Dict[str, List[JsonDict]]: """ We have received a knock event for a room. Verify and send the event into the room @@ -826,13 +826,13 @@ class FederationServer(FederationBase): Args: origin: The remote homeserver of the knocking user. content: The content of the request. - room_id: The ID of the room to knock on. + expected_room_id: The room ID included in the request. Returns: The stripped room state. """ _, context = await self._on_send_membership_event( - origin, content, Membership.KNOCK, room_id + origin, content, Membership.KNOCK, expected_room_id ) # Retrieve stripped state events from the room and send them back to the remote @@ -853,7 +853,11 @@ class FederationServer(FederationBase): } async def _on_send_membership_event( - self, origin: str, content: JsonDict, membership_type: str, room_id: str + self, + origin: str, + content: JsonDict, + membership_type: str, + expected_room_id: Optional[str], ) -> Tuple[EventBase, EventContext]: """Handle an on_send_{join,leave,knock} request @@ -865,8 +869,8 @@ class FederationServer(FederationBase): content: The body of the send_* request - a complete membership event membership_type: The expected membership type (join or leave, depending on the endpoint) - room_id: The room_id from the request, to be validated against the room_id - in the event + expected_room_id: The room_id from the request, to be validated against + the room_id in the event. None if the request did not include a room ID. Returns: The event and context of the event after inserting it into the room graph. @@ -876,12 +880,16 @@ class FederationServer(FederationBase): the room_id not matching or the event not being authorized. """ assert_params_in_dict(content, ["room_id"]) - if content["room_id"] != room_id: + if expected_room_id is None: + room_id = content["room_id"] + elif content["room_id"] != expected_room_id: raise SynapseError( 400, "Room ID in body does not match that in request path", Codes.BAD_JSON, ) + else: + room_id = expected_room_id # Note that get_room_version throws if the room does not exist here. room_version = await self.store.get_room_version(room_id) diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 012ce4710b..527d89d996 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -435,6 +435,22 @@ class TransportLayerClient: parser=SendJoinParser(room_version, v1_api=False), ) + async def send_join_unstable( + self, + room_version: RoomVersion, + destination: str, + txn_id: str, + content: JsonDict, + ) -> "SendJoinResponse": + path = f"/_matrix/federation/unstable/org.matrix.i-d.ralston-mimi-linearized-matrix.02/send_join/{txn_id}" + + return await self.client.post_json( + destination=destination, + path=path, + data=content, + parser=SendJoinParser(room_version, v1_api=False), + ) + async def send_leave_v1( self, destination: str, room_id: str, event_id: str, content: JsonDict ) -> Tuple[int, JsonDict]: @@ -468,6 +484,22 @@ class TransportLayerClient: ignore_backoff=True, ) + async def send_leave_unstable( + self, destination: str, txn_id: str, content: JsonDict + ) -> JsonDict: + path = f"/_matrix/federation/unstable/org.matrix.i-d.ralston-mimi-linearized-matrix.02/send_leave/{txn_id}" + + return await self.client.post_json( + destination=destination, + path=path, + data=content, + # we want to do our best to send this through. The problem is + # that if it fails, we won't retry it later, so if the remote + # server was just having a momentary blip, the room will be out of + # sync. + ignore_backoff=True, + ) + async def send_knock_v1( self, destination: str, @@ -501,6 +533,15 @@ class TransportLayerClient: destination=destination, path=path, data=content ) + async def send_knock_unstable( + self, destination: str, txn_id: str, content: JsonDict + ) -> JsonDict: + path = f"/_matrix/federation/unstable/org.matrix.i-d.ralston-mimi-linearized-matrix.02/send_knock/{txn_id}" + + return await self.client.post_json( + destination=destination, path=path, data=content + ) + async def send_invite_v1( self, destination: str, room_id: str, event_id: str, content: JsonDict ) -> Tuple[int, JsonDict]: diff --git a/synapse/federation/transport/server/federation.py b/synapse/federation/transport/server/federation.py index d16375aecb..ee201c5c30 100644 --- a/synapse/federation/transport/server/federation.py +++ b/synapse/federation/transport/server/federation.py @@ -436,6 +436,23 @@ class FederationV2SendLeaveServlet(BaseFederationServerServlet): return 200, result +class FederationUnstableSendLeaveServlet(BaseFederationServerServlet): + PREFIX = FEDERATION_UNSTABLE_PREFIX + "/org.matrix.i-d.ralston-mimi-linearized-matrix.02" + PATH = "/send_leave/(?P<txn_id>[^/]*)" + CATEGORY = "Federation requests" + + async def on_POST( + self, + origin: str, + content: JsonDict, + query: Dict[bytes, List[bytes]], + txn_id: str, + ) -> Tuple[int, JsonDict]: + # TODO Use the txn_id for idempotency. + result = await self.handler.on_send_leave_request(origin, content) + return 200, result + + class FederationMakeKnockServlet(BaseFederationServerServlet): PATH = "/make_knock/(?P<room_id>[^/]*)/(?P<user_id>[^/]*)" CATEGORY = "Federation requests" @@ -475,6 +492,23 @@ class FederationV1SendKnockServlet(BaseFederationServerServlet): return 200, result +class FederationUnstableSendKnockServlet(BaseFederationServerServlet): + PREFIX = FEDERATION_UNSTABLE_PREFIX + "/org.matrix.i-d.ralston-mimi-linearized-matrix.02" + PATH = "/send_knock/(?P<txn_id>[^/]*)" + CATEGORY = "Federation requests" + + async def on_POST( + self, + origin: str, + content: JsonDict, + query: Dict[bytes, List[bytes]], + txn_id: str, + ) -> Tuple[int, JsonDict]: + # TODO Use the txn_id for idempotency. + result = await self.handler.on_send_knock_request(origin, content) + return 200, {"stripped_state": result["knock_room_state"]} + + class FederationEventAuthServlet(BaseFederationServerServlet): PATH = "/event_auth/(?P<room_id>[^/]*)/(?P<event_id>[^/]*)" CATEGORY = "Federation requests" @@ -533,6 +567,28 @@ class FederationV2SendJoinServlet(BaseFederationServerServlet): return 200, result +class FederationUnstableSendJoinServlet(BaseFederationServerServlet): + PREFIX = FEDERATION_UNSTABLE_PREFIX + "/org.matrix.i-d.ralston-mimi-linearized-matrix.02" + PATH = "/send_join/(?P<txn_id>[^/]*)" + CATEGORY = "Federation requests" + + async def on_POST( + self, + origin: str, + content: JsonDict, + query: Dict[bytes, List[bytes]], + txn_id: str, + ) -> Tuple[int, JsonDict]: + # TODO Use the txn_id for idempotency. + + result = await self.handler.on_send_join_request(origin, content) + return 200, { + "event": result["event"], + "state": result["state"], + "auth_chain": result["auth_chain}"], + } + + class FederationV1InviteServlet(BaseFederationServerServlet): PATH = "/invite/(?P<room_id>[^/]*)/(?P<event_id>[^/]*)" CATEGORY = "Federation requests" @@ -896,4 +952,7 @@ FEDERATION_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = ( # TODO(LM) Linearized Matrix additions. FederationUnstableEventServlet, FederationUnstableBackfillServlet, + FederationUnstableSendJoinServlet, + FederationUnstableSendLeaveServlet, + FederationUnstableSendKnockServlet, ) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index cc4e258b0f..b973246adc 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -986,6 +986,7 @@ class MatrixFederationHttpClient: return body + @overload async def post_json( self, destination: str, @@ -995,7 +996,35 @@ class MatrixFederationHttpClient: timeout: Optional[int] = None, ignore_backoff: bool = False, args: Optional[QueryParams] = None, + parser: Literal[None] = None, ) -> JsonDict: + ... + + @overload + async def post_json( + self, + destination: str, + path: str, + data: Optional[JsonDict] = None, + long_retries: bool = False, + timeout: Optional[int] = None, + ignore_backoff: bool = False, + args: Optional[QueryParams] = None, + parser: Optional[ByteParser[T]] = None, + ) -> T: + ... + + async def post_json( + self, + destination: str, + path: str, + data: Optional[JsonDict] = None, + long_retries: bool = False, + timeout: Optional[int] = None, + ignore_backoff: bool = False, + args: Optional[QueryParams] = None, + parser: Optional[ByteParser[T]] = None, + ) -> Union[JsonDict, T]: """Sends the specified json data using POST Args: @@ -1021,6 +1050,9 @@ class MatrixFederationHttpClient: try the request anyway. args: query params + + parser: The parser to use to decode the response. Defaults to + parsing as JSON. Returns: Succeeds when we get a 2xx HTTP response. The result will be the decoded JSON body. @@ -1053,8 +1085,11 @@ class MatrixFederationHttpClient: else: _sec_timeout = self.default_timeout_seconds + if parser is None: + parser = cast(ByteParser[T], JsonParser()) + body = await _handle_response( - self.reactor, _sec_timeout, request, response, start_ms, parser=JsonParser() + self.reactor, _sec_timeout, request, response, start_ms, parser=parser ) return body |