From 941f59101b51e9225dbdc38b22110a01de194242 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 2 Feb 2015 16:56:01 +0000 Subject: Don't fail an entire request if one of the returned events fails a signature check. If an event does fail a signature check, look in the local database and request it from the originator. --- synapse/federation/federation_client.py | 107 ++++++++++++++++++++++++-------- 1 file changed, 81 insertions(+), 26 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index e1539bd0e0..b809e935a0 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -224,17 +224,17 @@ class FederationClient(object): for p in result.get("auth_chain", []) ] - for i, pdu in enumerate(pdus): - pdus[i] = yield self._check_sigs_and_hash(pdu) - - # FIXME: We should handle signature failures more gracefully. + signed_pdus = yield self._check_sigs_and_hash_and_fetch( + pdus, outlier=True + ) - for i, pdu in enumerate(auth_chain): - auth_chain[i] = yield self._check_sigs_and_hash(pdu) + signed_auth = yield self._check_sigs_and_hash_and_fetch( + auth_chain, outlier=True + ) - # FIXME: We should handle signature failures more gracefully. + signed_auth.sort(key=lambda e: e.depth) - defer.returnValue((pdus, auth_chain)) + defer.returnValue((signed_pdus, signed_auth)) @defer.inlineCallbacks @log_function @@ -248,14 +248,13 @@ class FederationClient(object): for p in res["auth_chain"] ] - for i, pdu in enumerate(auth_chain): - auth_chain[i] = yield self._check_sigs_and_hash(pdu) - - # FIXME: We should handle signature failures more gracefully. + signed_auth = yield self._check_sigs_and_hash_and_fetch( + auth_chain, outlier=True + ) - auth_chain.sort(key=lambda e: e.depth) + signed_auth.sort(key=lambda e: e.depth) - defer.returnValue(auth_chain) + defer.returnValue(signed_auth) @defer.inlineCallbacks def make_join(self, destination, room_id, user_id): @@ -291,21 +290,19 @@ class FederationClient(object): for p in content.get("auth_chain", []) ] - for i, pdu in enumerate(state): - state[i] = yield self._check_sigs_and_hash(pdu) - - # FIXME: We should handle signature failures more gracefully. - - for i, pdu in enumerate(auth_chain): - auth_chain[i] = yield self._check_sigs_and_hash(pdu) + signed_state = yield self._check_sigs_and_hash_and_fetch( + state, outlier=True + ) - # FIXME: We should handle signature failures more gracefully. + signed_auth = yield self._check_sigs_and_hash_and_fetch( + auth_chain, outlier=True + ) auth_chain.sort(key=lambda e: e.depth) defer.returnValue({ - "state": state, - "auth_chain": auth_chain, + "state": signed_state, + "auth_chain": signed_auth, }) @defer.inlineCallbacks @@ -353,12 +350,18 @@ class FederationClient(object): ) auth_chain = [ - (yield self._check_sigs_and_hash(self.event_from_pdu_json(e))) + self.event_from_pdu_json(e) for e in content["auth_chain"] ] + signed_auth = yield self._check_sigs_and_hash_and_fetch( + auth_chain, outlier=True + ) + + signed_auth.sort(key=lambda e: e.depth) + ret = { - "auth_chain": auth_chain, + "auth_chain": signed_auth, "rejects": content.get("rejects", []), "missing": content.get("missing", []), } @@ -374,6 +377,58 @@ class FederationClient(object): return event + @defer.inlineCallbacks + def _check_sigs_and_hash_and_fetch(self, pdus, outlier=False): + """Takes a list of PDUs and checks the signatures and hashs of each + one. If a PDU fails its signature check then we check if we have it in + the database and if not then request if from the originating server of + that PDU. + + If a PDU fails its content hash check then it is redacted. + + The given list of PDUs are not modified, instead the function returns + a new list. + + Args: + pdu (list) + outlier (bool) + + Returns: + Deferred : A list of PDUs that have valid signatures and hashes. + """ + signed_pdus = [] + for pdu in pdus: + try: + new_pdu = yield self._check_sigs_and_hash(pdu) + signed_pdus.append(new_pdu) + except SynapseError: + # FIXME: We should handle signature failures more gracefully. + + # Check local db. + new_pdu = yield self.store.get_event( + pdu.event_id, + allow_rejected=True + ) + if new_pdu: + signed_pdus.append(new_pdu) + continue + + # Check pdu.origin + new_pdu = yield self.get_pdu( + destinations=[pdu.origin], + event_id=pdu.event_id, + outlier=outlier, + ) + + if new_pdu: + signed_pdus.append(new_pdu) + continue + + logger.warn("Failed to find copy of %s with valid signature") + + defer.returnValue(signed_pdus) + + @defer.inlineCallbacks def _check_sigs_and_hash(self, pdu): """Throws a SynapseError if the PDU does not have the correct -- cgit 1.4.1 From 40c6fe1b81e4d92cba797b0c966fd774e2a60a28 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 2 Feb 2015 17:06:37 +0000 Subject: Don't bother requesting PDUs with bad signatures from the same server --- synapse/federation/federation_client.py | 33 +++++++++++++++++---------------- 1 file changed, 17 insertions(+), 16 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index b809e935a0..f87e84db73 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -225,11 +225,11 @@ class FederationClient(object): ] signed_pdus = yield self._check_sigs_and_hash_and_fetch( - pdus, outlier=True + destination, pdus, outlier=True ) signed_auth = yield self._check_sigs_and_hash_and_fetch( - auth_chain, outlier=True + destination, auth_chain, outlier=True ) signed_auth.sort(key=lambda e: e.depth) @@ -249,7 +249,7 @@ class FederationClient(object): ] signed_auth = yield self._check_sigs_and_hash_and_fetch( - auth_chain, outlier=True + destination, auth_chain, outlier=True ) signed_auth.sort(key=lambda e: e.depth) @@ -291,11 +291,11 @@ class FederationClient(object): ] signed_state = yield self._check_sigs_and_hash_and_fetch( - state, outlier=True + destination, state, outlier=True ) signed_auth = yield self._check_sigs_and_hash_and_fetch( - auth_chain, outlier=True + destination, auth_chain, outlier=True ) auth_chain.sort(key=lambda e: e.depth) @@ -355,7 +355,7 @@ class FederationClient(object): ] signed_auth = yield self._check_sigs_and_hash_and_fetch( - auth_chain, outlier=True + destination, auth_chain, outlier=True ) signed_auth.sort(key=lambda e: e.depth) @@ -378,7 +378,7 @@ class FederationClient(object): return event @defer.inlineCallbacks - def _check_sigs_and_hash_and_fetch(self, pdus, outlier=False): + def _check_sigs_and_hash_and_fetch(self, origin, pdus, outlier=False): """Takes a list of PDUs and checks the signatures and hashs of each one. If a PDU fails its signature check then we check if we have it in the database and if not then request if from the originating server of @@ -414,15 +414,16 @@ class FederationClient(object): continue # Check pdu.origin - new_pdu = yield self.get_pdu( - destinations=[pdu.origin], - event_id=pdu.event_id, - outlier=outlier, - ) - - if new_pdu: - signed_pdus.append(new_pdu) - continue + if pdu.origin != origin: + new_pdu = yield self.get_pdu( + destinations=[pdu.origin], + event_id=pdu.event_id, + outlier=outlier, + ) + + if new_pdu: + signed_pdus.append(new_pdu) + continue logger.warn("Failed to find copy of %s with valid signature") -- cgit 1.4.1 From 0f48e22ef66ff8a34d4af13c25e20a461c8a8390 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Feb 2015 10:43:29 +0000 Subject: PEP8 --- synapse/federation/federation_client.py | 1 - 1 file changed, 1 deletion(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index f87e84db73..9ceb66e6f4 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -429,7 +429,6 @@ class FederationClient(object): defer.returnValue(signed_pdus) - @defer.inlineCallbacks def _check_sigs_and_hash(self, pdu): """Throws a SynapseError if the PDU does not have the correct -- cgit 1.4.1 From 0dd3aea319c13e66eb1d75b5b8a196032ee332b7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Feb 2015 14:58:30 +0000 Subject: Keep around the old (buggy) version of the prune_event function so that we can use it to check signatures for events on old servers --- synapse/api/auth.py | 2 - synapse/events/utils.py | 79 +++++++++++++++++++++++++++ synapse/federation/federation_client.py | 96 +-------------------------------- synapse/federation/federation_server.py | 52 ++++-------------- 4 files changed, 92 insertions(+), 137 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 37e31d2b6f..3471afd7e7 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -102,8 +102,6 @@ class Auth(object): def check_host_in_room(self, room_id, host): curr_state = yield self.state.get_current_state(room_id) - logger.debug("Got curr_state %s", curr_state) - for event in curr_state: if event.type == EventTypes.Member: try: diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 1aa952150e..65a9f70982 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -94,6 +94,85 @@ def prune_event(event): ) +def old_prune_event(event): + """This is an old and buggy version of the prune event function. The + difference between this and the new version is that when including dicts + in the content they were included as frozen_dicts rather than dicts. This + caused the JSON encoder to encode as a list of the keys rather than the + dict. + """ + event_type = event.type + + allowed_keys = [ + "event_id", + "sender", + "room_id", + "hashes", + "signatures", + "content", + "type", + "state_key", + "depth", + "prev_events", + "prev_state", + "auth_events", + "origin", + "origin_server_ts", + "membership", + ] + + event_dict = event.get_dict() + + new_content = {} + + def add_fields(*fields): + for field in fields: + if field in event.content: + # This is the line that is buggy: event.content may return + # a frozen_dict which the json encoders encode as lists rather + # than dicts. + new_content[field] = event.content[field] + + if event_type == EventTypes.Member: + add_fields("membership") + elif event_type == EventTypes.Create: + add_fields("creator") + elif event_type == EventTypes.JoinRules: + add_fields("join_rule") + elif event_type == EventTypes.PowerLevels: + add_fields( + "users", + "users_default", + "events", + "events_default", + "events_default", + "state_default", + "ban", + "kick", + "redact", + ) + elif event_type == EventTypes.Aliases: + add_fields("aliases") + + allowed_fields = { + k: v + for k, v in event_dict.items() + if k in allowed_keys + } + + allowed_fields["content"] = new_content + + allowed_fields["unsigned"] = {} + + if "age_ts" in event.unsigned: + allowed_fields["unsigned"]["age_ts"] = event.unsigned["age_ts"] + + return type(event)( + allowed_fields, + internal_metadata_dict=event.internal_metadata.get_dict() + ) + + def format_event_raw(d): return d diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 9ceb66e6f4..5fac629709 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -16,17 +16,11 @@ from twisted.internet import defer +from .federation_base import FederationBase from .units import Edu from synapse.util.logutils import log_function from synapse.events import FrozenEvent -from synapse.events.utils import prune_event - -from syutil.jsonutil import encode_canonical_json - -from synapse.crypto.event_signing import check_event_content_hash - -from synapse.api.errors import SynapseError import logging @@ -34,7 +28,7 @@ import logging logger = logging.getLogger(__name__) -class FederationClient(object): +class FederationClient(FederationBase): @log_function def send_pdu(self, pdu, destinations): """Informs the replication layer about a new PDU generated within the @@ -376,89 +370,3 @@ class FederationClient(object): event.internal_metadata.outlier = outlier return event - - @defer.inlineCallbacks - def _check_sigs_and_hash_and_fetch(self, origin, pdus, outlier=False): - """Takes a list of PDUs and checks the signatures and hashs of each - one. If a PDU fails its signature check then we check if we have it in - the database and if not then request if from the originating server of - that PDU. - - If a PDU fails its content hash check then it is redacted. - - The given list of PDUs are not modified, instead the function returns - a new list. - - Args: - pdu (list) - outlier (bool) - - Returns: - Deferred : A list of PDUs that have valid signatures and hashes. - """ - signed_pdus = [] - for pdu in pdus: - try: - new_pdu = yield self._check_sigs_and_hash(pdu) - signed_pdus.append(new_pdu) - except SynapseError: - # FIXME: We should handle signature failures more gracefully. - - # Check local db. - new_pdu = yield self.store.get_event( - pdu.event_id, - allow_rejected=True - ) - if new_pdu: - signed_pdus.append(new_pdu) - continue - - # Check pdu.origin - if pdu.origin != origin: - new_pdu = yield self.get_pdu( - destinations=[pdu.origin], - event_id=pdu.event_id, - outlier=outlier, - ) - - if new_pdu: - signed_pdus.append(new_pdu) - continue - - logger.warn("Failed to find copy of %s with valid signature") - - defer.returnValue(signed_pdus) - - @defer.inlineCallbacks - def _check_sigs_and_hash(self, pdu): - """Throws a SynapseError if the PDU does not have the correct - signatures. - - Returns: - FrozenEvent: Either the given event or it redacted if it failed the - content hash check. - """ - # Check signatures are correct. - redacted_event = prune_event(pdu) - redacted_pdu_json = redacted_event.get_pdu_json() - - try: - yield self.keyring.verify_json_for_server( - pdu.origin, redacted_pdu_json - ) - except SynapseError: - logger.warn( - "Signature check failed for %s redacted to %s", - encode_canonical_json(pdu.get_pdu_json()), - encode_canonical_json(redacted_pdu_json), - ) - raise - - if not check_event_content_hash(pdu): - logger.warn( - "Event content has been tampered, redacting %s, %s", - pdu.event_id, encode_canonical_json(pdu.get_dict()) - ) - defer.returnValue(redacted_event) - - defer.returnValue(pdu) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 5fbd8b19de..97dca3f84c 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -16,6 +16,7 @@ from twisted.internet import defer +from .federation_base import FederationBase from .units import Transaction, Edu from synapse.util.logutils import log_function @@ -35,7 +36,7 @@ import logging logger = logging.getLogger(__name__) -class FederationServer(object): +class FederationServer(FederationBase): def set_handler(self, handler): """Sets the handler that the replication layer will use to communicate receipt of new PDUs from other home servers. The required methods are @@ -251,17 +252,20 @@ class FederationServer(object): Deferred: Results in `dict` with the same format as `content` """ auth_chain = [ - (yield self._check_sigs_and_hash(self.event_from_pdu_json(e))) + self.event_from_pdu_json(e) for e in content["auth_chain"] ] - missing = [ - (yield self._check_sigs_and_hash(self.event_from_pdu_json(e))) - for e in content.get("missing", []) - ] + signed_auth = yield self._check_sigs_and_hash_and_fetch( + origin, auth_chain, outlier=True + ) ret = yield self.handler.on_query_auth( - origin, event_id, auth_chain, content.get("rejects", []), missing + origin, + event_id, + signed_auth, + content.get("rejects", []), + content.get("missing", []), ) time_now = self._clock.time_msec() @@ -426,37 +430,3 @@ class FederationServer(object): event.internal_metadata.outlier = outlier return event - - @defer.inlineCallbacks - def _check_sigs_and_hash(self, pdu): - """Throws a SynapseError if the PDU does not have the correct - signatures. - - Returns: - FrozenEvent: Either the given event or it redacted if it failed the - content hash check. - """ - # Check signatures are correct. - redacted_event = prune_event(pdu) - redacted_pdu_json = redacted_event.get_pdu_json() - - try: - yield self.keyring.verify_json_for_server( - pdu.origin, redacted_pdu_json - ) - except SynapseError: - logger.warn( - "Signature check failed for %s redacted to %s", - encode_canonical_json(pdu.get_pdu_json()), - encode_canonical_json(redacted_pdu_json), - ) - raise - - if not check_event_content_hash(pdu): - logger.warn( - "Event content has been tampered, redacting %s, %s", - pdu.event_id, encode_canonical_json(pdu.get_dict()) - ) - defer.returnValue(redacted_event) - - defer.returnValue(pdu) -- cgit 1.4.1 From 7b810e136ef2040fe55a778d4a4a790cdbd0f84c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Feb 2015 15:00:42 +0000 Subject: Add new FederationBase --- synapse/federation/federation_base.py | 126 ++++++++++++++++++++++++++++++++++ 1 file changed, 126 insertions(+) create mode 100644 synapse/federation/federation_base.py (limited to 'synapse/federation') diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py new file mode 100644 index 0000000000..d26a2396ac --- /dev/null +++ b/synapse/federation/federation_base.py @@ -0,0 +1,126 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from twisted.internet import defer + +from synapse.events.utils import prune_event, old_prune_event + +from syutil.jsonutil import encode_canonical_json + +from synapse.crypto.event_signing import check_event_content_hash + +from synapse.api.errors import SynapseError + +import logging + + +logger = logging.getLogger(__name__) + + +class FederationBase(object): + @defer.inlineCallbacks + def _check_sigs_and_hash_and_fetch(self, origin, pdus, outlier=False): + """Takes a list of PDUs and checks the signatures and hashs of each + one. If a PDU fails its signature check then we check if we have it in + the database and if not then request if from the originating server of + that PDU. + + If a PDU fails its content hash check then it is redacted. + + The given list of PDUs are not modified, instead the function returns + a new list. + + Args: + pdu (list) + outlier (bool) + + Returns: + Deferred : A list of PDUs that have valid signatures and hashes. + """ + signed_pdus = [] + for pdu in pdus: + try: + new_pdu = yield self._check_sigs_and_hash(pdu) + signed_pdus.append(new_pdu) + except SynapseError: + # FIXME: We should handle signature failures more gracefully. + + # Check local db. + new_pdu = yield self.store.get_event( + pdu.event_id, + allow_rejected=True + ) + if new_pdu: + signed_pdus.append(new_pdu) + continue + + # Check pdu.origin + if pdu.origin != origin: + new_pdu = yield self.get_pdu( + destinations=[pdu.origin], + event_id=pdu.event_id, + outlier=outlier, + ) + + if new_pdu: + signed_pdus.append(new_pdu) + continue + + logger.warn("Failed to find copy of %s with valid signature") + + defer.returnValue(signed_pdus) + + @defer.inlineCallbacks + def _check_sigs_and_hash(self, pdu): + """Throws a SynapseError if the PDU does not have the correct + signatures. + + Returns: + FrozenEvent: Either the given event or it redacted if it failed the + content hash check. + """ + # Check signatures are correct. + redacted_event = prune_event(pdu) + redacted_pdu_json = redacted_event.get_pdu_json() + + old_redacted = old_prune_event(pdu) + old_redacted_pdu_json = old_redacted.get_pdu_json() + + try: + try: + yield self.keyring.verify_json_for_server( + pdu.origin, old_redacted_pdu_json + ) + except SynapseError: + yield self.keyring.verify_json_for_server( + pdu.origin, redacted_pdu_json + ) + except SynapseError: + logger.warn( + "Signature check failed for %s redacted to %s", + encode_canonical_json(pdu.get_pdu_json()), + encode_canonical_json(redacted_pdu_json), + ) + raise + + if not check_event_content_hash(pdu): + logger.warn( + "Event content has been tampered, redacting %s, %s", + pdu.event_id, encode_canonical_json(pdu.get_dict()) + ) + defer.returnValue(redacted_event) + + defer.returnValue(pdu) \ No newline at end of file -- cgit 1.4.1 From 8dae5c81085458a3a0b3dc92278e8c317d5de204 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Feb 2015 15:01:12 +0000 Subject: Remove unused imports --- synapse/federation/federation_server.py | 5 ----- 1 file changed, 5 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 97dca3f84c..4742ca9390 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -22,11 +22,6 @@ from .units import Transaction, Edu from synapse.util.logutils import log_function from synapse.util.logcontext import PreserveLoggingContext from synapse.events import FrozenEvent -from synapse.events.utils import prune_event - -from syutil.jsonutil import encode_canonical_json - -from synapse.crypto.event_signing import check_event_content_hash from synapse.api.errors import FederationError, SynapseError -- cgit 1.4.1 From 9bace3a36751deed141225ccabd5bebecebc25f3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Feb 2015 15:32:17 +0000 Subject: Actually, the old prune_event function was non-deterministic, so no point keeping it around :( --- synapse/events/utils.py | 79 ----------------------------------- synapse/federation/federation_base.py | 16 ++----- 2 files changed, 4 insertions(+), 91 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 65a9f70982..1aa952150e 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -94,85 +94,6 @@ def prune_event(event): ) -def old_prune_event(event): - """This is an old and buggy version of the prune event function. The - difference between this and the new version is that when including dicts - in the content they were included as frozen_dicts rather than dicts. This - caused the JSON encoder to encode as a list of the keys rather than the - dict. - """ - event_type = event.type - - allowed_keys = [ - "event_id", - "sender", - "room_id", - "hashes", - "signatures", - "content", - "type", - "state_key", - "depth", - "prev_events", - "prev_state", - "auth_events", - "origin", - "origin_server_ts", - "membership", - ] - - event_dict = event.get_dict() - - new_content = {} - - def add_fields(*fields): - for field in fields: - if field in event.content: - # This is the line that is buggy: event.content may return - # a frozen_dict which the json encoders encode as lists rather - # than dicts. - new_content[field] = event.content[field] - - if event_type == EventTypes.Member: - add_fields("membership") - elif event_type == EventTypes.Create: - add_fields("creator") - elif event_type == EventTypes.JoinRules: - add_fields("join_rule") - elif event_type == EventTypes.PowerLevels: - add_fields( - "users", - "users_default", - "events", - "events_default", - "events_default", - "state_default", - "ban", - "kick", - "redact", - ) - elif event_type == EventTypes.Aliases: - add_fields("aliases") - - allowed_fields = { - k: v - for k, v in event_dict.items() - if k in allowed_keys - } - - allowed_fields["content"] = new_content - - allowed_fields["unsigned"] = {} - - if "age_ts" in event.unsigned: - allowed_fields["unsigned"]["age_ts"] = event.unsigned["age_ts"] - - return type(event)( - allowed_fields, - internal_metadata_dict=event.internal_metadata.get_dict() - ) - - def format_event_raw(d): return d diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index d26a2396ac..27c5918e0f 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -16,7 +16,7 @@ from twisted.internet import defer -from synapse.events.utils import prune_event, old_prune_event +from synapse.events.utils import prune_event from syutil.jsonutil import encode_canonical_json @@ -96,18 +96,10 @@ class FederationBase(object): redacted_event = prune_event(pdu) redacted_pdu_json = redacted_event.get_pdu_json() - old_redacted = old_prune_event(pdu) - old_redacted_pdu_json = old_redacted.get_pdu_json() - try: - try: - yield self.keyring.verify_json_for_server( - pdu.origin, old_redacted_pdu_json - ) - except SynapseError: - yield self.keyring.verify_json_for_server( - pdu.origin, redacted_pdu_json - ) + yield self.keyring.verify_json_for_server( + pdu.origin, redacted_pdu_json + ) except SynapseError: logger.warn( "Signature check failed for %s redacted to %s", -- cgit 1.4.1 From 3c39f42a0526186f85e69988504fff6adbf41d91 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Feb 2015 16:14:19 +0000 Subject: New line --- synapse/federation/federation_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index 27c5918e0f..a990aec4fd 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -115,4 +115,4 @@ class FederationBase(object): ) defer.returnValue(redacted_event) - defer.returnValue(pdu) \ No newline at end of file + defer.returnValue(pdu) -- cgit 1.4.1 From ff78eded015b7596e883623bf826aa579662e766 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 4 Feb 2015 13:55:10 +0000 Subject: Retry make_join --- synapse/federation/federation_client.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 5fac629709..d6b8c43916 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -251,16 +251,21 @@ class FederationClient(FederationBase): defer.returnValue(signed_auth) @defer.inlineCallbacks - def make_join(self, destination, room_id, user_id): - ret = yield self.transport_layer.make_join( - destination, room_id, user_id - ) + def make_join(self, destinations, room_id, user_id): + for destination in destinations: + try: + ret = yield self.transport_layer.make_join( + destination, room_id, user_id + ) - pdu_dict = ret["event"] + pdu_dict = ret["event"] - logger.debug("Got response to make_join: %s", pdu_dict) + logger.debug("Got response to make_join: %s", pdu_dict) - defer.returnValue(self.event_from_pdu_json(pdu_dict)) + defer.returnValue(self.event_from_pdu_json(pdu_dict)) + break + except Exception as e: + logger.warn("Failed to make_join via %s", destination) @defer.inlineCallbacks def send_join(self, destination, pdu): -- cgit 1.4.1 From ae46f10fc5dba0f81518e3144ab8d9ed7a7d03bc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 4 Feb 2015 16:28:12 +0000 Subject: Apply sanity to the transport client interface. Convert 'make_join' and 'send_join' to accept iterables of destinations --- synapse/api/errors.py | 8 +++- synapse/federation/federation_client.py | 82 ++++++++++++++++++++------------- synapse/federation/transaction_queue.py | 23 +++++++-- synapse/federation/transport/client.py | 42 +++++++---------- synapse/handlers/federation.py | 4 +- synapse/http/matrixfederationclient.py | 42 ++++++++++++++--- 6 files changed, 130 insertions(+), 71 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/api/errors.py b/synapse/api/errors.py index ad478aa6b7..5041828f18 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -39,7 +39,7 @@ class Codes(object): TOO_LARGE = "M_TOO_LARGE" -class CodeMessageException(Exception): +class CodeMessageException(RuntimeError): """An exception with integer code and message string attributes.""" def __init__(self, code, msg): @@ -227,3 +227,9 @@ class FederationError(RuntimeError): "affected": self.affected, "source": self.source if self.source else self.affected, } + + +class HttpResponseException(CodeMessageException): + def __init__(self, code, msg, response): + self.response = response + super(HttpResponseException, self).__init__(code, msg) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index d6b8c43916..eb36ec040b 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -19,6 +19,7 @@ from twisted.internet import defer from .federation_base import FederationBase from .units import Edu +from synapse.api.errors import CodeMessageException from synapse.util.logutils import log_function from synapse.events import FrozenEvent @@ -180,7 +181,8 @@ class FederationClient(FederationBase): pdu = yield self._check_sigs_and_hash(pdu) break - + except CodeMessageException: + raise except Exception as e: logger.info( "Failed to get PDU %s from %s because %s", @@ -264,45 +266,63 @@ class FederationClient(FederationBase): defer.returnValue(self.event_from_pdu_json(pdu_dict)) break - except Exception as e: - logger.warn("Failed to make_join via %s", destination) + except CodeMessageException: + raise + except RuntimeError as e: + logger.warn( + "Failed to make_join via %s: %s", + destination, e.message + ) + + raise RuntimeError("Failed to send to any server.") @defer.inlineCallbacks - def send_join(self, destination, pdu): - time_now = self._clock.time_msec() - _, content = yield self.transport_layer.send_join( - destination=destination, - room_id=pdu.room_id, - event_id=pdu.event_id, - content=pdu.get_pdu_json(time_now), - ) + def send_join(self, destinations, pdu): + for destination in destinations: + try: + time_now = self._clock.time_msec() + _, content = yield self.transport_layer.send_join( + destination=destination, + room_id=pdu.room_id, + event_id=pdu.event_id, + content=pdu.get_pdu_json(time_now), + ) - logger.debug("Got content: %s", content) + logger.debug("Got content: %s", content) - state = [ - self.event_from_pdu_json(p, outlier=True) - for p in content.get("state", []) - ] + state = [ + self.event_from_pdu_json(p, outlier=True) + for p in content.get("state", []) + ] - auth_chain = [ - self.event_from_pdu_json(p, outlier=True) - for p in content.get("auth_chain", []) - ] + auth_chain = [ + self.event_from_pdu_json(p, outlier=True) + for p in content.get("auth_chain", []) + ] - signed_state = yield self._check_sigs_and_hash_and_fetch( - destination, state, outlier=True - ) + signed_state = yield self._check_sigs_and_hash_and_fetch( + destination, state, outlier=True + ) - signed_auth = yield self._check_sigs_and_hash_and_fetch( - destination, auth_chain, outlier=True - ) + signed_auth = yield self._check_sigs_and_hash_and_fetch( + destination, auth_chain, outlier=True + ) - auth_chain.sort(key=lambda e: e.depth) + auth_chain.sort(key=lambda e: e.depth) + + defer.returnValue({ + "state": signed_state, + "auth_chain": signed_auth, + }) + except CodeMessageException: + raise + except RuntimeError as e: + logger.warn( + "Failed to send_join via %s: %s", + destination, e.message + ) - defer.returnValue({ - "state": signed_state, - "auth_chain": signed_auth, - }) + raise RuntimeError("Failed to send to any server.") @defer.inlineCallbacks def send_invite(self, destination, room_id, event_id, pdu): diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 9d4f2c09a2..f38aeba7cc 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -19,6 +19,7 @@ from twisted.internet import defer from .persistence import TransactionActions from .units import Transaction +from synapse.api.errors import HttpResponseException from synapse.util.logutils import log_function from synapse.util.logcontext import PreserveLoggingContext @@ -238,9 +239,14 @@ class TransactionQueue(object): del p["age_ts"] return data - code, response = yield self.transport_layer.send_transaction( - transaction, json_data_cb - ) + try: + response = yield self.transport_layer.send_transaction( + transaction, json_data_cb + ) + code = 200 + except HttpResponseException as e: + code = e.code + response = e.response logger.info("TX [%s] got %d response", destination, code) @@ -274,8 +280,7 @@ class TransactionQueue(object): pass logger.debug("TX [%s] Yielded to callbacks", destination) - - except Exception as e: + except RuntimeError as e: # We capture this here as there as nothing actually listens # for this finishing functions deferred. logger.warn( @@ -283,6 +288,14 @@ class TransactionQueue(object): destination, e, ) + except Exception as e: + # We capture this here as there as nothing actually listens + # for this finishing functions deferred. + logger.exception( + "TX [%s] Problem in _attempt_transaction: %s", + destination, + e, + ) self.set_retrying(destination, retry_interval) diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 4cb1dea2de..8b137e7128 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -19,7 +19,6 @@ from synapse.api.urls import FEDERATION_PREFIX as PREFIX from synapse.util.logutils import log_function import logging -import json logger = logging.getLogger(__name__) @@ -129,7 +128,7 @@ class TransportLayerClient(object): # generated by the json_data_callback. json_data = transaction.get_dict() - code, response = yield self.client.put_json( + response = yield self.client.put_json( transaction.destination, path=PREFIX + "/send/%s/" % transaction.transaction_id, data=json_data, @@ -137,95 +136,86 @@ class TransportLayerClient(object): ) logger.debug( - "send_data dest=%s, txid=%s, got response: %d", - transaction.destination, transaction.transaction_id, code + "send_data dest=%s, txid=%s, got response: 200", + transaction.destination, transaction.transaction_id, ) - defer.returnValue((code, response)) + defer.returnValue(response) @defer.inlineCallbacks @log_function def make_query(self, destination, query_type, args, retry_on_dns_fail): path = PREFIX + "/query/%s" % query_type - response = yield self.client.get_json( + content = yield self.client.get_json( destination=destination, path=path, args=args, retry_on_dns_fail=retry_on_dns_fail, ) - defer.returnValue(response) + defer.returnValue(content) @defer.inlineCallbacks @log_function def make_join(self, destination, room_id, user_id, retry_on_dns_fail=True): path = PREFIX + "/make_join/%s/%s" % (room_id, user_id) - response = yield self.client.get_json( + content = yield self.client.get_json( destination=destination, path=path, retry_on_dns_fail=retry_on_dns_fail, ) - defer.returnValue(response) + defer.returnValue(content) @defer.inlineCallbacks @log_function def send_join(self, destination, room_id, event_id, content): path = PREFIX + "/send_join/%s/%s" % (room_id, event_id) - code, content = yield self.client.put_json( + response = yield self.client.put_json( destination=destination, path=path, data=content, ) - if not 200 <= code < 300: - raise RuntimeError("Got %d from send_join", code) - - defer.returnValue(json.loads(content)) + defer.returnValue(response) @defer.inlineCallbacks @log_function def send_invite(self, destination, room_id, event_id, content): path = PREFIX + "/invite/%s/%s" % (room_id, event_id) - code, content = yield self.client.put_json( + response = yield self.client.put_json( destination=destination, path=path, data=content, ) - if not 200 <= code < 300: - raise RuntimeError("Got %d from send_invite", code) - - defer.returnValue(json.loads(content)) + defer.returnValue(response) @defer.inlineCallbacks @log_function def get_event_auth(self, destination, room_id, event_id): path = PREFIX + "/event_auth/%s/%s" % (room_id, event_id) - response = yield self.client.get_json( + content = yield self.client.get_json( destination=destination, path=path, ) - defer.returnValue(response) + defer.returnValue(content) @defer.inlineCallbacks @log_function def send_query_auth(self, destination, room_id, event_id, content): path = PREFIX + "/query_auth/%s/%s" % (room_id, event_id) - code, content = yield self.client.post_json( + content = yield self.client.post_json( destination=destination, path=path, data=content, ) - if not 200 <= code < 300: - raise RuntimeError("Got %d from send_invite", code) - - defer.returnValue(json.loads(content)) + defer.returnValue(content) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 0876589e31..a968a87360 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -288,7 +288,7 @@ class FederationHandler(BaseHandler): logger.debug("Joining %s to %s", joinee, room_id) pdu = yield self.replication_layer.make_join( - target_host, + [target_host], room_id, joinee ) @@ -331,7 +331,7 @@ class FederationHandler(BaseHandler): new_event = builder.build() ret = yield self.replication_layer.send_join( - target_host, + [target_host], new_event ) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index c7bf1b47b8..8559d06b7f 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -27,7 +27,9 @@ from synapse.util.logcontext import PreserveLoggingContext from syutil.jsonutil import encode_canonical_json -from synapse.api.errors import CodeMessageException, SynapseError, Codes +from synapse.api.errors import ( + SynapseError, Codes, HttpResponseException, +) from syutil.crypto.jsonsign import sign_json @@ -163,13 +165,12 @@ class MatrixFederationHttpClient(object): ) if 200 <= response.code < 300: - # We need to update the transactions table to say it was sent? pass else: # :'( # Update transactions table? - raise CodeMessageException( - response.code, response.phrase + raise HttpResponseException( + response.code, response.phrase, response ) defer.returnValue(response) @@ -238,11 +239,20 @@ class MatrixFederationHttpClient(object): headers_dict={"Content-Type": ["application/json"]}, ) + if 200 <= response.code < 300: + # We need to update the transactions table to say it was sent? + c_type = response.headers.getRawHeaders("Content-Type") + + if "application/json" not in c_type: + raise RuntimeError( + "Content-Type not application/json" + ) + logger.debug("Getting resp body") body = yield readBody(response) logger.debug("Got resp body") - defer.returnValue((response.code, body)) + defer.returnValue(json.loads(body)) @defer.inlineCallbacks def post_json(self, destination, path, data={}): @@ -275,11 +285,20 @@ class MatrixFederationHttpClient(object): headers_dict={"Content-Type": ["application/json"]}, ) + if 200 <= response.code < 300: + # We need to update the transactions table to say it was sent? + c_type = response.headers.getRawHeaders("Content-Type") + + if "application/json" not in c_type: + raise RuntimeError( + "Content-Type not application/json" + ) + logger.debug("Getting resp body") body = yield readBody(response) logger.debug("Got resp body") - defer.returnValue((response.code, body)) + defer.returnValue(json.loads(body)) @defer.inlineCallbacks def get_json(self, destination, path, args={}, retry_on_dns_fail=True): @@ -321,7 +340,18 @@ class MatrixFederationHttpClient(object): retry_on_dns_fail=retry_on_dns_fail ) + if 200 <= response.code < 300: + # We need to update the transactions table to say it was sent? + c_type = response.headers.getRawHeaders("Content-Type") + + if "application/json" not in c_type: + raise RuntimeError( + "Content-Type not application/json" + ) + + logger.debug("Getting resp body") body = yield readBody(response) + logger.debug("Got resp body") defer.returnValue(json.loads(body)) -- cgit 1.4.1 From e1515c3e91f2117adc3976b5e606728560ce9e96 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 5 Feb 2015 13:43:28 +0000 Subject: Pass through list of room hosts from room alias query to federation so that it can retry against different room hosts --- synapse/federation/federation_client.py | 5 ++++- synapse/handlers/federation.py | 20 +++++++++++++------- synapse/handlers/room.py | 12 +++++------- 3 files changed, 22 insertions(+), 15 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index eb36ec040b..9923b3fc0d 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -264,7 +264,9 @@ class FederationClient(FederationBase): logger.debug("Got response to make_join: %s", pdu_dict) - defer.returnValue(self.event_from_pdu_json(pdu_dict)) + defer.returnValue( + (destination, self.event_from_pdu_json(pdu_dict)) + ) break except CodeMessageException: raise @@ -313,6 +315,7 @@ class FederationClient(FederationBase): defer.returnValue({ "state": signed_state, "auth_chain": signed_auth, + "origin": destination, }) except CodeMessageException: raise diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 04a4689483..aba266c2bc 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -273,7 +273,7 @@ class FederationHandler(BaseHandler): @log_function @defer.inlineCallbacks - def do_invite_join(self, target_host, room_id, joinee, content, snapshot): + def do_invite_join(self, target_hosts, room_id, joinee, content, snapshot): """ Attempts to join the `joinee` to the room `room_id` via the server `target_host`. @@ -287,8 +287,8 @@ class FederationHandler(BaseHandler): """ logger.debug("Joining %s to %s", joinee, room_id) - pdu = yield self.replication_layer.make_join( - [target_host], + origin, pdu = yield self.replication_layer.make_join( + target_hosts, room_id, joinee ) @@ -330,11 +330,17 @@ class FederationHandler(BaseHandler): new_event = builder.build() + # Try the host we successfully got a response to /make_join/ + # request first. + target_hosts.remove(origin) + target_hosts.insert(0, origin) + ret = yield self.replication_layer.send_join( - [target_host], + target_hosts, new_event ) + origin = ret["origin"] state = ret["state"] auth_chain = ret["auth_chain"] auth_chain.sort(key=lambda e: e.depth) @@ -371,7 +377,7 @@ class FederationHandler(BaseHandler): if e.event_id in auth_ids } yield self._handle_new_event( - target_host, e, auth_events=auth + origin, e, auth_events=auth ) except: logger.exception( @@ -391,7 +397,7 @@ class FederationHandler(BaseHandler): if e.event_id in auth_ids } yield self._handle_new_event( - target_host, e, auth_events=auth + origin, e, auth_events=auth ) except: logger.exception( @@ -406,7 +412,7 @@ class FederationHandler(BaseHandler): } yield self._handle_new_event( - target_host, + origin, new_event, state=state, current_state=state, diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 23821d321f..0369b907a5 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -389,8 +389,6 @@ class RoomMemberHandler(BaseHandler): if not hosts: raise SynapseError(404, "No known servers") - host = hosts[0] - # If event doesn't include a display name, add one. yield self.distributor.fire( "collect_presencelike_data", joinee, content @@ -407,12 +405,12 @@ class RoomMemberHandler(BaseHandler): }) event, context = yield self._create_new_client_event(builder) - yield self._do_join(event, context, room_host=host, do_auth=True) + yield self._do_join(event, context, room_hosts=hosts, do_auth=True) defer.returnValue({"room_id": room_id}) @defer.inlineCallbacks - def _do_join(self, event, context, room_host=None, do_auth=True): + def _do_join(self, event, context, room_hosts=None, do_auth=True): joinee = UserID.from_string(event.state_key) # room_id = RoomID.from_string(event.room_id, self.hs) room_id = event.room_id @@ -441,7 +439,7 @@ class RoomMemberHandler(BaseHandler): if is_host_in_room: should_do_dance = False - elif room_host: # TODO: Shouldn't this be remote_room_host? + elif room_hosts: # TODO: Shouldn't this be remote_room_host? should_do_dance = True else: # TODO(markjh): get prev_state from snapshot @@ -453,7 +451,7 @@ class RoomMemberHandler(BaseHandler): inviter = UserID.from_string(prev_state.user_id) should_do_dance = not self.hs.is_mine(inviter) - room_host = inviter.domain + room_hosts = [inviter.domain] else: # return the same error as join_room_alias does raise SynapseError(404, "No known servers") @@ -461,7 +459,7 @@ class RoomMemberHandler(BaseHandler): if should_do_dance: handler = self.hs.get_handlers().federation_handler yield handler.do_invite_join( - room_host, + room_hosts, room_id, event.user_id, event.get_dict()["content"], # FIXME To get a non-frozen dict -- cgit 1.4.1 From e9c85a4d5ab332021f93634182ad8ed93bd0091c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 5 Feb 2015 13:50:15 +0000 Subject: Connection errors in twisted aren't RuntimeErrors --- synapse/federation/federation_client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 9923b3fc0d..70c9a6f46b 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -270,7 +270,7 @@ class FederationClient(FederationBase): break except CodeMessageException: raise - except RuntimeError as e: + except Exception as e: logger.warn( "Failed to make_join via %s: %s", destination, e.message @@ -319,7 +319,7 @@ class FederationClient(FederationBase): }) except CodeMessageException: raise - except RuntimeError as e: + except Exception as e: logger.warn( "Failed to send_join via %s: %s", destination, e.message -- cgit 1.4.1