From 7b8861924130821c1bbd05ce65260209a993f759 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 26 Jan 2015 10:45:24 +0000 Subject: Split up replication_layer module into client, server and transaction queue --- synapse/federation/federation_server.py | 345 ++++++++++++++++++++++++++++++++ 1 file changed, 345 insertions(+) create mode 100644 synapse/federation/federation_server.py (limited to 'synapse/federation/federation_server.py') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py new file mode 100644 index 0000000000..0597725ce7 --- /dev/null +++ b/synapse/federation/federation_server.py @@ -0,0 +1,345 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from twisted.internet import defer + +from .units import Transaction, Edu + +from synapse.util.logutils import log_function +from synapse.util.logcontext import PreserveLoggingContext +from synapse.events import FrozenEvent + +import logging + + +logger = logging.getLogger(__name__) + + +class FederationServer(object): + def set_handler(self, handler): + """Sets the handler that the replication layer will use to communicate + receipt of new PDUs from other home servers. The required methods are + documented on :py:class:`.ReplicationHandler`. + """ + self.handler = handler + + def register_edu_handler(self, edu_type, handler): + if edu_type in self.edu_handlers: + raise KeyError("Already have an EDU handler for %s" % (edu_type,)) + + self.edu_handlers[edu_type] = handler + + def register_query_handler(self, query_type, handler): + """Sets the handler callable that will be used to handle an incoming + federation Query of the given type. + + Args: + query_type (str): Category name of the query, which should match + the string used by make_query. + handler (callable): Invoked to handle incoming queries of this type + + handler is invoked as: + result = handler(args) + + where 'args' is a dict mapping strings to strings of the query + arguments. It should return a Deferred that will eventually yield an + object to encode as JSON. + """ + if query_type in self.query_handlers: + raise KeyError( + "Already have a Query handler for %s" % (query_type,) + ) + + self.query_handlers[query_type] = handler + + @defer.inlineCallbacks + @log_function + def on_backfill_request(self, origin, room_id, versions, limit): + pdus = yield self.handler.on_backfill_request( + origin, room_id, versions, limit + ) + + defer.returnValue((200, self._transaction_from_pdus(pdus).get_dict())) + + @defer.inlineCallbacks + @log_function + def on_incoming_transaction(self, transaction_data): + transaction = Transaction(**transaction_data) + + for p in transaction.pdus: + if "unsigned" in p: + unsigned = p["unsigned"] + if "age" in unsigned: + p["age"] = unsigned["age"] + if "age" in p: + p["age_ts"] = int(self._clock.time_msec()) - int(p["age"]) + del p["age"] + + pdu_list = [ + self.event_from_pdu_json(p) for p in transaction.pdus + ] + + logger.debug("[%s] Got transaction", transaction.transaction_id) + + response = yield self.transaction_actions.have_responded(transaction) + + if response: + logger.debug("[%s] We've already responed to this request", + transaction.transaction_id) + defer.returnValue(response) + return + + logger.debug("[%s] Transaction is new", transaction.transaction_id) + + with PreserveLoggingContext(): + dl = [] + for pdu in pdu_list: + dl.append(self._handle_new_pdu(transaction.origin, pdu)) + + if hasattr(transaction, "edus"): + for edu in [Edu(**x) for x in transaction.edus]: + self.received_edu( + transaction.origin, + edu.edu_type, + edu.content + ) + + results = yield defer.DeferredList(dl) + + ret = [] + for r in results: + if r[0]: + ret.append({}) + else: + logger.exception(r[1]) + ret.append({"error": str(r[1])}) + + logger.debug("Returning: %s", str(ret)) + + yield self.transaction_actions.set_response( + transaction, + 200, response + ) + defer.returnValue((200, response)) + + def received_edu(self, origin, edu_type, content): + if edu_type in self.edu_handlers: + self.edu_handlers[edu_type](origin, content) + else: + logger.warn("Received EDU of type %s with no handler", edu_type) + + @defer.inlineCallbacks + @log_function + def on_context_state_request(self, origin, room_id, event_id): + if event_id: + pdus = yield self.handler.get_state_for_pdu( + origin, room_id, event_id, + ) + auth_chain = yield self.store.get_auth_chain( + [pdu.event_id for pdu in pdus] + ) + else: + raise NotImplementedError("Specify an event") + + defer.returnValue((200, { + "pdus": [pdu.get_pdu_json() for pdu in pdus], + "auth_chain": [pdu.get_pdu_json() for pdu in auth_chain], + })) + + @defer.inlineCallbacks + @log_function + def on_pdu_request(self, origin, event_id): + pdu = yield self._get_persisted_pdu(origin, event_id) + + if pdu: + defer.returnValue( + (200, self._transaction_from_pdus([pdu]).get_dict()) + ) + else: + defer.returnValue((404, "")) + + @defer.inlineCallbacks + @log_function + def on_pull_request(self, origin, versions): + raise NotImplementedError("Pull transactions not implemented") + + @defer.inlineCallbacks + def on_query_request(self, query_type, args): + if query_type in self.query_handlers: + response = yield self.query_handlers[query_type](args) + defer.returnValue((200, response)) + else: + defer.returnValue( + (404, "No handler for Query type '%s'" % (query_type,)) + ) + + @defer.inlineCallbacks + def on_make_join_request(self, room_id, user_id): + pdu = yield self.handler.on_make_join_request(room_id, user_id) + time_now = self._clock.time_msec() + defer.returnValue({"event": pdu.get_pdu_json(time_now)}) + + @defer.inlineCallbacks + def on_invite_request(self, origin, content): + pdu = self.event_from_pdu_json(content) + ret_pdu = yield self.handler.on_invite_request(origin, pdu) + time_now = self._clock.time_msec() + defer.returnValue((200, {"event": ret_pdu.get_pdu_json(time_now)})) + + @defer.inlineCallbacks + def on_send_join_request(self, origin, content): + logger.debug("on_send_join_request: content: %s", content) + pdu = self.event_from_pdu_json(content) + logger.debug("on_send_join_request: pdu sigs: %s", pdu.signatures) + res_pdus = yield self.handler.on_send_join_request(origin, pdu) + time_now = self._clock.time_msec() + defer.returnValue((200, { + "state": [p.get_pdu_json(time_now) for p in res_pdus["state"]], + "auth_chain": [ + p.get_pdu_json(time_now) for p in res_pdus["auth_chain"] + ], + })) + + @defer.inlineCallbacks + def on_event_auth(self, origin, room_id, event_id): + time_now = self._clock.time_msec() + auth_pdus = yield self.handler.on_event_auth(event_id) + defer.returnValue((200, { + "auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus], + })) + + @log_function + def _get_persisted_pdu(self, origin, event_id, do_auth=True): + """ Get a PDU from the database with given origin and id. + + Returns: + Deferred: Results in a `Pdu`. + """ + return self.handler.get_persisted_pdu( + origin, event_id, do_auth=do_auth + ) + + def _transaction_from_pdus(self, pdu_list): + """Returns a new Transaction containing the given PDUs suitable for + transmission. + """ + time_now = self._clock.time_msec() + pdus = [p.get_pdu_json(time_now) for p in pdu_list] + return Transaction( + origin=self.server_name, + pdus=pdus, + origin_server_ts=int(time_now), + destination=None, + ) + + @defer.inlineCallbacks + @log_function + def _handle_new_pdu(self, origin, pdu, max_recursion=10): + # We reprocess pdus when we have seen them only as outliers + existing = yield self._get_persisted_pdu( + origin, pdu.event_id, do_auth=False + ) + + already_seen = ( + existing and ( + not existing.internal_metadata.is_outlier() + or pdu.internal_metadata.is_outlier() + ) + ) + if already_seen: + logger.debug("Already seen pdu %s", pdu.event_id) + defer.returnValue({}) + return + + state = None + + auth_chain = [] + + have_seen = yield self.store.have_events( + [e for e, _ in pdu.prev_events] + ) + + # Get missing pdus if necessary. + if not pdu.internal_metadata.is_outlier(): + # We only backfill backwards to the min depth. + min_depth = yield self.handler.get_min_depth_for_context( + pdu.room_id + ) + + logger.debug( + "_handle_new_pdu min_depth for %s: %d", + pdu.room_id, min_depth + ) + + if min_depth and pdu.depth > min_depth and max_recursion > 0: + for event_id, hashes in pdu.prev_events: + if event_id not in have_seen: + logger.debug( + "_handle_new_pdu requesting pdu %s", + event_id + ) + + try: + new_pdu = yield self.federation_client.get_pdu( + [origin, pdu.origin], + event_id=event_id, + ) + + if new_pdu: + yield self._handle_new_pdu( + origin, + new_pdu, + max_recursion=max_recursion-1 + ) + + logger.debug("Processed pdu %s", event_id) + else: + logger.warn("Failed to get PDU %s", event_id) + except: + # TODO(erikj): Do some more intelligent retries. + logger.exception("Failed to get PDU") + else: + # We need to get the state at this event, since we have reached + # a backward extremity edge. + logger.debug( + "_handle_new_pdu getting state for %s", + pdu.room_id + ) + state, auth_chain = yield self.get_state_for_room( + origin, pdu.room_id, pdu.event_id, + ) + + ret = yield self.handler.on_receive_pdu( + origin, + pdu, + backfilled=False, + state=state, + auth_chain=auth_chain, + ) + + defer.returnValue(ret) + + def __str__(self): + return "" % self.server_name + + def event_from_pdu_json(self, pdu_json, outlier=False): + event = FrozenEvent( + pdu_json + ) + + event.internal_metadata.outlier = outlier + + return event -- cgit 1.4.1 From c92d64a6c35713aabaed11e8ef1e62d2fb84a875 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 26 Jan 2015 14:33:11 +0000 Subject: Make it the responsibility of the replication layer to check signature and hashes. --- synapse/federation/federation_client.py | 108 ++++++++++++++++++++++++++++---- synapse/federation/federation_server.py | 89 ++++++++++++++++++++++---- synapse/federation/replication.py | 2 + 3 files changed, 173 insertions(+), 26 deletions(-) (limited to 'synapse/federation/federation_server.py') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index c80f4c61bc..91b44cd8b3 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -20,6 +20,13 @@ from .units import Edu from synapse.util.logutils import log_function from synapse.events import FrozenEvent +from synapse.events.utils import prune_event + +from syutil.jsonutil import encode_canonical_json + +from synapse.crypto.event_signing import check_event_content_hash + +from synapse.api.errors import SynapseError import logging @@ -126,6 +133,11 @@ class FederationClient(object): for p in transaction_data["pdus"] ] + for i, pdu in enumerate(pdus): + pdus[i] = yield self._check_sigs_and_hash(pdu) + + # FIXME: We should handle signature failures more gracefully. + defer.returnValue(pdus) @defer.inlineCallbacks @@ -159,6 +171,22 @@ class FederationClient(object): transaction_data = yield self.transport_layer.get_event( destination, event_id ) + + logger.debug("transaction_data %r", transaction_data) + + pdu_list = [ + self.event_from_pdu_json(p, outlier=outlier) + for p in transaction_data["pdus"] + ] + + if pdu_list: + pdu = pdu_list[0] + + # Check signatures are correct. + pdu = yield self._check_sigs_and_hash(pdu) + + break + except Exception as e: logger.info( "Failed to get PDU %s from %s because %s", @@ -166,18 +194,6 @@ class FederationClient(object): ) continue - logger.debug("transaction_data %r", transaction_data) - - pdu_list = [ - self.event_from_pdu_json(p, outlier=outlier) - for p in transaction_data["pdus"] - ] - - if pdu_list: - pdu = pdu_list[0] - # TODO: We need to check signatures here - break - defer.returnValue(pdu) @defer.inlineCallbacks @@ -208,6 +224,16 @@ class FederationClient(object): for p in result.get("auth_chain", []) ] + for i, pdu in enumerate(pdus): + pdus[i] = yield self._check_sigs_and_hash(pdu) + + # FIXME: We should handle signature failures more gracefully. + + for i, pdu in enumerate(auth_chain): + auth_chain[i] = yield self._check_sigs_and_hash(pdu) + + # FIXME: We should handle signature failures more gracefully. + defer.returnValue((pdus, auth_chain)) @defer.inlineCallbacks @@ -222,6 +248,11 @@ class FederationClient(object): for p in res["auth_chain"] ] + for i, pdu in enumerate(auth_chain): + auth_chain[i] = yield self._check_sigs_and_hash(pdu) + + # FIXME: We should handle signature failures more gracefully. + auth_chain.sort(key=lambda e: e.depth) defer.returnValue(auth_chain) @@ -260,6 +291,16 @@ class FederationClient(object): for p in content.get("auth_chain", []) ] + for i, pdu in enumerate(state): + state[i] = yield self._check_sigs_and_hash(pdu) + + # FIXME: We should handle signature failures more gracefully. + + for i, pdu in enumerate(auth_chain): + auth_chain[i] = yield self._check_sigs_and_hash(pdu) + + # FIXME: We should handle signature failures more gracefully. + auth_chain.sort(key=lambda e: e.depth) defer.returnValue({ @@ -281,7 +322,14 @@ class FederationClient(object): logger.debug("Got response to send_invite: %s", pdu_dict) - defer.returnValue(self.event_from_pdu_json(pdu_dict)) + pdu = self.event_from_pdu_json(pdu_dict) + + # Check signatures are correct. + pdu = yield self._check_sigs_and_hash(pdu) + + # FIXME: We should handle signature failures more gracefully. + + defer.returnValue(pdu) def event_from_pdu_json(self, pdu_json, outlier=False): event = FrozenEvent( @@ -291,3 +339,37 @@ class FederationClient(object): event.internal_metadata.outlier = outlier return event + + @defer.inlineCallbacks + def _check_sigs_and_hash(self, pdu): + """Throws a SynapseError if the PDU does not have the correct + signatures. + + Returns: + FrozenEvent: Either the given event or it redacted if it failed the + content hash check. + """ + # Check signatures are correct. + redacted_event = prune_event(pdu) + redacted_pdu_json = redacted_event.get_pdu_json() + + try: + yield self.keyring.verify_json_for_server( + pdu.origin, redacted_pdu_json + ) + except SynapseError: + logger.warn( + "Signature check failed for %s redacted to %s", + encode_canonical_json(pdu.get_pdu_json()), + encode_canonical_json(redacted_pdu_json), + ) + raise + + if not check_event_content_hash(pdu): + logger.warn( + "Event content has been tampered, redacting %s, %s", + pdu.event_id, encode_canonical_json(pdu.get_dict()) + ) + defer.returnValue(redacted_event) + + defer.returnValue(pdu) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 0597725ce7..fc5342afaa 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -21,6 +21,13 @@ from .units import Transaction, Edu from synapse.util.logutils import log_function from synapse.util.logcontext import PreserveLoggingContext from synapse.events import FrozenEvent +from synapse.events.utils import prune_event + +from syutil.jsonutil import encode_canonical_json + +from synapse.crypto.event_signing import check_event_content_hash + +from synapse.api.errors import FederationError, SynapseError import logging @@ -97,8 +104,10 @@ class FederationServer(object): response = yield self.transaction_actions.have_responded(transaction) if response: - logger.debug("[%s] We've already responed to this request", - transaction.transaction_id) + logger.debug( + "[%s] We've already responed to this request", + transaction.transaction_id + ) defer.returnValue(response) return @@ -253,6 +262,9 @@ class FederationServer(object): origin, pdu.event_id, do_auth=False ) + # FIXME: Currently we fetch an event again when we already have it + # if it has been marked as an outlier. + already_seen = ( existing and ( not existing.internal_metadata.is_outlier() @@ -264,14 +276,27 @@ class FederationServer(object): defer.returnValue({}) return + # Check signature. + try: + pdu = yield self._check_sigs_and_hash(pdu) + except SynapseError as e: + raise FederationError( + "ERROR", + e.code, + e.msg, + affected=pdu.event_id, + ) + state = None auth_chain = [] have_seen = yield self.store.have_events( - [e for e, _ in pdu.prev_events] + [ev for ev, _ in pdu.prev_events] ) + fetch_state = False + # Get missing pdus if necessary. if not pdu.internal_metadata.is_outlier(): # We only backfill backwards to the min depth. @@ -311,16 +336,20 @@ class FederationServer(object): except: # TODO(erikj): Do some more intelligent retries. logger.exception("Failed to get PDU") - else: - # We need to get the state at this event, since we have reached - # a backward extremity edge. - logger.debug( - "_handle_new_pdu getting state for %s", - pdu.room_id - ) - state, auth_chain = yield self.get_state_for_room( - origin, pdu.room_id, pdu.event_id, - ) + fetch_state = True + else: + fetch_state = True + + if fetch_state: + # We need to get the state at this event, since we haven't + # processed all the prev events. + logger.debug( + "_handle_new_pdu getting state for %s", + pdu.room_id + ) + state, auth_chain = yield self.get_state_for_room( + origin, pdu.room_id, pdu.event_id, + ) ret = yield self.handler.on_receive_pdu( origin, @@ -343,3 +372,37 @@ class FederationServer(object): event.internal_metadata.outlier = outlier return event + + @defer.inlineCallbacks + def _check_sigs_and_hash(self, pdu): + """Throws a SynapseError if the PDU does not have the correct + signatures. + + Returns: + FrozenEvent: Either the given event or it redacted if it failed the + content hash check. + """ + # Check signatures are correct. + redacted_event = prune_event(pdu) + redacted_pdu_json = redacted_event.get_pdu_json() + + try: + yield self.keyring.verify_json_for_server( + pdu.origin, redacted_pdu_json + ) + except SynapseError: + logger.warn( + "Signature check failed for %s redacted to %s", + encode_canonical_json(pdu.get_pdu_json()), + encode_canonical_json(redacted_pdu_json), + ) + raise + + if not check_event_content_hash(pdu): + logger.warn( + "Event content has been tampered, redacting %s, %s", + pdu.event_id, encode_canonical_json(pdu.get_dict()) + ) + defer.returnValue(redacted_event) + + defer.returnValue(pdu) diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 9ef4834927..e442c6c5d5 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -51,6 +51,8 @@ class ReplicationLayer(FederationClient, FederationServer): def __init__(self, hs, transport_layer): self.server_name = hs.hostname + self.keyring = hs.get_keyring() + self.transport_layer = transport_layer self.transport_layer.register_received_handler(self) self.transport_layer.register_request_handler(self) -- cgit 1.4.1 From 78015948a7febb18e000651f72f8f58830a55b93 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 29 Jan 2015 16:50:23 +0000 Subject: Initial implementation of auth conflict resolution --- synapse/events/utils.py | 6 +- synapse/federation/federation_client.py | 2 +- synapse/federation/federation_server.py | 33 +++++ synapse/federation/transport/client.py | 16 +++ synapse/federation/transport/server.py | 21 +++- synapse/handlers/federation.py | 207 ++++++++++++++++++++------------ synapse/storage/rejections.py | 4 +- tests/handlers/test_federation.py | 2 + 8 files changed, 210 insertions(+), 81 deletions(-) (limited to 'synapse/federation/federation_server.py') diff --git a/synapse/events/utils.py b/synapse/events/utils.py index bcb5457278..10a6b9f264 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -45,12 +45,14 @@ def prune_event(event): "membership", ] + event_dict = event.get_dict() + new_content = {} def add_fields(*fields): for field in fields: if field in event.content: - new_content[field] = event.content[field] + new_content[field] = event_dict["content"][field] if event_type == EventTypes.Member: add_fields("membership") @@ -75,7 +77,7 @@ def prune_event(event): allowed_fields = { k: v - for k, v in event.get_dict().items() + for k, v in event_dict.items() if k in allowed_keys } diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index ebcd593506..1173ca817b 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -345,7 +345,7 @@ class FederationClient(object): "auth_chain": [e.get_pdu_json(time_now) for e in local_auth], } - code, content = yield self.transport_layer.send_invite( + code, content = yield self.transport_layer.send_query_auth( destination=destination, room_id=room_id, event_id=event_id, diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index fc5342afaa..8cff4e6472 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -230,6 +230,39 @@ class FederationServer(object): "auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus], })) + @defer.inlineCallbacks + def on_query_auth_request(self, origin, content, event_id): + auth_chain = [ + (yield self._check_sigs_and_hash(self.event_from_pdu_json(e))) + for e in content["auth_chain"] + ] + + missing = [ + (yield self._check_sigs_and_hash(self.event_from_pdu_json(e))) + for e in content.get("missing", []) + ] + + ret = yield self.handler.on_query_auth( + origin, event_id, auth_chain, content.get("rejects", []), missing + ) + + time_now = self._clock.time_msec() + send_content = { + "auth_chain": [ + e.get_pdu_json(time_now) + for e in ret["auth_chain"] + ], + "rejects": content.get("rejects", []), + "missing": [ + e.get_pdu_json(time_now) + for e in ret.get("missing", []) + ], + } + + defer.returnValue( + (200, send_content) + ) + @log_function def _get_persisted_pdu(self, origin, event_id, do_auth=True): """ Get a PDU from the database with given origin and id. diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index e634a3a213..4cb1dea2de 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -213,3 +213,19 @@ class TransportLayerClient(object): ) defer.returnValue(response) + + @defer.inlineCallbacks + @log_function + def send_query_auth(self, destination, room_id, event_id, content): + path = PREFIX + "/query_auth/%s/%s" % (room_id, event_id) + + code, content = yield self.client.post_json( + destination=destination, + path=path, + data=content, + ) + + if not 200 <= code < 300: + raise RuntimeError("Got %d from send_invite", code) + + defer.returnValue(json.loads(content)) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index a380a6910b..9c9f8d525b 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -42,7 +42,7 @@ class TransportLayerServer(object): content = None origin = None - if request.method == "PUT": + if request.method in ["PUT", "POST"]: # TODO: Handle other method types? other content types? try: content_bytes = request.content.read() @@ -234,6 +234,16 @@ class TransportLayerServer(object): ) ) ) + self.server.register_path( + "POST", + re.compile("^" + PREFIX + "/query_auth/([^/]*)/([^/]*)$"), + self._with_authentication( + lambda origin, content, query, context, event_id: + self._on_query_auth_request( + origin, content, event_id, + ) + ) + ) @defer.inlineCallbacks @log_function @@ -325,3 +335,12 @@ class TransportLayerServer(object): ) defer.returnValue((200, content)) + + @defer.inlineCallbacks + @log_function + def _on_query_auth_request(self, origin, content, event_id): + new_content = yield self.request_handler.on_query_auth_request( + origin, content, event_id + ) + + defer.returnValue((200, new_content)) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 97e3c503b9..14c26d8cea 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -126,7 +126,7 @@ class FederationHandler(BaseHandler): if not state: state, auth_chain = yield replication.get_state_for_room( - origin, context=event.room_id, event_id=event.event_id, + origin, room_id=event.room_id, event_id=event.event_id, ) if not auth_chain: @@ -139,7 +139,7 @@ class FederationHandler(BaseHandler): for e in auth_chain: e.internal_metadata.outlier = True try: - yield self._handle_new_event(e, fetch_auth_from=origin) + yield self._handle_new_event(origin, e) except: logger.exception( "Failed to handle auth event %s", @@ -152,7 +152,7 @@ class FederationHandler(BaseHandler): for e in state: e.internal_metadata.outlier = True try: - yield self._handle_new_event(e) + yield self._handle_new_event(origin, e) except: logger.exception( "Failed to handle state event %s", @@ -161,6 +161,7 @@ class FederationHandler(BaseHandler): try: yield self._handle_new_event( + origin, event, state=state, backfilled=backfilled, @@ -363,7 +364,14 @@ class FederationHandler(BaseHandler): for e in auth_chain: e.internal_metadata.outlier = True try: - yield self._handle_new_event(e) + auth_ids = [e_id for e_id, _ in e.auth_events] + auth = { + (e.type, e.state_key): e for e in auth_chain + if e.event_id in auth_ids + } + yield self._handle_new_event( + target_host, e, auth_events=auth + ) except: logger.exception( "Failed to handle auth event %s", @@ -374,8 +382,13 @@ class FederationHandler(BaseHandler): # FIXME: Auth these. e.internal_metadata.outlier = True try: + auth_ids = [e_id for e_id, _ in e.auth_events] + auth = { + (e.type, e.state_key): e for e in auth_chain + if e.event_id in auth_ids + } yield self._handle_new_event( - e, fetch_auth_from=target_host + target_host, e, auth_events=auth ) except: logger.exception( @@ -384,6 +397,7 @@ class FederationHandler(BaseHandler): ) yield self._handle_new_event( + target_host, new_event, state=state, current_state=state, @@ -450,7 +464,7 @@ class FederationHandler(BaseHandler): event.internal_metadata.outlier = False - context = yield self._handle_new_event(event) + context = yield self._handle_new_event(origin, event) logger.debug( "on_send_join_request: After _handle_new_event: %s, sigs: %s", @@ -651,11 +665,12 @@ class FederationHandler(BaseHandler): waiters.pop().callback(None) @defer.inlineCallbacks - def _handle_new_event(self, event, state=None, backfilled=False, - current_state=None, fetch_auth_from=None): + @log_function + def _handle_new_event(self, origin, event, state=None, backfilled=False, + current_state=None, auth_events=None): logger.debug( - "_handle_new_event: Before annotate: %s, sigs: %s", + "_handle_new_event: %s, sigs: %s", event.event_id, event.signatures, ) @@ -663,62 +678,34 @@ class FederationHandler(BaseHandler): event, old_state=state ) + if not auth_events: + auth_events = context.auth_events + logger.debug( - "_handle_new_event: Before auth fetch: %s, sigs: %s", - event.event_id, event.signatures, + "_handle_new_event: %s, auth_events: %s", + event.event_id, auth_events, ) is_new_state = not event.internal_metadata.is_outlier() - known_ids = set( - [s.event_id for s in context.auth_events.values()] - ) - - for e_id, _ in event.auth_events: - if e_id not in known_ids: - e = yield self.store.get_event(e_id, allow_none=True) - - if not e and fetch_auth_from is not None: - # Grab the auth_chain over federation if we are missing - # auth events. - auth_chain = yield self.replication_layer.get_event_auth( - fetch_auth_from, event.event_id, event.room_id - ) - for auth_event in auth_chain: - yield self._handle_new_event(auth_event) - e = yield self.store.get_event(e_id, allow_none=True) - - if not e: - # TODO: Do some conflict res to make sure that we're - # not the ones who are wrong. - logger.info( - "Rejecting %s as %s not in db or %s", - event.event_id, e_id, known_ids, - ) - # FIXME: How does raising AuthError work with federation? - raise AuthError(403, "Cannot find auth event") - - context.auth_events[(e.type, e.state_key)] = e - - logger.debug( - "_handle_new_event: Before hack: %s, sigs: %s", - event.event_id, event.signatures, - ) - + # This is a hack to fix some old rooms where the initial join event + # didn't reference the create event in its auth events. if event.type == EventTypes.Member and not event.auth_events: if len(event.prev_events) == 1: c = yield self.store.get_event(event.prev_events[0][0]) if c.type == EventTypes.Create: - context.auth_events[(c.type, c.state_key)] = c - - logger.debug( - "_handle_new_event: Before auth check: %s, sigs: %s", - event.event_id, event.signatures, - ) + auth_events[(c.type, c.state_key)] = c try: - self.auth.check(event, auth_events=context.auth_events) - except AuthError: + yield self.do_auth( + origin, event, context, auth_events=auth_events + ) + except AuthError as e: + logger.warn( + "Rejecting %s because %s", + event.event_id, e.msg + ) + # TODO: Store rejection. context.rejected = RejectedReason.AUTH_ERROR @@ -731,11 +718,6 @@ class FederationHandler(BaseHandler): ) raise - logger.debug( - "_handle_new_event: Before persist_event: %s, sigs: %s", - event.event_id, event.signatures, - ) - yield self.store.persist_event( event, context=context, @@ -744,25 +726,73 @@ class FederationHandler(BaseHandler): current_state=current_state, ) - logger.debug( - "_handle_new_event: After persist_event: %s, sigs: %s", - event.event_id, event.signatures, + defer.returnValue(context) + + @defer.inlineCallbacks + def on_query_auth(self, origin, event_id, remote_auth_chain, rejects, + missing): + # Just go through and process each event in `remote_auth_chain`. We + # don't want to fall into the trap of `missing` being wrong. + for e in remote_auth_chain: + try: + yield self._handle_new_event(origin, e) + except AuthError: + pass + + # Now get the current auth_chain for the event. + local_auth_chain = yield self.store.get_auth_chain([event_id]) + + # TODO: Check if we would now reject event_id. If so we need to tell + # everyone. + + ret = yield self.construct_auth_difference( + local_auth_chain, remote_auth_chain ) - defer.returnValue(context) + logger.debug("on_query_auth reutrning: %s", ret) + + defer.returnValue(ret) @defer.inlineCallbacks - def do_auth(self, origin, event, context): - for e_id, _ in event.auth_events: - pass + @log_function + def do_auth(self, origin, event, context, auth_events): + # Check if we have all the auth events. + res = yield self.store.have_events( + [e_id for e_id, _ in event.auth_events] + ) - auth_events = set(e_id for e_id, _ in event.auth_events) - current_state = set(e.event_id for e in context.auth_events.values()) + event_auth_events = set(e_id for e_id, _ in event.auth_events) + seen_events = set(res.keys()) - missing_auth = auth_events - current_state + missing_auth = event_auth_events - seen_events if missing_auth: + logger.debug("Missing auth: %s", missing_auth) + # If we don't have all the auth events, we need to get them. + remote_auth_chain = yield self.replication_layer.get_event_auth( + origin, event.room_id, event.event_id + ) + + for e in remote_auth_chain: + try: + auth_ids = [e_id for e_id, _ in e.auth_events] + auth = { + (e.type, e.state_key): e for e in remote_auth_chain + if e.event_id in auth_ids + } + yield self._handle_new_event( + origin, e, auth_events=auth + ) + auth_events[(e.type, e.state_key)] = e + except AuthError: + pass + + current_state = set(e.event_id for e in auth_events.values()) + different_auth = event_auth_events - current_state + + if different_auth and not event.internal_metadata.is_outlier(): # Do auth conflict res. + logger.debug("Different auth: %s", different_auth) # 1. Get what we think is the auth chain. auth_ids = self.auth.compute_auth_events(event, context) @@ -778,14 +808,24 @@ class FederationHandler(BaseHandler): # 3. Process any remote auth chain events we haven't seen. for e in result.get("missing", []): - # TODO. - pass + try: + auth_ids = [e_id for e_id, _ in e.auth_events] + auth = { + (e.type, e.state_key): e for e in result["auth_chain"] + if e.event_id in auth_ids + } + yield self._handle_new_event( + origin, e, auth_events=auth + ) + auth_events[(e.type, e.state_key)] = e + except AuthError: + pass # 4. Look at rejects and their proofs. # TODO. try: - self.auth.check(event, auth_events=context.auth_events) + self.auth.check(event, auth_events=auth_events) except AuthError: raise @@ -802,12 +842,16 @@ class FederationHandler(BaseHandler): dict """ + logger.debug("construct_auth_difference Start!") + # TODO: Make sure we are OK with local_auth or remote_auth having more # auth events in them than strictly necessary. def sort_fun(ev): return ev.depth, ev.event_id + logger.debug("construct_auth_difference after sort_fun!") + # We find the differences by starting at the "bottom" of each list # and iterating up on both lists. The lists are ordered by depth and # then event_id, we iterate up both lists until we find the event ids @@ -823,11 +867,18 @@ class FederationHandler(BaseHandler): local_iter = iter(local_list) remote_iter = iter(remote_list) - current_local = local_iter.next() - current_remote = remote_iter.next() + logger.debug("construct_auth_difference before get_next!") def get_next(it, opt=None): - return it.next() if it.has_next() else opt + try: + return it.next() + except: + return opt + + current_local = get_next(local_iter) + current_remote = get_next(remote_iter) + + logger.debug("construct_auth_difference before while") missing_remotes = [] missing_locals = [] @@ -867,6 +918,8 @@ class FederationHandler(BaseHandler): current_remote = get_next(remote_iter) continue + logger.debug("construct_auth_difference after while") + # missing locals should be sent to the server # We should find why we are missing remotes, as they will have been # rejected. @@ -886,6 +939,7 @@ class FederationHandler(BaseHandler): reason = yield self.store.get_rejection_reason(e.event_id) if reason is None: # FIXME: ERRR?! + logger.warn("Could not find reason for %s", e.event_id) raise RuntimeError("") reason_map[e.event_id] = reason @@ -899,7 +953,10 @@ class FederationHandler(BaseHandler): # TODO: Get proof. pass + logger.debug("construct_auth_difference returning") + defer.returnValue({ + "auth_chain": local_auth, "rejects": { e.event_id: { "reason": reason_map[e.event_id], diff --git a/synapse/storage/rejections.py b/synapse/storage/rejections.py index b7249700d7..4e1a9a2783 100644 --- a/synapse/storage/rejections.py +++ b/synapse/storage/rejections.py @@ -28,12 +28,12 @@ class RejectionsStore(SQLBaseStore): values={ "event_id": event_id, "reason": reason, - "last_failure": self._clock.time_msec(), + "last_check": self._clock.time_msec(), } ) def get_rejection_reason(self, event_id): - self._simple_select_one_onecol( + return self._simple_select_one_onecol( table="rejections", retcol="reason", keyvalues={ diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py index ed21defd13..44dbce6bea 100644 --- a/tests/handlers/test_federation.py +++ b/tests/handlers/test_federation.py @@ -52,6 +52,7 @@ class FederationTestCase(unittest.TestCase): "get_room", "get_destination_retry_timings", "set_destination_retry_timings", + "have_events", ]), resource_for_federation=NonCallableMock(), http_client=NonCallableMock(spec_set=[]), @@ -90,6 +91,7 @@ class FederationTestCase(unittest.TestCase): self.datastore.persist_event.return_value = defer.succeed(None) self.datastore.get_room.return_value = defer.succeed(True) self.auth.check_host_in_room.return_value = defer.succeed(True) + self.datastore.have_events.return_value = defer.succeed({}) def annotate(ev, old_state=None): context = Mock() -- cgit 1.4.1 From c1d860870b418b9583078022d0babe557b73760f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 30 Jan 2015 10:48:47 +0000 Subject: Fix regression where we no longer correctly handled the case of gaps in our event graph --- synapse/federation/federation_server.py | 3 +++ synapse/handlers/federation.py | 8 ++++++-- 2 files changed, 9 insertions(+), 2 deletions(-) (limited to 'synapse/federation/federation_server.py') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 8cff4e6472..845a07a3a3 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -366,10 +366,13 @@ class FederationServer(object): logger.debug("Processed pdu %s", event_id) else: logger.warn("Failed to get PDU %s", event_id) + fetch_state = True except: # TODO(erikj): Do some more intelligent retries. logger.exception("Failed to get PDU") fetch_state = True + else: + fetch_state = True else: fetch_state = True diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 14c26d8cea..de47a97e6b 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -119,7 +119,7 @@ class FederationHandler(BaseHandler): event.room_id, self.server_name ) - if not is_in_room and not event.internal_metadata.outlier: + if not is_in_room and not event.internal_metadata.is_outlier(): logger.debug("Got event for room we're not in.") replication = self.replication_layer @@ -780,6 +780,7 @@ class FederationHandler(BaseHandler): (e.type, e.state_key): e for e in remote_auth_chain if e.event_id in auth_ids } + e.internal_metadata.outlier = True yield self._handle_new_event( origin, e, auth_events=auth ) @@ -787,6 +788,8 @@ class FederationHandler(BaseHandler): except AuthError: pass + # FIXME: Assumes we have and stored all the state for all the + # prev_events current_state = set(e.event_id for e in auth_events.values()) different_auth = event_auth_events - current_state @@ -814,6 +817,7 @@ class FederationHandler(BaseHandler): (e.type, e.state_key): e for e in result["auth_chain"] if e.event_id in auth_ids } + e.internal_metadata.outlier = True yield self._handle_new_event( origin, e, auth_events=auth ) @@ -882,7 +886,7 @@ class FederationHandler(BaseHandler): missing_remotes = [] missing_locals = [] - while current_local and current_remote: + while current_local or current_remote: if current_remote is None: missing_locals.append(current_local) current_local = get_next(local_iter) -- cgit 1.4.1 From a70a801184814d116ed5b10a952e17c45df7bfc8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 30 Jan 2015 13:34:01 +0000 Subject: Fix bug where we superfluously asked for current state. Change API of /query_auth/ so that we don't duplicate events in the response. --- synapse/api/auth.py | 2 ++ synapse/federation/federation_client.py | 7 +---- synapse/federation/federation_server.py | 12 ++++---- synapse/handlers/federation.py | 51 ++++++++++++--------------------- synapse/state.py | 20 ++++++++++--- 5 files changed, 43 insertions(+), 49 deletions(-) (limited to 'synapse/federation/federation_server.py') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 3471afd7e7..37e31d2b6f 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -102,6 +102,8 @@ class Auth(object): def check_host_in_room(self, room_id, host): curr_state = yield self.state.get_current_state(room_id) + logger.debug("Got curr_state %s", curr_state) + for event in curr_state: if event.type == EventTypes.Member: try: diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 1173ca817b..e1539bd0e0 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -357,15 +357,10 @@ class FederationClient(object): for e in content["auth_chain"] ] - missing = [ - (yield self._check_sigs_and_hash(self.event_from_pdu_json(e))) - for e in content.get("missing", []) - ] - ret = { "auth_chain": auth_chain, "rejects": content.get("rejects", []), - "missing": missing, + "missing": content.get("missing", []), } defer.returnValue(ret) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 845a07a3a3..84ed0a0ba0 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -252,11 +252,8 @@ class FederationServer(object): e.get_pdu_json(time_now) for e in ret["auth_chain"] ], - "rejects": content.get("rejects", []), - "missing": [ - e.get_pdu_json(time_now) - for e in ret.get("missing", []) - ], + "rejects": ret.get("rejects", []), + "missing": ret.get("missing", []), } defer.returnValue( @@ -372,7 +369,10 @@ class FederationServer(object): logger.exception("Failed to get PDU") fetch_state = True else: - fetch_state = True + prevs = {e_id for e_id, _ in pdu.prev_events} + seen = set(have_seen.keys()) + if prevs - seen: + fetch_state = True else: fetch_state = True diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index cc22f21cd1..35cad4182a 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -121,38 +121,18 @@ class FederationHandler(BaseHandler): ) if not is_in_room and not event.internal_metadata.is_outlier(): logger.debug("Got event for room we're not in.") - - replication = self.replication_layer - - if not state: - state, auth_chain = yield replication.get_state_for_room( - origin, room_id=event.room_id, event_id=event.event_id, - ) - - if not auth_chain: - auth_chain = yield replication.get_event_auth( - origin, - context=event.room_id, - event_id=event.event_id, - ) - - for e in auth_chain: - e.internal_metadata.outlier = True - try: - yield self._handle_new_event(origin, e) - except: - logger.exception( - "Failed to handle auth event %s", - e.event_id, - ) - current_state = state - if state: + if state and auth_chain is not None: for e in state: e.internal_metadata.outlier = True try: - yield self._handle_new_event(origin, e) + auth_ids = [e_id for e_id, _ in e.auth_events] + auth = { + (e.type, e.state_key): e for e in auth_chain + if e.event_id in auth_ids + } + yield self._handle_new_event(origin, e, auth_events=auth) except: logger.exception( "Failed to handle state event %s", @@ -809,18 +789,23 @@ class FederationHandler(BaseHandler): ) # 3. Process any remote auth chain events we haven't seen. - for e in result.get("missing", []): + for missing_id in result.get("missing", []): try: - auth_ids = [e_id for e_id, _ in e.auth_events] + for e in result["auth_chain"]: + if e.event_id == missing_id: + ev = e + break + + auth_ids = [e_id for e_id, _ in ev.auth_events] auth = { (e.type, e.state_key): e for e in result["auth_chain"] if e.event_id in auth_ids } - e.internal_metadata.outlier = True + ev.internal_metadata.outlier = True yield self._handle_new_event( - origin, e, auth_events=auth + origin, ev, auth_events=auth ) - auth_events[(e.type, e.state_key)] = e + auth_events[(ev.type, ev.state_key)] = ev except AuthError: pass @@ -970,5 +955,5 @@ class FederationHandler(BaseHandler): } for e in base_remote_rejected }, - "missing": missing_locals, + "missing": [e.event_id for e in missing_locals], }) diff --git a/synapse/state.py b/synapse/state.py index d9fdfb34be..e6632978b5 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -166,10 +166,17 @@ class StateHandler(object): first is the name of a state group if one and only one is involved, otherwise `None`. """ + logger.debug("resolve_state_groups event_ids %s", event_ids) + state_groups = yield self.store.get_state_groups( event_ids ) + logger.debug( + "resolve_state_groups state_groups %s", + state_groups.keys() + ) + group_names = set(state_groups.keys()) if len(group_names) == 1: name, state_list = state_groups.items().pop() @@ -205,6 +212,15 @@ class StateHandler(object): if len(v.values()) > 1 } + logger.debug( + "resolve_state_groups Unconflicted state: %s", + unconflicted_state.values(), + ) + logger.debug( + "resolve_state_groups Conflicted state: %s", + conflicted_state.values(), + ) + if event_type: prev_states_events = conflicted_state.get( (event_type, state_key), [] @@ -240,10 +256,6 @@ class StateHandler(object): 1. power levels 2. memberships 3. other events. - - :param conflicted_state: - :param auth_events: - :return: """ resolved_state = {} power_key = (EventTypes.PowerLevels, "") -- cgit 1.4.1 From 776ac820f9bb7f0e9c2fae9facbee05b0132079e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 30 Jan 2015 15:58:28 +0000 Subject: Briefly doc structure of query_auth API. --- synapse/federation/federation_server.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) (limited to 'synapse/federation/federation_server.py') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 84ed0a0ba0..5fbd8b19de 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -232,6 +232,24 @@ class FederationServer(object): @defer.inlineCallbacks def on_query_auth_request(self, origin, content, event_id): + """ + Content is a dict with keys:: + auth_chain (list): A list of events that give the auth chain. + missing (list): A list of event_ids indicating what the other + side (`origin`) think we're missing. + rejects (dict): A mapping from event_id to a 2-tuple of reason + string and a proof (or None) of why the event was rejected. + The keys of this dict give the list of events the `origin` has + rejected. + + Args: + origin (str) + content (dict) + event_id (str) + + Returns: + Deferred: Results in `dict` with the same format as `content` + """ auth_chain = [ (yield self._check_sigs_and_hash(self.event_from_pdu_json(e))) for e in content["auth_chain"] -- cgit 1.4.1