diff options
author | Kegan Dougal <kegan@matrix.org> | 2015-02-05 14:28:03 +0000 |
---|---|---|
committer | Kegan Dougal <kegan@matrix.org> | 2015-02-05 14:28:03 +0000 |
commit | 951690e54d5673431abefafc30c94af6e11341e3 (patch) | |
tree | 779471113f16dfbf7e67504731dd412101eee180 /synapse/federation/federation_client.py | |
parent | Fix user query checks. HS>AS pushing now works. (diff) | |
parent | Merge branch 'federation_client_retries' of github.com:matrix-org/synapse int... (diff) | |
download | synapse-951690e54d5673431abefafc30c94af6e11341e3.tar.xz |
Merge branch 'develop' into application-services
Diffstat (limited to 'synapse/federation/federation_client.py')
-rw-r--r-- | synapse/federation/federation_client.py | 181 |
1 files changed, 86 insertions, 95 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index e1539bd0e0..70c9a6f46b 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -16,17 +16,12 @@ from twisted.internet import defer +from .federation_base import FederationBase from .units import Edu +from synapse.api.errors import CodeMessageException from synapse.util.logutils import log_function from synapse.events import FrozenEvent -from synapse.events.utils import prune_event - -from syutil.jsonutil import encode_canonical_json - -from synapse.crypto.event_signing import check_event_content_hash - -from synapse.api.errors import SynapseError import logging @@ -34,7 +29,7 @@ import logging logger = logging.getLogger(__name__) -class FederationClient(object): +class FederationClient(FederationBase): @log_function def send_pdu(self, pdu, destinations): """Informs the replication layer about a new PDU generated within the @@ -186,7 +181,8 @@ class FederationClient(object): pdu = yield self._check_sigs_and_hash(pdu) break - + except CodeMessageException: + raise except Exception as e: logger.info( "Failed to get PDU %s from %s because %s", @@ -224,17 +220,17 @@ class FederationClient(object): for p in result.get("auth_chain", []) ] - for i, pdu in enumerate(pdus): - pdus[i] = yield self._check_sigs_and_hash(pdu) - - # FIXME: We should handle signature failures more gracefully. + signed_pdus = yield self._check_sigs_and_hash_and_fetch( + destination, pdus, outlier=True + ) - for i, pdu in enumerate(auth_chain): - auth_chain[i] = yield self._check_sigs_and_hash(pdu) + signed_auth = yield self._check_sigs_and_hash_and_fetch( + destination, auth_chain, outlier=True + ) - # FIXME: We should handle signature failures more gracefully. + signed_auth.sort(key=lambda e: e.depth) - defer.returnValue((pdus, auth_chain)) + defer.returnValue((signed_pdus, signed_auth)) @defer.inlineCallbacks @log_function @@ -248,65 +244,88 @@ class FederationClient(object): for p in res["auth_chain"] ] - for i, pdu in enumerate(auth_chain): - auth_chain[i] = yield self._check_sigs_and_hash(pdu) - - # FIXME: We should handle signature failures more gracefully. + signed_auth = yield self._check_sigs_and_hash_and_fetch( + destination, auth_chain, outlier=True + ) - auth_chain.sort(key=lambda e: e.depth) + signed_auth.sort(key=lambda e: e.depth) - defer.returnValue(auth_chain) + defer.returnValue(signed_auth) @defer.inlineCallbacks - def make_join(self, destination, room_id, user_id): - ret = yield self.transport_layer.make_join( - destination, room_id, user_id - ) + def make_join(self, destinations, room_id, user_id): + for destination in destinations: + try: + ret = yield self.transport_layer.make_join( + destination, room_id, user_id + ) - pdu_dict = ret["event"] + pdu_dict = ret["event"] - logger.debug("Got response to make_join: %s", pdu_dict) + logger.debug("Got response to make_join: %s", pdu_dict) - defer.returnValue(self.event_from_pdu_json(pdu_dict)) + defer.returnValue( + (destination, self.event_from_pdu_json(pdu_dict)) + ) + break + except CodeMessageException: + raise + except Exception as e: + logger.warn( + "Failed to make_join via %s: %s", + destination, e.message + ) - @defer.inlineCallbacks - def send_join(self, destination, pdu): - time_now = self._clock.time_msec() - _, content = yield self.transport_layer.send_join( - destination=destination, - room_id=pdu.room_id, - event_id=pdu.event_id, - content=pdu.get_pdu_json(time_now), - ) + raise RuntimeError("Failed to send to any server.") - logger.debug("Got content: %s", content) + @defer.inlineCallbacks + def send_join(self, destinations, pdu): + for destination in destinations: + try: + time_now = self._clock.time_msec() + _, content = yield self.transport_layer.send_join( + destination=destination, + room_id=pdu.room_id, + event_id=pdu.event_id, + content=pdu.get_pdu_json(time_now), + ) - state = [ - self.event_from_pdu_json(p, outlier=True) - for p in content.get("state", []) - ] + logger.debug("Got content: %s", content) - auth_chain = [ - self.event_from_pdu_json(p, outlier=True) - for p in content.get("auth_chain", []) - ] + state = [ + self.event_from_pdu_json(p, outlier=True) + for p in content.get("state", []) + ] - for i, pdu in enumerate(state): - state[i] = yield self._check_sigs_and_hash(pdu) + auth_chain = [ + self.event_from_pdu_json(p, outlier=True) + for p in content.get("auth_chain", []) + ] - # FIXME: We should handle signature failures more gracefully. + signed_state = yield self._check_sigs_and_hash_and_fetch( + destination, state, outlier=True + ) - for i, pdu in enumerate(auth_chain): - auth_chain[i] = yield self._check_sigs_and_hash(pdu) + signed_auth = yield self._check_sigs_and_hash_and_fetch( + destination, auth_chain, outlier=True + ) - # FIXME: We should handle signature failures more gracefully. + auth_chain.sort(key=lambda e: e.depth) - auth_chain.sort(key=lambda e: e.depth) + defer.returnValue({ + "state": signed_state, + "auth_chain": signed_auth, + "origin": destination, + }) + except CodeMessageException: + raise + except Exception as e: + logger.warn( + "Failed to send_join via %s: %s", + destination, e.message + ) - defer.returnValue({ - "state": state, - "auth_chain": auth_chain, - }) + raise RuntimeError("Failed to send to any server.") @defer.inlineCallbacks def send_invite(self, destination, room_id, event_id, pdu): @@ -353,12 +372,18 @@ class FederationClient(object): ) auth_chain = [ - (yield self._check_sigs_and_hash(self.event_from_pdu_json(e))) + self.event_from_pdu_json(e) for e in content["auth_chain"] ] + signed_auth = yield self._check_sigs_and_hash_and_fetch( + destination, auth_chain, outlier=True + ) + + signed_auth.sort(key=lambda e: e.depth) + ret = { - "auth_chain": auth_chain, + "auth_chain": signed_auth, "rejects": content.get("rejects", []), "missing": content.get("missing", []), } @@ -373,37 +398,3 @@ class FederationClient(object): event.internal_metadata.outlier = outlier return event - - @defer.inlineCallbacks - def _check_sigs_and_hash(self, pdu): - """Throws a SynapseError if the PDU does not have the correct - signatures. - - Returns: - FrozenEvent: Either the given event or it redacted if it failed the - content hash check. - """ - # Check signatures are correct. - redacted_event = prune_event(pdu) - redacted_pdu_json = redacted_event.get_pdu_json() - - try: - yield self.keyring.verify_json_for_server( - pdu.origin, redacted_pdu_json - ) - except SynapseError: - logger.warn( - "Signature check failed for %s redacted to %s", - encode_canonical_json(pdu.get_pdu_json()), - encode_canonical_json(redacted_pdu_json), - ) - raise - - if not check_event_content_hash(pdu): - logger.warn( - "Event content has been tampered, redacting %s, %s", - pdu.event_id, encode_canonical_json(pdu.get_dict()) - ) - defer.returnValue(redacted_event) - - defer.returnValue(pdu) |