summary refs log tree commit diff
diff options
context:
space:
mode:
authorPatrick Cloke <patrickc@matrix.org>2023-07-11 15:34:31 -0400
committerPatrick Cloke <patrickc@matrix.org>2023-07-17 11:05:44 -0400
commita2d697b74561196d73088623d2e6c1ab97324ac3 (patch)
treee350836f90a4b002f1cd1d8a388c4e14de3e01b9
parentImplement new event and backfill endpoints. (diff)
downloadsynapse-a2d697b74561196d73088623d2e6c1ab97324ac3.tar.xz
Implement new send_{join,leave,knock} endpoints.
-rw-r--r--synapse/federation/federation_client.py37
-rw-r--r--synapse/federation/federation_server.py36
-rw-r--r--synapse/federation/transport/client.py41
-rw-r--r--synapse/federation/transport/server/federation.py59
-rw-r--r--synapse/http/matrixfederationclient.py37
5 files changed, 195 insertions, 15 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index f60ef8c16c..35faf48431 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -1248,6 +1248,19 @@ class FederationClient(FederationBase):
         time_now = self._clock.time_msec()
 
         try:
+            return await self.transport_layer.send_join_unstable(
+                room_version=room_version,
+                destination=destination,
+                txn_id=pdu.event_id,
+                content=pdu.get_pdu_json(time_now),
+            )
+        except HttpResponseException as e:
+            # If an error is received that is due to an unrecognised endpoint,
+            # fallback to the v2 endpoint.
+            if not is_unknown_endpoint(e):
+                raise
+
+        try:
             return await self.transport_layer.send_join_v2(
                 room_version=room_version,
                 destination=destination,
@@ -1385,6 +1398,18 @@ class FederationClient(FederationBase):
         time_now = self._clock.time_msec()
 
         try:
+            return await self.transport_layer.send_leave_unstable(
+                destination=destination,
+                txn_id=pdu.event_id,
+                content=pdu.get_pdu_json(time_now),
+            )
+        except HttpResponseException as e:
+            # If an error is received that is due to an unrecognised endpoint,
+            # fallback to the v2.
+            if not is_unknown_endpoint(e):
+                raise
+
+        try:
             return await self.transport_layer.send_leave_v2(
                 destination=destination,
                 room_id=pdu.room_id,
@@ -1460,6 +1485,18 @@ class FederationClient(FederationBase):
         """
         time_now = self._clock.time_msec()
 
+        try:
+            return await self.transport_layer.send_knock_unstable(
+                destination=destination,
+                txn_id=pdu.event_id,
+                content=pdu.get_pdu_json(time_now),
+            )
+        except HttpResponseException as e:
+            # If an error is received that is due to an unrecognised endpoint,
+            # fallback to the v2.
+            if not is_unknown_endpoint(e):
+                raise
+
         return await self.transport_layer.send_knock_v1(
             destination=destination,
             room_id=pdu.room_id,
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index aff750d0f5..9585e297b5 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -679,13 +679,14 @@ class FederationServer(FederationBase):
         self,
         origin: str,
         content: JsonDict,
-        room_id: str,
+        expected_room_id: Optional[str] = None,
         caller_supports_partial_state: bool = False,
     ) -> Dict[str, Any]:
         set_tag(
             SynapseTags.SEND_JOIN_RESPONSE_IS_PARTIAL_STATE,
             caller_supports_partial_state,
         )
+        room_id = content["room_id"]
         await self._room_member_handler._join_rate_per_room_limiter.ratelimit(  # type: ignore[has-type]
             requester=None,
             key=room_id,
@@ -693,7 +694,7 @@ class FederationServer(FederationBase):
         )
 
         event, context = await self._on_send_membership_event(
-            origin, content, Membership.JOIN, room_id
+            origin, content, Membership.JOIN, expected_room_id
         )
 
         prev_state_ids = await context.get_prev_state_ids()
@@ -752,10 +753,12 @@ class FederationServer(FederationBase):
         return {"event": pdu.get_templated_pdu_json(), "room_version": room_version}
 
     async def on_send_leave_request(
-        self, origin: str, content: JsonDict, room_id: str
+        self, origin: str, content: JsonDict, expected_room_id: Optional[str] = None
     ) -> dict:
         logger.debug("on_send_leave_request: content: %s", content)
-        await self._on_send_membership_event(origin, content, Membership.LEAVE, room_id)
+        await self._on_send_membership_event(
+            origin, content, Membership.LEAVE, expected_room_id
+        )
         return {}
 
     async def on_make_knock_request(
@@ -813,10 +816,7 @@ class FederationServer(FederationBase):
         }
 
     async def on_send_knock_request(
-        self,
-        origin: str,
-        content: JsonDict,
-        room_id: str,
+        self, origin: str, content: JsonDict, expected_room_id: Optional[str] = None
     ) -> Dict[str, List[JsonDict]]:
         """
         We have received a knock event for a room. Verify and send the event into the room
@@ -826,13 +826,13 @@ class FederationServer(FederationBase):
         Args:
             origin: The remote homeserver of the knocking user.
             content: The content of the request.
-            room_id: The ID of the room to knock on.
+            expected_room_id: The room ID included in the request.
 
         Returns:
             The stripped room state.
         """
         _, context = await self._on_send_membership_event(
-            origin, content, Membership.KNOCK, room_id
+            origin, content, Membership.KNOCK, expected_room_id
         )
 
         # Retrieve stripped state events from the room and send them back to the remote
@@ -853,7 +853,11 @@ class FederationServer(FederationBase):
         }
 
     async def _on_send_membership_event(
-        self, origin: str, content: JsonDict, membership_type: str, room_id: str
+        self,
+        origin: str,
+        content: JsonDict,
+        membership_type: str,
+        expected_room_id: Optional[str],
     ) -> Tuple[EventBase, EventContext]:
         """Handle an on_send_{join,leave,knock} request
 
@@ -865,8 +869,8 @@ class FederationServer(FederationBase):
             content: The body of the send_* request - a complete membership event
             membership_type: The expected membership type (join or leave, depending
                 on the endpoint)
-            room_id: The room_id from the request, to be validated against the room_id
-                in the event
+            expected_room_id: The room_id from the request, to be validated against
+                the room_id in the event. None if the request did not include a room ID.
 
         Returns:
             The event and context of the event after inserting it into the room graph.
@@ -876,12 +880,16 @@ class FederationServer(FederationBase):
                the room_id not matching or the event not being authorized.
         """
         assert_params_in_dict(content, ["room_id"])
-        if content["room_id"] != room_id:
+        if expected_room_id is None:
+            room_id = content["room_id"]
+        elif content["room_id"] != expected_room_id:
             raise SynapseError(
                 400,
                 "Room ID in body does not match that in request path",
                 Codes.BAD_JSON,
             )
+        else:
+            room_id = expected_room_id
 
         # Note that get_room_version throws if the room does not exist here.
         room_version = await self.store.get_room_version(room_id)
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 012ce4710b..527d89d996 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -435,6 +435,22 @@ class TransportLayerClient:
             parser=SendJoinParser(room_version, v1_api=False),
         )
 
+    async def send_join_unstable(
+        self,
+        room_version: RoomVersion,
+        destination: str,
+        txn_id: str,
+        content: JsonDict,
+    ) -> "SendJoinResponse":
+        path = f"/_matrix/federation/unstable/org.matrix.i-d.ralston-mimi-linearized-matrix.02/send_join/{txn_id}"
+
+        return await self.client.post_json(
+            destination=destination,
+            path=path,
+            data=content,
+            parser=SendJoinParser(room_version, v1_api=False),
+        )
+
     async def send_leave_v1(
         self, destination: str, room_id: str, event_id: str, content: JsonDict
     ) -> Tuple[int, JsonDict]:
@@ -468,6 +484,22 @@ class TransportLayerClient:
             ignore_backoff=True,
         )
 
+    async def send_leave_unstable(
+        self, destination: str, txn_id: str, content: JsonDict
+    ) -> JsonDict:
+        path = f"/_matrix/federation/unstable/org.matrix.i-d.ralston-mimi-linearized-matrix.02/send_leave/{txn_id}"
+
+        return await self.client.post_json(
+            destination=destination,
+            path=path,
+            data=content,
+            # we want to do our best to send this through. The problem is
+            # that if it fails, we won't retry it later, so if the remote
+            # server was just having a momentary blip, the room will be out of
+            # sync.
+            ignore_backoff=True,
+        )
+
     async def send_knock_v1(
         self,
         destination: str,
@@ -501,6 +533,15 @@ class TransportLayerClient:
             destination=destination, path=path, data=content
         )
 
+    async def send_knock_unstable(
+        self, destination: str, txn_id: str, content: JsonDict
+    ) -> JsonDict:
+        path = f"/_matrix/federation/unstable/org.matrix.i-d.ralston-mimi-linearized-matrix.02/send_knock/{txn_id}"
+
+        return await self.client.post_json(
+            destination=destination, path=path, data=content
+        )
+
     async def send_invite_v1(
         self, destination: str, room_id: str, event_id: str, content: JsonDict
     ) -> Tuple[int, JsonDict]:
diff --git a/synapse/federation/transport/server/federation.py b/synapse/federation/transport/server/federation.py
index d16375aecb..ee201c5c30 100644
--- a/synapse/federation/transport/server/federation.py
+++ b/synapse/federation/transport/server/federation.py
@@ -436,6 +436,23 @@ class FederationV2SendLeaveServlet(BaseFederationServerServlet):
         return 200, result
 
 
+class FederationUnstableSendLeaveServlet(BaseFederationServerServlet):
+    PREFIX = FEDERATION_UNSTABLE_PREFIX + "/org.matrix.i-d.ralston-mimi-linearized-matrix.02"
+    PATH = "/send_leave/(?P<txn_id>[^/]*)"
+    CATEGORY = "Federation requests"
+
+    async def on_POST(
+        self,
+        origin: str,
+        content: JsonDict,
+        query: Dict[bytes, List[bytes]],
+        txn_id: str,
+    ) -> Tuple[int, JsonDict]:
+        # TODO Use the txn_id for idempotency.
+        result = await self.handler.on_send_leave_request(origin, content)
+        return 200, result
+
+
 class FederationMakeKnockServlet(BaseFederationServerServlet):
     PATH = "/make_knock/(?P<room_id>[^/]*)/(?P<user_id>[^/]*)"
     CATEGORY = "Federation requests"
@@ -475,6 +492,23 @@ class FederationV1SendKnockServlet(BaseFederationServerServlet):
         return 200, result
 
 
+class FederationUnstableSendKnockServlet(BaseFederationServerServlet):
+    PREFIX = FEDERATION_UNSTABLE_PREFIX + "/org.matrix.i-d.ralston-mimi-linearized-matrix.02"
+    PATH = "/send_knock/(?P<txn_id>[^/]*)"
+    CATEGORY = "Federation requests"
+
+    async def on_POST(
+        self,
+        origin: str,
+        content: JsonDict,
+        query: Dict[bytes, List[bytes]],
+        txn_id: str,
+    ) -> Tuple[int, JsonDict]:
+        # TODO Use the txn_id for idempotency.
+        result = await self.handler.on_send_knock_request(origin, content)
+        return 200, {"stripped_state": result["knock_room_state"]}
+
+
 class FederationEventAuthServlet(BaseFederationServerServlet):
     PATH = "/event_auth/(?P<room_id>[^/]*)/(?P<event_id>[^/]*)"
     CATEGORY = "Federation requests"
@@ -533,6 +567,28 @@ class FederationV2SendJoinServlet(BaseFederationServerServlet):
         return 200, result
 
 
+class FederationUnstableSendJoinServlet(BaseFederationServerServlet):
+    PREFIX = FEDERATION_UNSTABLE_PREFIX + "/org.matrix.i-d.ralston-mimi-linearized-matrix.02"
+    PATH = "/send_join/(?P<txn_id>[^/]*)"
+    CATEGORY = "Federation requests"
+
+    async def on_POST(
+        self,
+        origin: str,
+        content: JsonDict,
+        query: Dict[bytes, List[bytes]],
+        txn_id: str,
+    ) -> Tuple[int, JsonDict]:
+        # TODO Use the txn_id for idempotency.
+
+        result = await self.handler.on_send_join_request(origin, content)
+        return 200, {
+            "event": result["event"],
+            "state": result["state"],
+            "auth_chain": result["auth_chain}"],
+        }
+
+
 class FederationV1InviteServlet(BaseFederationServerServlet):
     PATH = "/invite/(?P<room_id>[^/]*)/(?P<event_id>[^/]*)"
     CATEGORY = "Federation requests"
@@ -896,4 +952,7 @@ FEDERATION_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = (
     # TODO(LM) Linearized Matrix additions.
     FederationUnstableEventServlet,
     FederationUnstableBackfillServlet,
+    FederationUnstableSendJoinServlet,
+    FederationUnstableSendLeaveServlet,
+    FederationUnstableSendKnockServlet,
 )
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index cc4e258b0f..b973246adc 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -986,6 +986,7 @@ class MatrixFederationHttpClient:
 
         return body
 
+    @overload
     async def post_json(
         self,
         destination: str,
@@ -995,7 +996,35 @@ class MatrixFederationHttpClient:
         timeout: Optional[int] = None,
         ignore_backoff: bool = False,
         args: Optional[QueryParams] = None,
+        parser: Literal[None] = None,
     ) -> JsonDict:
+        ...
+
+    @overload
+    async def post_json(
+        self,
+        destination: str,
+        path: str,
+        data: Optional[JsonDict] = None,
+        long_retries: bool = False,
+        timeout: Optional[int] = None,
+        ignore_backoff: bool = False,
+        args: Optional[QueryParams] = None,
+        parser: Optional[ByteParser[T]] = None,
+    ) -> T:
+        ...
+
+    async def post_json(
+        self,
+        destination: str,
+        path: str,
+        data: Optional[JsonDict] = None,
+        long_retries: bool = False,
+        timeout: Optional[int] = None,
+        ignore_backoff: bool = False,
+        args: Optional[QueryParams] = None,
+        parser: Optional[ByteParser[T]] = None,
+    ) -> Union[JsonDict, T]:
         """Sends the specified json data using POST
 
         Args:
@@ -1021,6 +1050,9 @@ class MatrixFederationHttpClient:
                 try the request anyway.
 
             args: query params
+
+            parser: The parser to use to decode the response. Defaults to
+                parsing as JSON.
         Returns:
             Succeeds when we get a 2xx HTTP response. The result will be the decoded JSON body.
 
@@ -1053,8 +1085,11 @@ class MatrixFederationHttpClient:
         else:
             _sec_timeout = self.default_timeout_seconds
 
+        if parser is None:
+            parser = cast(ByteParser[T], JsonParser())
+
         body = await _handle_response(
-            self.reactor, _sec_timeout, request, response, start_ms, parser=JsonParser()
+            self.reactor, _sec_timeout, request, response, start_ms, parser=parser
         )
         return body