diff options
author | Kegan Dougal <kegan@matrix.org> | 2015-02-05 14:28:03 +0000 |
---|---|---|
committer | Kegan Dougal <kegan@matrix.org> | 2015-02-05 14:28:03 +0000 |
commit | 951690e54d5673431abefafc30c94af6e11341e3 (patch) | |
tree | 779471113f16dfbf7e67504731dd412101eee180 /synapse/federation | |
parent | Fix user query checks. HS>AS pushing now works. (diff) | |
parent | Merge branch 'federation_client_retries' of github.com:matrix-org/synapse int... (diff) | |
download | synapse-951690e54d5673431abefafc30c94af6e11341e3.tar.xz |
Merge branch 'develop' into application-services
Diffstat (limited to 'synapse/federation')
-rw-r--r-- | synapse/federation/federation_base.py | 118 | ||||
-rw-r--r-- | synapse/federation/federation_client.py | 181 | ||||
-rw-r--r-- | synapse/federation/federation_server.py | 57 | ||||
-rw-r--r-- | synapse/federation/transaction_queue.py | 23 | ||||
-rw-r--r-- | synapse/federation/transport/client.py | 42 |
5 files changed, 249 insertions, 172 deletions
diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py new file mode 100644 index 0000000000..a990aec4fd --- /dev/null +++ b/synapse/federation/federation_base.py @@ -0,0 +1,118 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from twisted.internet import defer + +from synapse.events.utils import prune_event + +from syutil.jsonutil import encode_canonical_json + +from synapse.crypto.event_signing import check_event_content_hash + +from synapse.api.errors import SynapseError + +import logging + + +logger = logging.getLogger(__name__) + + +class FederationBase(object): + @defer.inlineCallbacks + def _check_sigs_and_hash_and_fetch(self, origin, pdus, outlier=False): + """Takes a list of PDUs and checks the signatures and hashs of each + one. If a PDU fails its signature check then we check if we have it in + the database and if not then request if from the originating server of + that PDU. + + If a PDU fails its content hash check then it is redacted. + + The given list of PDUs are not modified, instead the function returns + a new list. + + Args: + pdu (list) + outlier (bool) + + Returns: + Deferred : A list of PDUs that have valid signatures and hashes. + """ + signed_pdus = [] + for pdu in pdus: + try: + new_pdu = yield self._check_sigs_and_hash(pdu) + signed_pdus.append(new_pdu) + except SynapseError: + # FIXME: We should handle signature failures more gracefully. + + # Check local db. + new_pdu = yield self.store.get_event( + pdu.event_id, + allow_rejected=True + ) + if new_pdu: + signed_pdus.append(new_pdu) + continue + + # Check pdu.origin + if pdu.origin != origin: + new_pdu = yield self.get_pdu( + destinations=[pdu.origin], + event_id=pdu.event_id, + outlier=outlier, + ) + + if new_pdu: + signed_pdus.append(new_pdu) + continue + + logger.warn("Failed to find copy of %s with valid signature") + + defer.returnValue(signed_pdus) + + @defer.inlineCallbacks + def _check_sigs_and_hash(self, pdu): + """Throws a SynapseError if the PDU does not have the correct + signatures. + + Returns: + FrozenEvent: Either the given event or it redacted if it failed the + content hash check. + """ + # Check signatures are correct. + redacted_event = prune_event(pdu) + redacted_pdu_json = redacted_event.get_pdu_json() + + try: + yield self.keyring.verify_json_for_server( + pdu.origin, redacted_pdu_json + ) + except SynapseError: + logger.warn( + "Signature check failed for %s redacted to %s", + encode_canonical_json(pdu.get_pdu_json()), + encode_canonical_json(redacted_pdu_json), + ) + raise + + if not check_event_content_hash(pdu): + logger.warn( + "Event content has been tampered, redacting %s, %s", + pdu.event_id, encode_canonical_json(pdu.get_dict()) + ) + defer.returnValue(redacted_event) + + defer.returnValue(pdu) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index e1539bd0e0..70c9a6f46b 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -16,17 +16,12 @@ from twisted.internet import defer +from .federation_base import FederationBase from .units import Edu +from synapse.api.errors import CodeMessageException from synapse.util.logutils import log_function from synapse.events import FrozenEvent -from synapse.events.utils import prune_event - -from syutil.jsonutil import encode_canonical_json - -from synapse.crypto.event_signing import check_event_content_hash - -from synapse.api.errors import SynapseError import logging @@ -34,7 +29,7 @@ import logging logger = logging.getLogger(__name__) -class FederationClient(object): +class FederationClient(FederationBase): @log_function def send_pdu(self, pdu, destinations): """Informs the replication layer about a new PDU generated within the @@ -186,7 +181,8 @@ class FederationClient(object): pdu = yield self._check_sigs_and_hash(pdu) break - + except CodeMessageException: + raise except Exception as e: logger.info( "Failed to get PDU %s from %s because %s", @@ -224,17 +220,17 @@ class FederationClient(object): for p in result.get("auth_chain", []) ] - for i, pdu in enumerate(pdus): - pdus[i] = yield self._check_sigs_and_hash(pdu) - - # FIXME: We should handle signature failures more gracefully. + signed_pdus = yield self._check_sigs_and_hash_and_fetch( + destination, pdus, outlier=True + ) - for i, pdu in enumerate(auth_chain): - auth_chain[i] = yield self._check_sigs_and_hash(pdu) + signed_auth = yield self._check_sigs_and_hash_and_fetch( + destination, auth_chain, outlier=True + ) - # FIXME: We should handle signature failures more gracefully. + signed_auth.sort(key=lambda e: e.depth) - defer.returnValue((pdus, auth_chain)) + defer.returnValue((signed_pdus, signed_auth)) @defer.inlineCallbacks @log_function @@ -248,65 +244,88 @@ class FederationClient(object): for p in res["auth_chain"] ] - for i, pdu in enumerate(auth_chain): - auth_chain[i] = yield self._check_sigs_and_hash(pdu) - - # FIXME: We should handle signature failures more gracefully. + signed_auth = yield self._check_sigs_and_hash_and_fetch( + destination, auth_chain, outlier=True + ) - auth_chain.sort(key=lambda e: e.depth) + signed_auth.sort(key=lambda e: e.depth) - defer.returnValue(auth_chain) + defer.returnValue(signed_auth) @defer.inlineCallbacks - def make_join(self, destination, room_id, user_id): - ret = yield self.transport_layer.make_join( - destination, room_id, user_id - ) + def make_join(self, destinations, room_id, user_id): + for destination in destinations: + try: + ret = yield self.transport_layer.make_join( + destination, room_id, user_id + ) - pdu_dict = ret["event"] + pdu_dict = ret["event"] - logger.debug("Got response to make_join: %s", pdu_dict) + logger.debug("Got response to make_join: %s", pdu_dict) - defer.returnValue(self.event_from_pdu_json(pdu_dict)) + defer.returnValue( + (destination, self.event_from_pdu_json(pdu_dict)) + ) + break + except CodeMessageException: + raise + except Exception as e: + logger.warn( + "Failed to make_join via %s: %s", + destination, e.message + ) - @defer.inlineCallbacks - def send_join(self, destination, pdu): - time_now = self._clock.time_msec() - _, content = yield self.transport_layer.send_join( - destination=destination, - room_id=pdu.room_id, - event_id=pdu.event_id, - content=pdu.get_pdu_json(time_now), - ) + raise RuntimeError("Failed to send to any server.") - logger.debug("Got content: %s", content) + @defer.inlineCallbacks + def send_join(self, destinations, pdu): + for destination in destinations: + try: + time_now = self._clock.time_msec() + _, content = yield self.transport_layer.send_join( + destination=destination, + room_id=pdu.room_id, + event_id=pdu.event_id, + content=pdu.get_pdu_json(time_now), + ) - state = [ - self.event_from_pdu_json(p, outlier=True) - for p in content.get("state", []) - ] + logger.debug("Got content: %s", content) - auth_chain = [ - self.event_from_pdu_json(p, outlier=True) - for p in content.get("auth_chain", []) - ] + state = [ + self.event_from_pdu_json(p, outlier=True) + for p in content.get("state", []) + ] - for i, pdu in enumerate(state): - state[i] = yield self._check_sigs_and_hash(pdu) + auth_chain = [ + self.event_from_pdu_json(p, outlier=True) + for p in content.get("auth_chain", []) + ] - # FIXME: We should handle signature failures more gracefully. + signed_state = yield self._check_sigs_and_hash_and_fetch( + destination, state, outlier=True + ) - for i, pdu in enumerate(auth_chain): - auth_chain[i] = yield self._check_sigs_and_hash(pdu) + signed_auth = yield self._check_sigs_and_hash_and_fetch( + destination, auth_chain, outlier=True + ) - # FIXME: We should handle signature failures more gracefully. + auth_chain.sort(key=lambda e: e.depth) - auth_chain.sort(key=lambda e: e.depth) + defer.returnValue({ + "state": signed_state, + "auth_chain": signed_auth, + "origin": destination, + }) + except CodeMessageException: + raise + except Exception as e: + logger.warn( + "Failed to send_join via %s: %s", + destination, e.message + ) - defer.returnValue({ - "state": state, - "auth_chain": auth_chain, - }) + raise RuntimeError("Failed to send to any server.") @defer.inlineCallbacks def send_invite(self, destination, room_id, event_id, pdu): @@ -353,12 +372,18 @@ class FederationClient(object): ) auth_chain = [ - (yield self._check_sigs_and_hash(self.event_from_pdu_json(e))) + self.event_from_pdu_json(e) for e in content["auth_chain"] ] + signed_auth = yield self._check_sigs_and_hash_and_fetch( + destination, auth_chain, outlier=True + ) + + signed_auth.sort(key=lambda e: e.depth) + ret = { - "auth_chain": auth_chain, + "auth_chain": signed_auth, "rejects": content.get("rejects", []), "missing": content.get("missing", []), } @@ -373,37 +398,3 @@ class FederationClient(object): event.internal_metadata.outlier = outlier return event - - @defer.inlineCallbacks - def _check_sigs_and_hash(self, pdu): - """Throws a SynapseError if the PDU does not have the correct - signatures. - - Returns: - FrozenEvent: Either the given event or it redacted if it failed the - content hash check. - """ - # Check signatures are correct. - redacted_event = prune_event(pdu) - redacted_pdu_json = redacted_event.get_pdu_json() - - try: - yield self.keyring.verify_json_for_server( - pdu.origin, redacted_pdu_json - ) - except SynapseError: - logger.warn( - "Signature check failed for %s redacted to %s", - encode_canonical_json(pdu.get_pdu_json()), - encode_canonical_json(redacted_pdu_json), - ) - raise - - if not check_event_content_hash(pdu): - logger.warn( - "Event content has been tampered, redacting %s, %s", - pdu.event_id, encode_canonical_json(pdu.get_dict()) - ) - defer.returnValue(redacted_event) - - defer.returnValue(pdu) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 5fbd8b19de..4742ca9390 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -16,16 +16,12 @@ from twisted.internet import defer +from .federation_base import FederationBase from .units import Transaction, Edu from synapse.util.logutils import log_function from synapse.util.logcontext import PreserveLoggingContext from synapse.events import FrozenEvent -from synapse.events.utils import prune_event - -from syutil.jsonutil import encode_canonical_json - -from synapse.crypto.event_signing import check_event_content_hash from synapse.api.errors import FederationError, SynapseError @@ -35,7 +31,7 @@ import logging logger = logging.getLogger(__name__) -class FederationServer(object): +class FederationServer(FederationBase): def set_handler(self, handler): """Sets the handler that the replication layer will use to communicate receipt of new PDUs from other home servers. The required methods are @@ -251,17 +247,20 @@ class FederationServer(object): Deferred: Results in `dict` with the same format as `content` """ auth_chain = [ - (yield self._check_sigs_and_hash(self.event_from_pdu_json(e))) + self.event_from_pdu_json(e) for e in content["auth_chain"] ] - missing = [ - (yield self._check_sigs_and_hash(self.event_from_pdu_json(e))) - for e in content.get("missing", []) - ] + signed_auth = yield self._check_sigs_and_hash_and_fetch( + origin, auth_chain, outlier=True + ) ret = yield self.handler.on_query_auth( - origin, event_id, auth_chain, content.get("rejects", []), missing + origin, + event_id, + signed_auth, + content.get("rejects", []), + content.get("missing", []), ) time_now = self._clock.time_msec() @@ -426,37 +425,3 @@ class FederationServer(object): event.internal_metadata.outlier = outlier return event - - @defer.inlineCallbacks - def _check_sigs_and_hash(self, pdu): - """Throws a SynapseError if the PDU does not have the correct - signatures. - - Returns: - FrozenEvent: Either the given event or it redacted if it failed the - content hash check. - """ - # Check signatures are correct. - redacted_event = prune_event(pdu) - redacted_pdu_json = redacted_event.get_pdu_json() - - try: - yield self.keyring.verify_json_for_server( - pdu.origin, redacted_pdu_json - ) - except SynapseError: - logger.warn( - "Signature check failed for %s redacted to %s", - encode_canonical_json(pdu.get_pdu_json()), - encode_canonical_json(redacted_pdu_json), - ) - raise - - if not check_event_content_hash(pdu): - logger.warn( - "Event content has been tampered, redacting %s, %s", - pdu.event_id, encode_canonical_json(pdu.get_dict()) - ) - defer.returnValue(redacted_event) - - defer.returnValue(pdu) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 9d4f2c09a2..f38aeba7cc 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -19,6 +19,7 @@ from twisted.internet import defer from .persistence import TransactionActions from .units import Transaction +from synapse.api.errors import HttpResponseException from synapse.util.logutils import log_function from synapse.util.logcontext import PreserveLoggingContext @@ -238,9 +239,14 @@ class TransactionQueue(object): del p["age_ts"] return data - code, response = yield self.transport_layer.send_transaction( - transaction, json_data_cb - ) + try: + response = yield self.transport_layer.send_transaction( + transaction, json_data_cb + ) + code = 200 + except HttpResponseException as e: + code = e.code + response = e.response logger.info("TX [%s] got %d response", destination, code) @@ -274,8 +280,7 @@ class TransactionQueue(object): pass logger.debug("TX [%s] Yielded to callbacks", destination) - - except Exception as e: + except RuntimeError as e: # We capture this here as there as nothing actually listens # for this finishing functions deferred. logger.warn( @@ -283,6 +288,14 @@ class TransactionQueue(object): destination, e, ) + except Exception as e: + # We capture this here as there as nothing actually listens + # for this finishing functions deferred. + logger.exception( + "TX [%s] Problem in _attempt_transaction: %s", + destination, + e, + ) self.set_retrying(destination, retry_interval) diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 4cb1dea2de..8b137e7128 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -19,7 +19,6 @@ from synapse.api.urls import FEDERATION_PREFIX as PREFIX from synapse.util.logutils import log_function import logging -import json logger = logging.getLogger(__name__) @@ -129,7 +128,7 @@ class TransportLayerClient(object): # generated by the json_data_callback. json_data = transaction.get_dict() - code, response = yield self.client.put_json( + response = yield self.client.put_json( transaction.destination, path=PREFIX + "/send/%s/" % transaction.transaction_id, data=json_data, @@ -137,95 +136,86 @@ class TransportLayerClient(object): ) logger.debug( - "send_data dest=%s, txid=%s, got response: %d", - transaction.destination, transaction.transaction_id, code + "send_data dest=%s, txid=%s, got response: 200", + transaction.destination, transaction.transaction_id, ) - defer.returnValue((code, response)) + defer.returnValue(response) @defer.inlineCallbacks @log_function def make_query(self, destination, query_type, args, retry_on_dns_fail): path = PREFIX + "/query/%s" % query_type - response = yield self.client.get_json( + content = yield self.client.get_json( destination=destination, path=path, args=args, retry_on_dns_fail=retry_on_dns_fail, ) - defer.returnValue(response) + defer.returnValue(content) @defer.inlineCallbacks @log_function def make_join(self, destination, room_id, user_id, retry_on_dns_fail=True): path = PREFIX + "/make_join/%s/%s" % (room_id, user_id) - response = yield self.client.get_json( + content = yield self.client.get_json( destination=destination, path=path, retry_on_dns_fail=retry_on_dns_fail, ) - defer.returnValue(response) + defer.returnValue(content) @defer.inlineCallbacks @log_function def send_join(self, destination, room_id, event_id, content): path = PREFIX + "/send_join/%s/%s" % (room_id, event_id) - code, content = yield self.client.put_json( + response = yield self.client.put_json( destination=destination, path=path, data=content, ) - if not 200 <= code < 300: - raise RuntimeError("Got %d from send_join", code) - - defer.returnValue(json.loads(content)) + defer.returnValue(response) @defer.inlineCallbacks @log_function def send_invite(self, destination, room_id, event_id, content): path = PREFIX + "/invite/%s/%s" % (room_id, event_id) - code, content = yield self.client.put_json( + response = yield self.client.put_json( destination=destination, path=path, data=content, ) - if not 200 <= code < 300: - raise RuntimeError("Got %d from send_invite", code) - - defer.returnValue(json.loads(content)) + defer.returnValue(response) @defer.inlineCallbacks @log_function def get_event_auth(self, destination, room_id, event_id): path = PREFIX + "/event_auth/%s/%s" % (room_id, event_id) - response = yield self.client.get_json( + content = yield self.client.get_json( destination=destination, path=path, ) - defer.returnValue(response) + defer.returnValue(content) @defer.inlineCallbacks @log_function def send_query_auth(self, destination, room_id, event_id, content): path = PREFIX + "/query_auth/%s/%s" % (room_id, event_id) - code, content = yield self.client.post_json( + content = yield self.client.post_json( destination=destination, path=path, data=content, ) - if not 200 <= code < 300: - raise RuntimeError("Got %d from send_invite", code) - - defer.returnValue(json.loads(content)) + defer.returnValue(content) |