From 547a7076b471b6adad1fbb6a2a5c1936c6439a3f Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 13 Jul 2023 08:07:24 -0400 Subject: Accept LPDUs in transactions and fan them back out. --- synapse/crypto/keyring.py | 4 + synapse/federation/federation_server.py | 135 ++++++++++++++++++++++++++++++-- synapse/federation/sender/__init__.py | 22 +++++- 3 files changed, 151 insertions(+), 10 deletions(-) diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 260aab3241..1caef70538 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -189,6 +189,10 @@ class Keyring: valid_until_ts=2**63, # fake future timestamp ) + async def is_server_linearized(self, server_name: str) -> bool: + # TODO(LM) Fetch whether the key response of the origin contains m.linearized. + return not self._is_mine_server_name(server_name) + async def verify_json_for_server( self, server_name: str, diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index c12d58af27..61ed41c903 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -13,6 +13,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import collections.abc import logging import random from typing import ( @@ -55,6 +56,7 @@ from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion from synapse.crypto.event_signing import compute_event_signature from synapse.events import EventBase from synapse.events.snapshot import EventContext +from synapse.events.utils import validate_canonicaljson from synapse.federation.federation_base import ( FederationBase, InvalidEventSignatureError, @@ -285,11 +287,6 @@ class FederationServer(FederationBase): # accurate as possible. request_time = self._clock.time_msec() - # TODO(LM): If we are the hub server and the destination is a participant - # server then pdus are Linear PDUs. - # - # TODO(LM): If we are the hub's DAG server we need to linearize and give - # it the results. transaction = Transaction( transaction_id=transaction_id, destination=destination, @@ -463,7 +460,21 @@ class FederationServer(FederationBase): logger.info("Ignoring PDU: %s", e) continue - event = event_from_pdu_json(p, room_version) + # If the transaction is from a linearized matrix server, create PDUs + # from the LPDUs. + if ( + room_version.linearized_matrix + and await self.keyring.is_server_linearized(origin) + ): + event = await self._on_lpdu_event(p, room_version) + + # Send this event on behalf of the other server. + # + # We are acting as the hub for this transaction and need to send + # this event out to other participants. + event.internal_metadata.send_on_behalf_of = origin + else: + event = event_from_pdu_json(p, room_version) pdus_by_room.setdefault(room_id, []).append(event) if event.origin_server_ts > newest_pdu_ts: @@ -531,7 +542,6 @@ class FederationServer(FederationBase): async def _handle_edus_in_txn(self, origin: str, transaction: Transaction) -> None: """Process the EDUs in a received transaction.""" - async def _process_edu(edu_dict: JsonDict) -> None: received_edus_counter.inc() @@ -1012,6 +1022,117 @@ class FederationServer(FederationBase): origin, event ) + async def _on_lpdu_event( + self, lpdu_json: JsonDict, room_version: RoomVersion + ) -> EventBase: + """Construct an EventBase from a linearized event json received over federation + + See event_from_pdu_json. + + Args: + lpdu_json: lpdu as received over federation + room_version: The version of the room this event belongs to + + Raises: + SynapseError: if the lpdu is missing required fields or is otherwise + not a valid matrix event + """ + if not room_version.linearized_matrix: + raise ValueError("Cannot be used on non-linearized matrix") + + # The minimum fields to create an LPDU. + assert_params_in_dict( + lpdu_json, + ( + "type", + "room_Id", + "sender", + "origin", + "hub_server", + "origin_server_ts", + "content", + "hashes", + ), + ) + + # The participant server should *not* provide auth/prev events. + for field in ("auth_events", "prev_events"): + if field in lpdu_json: + raise SynapseError(400, f"LPDU contained {field}", Codes.BAD_JSON) + + # Hashes must contain (only) "lpdu". + if not isinstance(lpdu_json["hashes"], collections.abc.Mapping): + raise SynapseError(400, "Invalid hashes", Codes.BAD_JSON) + if lpdu_json["hashes"].keys() != {"lpdu"}: + raise SynapseError( + 400, "hashes must contain exactly one key: 'lpdu'", Codes.BAD_JSON + ) + + # Validate that the JSON conforms to the specification. + if room_version.strict_canonicaljson: + validate_canonicaljson(lpdu_json) + + # An LPDU doesn't have enough information to just create an event, it needs + # to be built and signed, etc. + builder = self.hs.get_event_builder_factory().for_room_version( + room_version, lpdu_json + ) + + try: + ( + event, + unpersisted_context, + ) = await self.hs.get_event_creation_handler().create_new_client_event( + builder=builder + ) + except SynapseError as e: + logger.warning( + "Failed to create PDU from template for room %s because %s", + lpdu_json["room_id"], + e, + ) + raise + + if not event.hub_server: + raise SynapseError(400, "Cannot send PDU via hub server", Codes.BAD_JSON) + + # If an LPDU is trying to be sent through us, but we're not the hub + # then deny. + if not self._is_mine_server_name(event.hub_server): + raise SynapseError( + 400, + f"Cannot authorise event for hub server: {event.hub_server}", + Codes.FORBIDDEN, + ) + + # Double check that we *are* the hub. + state_ids = await self._state_storage_controller.get_current_state_ids( + event.room_id + ) + # Get the current hub server from the sender of the create event. + create_event = await self.store.get_event(state_ids[(EventTypes.Create, "")]) + hub_server = get_domain_from_id(create_event.sender) + + if not self._is_mine_server_name(hub_server): + raise SynapseError(400, "Not the hub server", Codes.FORBIDDEN) + + # Sign the event as the hub. + event.signatures.update( + compute_event_signature( + room_version, + event.get_pdu_json(), + self.hs.hostname, + self.hs.signing_key, + ) + ) + + # Note that the signatures, etc. get checked later on in _handle_received_pdu. + # Server ACLs are checked in the caller: _handle_pdus_in_txn. + + # TODO(LM) Do we need to check that the event will be accepted here? + + return event + async def on_event_auth( self, origin: str, room_id: str, event_id: str ) -> Tuple[int, Dict[str, Any]]: diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 97abbdee18..d965f8e52c 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -489,10 +489,19 @@ class FederationSender(AbstractFederationSender): break async def handle_event(event: EventBase) -> None: - # Only send events for this server. + # Send events which this server is sending on behalf of another, + # e.g. an event due to send_{join,leave,knock}. send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of() + # Send events which originate from this server. is_mine = self.is_mine_id(event.sender) - if not is_mine and send_on_behalf_of is None: + # Finally, if the server is acting as a linearized matrix hub, + # then send to any participating servers off the hub. + is_hub_server = ( + event.room_version.linearized_matrix + and event.hub_server + and self.is_mine_server_name(event.hub_server) + ) + if not is_mine and send_on_behalf_of is None and not is_hub_server: logger.debug("Not sending remote-origin event %s", event) return @@ -542,6 +551,7 @@ class FederationSender(AbstractFederationSender): ) return + # TODO(LM): Is the calculation of all destinations correct? destinations: Optional[Collection[str]] = None if not event.prev_event_ids(): # If there are no prev event IDs then the state is empty @@ -614,10 +624,16 @@ class FederationSender(AbstractFederationSender): ) } - if send_on_behalf_of is not None: + if ( + send_on_behalf_of is not None + and not event.room_version.linearized_matrix + ): # If we are sending the event on behalf of another server # then it already has the event and there is no reason to # send the event to it. + # + # For linearized matrix, send it back to the origin. + # TODO(LM) Do not send back to DAG servers? sharded_destinations.discard(send_on_behalf_of) logger.debug("Sending %s to %r", event, sharded_destinations) -- cgit 1.4.1