summary refs log tree commit diff
diff options
context:
space:
mode:
authorPatrick Cloke <patrickc@matrix.org>2023-07-13 08:07:24 -0400
committerPatrick Cloke <patrickc@matrix.org>2023-07-17 11:05:44 -0400
commit547a7076b471b6adad1fbb6a2a5c1936c6439a3f (patch)
tree8814bdef9bae6e2837fdb442c602b569f2c561a4
parentConvert new-style EDUs to old-style EDUs. (diff)
downloadsynapse-547a7076b471b6adad1fbb6a2a5c1936c6439a3f.tar.xz
Accept LPDUs in transactions and fan them back out.
-rw-r--r--synapse/crypto/keyring.py4
-rw-r--r--synapse/federation/federation_server.py135
-rw-r--r--synapse/federation/sender/__init__.py22
3 files changed, 151 insertions, 10 deletions
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index 260aab3241..1caef70538 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -189,6 +189,10 @@ class Keyring:
             valid_until_ts=2**63,  # fake future timestamp
         )
 
+    async def is_server_linearized(self, server_name: str) -> bool:
+        # TODO(LM) Fetch whether the key response of the origin contains m.linearized.
+        return not self._is_mine_server_name(server_name)
+
     async def verify_json_for_server(
         self,
         server_name: str,
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index c12d58af27..61ed41c903 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -13,6 +13,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import collections.abc
 import logging
 import random
 from typing import (
@@ -55,6 +56,7 @@ from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
 from synapse.crypto.event_signing import compute_event_signature
 from synapse.events import EventBase
 from synapse.events.snapshot import EventContext
+from synapse.events.utils import validate_canonicaljson
 from synapse.federation.federation_base import (
     FederationBase,
     InvalidEventSignatureError,
@@ -285,11 +287,6 @@ class FederationServer(FederationBase):
         # accurate as possible.
         request_time = self._clock.time_msec()
 
-        # TODO(LM): If we are the hub server and the destination is a participant
-        #           server then pdus are Linear PDUs.
-        #
-        # TODO(LM): If we are the hub's DAG server we need to linearize and give
-        #           it the results.
         transaction = Transaction(
             transaction_id=transaction_id,
             destination=destination,
@@ -463,7 +460,21 @@ class FederationServer(FederationBase):
                 logger.info("Ignoring PDU: %s", e)
                 continue
 
-            event = event_from_pdu_json(p, room_version)
+            # If the transaction is from a linearized matrix server, create PDUs
+            # from the LPDUs.
+            if (
+                room_version.linearized_matrix
+                and await self.keyring.is_server_linearized(origin)
+            ):
+                event = await self._on_lpdu_event(p, room_version)
+
+                # Send this event on behalf of the other server.
+                #
+                # We are acting as the hub for this transaction and need to send
+                # this event out to other participants.
+                event.internal_metadata.send_on_behalf_of = origin
+            else:
+                event = event_from_pdu_json(p, room_version)
             pdus_by_room.setdefault(room_id, []).append(event)
 
             if event.origin_server_ts > newest_pdu_ts:
@@ -531,7 +542,6 @@ class FederationServer(FederationBase):
     async def _handle_edus_in_txn(self, origin: str, transaction: Transaction) -> None:
         """Process the EDUs in a received transaction."""
 
-
         async def _process_edu(edu_dict: JsonDict) -> None:
             received_edus_counter.inc()
 
@@ -1012,6 +1022,117 @@ class FederationServer(FederationBase):
                 origin, event
             )
 
+    async def _on_lpdu_event(
+        self, lpdu_json: JsonDict, room_version: RoomVersion
+    ) -> EventBase:
+        """Construct an EventBase from a linearized event json received over federation
+
+        See event_from_pdu_json.
+
+        Args:
+            lpdu_json: lpdu as received over federation
+            room_version: The version of the room this event belongs to
+
+        Raises:
+            SynapseError: if the lpdu is missing required fields or is otherwise
+                not a valid matrix event
+        """
+        if not room_version.linearized_matrix:
+            raise ValueError("Cannot be used on non-linearized matrix")
+
+        # The minimum fields to create an LPDU.
+        assert_params_in_dict(
+            lpdu_json,
+            (
+                "type",
+                "room_Id",
+                "sender",
+                "origin",
+                "hub_server",
+                "origin_server_ts",
+                "content",
+                "hashes",
+            ),
+        )
+
+        # The participant server should *not* provide auth/prev events.
+        for field in ("auth_events", "prev_events"):
+            if field in lpdu_json:
+                raise SynapseError(400, f"LPDU contained {field}", Codes.BAD_JSON)
+
+        # Hashes must contain (only) "lpdu".
+        if not isinstance(lpdu_json["hashes"], collections.abc.Mapping):
+            raise SynapseError(400, "Invalid hashes", Codes.BAD_JSON)
+        if lpdu_json["hashes"].keys() != {"lpdu"}:
+            raise SynapseError(
+                400, "hashes must contain exactly one key: 'lpdu'", Codes.BAD_JSON
+            )
+
+        # Validate that the JSON conforms to the specification.
+        if room_version.strict_canonicaljson:
+            validate_canonicaljson(lpdu_json)
+
+        # An LPDU doesn't have enough information to just create an event, it needs
+        # to be built and signed, etc.
+        builder = self.hs.get_event_builder_factory().for_room_version(
+            room_version, lpdu_json
+        )
+
+        try:
+            (
+                event,
+                unpersisted_context,
+            ) = await self.hs.get_event_creation_handler().create_new_client_event(
+                builder=builder
+            )
+        except SynapseError as e:
+            logger.warning(
+                "Failed to create PDU from template for room %s because %s",
+                lpdu_json["room_id"],
+                e,
+            )
+            raise
+
+        if not event.hub_server:
+            raise SynapseError(400, "Cannot send PDU via hub server", Codes.BAD_JSON)
+
+        # If an LPDU is trying to be sent through us, but we're not the hub
+        # then deny.
+        if not self._is_mine_server_name(event.hub_server):
+            raise SynapseError(
+                400,
+                f"Cannot authorise event for hub server: {event.hub_server}",
+                Codes.FORBIDDEN,
+            )
+
+        # Double check that we *are* the hub.
+        state_ids = await self._state_storage_controller.get_current_state_ids(
+            event.room_id
+        )
+        # Get the current hub server from the sender of the create event.
+        create_event = await self.store.get_event(state_ids[(EventTypes.Create, "")])
+        hub_server = get_domain_from_id(create_event.sender)
+
+        if not self._is_mine_server_name(hub_server):
+            raise SynapseError(400, "Not the hub server", Codes.FORBIDDEN)
+
+        # Sign the event as the hub.
+        event.signatures.update(
+            compute_event_signature(
+                room_version,
+                event.get_pdu_json(),
+                self.hs.hostname,
+                self.hs.signing_key,
+            )
+        )
+
+        # Note that the signatures, etc. get checked later on in _handle_received_pdu.
+        # Server ACLs are checked in the caller: _handle_pdus_in_txn.
+
+        # TODO(LM) Do we need to check that the event will be accepted here?
+
+        return event
+
     async def on_event_auth(
         self, origin: str, room_id: str, event_id: str
     ) -> Tuple[int, Dict[str, Any]]:
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 97abbdee18..d965f8e52c 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -489,10 +489,19 @@ class FederationSender(AbstractFederationSender):
                     break
 
                 async def handle_event(event: EventBase) -> None:
-                    # Only send events for this server.
+                    # Send events which this server is sending on behalf of another,
+                    # e.g. an event due to send_{join,leave,knock}.
                     send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
+                    # Send events which originate from this server.
                     is_mine = self.is_mine_id(event.sender)
-                    if not is_mine and send_on_behalf_of is None:
+                    # Finally, if the server is acting as a linearized matrix hub,
+                    # then send to any participating servers off the hub.
+                    is_hub_server = (
+                        event.room_version.linearized_matrix
+                        and event.hub_server
+                        and self.is_mine_server_name(event.hub_server)
+                    )
+                    if not is_mine and send_on_behalf_of is None and not is_hub_server:
                         logger.debug("Not sending remote-origin event %s", event)
                         return
 
@@ -542,6 +551,7 @@ class FederationSender(AbstractFederationSender):
                         )
                         return
 
+                    # TODO(LM): Is the calculation of all destinations correct?
                     destinations: Optional[Collection[str]] = None
                     if not event.prev_event_ids():
                         # If there are no prev event IDs then the state is empty
@@ -614,10 +624,16 @@ class FederationSender(AbstractFederationSender):
                         )
                     }
 
-                    if send_on_behalf_of is not None:
+                    if (
+                        send_on_behalf_of is not None
+                        and not event.room_version.linearized_matrix
+                    ):
                         # If we are sending the event on behalf of another server
                         # then it already has the event and there is no reason to
                         # send the event to it.
+                        #
+                        # For linearized matrix, send it back to the origin.
+                        # TODO(LM) Do not send back to DAG servers?
                         sharded_destinations.discard(send_on_behalf_of)
 
                     logger.debug("Sending %s to %r", event, sharded_destinations)