diff options
Diffstat (limited to 'synapse/federation')
-rw-r--r-- | synapse/federation/federation_client.py | 142 | ||||
-rw-r--r-- | synapse/federation/federation_server.py | 123 | ||||
-rw-r--r-- | synapse/federation/replication.py | 2 | ||||
-rw-r--r-- | synapse/federation/transport/client.py | 16 | ||||
-rw-r--r-- | synapse/federation/transport/server.py | 21 |
5 files changed, 278 insertions, 26 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index c80f4c61bc..e1539bd0e0 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -20,6 +20,13 @@ from .units import Edu from synapse.util.logutils import log_function from synapse.events import FrozenEvent +from synapse.events.utils import prune_event + +from syutil.jsonutil import encode_canonical_json + +from synapse.crypto.event_signing import check_event_content_hash + +from synapse.api.errors import SynapseError import logging @@ -126,6 +133,11 @@ class FederationClient(object): for p in transaction_data["pdus"] ] + for i, pdu in enumerate(pdus): + pdus[i] = yield self._check_sigs_and_hash(pdu) + + # FIXME: We should handle signature failures more gracefully. + defer.returnValue(pdus) @defer.inlineCallbacks @@ -159,6 +171,22 @@ class FederationClient(object): transaction_data = yield self.transport_layer.get_event( destination, event_id ) + + logger.debug("transaction_data %r", transaction_data) + + pdu_list = [ + self.event_from_pdu_json(p, outlier=outlier) + for p in transaction_data["pdus"] + ] + + if pdu_list: + pdu = pdu_list[0] + + # Check signatures are correct. + pdu = yield self._check_sigs_and_hash(pdu) + + break + except Exception as e: logger.info( "Failed to get PDU %s from %s because %s", @@ -166,18 +194,6 @@ class FederationClient(object): ) continue - logger.debug("transaction_data %r", transaction_data) - - pdu_list = [ - self.event_from_pdu_json(p, outlier=outlier) - for p in transaction_data["pdus"] - ] - - if pdu_list: - pdu = pdu_list[0] - # TODO: We need to check signatures here - break - defer.returnValue(pdu) @defer.inlineCallbacks @@ -208,6 +224,16 @@ class FederationClient(object): for p in result.get("auth_chain", []) ] + for i, pdu in enumerate(pdus): + pdus[i] = yield self._check_sigs_and_hash(pdu) + + # FIXME: We should handle signature failures more gracefully. + + for i, pdu in enumerate(auth_chain): + auth_chain[i] = yield self._check_sigs_and_hash(pdu) + + # FIXME: We should handle signature failures more gracefully. + defer.returnValue((pdus, auth_chain)) @defer.inlineCallbacks @@ -222,6 +248,11 @@ class FederationClient(object): for p in res["auth_chain"] ] + for i, pdu in enumerate(auth_chain): + auth_chain[i] = yield self._check_sigs_and_hash(pdu) + + # FIXME: We should handle signature failures more gracefully. + auth_chain.sort(key=lambda e: e.depth) defer.returnValue(auth_chain) @@ -260,6 +291,16 @@ class FederationClient(object): for p in content.get("auth_chain", []) ] + for i, pdu in enumerate(state): + state[i] = yield self._check_sigs_and_hash(pdu) + + # FIXME: We should handle signature failures more gracefully. + + for i, pdu in enumerate(auth_chain): + auth_chain[i] = yield self._check_sigs_and_hash(pdu) + + # FIXME: We should handle signature failures more gracefully. + auth_chain.sort(key=lambda e: e.depth) defer.returnValue({ @@ -281,7 +322,48 @@ class FederationClient(object): logger.debug("Got response to send_invite: %s", pdu_dict) - defer.returnValue(self.event_from_pdu_json(pdu_dict)) + pdu = self.event_from_pdu_json(pdu_dict) + + # Check signatures are correct. + pdu = yield self._check_sigs_and_hash(pdu) + + # FIXME: We should handle signature failures more gracefully. + + defer.returnValue(pdu) + + @defer.inlineCallbacks + def query_auth(self, destination, room_id, event_id, local_auth): + """ + Params: + destination (str) + event_it (str) + local_auth (list) + """ + time_now = self._clock.time_msec() + + send_content = { + "auth_chain": [e.get_pdu_json(time_now) for e in local_auth], + } + + code, content = yield self.transport_layer.send_query_auth( + destination=destination, + room_id=room_id, + event_id=event_id, + content=send_content, + ) + + auth_chain = [ + (yield self._check_sigs_and_hash(self.event_from_pdu_json(e))) + for e in content["auth_chain"] + ] + + ret = { + "auth_chain": auth_chain, + "rejects": content.get("rejects", []), + "missing": content.get("missing", []), + } + + defer.returnValue(ret) def event_from_pdu_json(self, pdu_json, outlier=False): event = FrozenEvent( @@ -291,3 +373,37 @@ class FederationClient(object): event.internal_metadata.outlier = outlier return event + + @defer.inlineCallbacks + def _check_sigs_and_hash(self, pdu): + """Throws a SynapseError if the PDU does not have the correct + signatures. + + Returns: + FrozenEvent: Either the given event or it redacted if it failed the + content hash check. + """ + # Check signatures are correct. + redacted_event = prune_event(pdu) + redacted_pdu_json = redacted_event.get_pdu_json() + + try: + yield self.keyring.verify_json_for_server( + pdu.origin, redacted_pdu_json + ) + except SynapseError: + logger.warn( + "Signature check failed for %s redacted to %s", + encode_canonical_json(pdu.get_pdu_json()), + encode_canonical_json(redacted_pdu_json), + ) + raise + + if not check_event_content_hash(pdu): + logger.warn( + "Event content has been tampered, redacting %s, %s", + pdu.event_id, encode_canonical_json(pdu.get_dict()) + ) + defer.returnValue(redacted_event) + + defer.returnValue(pdu) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 0597725ce7..84ed0a0ba0 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -21,6 +21,13 @@ from .units import Transaction, Edu from synapse.util.logutils import log_function from synapse.util.logcontext import PreserveLoggingContext from synapse.events import FrozenEvent +from synapse.events.utils import prune_event + +from syutil.jsonutil import encode_canonical_json + +from synapse.crypto.event_signing import check_event_content_hash + +from synapse.api.errors import FederationError, SynapseError import logging @@ -97,8 +104,10 @@ class FederationServer(object): response = yield self.transaction_actions.have_responded(transaction) if response: - logger.debug("[%s] We've already responed to this request", - transaction.transaction_id) + logger.debug( + "[%s] We've already responed to this request", + transaction.transaction_id + ) defer.returnValue(response) return @@ -221,6 +230,36 @@ class FederationServer(object): "auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus], })) + @defer.inlineCallbacks + def on_query_auth_request(self, origin, content, event_id): + auth_chain = [ + (yield self._check_sigs_and_hash(self.event_from_pdu_json(e))) + for e in content["auth_chain"] + ] + + missing = [ + (yield self._check_sigs_and_hash(self.event_from_pdu_json(e))) + for e in content.get("missing", []) + ] + + ret = yield self.handler.on_query_auth( + origin, event_id, auth_chain, content.get("rejects", []), missing + ) + + time_now = self._clock.time_msec() + send_content = { + "auth_chain": [ + e.get_pdu_json(time_now) + for e in ret["auth_chain"] + ], + "rejects": ret.get("rejects", []), + "missing": ret.get("missing", []), + } + + defer.returnValue( + (200, send_content) + ) + @log_function def _get_persisted_pdu(self, origin, event_id, do_auth=True): """ Get a PDU from the database with given origin and id. @@ -253,6 +292,9 @@ class FederationServer(object): origin, pdu.event_id, do_auth=False ) + # FIXME: Currently we fetch an event again when we already have it + # if it has been marked as an outlier. + already_seen = ( existing and ( not existing.internal_metadata.is_outlier() @@ -264,14 +306,27 @@ class FederationServer(object): defer.returnValue({}) return + # Check signature. + try: + pdu = yield self._check_sigs_and_hash(pdu) + except SynapseError as e: + raise FederationError( + "ERROR", + e.code, + e.msg, + affected=pdu.event_id, + ) + state = None auth_chain = [] have_seen = yield self.store.have_events( - [e for e, _ in pdu.prev_events] + [ev for ev, _ in pdu.prev_events] ) + fetch_state = False + # Get missing pdus if necessary. if not pdu.internal_metadata.is_outlier(): # We only backfill backwards to the min depth. @@ -308,19 +363,29 @@ class FederationServer(object): logger.debug("Processed pdu %s", event_id) else: logger.warn("Failed to get PDU %s", event_id) + fetch_state = True except: # TODO(erikj): Do some more intelligent retries. logger.exception("Failed to get PDU") + fetch_state = True else: - # We need to get the state at this event, since we have reached - # a backward extremity edge. - logger.debug( - "_handle_new_pdu getting state for %s", - pdu.room_id - ) - state, auth_chain = yield self.get_state_for_room( - origin, pdu.room_id, pdu.event_id, - ) + prevs = {e_id for e_id, _ in pdu.prev_events} + seen = set(have_seen.keys()) + if prevs - seen: + fetch_state = True + else: + fetch_state = True + + if fetch_state: + # We need to get the state at this event, since we haven't + # processed all the prev events. + logger.debug( + "_handle_new_pdu getting state for %s", + pdu.room_id + ) + state, auth_chain = yield self.get_state_for_room( + origin, pdu.room_id, pdu.event_id, + ) ret = yield self.handler.on_receive_pdu( origin, @@ -343,3 +408,37 @@ class FederationServer(object): event.internal_metadata.outlier = outlier return event + + @defer.inlineCallbacks + def _check_sigs_and_hash(self, pdu): + """Throws a SynapseError if the PDU does not have the correct + signatures. + + Returns: + FrozenEvent: Either the given event or it redacted if it failed the + content hash check. + """ + # Check signatures are correct. + redacted_event = prune_event(pdu) + redacted_pdu_json = redacted_event.get_pdu_json() + + try: + yield self.keyring.verify_json_for_server( + pdu.origin, redacted_pdu_json + ) + except SynapseError: + logger.warn( + "Signature check failed for %s redacted to %s", + encode_canonical_json(pdu.get_pdu_json()), + encode_canonical_json(redacted_pdu_json), + ) + raise + + if not check_event_content_hash(pdu): + logger.warn( + "Event content has been tampered, redacting %s, %s", + pdu.event_id, encode_canonical_json(pdu.get_dict()) + ) + defer.returnValue(redacted_event) + + defer.returnValue(pdu) diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 9ef4834927..e442c6c5d5 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -51,6 +51,8 @@ class ReplicationLayer(FederationClient, FederationServer): def __init__(self, hs, transport_layer): self.server_name = hs.hostname + self.keyring = hs.get_keyring() + self.transport_layer = transport_layer self.transport_layer.register_received_handler(self) self.transport_layer.register_request_handler(self) diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index e634a3a213..4cb1dea2de 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -213,3 +213,19 @@ class TransportLayerClient(object): ) defer.returnValue(response) + + @defer.inlineCallbacks + @log_function + def send_query_auth(self, destination, room_id, event_id, content): + path = PREFIX + "/query_auth/%s/%s" % (room_id, event_id) + + code, content = yield self.client.post_json( + destination=destination, + path=path, + data=content, + ) + + if not 200 <= code < 300: + raise RuntimeError("Got %d from send_invite", code) + + defer.returnValue(json.loads(content)) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index a380a6910b..9c9f8d525b 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -42,7 +42,7 @@ class TransportLayerServer(object): content = None origin = None - if request.method == "PUT": + if request.method in ["PUT", "POST"]: # TODO: Handle other method types? other content types? try: content_bytes = request.content.read() @@ -234,6 +234,16 @@ class TransportLayerServer(object): ) ) ) + self.server.register_path( + "POST", + re.compile("^" + PREFIX + "/query_auth/([^/]*)/([^/]*)$"), + self._with_authentication( + lambda origin, content, query, context, event_id: + self._on_query_auth_request( + origin, content, event_id, + ) + ) + ) @defer.inlineCallbacks @log_function @@ -325,3 +335,12 @@ class TransportLayerServer(object): ) defer.returnValue((200, content)) + + @defer.inlineCallbacks + @log_function + def _on_query_auth_request(self, origin, content, event_id): + new_content = yield self.request_handler.on_query_auth_request( + origin, content, event_id + ) + + defer.returnValue((200, new_content)) |