From ca65a9d03e4e6e8bc9b42a3ff5a83c1e5294fc7b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 22 Jan 2015 16:37:08 +0000 Subject: Split out TransactionQueue from replication layer --- synapse/federation/replication.py | 291 +---------------------------- synapse/federation/transaction_queue.py | 314 ++++++++++++++++++++++++++++++++ 2 files changed, 316 insertions(+), 289 deletions(-) create mode 100644 synapse/federation/transaction_queue.py (limited to 'synapse/federation') diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 6620532a60..accf95e406 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -20,8 +20,8 @@ a given transport. from twisted.internet import defer from .units import Transaction, Edu - from .persistence import TransactionActions +from .transaction_queue import TransactionQueue from synapse.util.logutils import log_function from synapse.util.logcontext import PreserveLoggingContext @@ -62,7 +62,7 @@ class ReplicationLayer(object): # self.pdu_actions = PduActions(self.store) self.transaction_actions = TransactionActions(self.store) - self._transaction_queue = _TransactionQueue( + self._transaction_queue = TransactionQueue( hs, self.transaction_actions, transport_layer ) @@ -662,290 +662,3 @@ class ReplicationLayer(object): event.internal_metadata.outlier = outlier return event - - -class _TransactionQueue(object): - """This class makes sure we only have one transaction in flight at - a time for a given destination. - - It batches pending PDUs into single transactions. - """ - - def __init__(self, hs, transaction_actions, transport_layer): - self.server_name = hs.hostname - self.transaction_actions = transaction_actions - self.transport_layer = transport_layer - - self._clock = hs.get_clock() - self.store = hs.get_datastore() - - # Is a mapping from destinations -> deferreds. Used to keep track - # of which destinations have transactions in flight and when they are - # done - self.pending_transactions = {} - - # Is a mapping from destination -> list of - # tuple(pending pdus, deferred, order) - self.pending_pdus_by_dest = {} - # destination -> list of tuple(edu, deferred) - self.pending_edus_by_dest = {} - - # destination -> list of tuple(failure, deferred) - self.pending_failures_by_dest = {} - - # HACK to get unique tx id - self._next_txn_id = int(self._clock.time_msec()) - - @defer.inlineCallbacks - @log_function - def enqueue_pdu(self, pdu, destinations, order): - # We loop through all destinations to see whether we already have - # a transaction in progress. If we do, stick it in the pending_pdus - # table and we'll get back to it later. - - destinations = set(destinations) - destinations.discard(self.server_name) - destinations.discard("localhost") - - logger.debug("Sending to: %s", str(destinations)) - - if not destinations: - return - - deferreds = [] - - for destination in destinations: - deferred = defer.Deferred() - self.pending_pdus_by_dest.setdefault(destination, []).append( - (pdu, deferred, order) - ) - - def eb(failure): - if not deferred.called: - deferred.errback(failure) - else: - logger.warn("Failed to send pdu", failure) - - with PreserveLoggingContext(): - self._attempt_new_transaction(destination).addErrback(eb) - - deferreds.append(deferred) - - yield defer.DeferredList(deferreds) - - # NO inlineCallbacks - def enqueue_edu(self, edu): - destination = edu.destination - - if destination == self.server_name: - return - - deferred = defer.Deferred() - self.pending_edus_by_dest.setdefault(destination, []).append( - (edu, deferred) - ) - - def eb(failure): - if not deferred.called: - deferred.errback(failure) - else: - logger.warn("Failed to send edu", failure) - - with PreserveLoggingContext(): - self._attempt_new_transaction(destination).addErrback(eb) - - return deferred - - @defer.inlineCallbacks - def enqueue_failure(self, failure, destination): - deferred = defer.Deferred() - - self.pending_failures_by_dest.setdefault( - destination, [] - ).append( - (failure, deferred) - ) - - yield deferred - - @defer.inlineCallbacks - @log_function - def _attempt_new_transaction(self, destination): - - (retry_last_ts, retry_interval) = (0, 0) - retry_timings = yield self.store.get_destination_retry_timings( - destination - ) - if retry_timings: - (retry_last_ts, retry_interval) = ( - retry_timings.retry_last_ts, retry_timings.retry_interval - ) - if retry_last_ts + retry_interval > int(self._clock.time_msec()): - logger.info( - "TX [%s] not ready for retry yet - " - "dropping transaction for now", - destination, - ) - return - else: - logger.info("TX [%s] is ready for retry", destination) - - logger.info("TX [%s] _attempt_new_transaction", destination) - - if destination in self.pending_transactions: - # XXX: pending_transactions can get stuck on by a never-ending - # request at which point pending_pdus_by_dest just keeps growing. - # we need application-layer timeouts of some flavour of these - # requests - return - - # list of (pending_pdu, deferred, order) - pending_pdus = self.pending_pdus_by_dest.pop(destination, []) - pending_edus = self.pending_edus_by_dest.pop(destination, []) - pending_failures = self.pending_failures_by_dest.pop(destination, []) - - if pending_pdus: - logger.info("TX [%s] len(pending_pdus_by_dest[dest]) = %d", - destination, len(pending_pdus)) - - if not pending_pdus and not pending_edus and not pending_failures: - return - - logger.debug( - "TX [%s] Attempting new transaction" - " (pdus: %d, edus: %d, failures: %d)", - destination, - len(pending_pdus), - len(pending_edus), - len(pending_failures) - ) - - # Sort based on the order field - pending_pdus.sort(key=lambda t: t[2]) - - pdus = [x[0] for x in pending_pdus] - edus = [x[0] for x in pending_edus] - failures = [x[0].get_dict() for x in pending_failures] - deferreds = [ - x[1] - for x in pending_pdus + pending_edus + pending_failures - ] - - try: - self.pending_transactions[destination] = 1 - - logger.debug("TX [%s] Persisting transaction...", destination) - - transaction = Transaction.create_new( - origin_server_ts=int(self._clock.time_msec()), - transaction_id=str(self._next_txn_id), - origin=self.server_name, - destination=destination, - pdus=pdus, - edus=edus, - pdu_failures=failures, - ) - - self._next_txn_id += 1 - - yield self.transaction_actions.prepare_to_send(transaction) - - logger.debug("TX [%s] Persisted transaction", destination) - logger.info( - "TX [%s] Sending transaction [%s]", - destination, - transaction.transaction_id, - ) - - # Actually send the transaction - - # FIXME (erikj): This is a bit of a hack to make the Pdu age - # keys work - def json_data_cb(): - data = transaction.get_dict() - now = int(self._clock.time_msec()) - if "pdus" in data: - for p in data["pdus"]: - if "age_ts" in p: - unsigned = p.setdefault("unsigned", {}) - unsigned["age"] = now - int(p["age_ts"]) - del p["age_ts"] - return data - - code, response = yield self.transport_layer.send_transaction( - transaction, json_data_cb - ) - - logger.info("TX [%s] got %d response", destination, code) - - logger.debug("TX [%s] Sent transaction", destination) - logger.debug("TX [%s] Marking as delivered...", destination) - - yield self.transaction_actions.delivered( - transaction, code, response - ) - - logger.debug("TX [%s] Marked as delivered", destination) - logger.debug("TX [%s] Yielding to callbacks...", destination) - - for deferred in deferreds: - if code == 200: - if retry_last_ts: - # this host is alive! reset retry schedule - yield self.store.set_destination_retry_timings( - destination, 0, 0 - ) - deferred.callback(None) - else: - self.set_retrying(destination, retry_interval) - deferred.errback(RuntimeError("Got status %d" % code)) - - # Ensures we don't continue until all callbacks on that - # deferred have fired - try: - yield deferred - except: - pass - - logger.debug("TX [%s] Yielded to callbacks", destination) - - except Exception as e: - # We capture this here as there as nothing actually listens - # for this finishing functions deferred. - logger.warn( - "TX [%s] Problem in _attempt_transaction: %s", - destination, - e, - ) - - self.set_retrying(destination, retry_interval) - - for deferred in deferreds: - if not deferred.called: - deferred.errback(e) - - finally: - # We want to be *very* sure we delete this after we stop processing - self.pending_transactions.pop(destination, None) - - # Check to see if there is anything else to send. - self._attempt_new_transaction(destination) - - @defer.inlineCallbacks - def set_retrying(self, destination, retry_interval): - # track that this destination is having problems and we should - # give it a chance to recover before trying it again - - if retry_interval: - retry_interval *= 2 - # plateau at hourly retries for now - if retry_interval >= 60 * 60 * 1000: - retry_interval = 60 * 60 * 1000 - else: - retry_interval = 2000 # try again at first after 2 seconds - - yield self.store.set_destination_retry_timings( - destination, - int(self._clock.time_msec()), - retry_interval - ) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py new file mode 100644 index 0000000000..c2cb4a1c49 --- /dev/null +++ b/synapse/federation/transaction_queue.py @@ -0,0 +1,314 @@ +# -*- coding: utf-8 -*- +# Copyright 2014, 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from twisted.internet import defer + +from .units import Transaction + +from synapse.util.logutils import log_function +from synapse.util.logcontext import PreserveLoggingContext + +import logging + + +logger = logging.getLogger(__name__) + + +class TransactionQueue(object): + """This class makes sure we only have one transaction in flight at + a time for a given destination. + + It batches pending PDUs into single transactions. + """ + + def __init__(self, hs, transaction_actions, transport_layer): + self.server_name = hs.hostname + self.transaction_actions = transaction_actions + self.transport_layer = transport_layer + + self._clock = hs.get_clock() + self.store = hs.get_datastore() + + # Is a mapping from destinations -> deferreds. Used to keep track + # of which destinations have transactions in flight and when they are + # done + self.pending_transactions = {} + + # Is a mapping from destination -> list of + # tuple(pending pdus, deferred, order) + self.pending_pdus_by_dest = {} + # destination -> list of tuple(edu, deferred) + self.pending_edus_by_dest = {} + + # destination -> list of tuple(failure, deferred) + self.pending_failures_by_dest = {} + + # HACK to get unique tx id + self._next_txn_id = int(self._clock.time_msec()) + + @defer.inlineCallbacks + @log_function + def enqueue_pdu(self, pdu, destinations, order): + # We loop through all destinations to see whether we already have + # a transaction in progress. If we do, stick it in the pending_pdus + # table and we'll get back to it later. + + destinations = set(destinations) + destinations.discard(self.server_name) + destinations.discard("localhost") + + logger.debug("Sending to: %s", str(destinations)) + + if not destinations: + return + + deferreds = [] + + for destination in destinations: + deferred = defer.Deferred() + self.pending_pdus_by_dest.setdefault(destination, []).append( + (pdu, deferred, order) + ) + + def eb(failure): + if not deferred.called: + deferred.errback(failure) + else: + logger.warn("Failed to send pdu", failure) + + with PreserveLoggingContext(): + self._attempt_new_transaction(destination).addErrback(eb) + + deferreds.append(deferred) + + yield defer.DeferredList(deferreds) + + # NO inlineCallbacks + def enqueue_edu(self, edu): + destination = edu.destination + + if destination == self.server_name: + return + + deferred = defer.Deferred() + self.pending_edus_by_dest.setdefault(destination, []).append( + (edu, deferred) + ) + + def eb(failure): + if not deferred.called: + deferred.errback(failure) + else: + logger.warn("Failed to send edu", failure) + + with PreserveLoggingContext(): + self._attempt_new_transaction(destination).addErrback(eb) + + return deferred + + @defer.inlineCallbacks + def enqueue_failure(self, failure, destination): + deferred = defer.Deferred() + + self.pending_failures_by_dest.setdefault( + destination, [] + ).append( + (failure, deferred) + ) + + yield deferred + + @defer.inlineCallbacks + @log_function + def _attempt_new_transaction(self, destination): + + (retry_last_ts, retry_interval) = (0, 0) + retry_timings = yield self.store.get_destination_retry_timings( + destination + ) + if retry_timings: + (retry_last_ts, retry_interval) = ( + retry_timings.retry_last_ts, retry_timings.retry_interval + ) + if retry_last_ts + retry_interval > int(self._clock.time_msec()): + logger.info( + "TX [%s] not ready for retry yet - " + "dropping transaction for now", + destination, + ) + return + else: + logger.info("TX [%s] is ready for retry", destination) + + logger.info("TX [%s] _attempt_new_transaction", destination) + + if destination in self.pending_transactions: + # XXX: pending_transactions can get stuck on by a never-ending + # request at which point pending_pdus_by_dest just keeps growing. + # we need application-layer timeouts of some flavour of these + # requests + return + + # list of (pending_pdu, deferred, order) + pending_pdus = self.pending_pdus_by_dest.pop(destination, []) + pending_edus = self.pending_edus_by_dest.pop(destination, []) + pending_failures = self.pending_failures_by_dest.pop(destination, []) + + if pending_pdus: + logger.info("TX [%s] len(pending_pdus_by_dest[dest]) = %d", + destination, len(pending_pdus)) + + if not pending_pdus and not pending_edus and not pending_failures: + return + + logger.debug( + "TX [%s] Attempting new transaction" + " (pdus: %d, edus: %d, failures: %d)", + destination, + len(pending_pdus), + len(pending_edus), + len(pending_failures) + ) + + # Sort based on the order field + pending_pdus.sort(key=lambda t: t[2]) + + pdus = [x[0] for x in pending_pdus] + edus = [x[0] for x in pending_edus] + failures = [x[0].get_dict() for x in pending_failures] + deferreds = [ + x[1] + for x in pending_pdus + pending_edus + pending_failures + ] + + try: + self.pending_transactions[destination] = 1 + + logger.debug("TX [%s] Persisting transaction...", destination) + + transaction = Transaction.create_new( + origin_server_ts=int(self._clock.time_msec()), + transaction_id=str(self._next_txn_id), + origin=self.server_name, + destination=destination, + pdus=pdus, + edus=edus, + pdu_failures=failures, + ) + + self._next_txn_id += 1 + + yield self.transaction_actions.prepare_to_send(transaction) + + logger.debug("TX [%s] Persisted transaction", destination) + logger.info( + "TX [%s] Sending transaction [%s]", + destination, + transaction.transaction_id, + ) + + # Actually send the transaction + + # FIXME (erikj): This is a bit of a hack to make the Pdu age + # keys work + def json_data_cb(): + data = transaction.get_dict() + now = int(self._clock.time_msec()) + if "pdus" in data: + for p in data["pdus"]: + if "age_ts" in p: + unsigned = p.setdefault("unsigned", {}) + unsigned["age"] = now - int(p["age_ts"]) + del p["age_ts"] + return data + + code, response = yield self.transport_layer.send_transaction( + transaction, json_data_cb + ) + + logger.info("TX [%s] got %d response", destination, code) + + logger.debug("TX [%s] Sent transaction", destination) + logger.debug("TX [%s] Marking as delivered...", destination) + + yield self.transaction_actions.delivered( + transaction, code, response + ) + + logger.debug("TX [%s] Marked as delivered", destination) + logger.debug("TX [%s] Yielding to callbacks...", destination) + + for deferred in deferreds: + if code == 200: + if retry_last_ts: + # this host is alive! reset retry schedule + yield self.store.set_destination_retry_timings( + destination, 0, 0 + ) + deferred.callback(None) + else: + self.set_retrying(destination, retry_interval) + deferred.errback(RuntimeError("Got status %d" % code)) + + # Ensures we don't continue until all callbacks on that + # deferred have fired + try: + yield deferred + except: + pass + + logger.debug("TX [%s] Yielded to callbacks", destination) + + except Exception as e: + # We capture this here as there as nothing actually listens + # for this finishing functions deferred. + logger.warn( + "TX [%s] Problem in _attempt_transaction: %s", + destination, + e, + ) + + self.set_retrying(destination, retry_interval) + + for deferred in deferreds: + if not deferred.called: + deferred.errback(e) + + finally: + # We want to be *very* sure we delete this after we stop processing + self.pending_transactions.pop(destination, None) + + # Check to see if there is anything else to send. + self._attempt_new_transaction(destination) + + @defer.inlineCallbacks + def set_retrying(self, destination, retry_interval): + # track that this destination is having problems and we should + # give it a chance to recover before trying it again + + if retry_interval: + retry_interval *= 2 + # plateau at hourly retries for now + if retry_interval >= 60 * 60 * 1000: + retry_interval = 60 * 60 * 1000 + else: + retry_interval = 2000 # try again at first after 2 seconds + + yield self.store.set_destination_retry_timings( + destination, + int(self._clock.time_msec()), + retry_interval + ) -- cgit 1.5.1 From 7b8861924130821c1bbd05ce65260209a993f759 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 26 Jan 2015 10:45:24 +0000 Subject: Split up replication_layer module into client, server and transaction queue --- synapse/federation/federation_client.py | 293 +++++++++++++++ synapse/federation/federation_server.py | 345 ++++++++++++++++++ synapse/federation/replication.py | 608 +------------------------------- synapse/federation/transaction_queue.py | 9 +- synapse/storage/__init__.py | 2 +- 5 files changed, 654 insertions(+), 603 deletions(-) create mode 100644 synapse/federation/federation_client.py create mode 100644 synapse/federation/federation_server.py (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py new file mode 100644 index 0000000000..c80f4c61bc --- /dev/null +++ b/synapse/federation/federation_client.py @@ -0,0 +1,293 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from twisted.internet import defer + +from .units import Edu + +from synapse.util.logutils import log_function +from synapse.events import FrozenEvent + +import logging + + +logger = logging.getLogger(__name__) + + +class FederationClient(object): + @log_function + def send_pdu(self, pdu, destinations): + """Informs the replication layer about a new PDU generated within the + home server that should be transmitted to others. + + TODO: Figure out when we should actually resolve the deferred. + + Args: + pdu (Pdu): The new Pdu. + + Returns: + Deferred: Completes when we have successfully processed the PDU + and replicated it to any interested remote home servers. + """ + order = self._order + self._order += 1 + + logger.debug("[%s] transaction_layer.enqueue_pdu... ", pdu.event_id) + + # TODO, add errback, etc. + self._transaction_queue.enqueue_pdu(pdu, destinations, order) + + logger.debug( + "[%s] transaction_layer.enqueue_pdu... done", + pdu.event_id + ) + + @log_function + def send_edu(self, destination, edu_type, content): + edu = Edu( + origin=self.server_name, + destination=destination, + edu_type=edu_type, + content=content, + ) + + # TODO, add errback, etc. + self._transaction_queue.enqueue_edu(edu) + return defer.succeed(None) + + @log_function + def send_failure(self, failure, destination): + self._transaction_queue.enqueue_failure(failure, destination) + return defer.succeed(None) + + @log_function + def make_query(self, destination, query_type, args, + retry_on_dns_fail=True): + """Sends a federation Query to a remote homeserver of the given type + and arguments. + + Args: + destination (str): Domain name of the remote homeserver + query_type (str): Category of the query type; should match the + handler name used in register_query_handler(). + args (dict): Mapping of strings to strings containing the details + of the query request. + + Returns: + a Deferred which will eventually yield a JSON object from the + response + """ + return self.transport_layer.make_query( + destination, query_type, args, retry_on_dns_fail=retry_on_dns_fail + ) + + @defer.inlineCallbacks + @log_function + def backfill(self, dest, context, limit, extremities): + """Requests some more historic PDUs for the given context from the + given destination server. + + Args: + dest (str): The remote home server to ask. + context (str): The context to backfill. + limit (int): The maximum number of PDUs to return. + extremities (list): List of PDU id and origins of the first pdus + we have seen from the context + + Returns: + Deferred: Results in the received PDUs. + """ + logger.debug("backfill extrem=%s", extremities) + + # If there are no extremeties then we've (probably) reached the start. + if not extremities: + return + + transaction_data = yield self.transport_layer.backfill( + dest, context, extremities, limit) + + logger.debug("backfill transaction_data=%s", repr(transaction_data)) + + pdus = [ + self.event_from_pdu_json(p, outlier=False) + for p in transaction_data["pdus"] + ] + + defer.returnValue(pdus) + + @defer.inlineCallbacks + @log_function + def get_pdu(self, destinations, event_id, outlier=False): + """Requests the PDU with given origin and ID from the remote home + servers. + + Will attempt to get the PDU from each destination in the list until + one succeeds. + + This will persist the PDU locally upon receipt. + + Args: + destinations (list): Which home servers to query + pdu_origin (str): The home server that originally sent the pdu. + event_id (str) + outlier (bool): Indicates whether the PDU is an `outlier`, i.e. if + it's from an arbitary point in the context as opposed to part + of the current block of PDUs. Defaults to `False` + + Returns: + Deferred: Results in the requested PDU. + """ + + # TODO: Rate limit the number of times we try and get the same event. + + pdu = None + for destination in destinations: + try: + transaction_data = yield self.transport_layer.get_event( + destination, event_id + ) + except Exception as e: + logger.info( + "Failed to get PDU %s from %s because %s", + event_id, destination, e, + ) + continue + + logger.debug("transaction_data %r", transaction_data) + + pdu_list = [ + self.event_from_pdu_json(p, outlier=outlier) + for p in transaction_data["pdus"] + ] + + if pdu_list: + pdu = pdu_list[0] + # TODO: We need to check signatures here + break + + defer.returnValue(pdu) + + @defer.inlineCallbacks + @log_function + def get_state_for_room(self, destination, room_id, event_id): + """Requests all of the `current` state PDUs for a given room from + a remote home server. + + Args: + destination (str): The remote homeserver to query for the state. + room_id (str): The id of the room we're interested in. + event_id (str): The id of the event we want the state at. + + Returns: + Deferred: Results in a list of PDUs. + """ + + result = yield self.transport_layer.get_room_state( + destination, room_id, event_id=event_id, + ) + + pdus = [ + self.event_from_pdu_json(p, outlier=True) for p in result["pdus"] + ] + + auth_chain = [ + self.event_from_pdu_json(p, outlier=True) + for p in result.get("auth_chain", []) + ] + + defer.returnValue((pdus, auth_chain)) + + @defer.inlineCallbacks + @log_function + def get_event_auth(self, destination, room_id, event_id): + res = yield self.transport_layer.get_event_auth( + destination, room_id, event_id, + ) + + auth_chain = [ + self.event_from_pdu_json(p, outlier=True) + for p in res["auth_chain"] + ] + + auth_chain.sort(key=lambda e: e.depth) + + defer.returnValue(auth_chain) + + @defer.inlineCallbacks + def make_join(self, destination, room_id, user_id): + ret = yield self.transport_layer.make_join( + destination, room_id, user_id + ) + + pdu_dict = ret["event"] + + logger.debug("Got response to make_join: %s", pdu_dict) + + defer.returnValue(self.event_from_pdu_json(pdu_dict)) + + @defer.inlineCallbacks + def send_join(self, destination, pdu): + time_now = self._clock.time_msec() + _, content = yield self.transport_layer.send_join( + destination=destination, + room_id=pdu.room_id, + event_id=pdu.event_id, + content=pdu.get_pdu_json(time_now), + ) + + logger.debug("Got content: %s", content) + + state = [ + self.event_from_pdu_json(p, outlier=True) + for p in content.get("state", []) + ] + + auth_chain = [ + self.event_from_pdu_json(p, outlier=True) + for p in content.get("auth_chain", []) + ] + + auth_chain.sort(key=lambda e: e.depth) + + defer.returnValue({ + "state": state, + "auth_chain": auth_chain, + }) + + @defer.inlineCallbacks + def send_invite(self, destination, room_id, event_id, pdu): + time_now = self._clock.time_msec() + code, content = yield self.transport_layer.send_invite( + destination=destination, + room_id=room_id, + event_id=event_id, + content=pdu.get_pdu_json(time_now), + ) + + pdu_dict = content["event"] + + logger.debug("Got response to send_invite: %s", pdu_dict) + + defer.returnValue(self.event_from_pdu_json(pdu_dict)) + + def event_from_pdu_json(self, pdu_json, outlier=False): + event = FrozenEvent( + pdu_json + ) + + event.internal_metadata.outlier = outlier + + return event diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py new file mode 100644 index 0000000000..0597725ce7 --- /dev/null +++ b/synapse/federation/federation_server.py @@ -0,0 +1,345 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from twisted.internet import defer + +from .units import Transaction, Edu + +from synapse.util.logutils import log_function +from synapse.util.logcontext import PreserveLoggingContext +from synapse.events import FrozenEvent + +import logging + + +logger = logging.getLogger(__name__) + + +class FederationServer(object): + def set_handler(self, handler): + """Sets the handler that the replication layer will use to communicate + receipt of new PDUs from other home servers. The required methods are + documented on :py:class:`.ReplicationHandler`. + """ + self.handler = handler + + def register_edu_handler(self, edu_type, handler): + if edu_type in self.edu_handlers: + raise KeyError("Already have an EDU handler for %s" % (edu_type,)) + + self.edu_handlers[edu_type] = handler + + def register_query_handler(self, query_type, handler): + """Sets the handler callable that will be used to handle an incoming + federation Query of the given type. + + Args: + query_type (str): Category name of the query, which should match + the string used by make_query. + handler (callable): Invoked to handle incoming queries of this type + + handler is invoked as: + result = handler(args) + + where 'args' is a dict mapping strings to strings of the query + arguments. It should return a Deferred that will eventually yield an + object to encode as JSON. + """ + if query_type in self.query_handlers: + raise KeyError( + "Already have a Query handler for %s" % (query_type,) + ) + + self.query_handlers[query_type] = handler + + @defer.inlineCallbacks + @log_function + def on_backfill_request(self, origin, room_id, versions, limit): + pdus = yield self.handler.on_backfill_request( + origin, room_id, versions, limit + ) + + defer.returnValue((200, self._transaction_from_pdus(pdus).get_dict())) + + @defer.inlineCallbacks + @log_function + def on_incoming_transaction(self, transaction_data): + transaction = Transaction(**transaction_data) + + for p in transaction.pdus: + if "unsigned" in p: + unsigned = p["unsigned"] + if "age" in unsigned: + p["age"] = unsigned["age"] + if "age" in p: + p["age_ts"] = int(self._clock.time_msec()) - int(p["age"]) + del p["age"] + + pdu_list = [ + self.event_from_pdu_json(p) for p in transaction.pdus + ] + + logger.debug("[%s] Got transaction", transaction.transaction_id) + + response = yield self.transaction_actions.have_responded(transaction) + + if response: + logger.debug("[%s] We've already responed to this request", + transaction.transaction_id) + defer.returnValue(response) + return + + logger.debug("[%s] Transaction is new", transaction.transaction_id) + + with PreserveLoggingContext(): + dl = [] + for pdu in pdu_list: + dl.append(self._handle_new_pdu(transaction.origin, pdu)) + + if hasattr(transaction, "edus"): + for edu in [Edu(**x) for x in transaction.edus]: + self.received_edu( + transaction.origin, + edu.edu_type, + edu.content + ) + + results = yield defer.DeferredList(dl) + + ret = [] + for r in results: + if r[0]: + ret.append({}) + else: + logger.exception(r[1]) + ret.append({"error": str(r[1])}) + + logger.debug("Returning: %s", str(ret)) + + yield self.transaction_actions.set_response( + transaction, + 200, response + ) + defer.returnValue((200, response)) + + def received_edu(self, origin, edu_type, content): + if edu_type in self.edu_handlers: + self.edu_handlers[edu_type](origin, content) + else: + logger.warn("Received EDU of type %s with no handler", edu_type) + + @defer.inlineCallbacks + @log_function + def on_context_state_request(self, origin, room_id, event_id): + if event_id: + pdus = yield self.handler.get_state_for_pdu( + origin, room_id, event_id, + ) + auth_chain = yield self.store.get_auth_chain( + [pdu.event_id for pdu in pdus] + ) + else: + raise NotImplementedError("Specify an event") + + defer.returnValue((200, { + "pdus": [pdu.get_pdu_json() for pdu in pdus], + "auth_chain": [pdu.get_pdu_json() for pdu in auth_chain], + })) + + @defer.inlineCallbacks + @log_function + def on_pdu_request(self, origin, event_id): + pdu = yield self._get_persisted_pdu(origin, event_id) + + if pdu: + defer.returnValue( + (200, self._transaction_from_pdus([pdu]).get_dict()) + ) + else: + defer.returnValue((404, "")) + + @defer.inlineCallbacks + @log_function + def on_pull_request(self, origin, versions): + raise NotImplementedError("Pull transactions not implemented") + + @defer.inlineCallbacks + def on_query_request(self, query_type, args): + if query_type in self.query_handlers: + response = yield self.query_handlers[query_type](args) + defer.returnValue((200, response)) + else: + defer.returnValue( + (404, "No handler for Query type '%s'" % (query_type,)) + ) + + @defer.inlineCallbacks + def on_make_join_request(self, room_id, user_id): + pdu = yield self.handler.on_make_join_request(room_id, user_id) + time_now = self._clock.time_msec() + defer.returnValue({"event": pdu.get_pdu_json(time_now)}) + + @defer.inlineCallbacks + def on_invite_request(self, origin, content): + pdu = self.event_from_pdu_json(content) + ret_pdu = yield self.handler.on_invite_request(origin, pdu) + time_now = self._clock.time_msec() + defer.returnValue((200, {"event": ret_pdu.get_pdu_json(time_now)})) + + @defer.inlineCallbacks + def on_send_join_request(self, origin, content): + logger.debug("on_send_join_request: content: %s", content) + pdu = self.event_from_pdu_json(content) + logger.debug("on_send_join_request: pdu sigs: %s", pdu.signatures) + res_pdus = yield self.handler.on_send_join_request(origin, pdu) + time_now = self._clock.time_msec() + defer.returnValue((200, { + "state": [p.get_pdu_json(time_now) for p in res_pdus["state"]], + "auth_chain": [ + p.get_pdu_json(time_now) for p in res_pdus["auth_chain"] + ], + })) + + @defer.inlineCallbacks + def on_event_auth(self, origin, room_id, event_id): + time_now = self._clock.time_msec() + auth_pdus = yield self.handler.on_event_auth(event_id) + defer.returnValue((200, { + "auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus], + })) + + @log_function + def _get_persisted_pdu(self, origin, event_id, do_auth=True): + """ Get a PDU from the database with given origin and id. + + Returns: + Deferred: Results in a `Pdu`. + """ + return self.handler.get_persisted_pdu( + origin, event_id, do_auth=do_auth + ) + + def _transaction_from_pdus(self, pdu_list): + """Returns a new Transaction containing the given PDUs suitable for + transmission. + """ + time_now = self._clock.time_msec() + pdus = [p.get_pdu_json(time_now) for p in pdu_list] + return Transaction( + origin=self.server_name, + pdus=pdus, + origin_server_ts=int(time_now), + destination=None, + ) + + @defer.inlineCallbacks + @log_function + def _handle_new_pdu(self, origin, pdu, max_recursion=10): + # We reprocess pdus when we have seen them only as outliers + existing = yield self._get_persisted_pdu( + origin, pdu.event_id, do_auth=False + ) + + already_seen = ( + existing and ( + not existing.internal_metadata.is_outlier() + or pdu.internal_metadata.is_outlier() + ) + ) + if already_seen: + logger.debug("Already seen pdu %s", pdu.event_id) + defer.returnValue({}) + return + + state = None + + auth_chain = [] + + have_seen = yield self.store.have_events( + [e for e, _ in pdu.prev_events] + ) + + # Get missing pdus if necessary. + if not pdu.internal_metadata.is_outlier(): + # We only backfill backwards to the min depth. + min_depth = yield self.handler.get_min_depth_for_context( + pdu.room_id + ) + + logger.debug( + "_handle_new_pdu min_depth for %s: %d", + pdu.room_id, min_depth + ) + + if min_depth and pdu.depth > min_depth and max_recursion > 0: + for event_id, hashes in pdu.prev_events: + if event_id not in have_seen: + logger.debug( + "_handle_new_pdu requesting pdu %s", + event_id + ) + + try: + new_pdu = yield self.federation_client.get_pdu( + [origin, pdu.origin], + event_id=event_id, + ) + + if new_pdu: + yield self._handle_new_pdu( + origin, + new_pdu, + max_recursion=max_recursion-1 + ) + + logger.debug("Processed pdu %s", event_id) + else: + logger.warn("Failed to get PDU %s", event_id) + except: + # TODO(erikj): Do some more intelligent retries. + logger.exception("Failed to get PDU") + else: + # We need to get the state at this event, since we have reached + # a backward extremity edge. + logger.debug( + "_handle_new_pdu getting state for %s", + pdu.room_id + ) + state, auth_chain = yield self.get_state_for_room( + origin, pdu.room_id, pdu.event_id, + ) + + ret = yield self.handler.on_receive_pdu( + origin, + pdu, + backfilled=False, + state=state, + auth_chain=auth_chain, + ) + + defer.returnValue(ret) + + def __str__(self): + return "" % self.server_name + + def event_from_pdu_json(self, pdu_json, outlier=False): + event = FrozenEvent( + pdu_json + ) + + event.internal_metadata.outlier = outlier + + return event diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index accf95e406..9ef4834927 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -17,15 +17,12 @@ a given transport. """ -from twisted.internet import defer +from .federation_client import FederationClient +from .federation_server import FederationServer -from .units import Transaction, Edu -from .persistence import TransactionActions from .transaction_queue import TransactionQueue -from synapse.util.logutils import log_function -from synapse.util.logcontext import PreserveLoggingContext -from synapse.events import FrozenEvent +from .persistence import TransactionActions import logging @@ -33,7 +30,7 @@ import logging logger = logging.getLogger(__name__) -class ReplicationLayer(object): +class ReplicationLayer(FederationClient, FederationServer): """This layer is responsible for replicating with remote home servers over the given transport. I.e., does the sending and receiving of PDUs to remote home servers. @@ -58,607 +55,20 @@ class ReplicationLayer(object): self.transport_layer.register_received_handler(self) self.transport_layer.register_request_handler(self) - self.store = hs.get_datastore() - # self.pdu_actions = PduActions(self.store) - self.transaction_actions = TransactionActions(self.store) + self.federation_client = self - self._transaction_queue = TransactionQueue( - hs, self.transaction_actions, transport_layer - ) + self.store = hs.get_datastore() self.handler = None self.edu_handlers = {} self.query_handlers = {} - self._order = 0 - self._clock = hs.get_clock() - self.event_builder_factory = hs.get_event_builder_factory() - - def set_handler(self, handler): - """Sets the handler that the replication layer will use to communicate - receipt of new PDUs from other home servers. The required methods are - documented on :py:class:`.ReplicationHandler`. - """ - self.handler = handler - - def register_edu_handler(self, edu_type, handler): - if edu_type in self.edu_handlers: - raise KeyError("Already have an EDU handler for %s" % (edu_type,)) - - self.edu_handlers[edu_type] = handler - - def register_query_handler(self, query_type, handler): - """Sets the handler callable that will be used to handle an incoming - federation Query of the given type. - - Args: - query_type (str): Category name of the query, which should match - the string used by make_query. - handler (callable): Invoked to handle incoming queries of this type - - handler is invoked as: - result = handler(args) - - where 'args' is a dict mapping strings to strings of the query - arguments. It should return a Deferred that will eventually yield an - object to encode as JSON. - """ - if query_type in self.query_handlers: - raise KeyError( - "Already have a Query handler for %s" % (query_type,) - ) - - self.query_handlers[query_type] = handler - - @log_function - def send_pdu(self, pdu, destinations): - """Informs the replication layer about a new PDU generated within the - home server that should be transmitted to others. - - TODO: Figure out when we should actually resolve the deferred. - - Args: - pdu (Pdu): The new Pdu. - - Returns: - Deferred: Completes when we have successfully processed the PDU - and replicated it to any interested remote home servers. - """ - order = self._order - self._order += 1 - - logger.debug("[%s] transaction_layer.enqueue_pdu... ", pdu.event_id) - - # TODO, add errback, etc. - self._transaction_queue.enqueue_pdu(pdu, destinations, order) - - logger.debug( - "[%s] transaction_layer.enqueue_pdu... done", - pdu.event_id - ) - - @log_function - def send_edu(self, destination, edu_type, content): - edu = Edu( - origin=self.server_name, - destination=destination, - edu_type=edu_type, - content=content, - ) - - # TODO, add errback, etc. - self._transaction_queue.enqueue_edu(edu) - return defer.succeed(None) - - @log_function - def send_failure(self, failure, destination): - self._transaction_queue.enqueue_failure(failure, destination) - return defer.succeed(None) - - @log_function - def make_query(self, destination, query_type, args, - retry_on_dns_fail=True): - """Sends a federation Query to a remote homeserver of the given type - and arguments. - - Args: - destination (str): Domain name of the remote homeserver - query_type (str): Category of the query type; should match the - handler name used in register_query_handler(). - args (dict): Mapping of strings to strings containing the details - of the query request. - - Returns: - a Deferred which will eventually yield a JSON object from the - response - """ - return self.transport_layer.make_query( - destination, query_type, args, retry_on_dns_fail=retry_on_dns_fail - ) - - @defer.inlineCallbacks - @log_function - def backfill(self, dest, context, limit, extremities): - """Requests some more historic PDUs for the given context from the - given destination server. - - Args: - dest (str): The remote home server to ask. - context (str): The context to backfill. - limit (int): The maximum number of PDUs to return. - extremities (list): List of PDU id and origins of the first pdus - we have seen from the context - - Returns: - Deferred: Results in the received PDUs. - """ - logger.debug("backfill extrem=%s", extremities) - - # If there are no extremeties then we've (probably) reached the start. - if not extremities: - return - - transaction_data = yield self.transport_layer.backfill( - dest, context, extremities, limit) - - logger.debug("backfill transaction_data=%s", repr(transaction_data)) - - transaction = Transaction(**transaction_data) - - pdus = [ - self.event_from_pdu_json(p, outlier=False) - for p in transaction.pdus - ] - for pdu in pdus: - yield self._handle_new_pdu(dest, pdu, backfilled=True) - - defer.returnValue(pdus) - - @defer.inlineCallbacks - @log_function - def get_pdu(self, destination, event_id, outlier=False): - """Requests the PDU with given origin and ID from the remote home - server. - - This will persist the PDU locally upon receipt. - - Args: - destination (str): Which home server to query - pdu_origin (str): The home server that originally sent the pdu. - event_id (str) - outlier (bool): Indicates whether the PDU is an `outlier`, i.e. if - it's from an arbitary point in the context as opposed to part - of the current block of PDUs. Defaults to `False` - - Returns: - Deferred: Results in the requested PDU. - """ - - transaction_data = yield self.transport_layer.get_event( - destination, event_id - ) - - transaction = Transaction(**transaction_data) - - pdu_list = [ - self.event_from_pdu_json(p, outlier=outlier) - for p in transaction.pdus - ] - - pdu = None - if pdu_list: - pdu = pdu_list[0] - yield self._handle_new_pdu(destination, pdu) - - defer.returnValue(pdu) - - @defer.inlineCallbacks - @log_function - def get_state_for_room(self, destination, room_id, event_id): - """Requests all of the `current` state PDUs for a given room from - a remote home server. - - Args: - destination (str): The remote homeserver to query for the state. - room_id (str): The id of the room we're interested in. - event_id (str): The id of the event we want the state at. - - Returns: - Deferred: Results in a list of PDUs. - """ - - result = yield self.transport_layer.get_room_state( - destination, room_id, event_id=event_id, - ) - - pdus = [ - self.event_from_pdu_json(p, outlier=True) for p in result["pdus"] - ] - - auth_chain = [ - self.event_from_pdu_json(p, outlier=True) - for p in result.get("auth_chain", []) - ] - - defer.returnValue((pdus, auth_chain)) - - @defer.inlineCallbacks - @log_function - def get_event_auth(self, destination, room_id, event_id): - res = yield self.transport_layer.get_event_auth( - destination, room_id, event_id, - ) - - auth_chain = [ - self.event_from_pdu_json(p, outlier=True) - for p in res["auth_chain"] - ] - - auth_chain.sort(key=lambda e: e.depth) - - defer.returnValue(auth_chain) - - @defer.inlineCallbacks - @log_function - def on_backfill_request(self, origin, room_id, versions, limit): - pdus = yield self.handler.on_backfill_request( - origin, room_id, versions, limit - ) - - defer.returnValue((200, self._transaction_from_pdus(pdus).get_dict())) - - @defer.inlineCallbacks - @log_function - def on_incoming_transaction(self, transaction_data): - transaction = Transaction(**transaction_data) - - for p in transaction.pdus: - if "unsigned" in p: - unsigned = p["unsigned"] - if "age" in unsigned: - p["age"] = unsigned["age"] - if "age" in p: - p["age_ts"] = int(self._clock.time_msec()) - int(p["age"]) - del p["age"] - - pdu_list = [ - self.event_from_pdu_json(p) for p in transaction.pdus - ] - - logger.debug("[%s] Got transaction", transaction.transaction_id) - - response = yield self.transaction_actions.have_responded(transaction) - - if response: - logger.debug("[%s] We've already responed to this request", - transaction.transaction_id) - defer.returnValue(response) - return - - logger.debug("[%s] Transaction is new", transaction.transaction_id) - - with PreserveLoggingContext(): - dl = [] - for pdu in pdu_list: - dl.append(self._handle_new_pdu(transaction.origin, pdu)) - - if hasattr(transaction, "edus"): - for edu in [Edu(**x) for x in transaction.edus]: - self.received_edu( - transaction.origin, - edu.edu_type, - edu.content - ) - - results = yield defer.DeferredList(dl) - - ret = [] - for r in results: - if r[0]: - ret.append({}) - else: - logger.exception(r[1]) - ret.append({"error": str(r[1])}) - - logger.debug("Returning: %s", str(ret)) - - yield self.transaction_actions.set_response( - transaction, - 200, response - ) - defer.returnValue((200, response)) - - def received_edu(self, origin, edu_type, content): - if edu_type in self.edu_handlers: - self.edu_handlers[edu_type](origin, content) - else: - logger.warn("Received EDU of type %s with no handler", edu_type) - - @defer.inlineCallbacks - @log_function - def on_context_state_request(self, origin, room_id, event_id): - if event_id: - pdus = yield self.handler.get_state_for_pdu( - origin, room_id, event_id, - ) - auth_chain = yield self.store.get_auth_chain( - [pdu.event_id for pdu in pdus] - ) - else: - raise NotImplementedError("Specify an event") - - defer.returnValue((200, { - "pdus": [pdu.get_pdu_json() for pdu in pdus], - "auth_chain": [pdu.get_pdu_json() for pdu in auth_chain], - })) - - @defer.inlineCallbacks - @log_function - def on_pdu_request(self, origin, event_id): - pdu = yield self._get_persisted_pdu(origin, event_id) - - if pdu: - defer.returnValue( - (200, self._transaction_from_pdus([pdu]).get_dict()) - ) - else: - defer.returnValue((404, "")) - - @defer.inlineCallbacks - @log_function - def on_pull_request(self, origin, versions): - raise NotImplementedError("Pull transactions not implemented") - - @defer.inlineCallbacks - def on_query_request(self, query_type, args): - if query_type in self.query_handlers: - response = yield self.query_handlers[query_type](args) - defer.returnValue((200, response)) - else: - defer.returnValue( - (404, "No handler for Query type '%s'" % (query_type,)) - ) - - @defer.inlineCallbacks - def on_make_join_request(self, room_id, user_id): - pdu = yield self.handler.on_make_join_request(room_id, user_id) - time_now = self._clock.time_msec() - defer.returnValue({"event": pdu.get_pdu_json(time_now)}) - - @defer.inlineCallbacks - def on_invite_request(self, origin, content): - pdu = self.event_from_pdu_json(content) - ret_pdu = yield self.handler.on_invite_request(origin, pdu) - time_now = self._clock.time_msec() - defer.returnValue((200, {"event": ret_pdu.get_pdu_json(time_now)})) - - @defer.inlineCallbacks - def on_send_join_request(self, origin, content): - logger.debug("on_send_join_request: content: %s", content) - pdu = self.event_from_pdu_json(content) - logger.debug("on_send_join_request: pdu sigs: %s", pdu.signatures) - res_pdus = yield self.handler.on_send_join_request(origin, pdu) - time_now = self._clock.time_msec() - defer.returnValue((200, { - "state": [p.get_pdu_json(time_now) for p in res_pdus["state"]], - "auth_chain": [ - p.get_pdu_json(time_now) for p in res_pdus["auth_chain"] - ], - })) - - @defer.inlineCallbacks - def on_event_auth(self, origin, room_id, event_id): - time_now = self._clock.time_msec() - auth_pdus = yield self.handler.on_event_auth(event_id) - defer.returnValue((200, { - "auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus], - })) - - @defer.inlineCallbacks - def make_join(self, destination, room_id, user_id): - ret = yield self.transport_layer.make_join( - destination, room_id, user_id - ) - - pdu_dict = ret["event"] - - logger.debug("Got response to make_join: %s", pdu_dict) - - defer.returnValue(self.event_from_pdu_json(pdu_dict)) - - @defer.inlineCallbacks - def send_join(self, destination, pdu): - time_now = self._clock.time_msec() - _, content = yield self.transport_layer.send_join( - destination=destination, - room_id=pdu.room_id, - event_id=pdu.event_id, - content=pdu.get_pdu_json(time_now), - ) - - logger.debug("Got content: %s", content) - - state = [ - self.event_from_pdu_json(p, outlier=True) - for p in content.get("state", []) - ] - - auth_chain = [ - self.event_from_pdu_json(p, outlier=True) - for p in content.get("auth_chain", []) - ] - - auth_chain.sort(key=lambda e: e.depth) - - defer.returnValue({ - "state": state, - "auth_chain": auth_chain, - }) - - @defer.inlineCallbacks - def send_invite(self, destination, room_id, event_id, pdu): - time_now = self._clock.time_msec() - code, content = yield self.transport_layer.send_invite( - destination=destination, - room_id=room_id, - event_id=event_id, - content=pdu.get_pdu_json(time_now), - ) - - pdu_dict = content["event"] - - logger.debug("Got response to send_invite: %s", pdu_dict) - - defer.returnValue(self.event_from_pdu_json(pdu_dict)) - - @log_function - def _get_persisted_pdu(self, origin, event_id, do_auth=True): - """ Get a PDU from the database with given origin and id. - - Returns: - Deferred: Results in a `Pdu`. - """ - return self.handler.get_persisted_pdu( - origin, event_id, do_auth=do_auth - ) - - def _transaction_from_pdus(self, pdu_list): - """Returns a new Transaction containing the given PDUs suitable for - transmission. - """ - time_now = self._clock.time_msec() - pdus = [p.get_pdu_json(time_now) for p in pdu_list] - return Transaction( - origin=self.server_name, - pdus=pdus, - origin_server_ts=int(time_now), - destination=None, - ) - - @defer.inlineCallbacks - @log_function - def _handle_new_pdu(self, origin, pdu, backfilled=False): - # We reprocess pdus when we have seen them only as outliers - existing = yield self._get_persisted_pdu( - origin, pdu.event_id, do_auth=False - ) - - already_seen = ( - existing and ( - not existing.internal_metadata.is_outlier() - or pdu.internal_metadata.is_outlier() - ) - ) - if already_seen: - logger.debug("Already seen pdu %s", pdu.event_id) - defer.returnValue({}) - return - - state = None - - auth_chain = [] - - # We need to make sure we have all the auth events. - # for e_id, _ in pdu.auth_events: - # exists = yield self._get_persisted_pdu( - # origin, - # e_id, - # do_auth=False - # ) - # - # if not exists: - # try: - # logger.debug( - # "_handle_new_pdu fetch missing auth event %s from %s", - # e_id, - # origin, - # ) - # - # yield self.get_pdu( - # origin, - # event_id=e_id, - # outlier=True, - # ) - # - # logger.debug("Processed pdu %s", e_id) - # except: - # logger.warn( - # "Failed to get auth event %s from %s", - # e_id, - # origin - # ) - - # Get missing pdus if necessary. - if not pdu.internal_metadata.is_outlier(): - # We only backfill backwards to the min depth. - min_depth = yield self.handler.get_min_depth_for_context( - pdu.room_id - ) - - logger.debug( - "_handle_new_pdu min_depth for %s: %d", - pdu.room_id, min_depth - ) - - if min_depth and pdu.depth > min_depth: - for event_id, hashes in pdu.prev_events: - exists = yield self._get_persisted_pdu( - origin, - event_id, - do_auth=False - ) - - if not exists: - logger.debug( - "_handle_new_pdu requesting pdu %s", - event_id - ) - - try: - yield self.get_pdu( - origin, - event_id=event_id, - ) - logger.debug("Processed pdu %s", event_id) - except: - # TODO(erikj): Do some more intelligent retries. - logger.exception("Failed to get PDU") - else: - # We need to get the state at this event, since we have reached - # a backward extremity edge. - logger.debug( - "_handle_new_pdu getting state for %s", - pdu.room_id - ) - state, auth_chain = yield self.get_state_for_room( - origin, pdu.room_id, pdu.event_id, - ) - - if not backfilled: - ret = yield self.handler.on_receive_pdu( - origin, - pdu, - backfilled=backfilled, - state=state, - auth_chain=auth_chain, - ) - else: - ret = None - - # yield self.pdu_actions.mark_as_processed(pdu) + self.transaction_actions = TransactionActions(self.store) + self._transaction_queue = TransactionQueue(hs, transport_layer) - defer.returnValue(ret) + self._order = 0 def __str__(self): return "" % self.server_name - - def event_from_pdu_json(self, pdu_json, outlier=False): - event = FrozenEvent( - pdu_json - ) - - event.internal_metadata.outlier = outlier - - return event diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index c2cb4a1c49..9d4f2c09a2 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -16,6 +16,7 @@ from twisted.internet import defer +from .persistence import TransactionActions from .units import Transaction from synapse.util.logutils import log_function @@ -34,13 +35,15 @@ class TransactionQueue(object): It batches pending PDUs into single transactions. """ - def __init__(self, hs, transaction_actions, transport_layer): + def __init__(self, hs, transport_layer): self.server_name = hs.hostname - self.transaction_actions = transaction_actions + + self.store = hs.get_datastore() + self.transaction_actions = TransactionActions(self.store) + self.transport_layer = transport_layer self._clock = hs.get_clock() - self.store = hs.get_datastore() # Is a mapping from destinations -> deferreds. Used to keep track # of which destinations have transactions in flight and when they are diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 4f09909607..27d835db79 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -434,7 +434,7 @@ class DataStore(RoomMemberStore, RoomStore, sql = ( "SELECT e.event_id, reason FROM events as e " "LEFT JOIN rejections as r ON e.event_id = r.event_id " - "WHERE event_id = ?" + "WHERE e.event_id = ?" ) res = {} -- cgit 1.5.1 From c92d64a6c35713aabaed11e8ef1e62d2fb84a875 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 26 Jan 2015 14:33:11 +0000 Subject: Make it the responsibility of the replication layer to check signature and hashes. --- synapse/federation/federation_client.py | 108 ++++++++++++++++++++++++++++---- synapse/federation/federation_server.py | 89 ++++++++++++++++++++++---- synapse/federation/replication.py | 2 + 3 files changed, 173 insertions(+), 26 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index c80f4c61bc..91b44cd8b3 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -20,6 +20,13 @@ from .units import Edu from synapse.util.logutils import log_function from synapse.events import FrozenEvent +from synapse.events.utils import prune_event + +from syutil.jsonutil import encode_canonical_json + +from synapse.crypto.event_signing import check_event_content_hash + +from synapse.api.errors import SynapseError import logging @@ -126,6 +133,11 @@ class FederationClient(object): for p in transaction_data["pdus"] ] + for i, pdu in enumerate(pdus): + pdus[i] = yield self._check_sigs_and_hash(pdu) + + # FIXME: We should handle signature failures more gracefully. + defer.returnValue(pdus) @defer.inlineCallbacks @@ -159,6 +171,22 @@ class FederationClient(object): transaction_data = yield self.transport_layer.get_event( destination, event_id ) + + logger.debug("transaction_data %r", transaction_data) + + pdu_list = [ + self.event_from_pdu_json(p, outlier=outlier) + for p in transaction_data["pdus"] + ] + + if pdu_list: + pdu = pdu_list[0] + + # Check signatures are correct. + pdu = yield self._check_sigs_and_hash(pdu) + + break + except Exception as e: logger.info( "Failed to get PDU %s from %s because %s", @@ -166,18 +194,6 @@ class FederationClient(object): ) continue - logger.debug("transaction_data %r", transaction_data) - - pdu_list = [ - self.event_from_pdu_json(p, outlier=outlier) - for p in transaction_data["pdus"] - ] - - if pdu_list: - pdu = pdu_list[0] - # TODO: We need to check signatures here - break - defer.returnValue(pdu) @defer.inlineCallbacks @@ -208,6 +224,16 @@ class FederationClient(object): for p in result.get("auth_chain", []) ] + for i, pdu in enumerate(pdus): + pdus[i] = yield self._check_sigs_and_hash(pdu) + + # FIXME: We should handle signature failures more gracefully. + + for i, pdu in enumerate(auth_chain): + auth_chain[i] = yield self._check_sigs_and_hash(pdu) + + # FIXME: We should handle signature failures more gracefully. + defer.returnValue((pdus, auth_chain)) @defer.inlineCallbacks @@ -222,6 +248,11 @@ class FederationClient(object): for p in res["auth_chain"] ] + for i, pdu in enumerate(auth_chain): + auth_chain[i] = yield self._check_sigs_and_hash(pdu) + + # FIXME: We should handle signature failures more gracefully. + auth_chain.sort(key=lambda e: e.depth) defer.returnValue(auth_chain) @@ -260,6 +291,16 @@ class FederationClient(object): for p in content.get("auth_chain", []) ] + for i, pdu in enumerate(state): + state[i] = yield self._check_sigs_and_hash(pdu) + + # FIXME: We should handle signature failures more gracefully. + + for i, pdu in enumerate(auth_chain): + auth_chain[i] = yield self._check_sigs_and_hash(pdu) + + # FIXME: We should handle signature failures more gracefully. + auth_chain.sort(key=lambda e: e.depth) defer.returnValue({ @@ -281,7 +322,14 @@ class FederationClient(object): logger.debug("Got response to send_invite: %s", pdu_dict) - defer.returnValue(self.event_from_pdu_json(pdu_dict)) + pdu = self.event_from_pdu_json(pdu_dict) + + # Check signatures are correct. + pdu = yield self._check_sigs_and_hash(pdu) + + # FIXME: We should handle signature failures more gracefully. + + defer.returnValue(pdu) def event_from_pdu_json(self, pdu_json, outlier=False): event = FrozenEvent( @@ -291,3 +339,37 @@ class FederationClient(object): event.internal_metadata.outlier = outlier return event + + @defer.inlineCallbacks + def _check_sigs_and_hash(self, pdu): + """Throws a SynapseError if the PDU does not have the correct + signatures. + + Returns: + FrozenEvent: Either the given event or it redacted if it failed the + content hash check. + """ + # Check signatures are correct. + redacted_event = prune_event(pdu) + redacted_pdu_json = redacted_event.get_pdu_json() + + try: + yield self.keyring.verify_json_for_server( + pdu.origin, redacted_pdu_json + ) + except SynapseError: + logger.warn( + "Signature check failed for %s redacted to %s", + encode_canonical_json(pdu.get_pdu_json()), + encode_canonical_json(redacted_pdu_json), + ) + raise + + if not check_event_content_hash(pdu): + logger.warn( + "Event content has been tampered, redacting %s, %s", + pdu.event_id, encode_canonical_json(pdu.get_dict()) + ) + defer.returnValue(redacted_event) + + defer.returnValue(pdu) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 0597725ce7..fc5342afaa 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -21,6 +21,13 @@ from .units import Transaction, Edu from synapse.util.logutils import log_function from synapse.util.logcontext import PreserveLoggingContext from synapse.events import FrozenEvent +from synapse.events.utils import prune_event + +from syutil.jsonutil import encode_canonical_json + +from synapse.crypto.event_signing import check_event_content_hash + +from synapse.api.errors import FederationError, SynapseError import logging @@ -97,8 +104,10 @@ class FederationServer(object): response = yield self.transaction_actions.have_responded(transaction) if response: - logger.debug("[%s] We've already responed to this request", - transaction.transaction_id) + logger.debug( + "[%s] We've already responed to this request", + transaction.transaction_id + ) defer.returnValue(response) return @@ -253,6 +262,9 @@ class FederationServer(object): origin, pdu.event_id, do_auth=False ) + # FIXME: Currently we fetch an event again when we already have it + # if it has been marked as an outlier. + already_seen = ( existing and ( not existing.internal_metadata.is_outlier() @@ -264,14 +276,27 @@ class FederationServer(object): defer.returnValue({}) return + # Check signature. + try: + pdu = yield self._check_sigs_and_hash(pdu) + except SynapseError as e: + raise FederationError( + "ERROR", + e.code, + e.msg, + affected=pdu.event_id, + ) + state = None auth_chain = [] have_seen = yield self.store.have_events( - [e for e, _ in pdu.prev_events] + [ev for ev, _ in pdu.prev_events] ) + fetch_state = False + # Get missing pdus if necessary. if not pdu.internal_metadata.is_outlier(): # We only backfill backwards to the min depth. @@ -311,16 +336,20 @@ class FederationServer(object): except: # TODO(erikj): Do some more intelligent retries. logger.exception("Failed to get PDU") - else: - # We need to get the state at this event, since we have reached - # a backward extremity edge. - logger.debug( - "_handle_new_pdu getting state for %s", - pdu.room_id - ) - state, auth_chain = yield self.get_state_for_room( - origin, pdu.room_id, pdu.event_id, - ) + fetch_state = True + else: + fetch_state = True + + if fetch_state: + # We need to get the state at this event, since we haven't + # processed all the prev events. + logger.debug( + "_handle_new_pdu getting state for %s", + pdu.room_id + ) + state, auth_chain = yield self.get_state_for_room( + origin, pdu.room_id, pdu.event_id, + ) ret = yield self.handler.on_receive_pdu( origin, @@ -343,3 +372,37 @@ class FederationServer(object): event.internal_metadata.outlier = outlier return event + + @defer.inlineCallbacks + def _check_sigs_and_hash(self, pdu): + """Throws a SynapseError if the PDU does not have the correct + signatures. + + Returns: + FrozenEvent: Either the given event or it redacted if it failed the + content hash check. + """ + # Check signatures are correct. + redacted_event = prune_event(pdu) + redacted_pdu_json = redacted_event.get_pdu_json() + + try: + yield self.keyring.verify_json_for_server( + pdu.origin, redacted_pdu_json + ) + except SynapseError: + logger.warn( + "Signature check failed for %s redacted to %s", + encode_canonical_json(pdu.get_pdu_json()), + encode_canonical_json(redacted_pdu_json), + ) + raise + + if not check_event_content_hash(pdu): + logger.warn( + "Event content has been tampered, redacting %s, %s", + pdu.event_id, encode_canonical_json(pdu.get_dict()) + ) + defer.returnValue(redacted_event) + + defer.returnValue(pdu) diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 9ef4834927..e442c6c5d5 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -51,6 +51,8 @@ class ReplicationLayer(FederationClient, FederationServer): def __init__(self, hs, transport_layer): self.server_name = hs.hostname + self.keyring = hs.get_keyring() + self.transport_layer = transport_layer self.transport_layer.register_received_handler(self) self.transport_layer.register_request_handler(self) -- cgit 1.5.1 From 0ef5bfd6a9eaaae14e199997658b3d0006abd854 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 28 Jan 2015 16:16:53 +0000 Subject: Start implementing auth conflict res --- synapse/api/auth.py | 38 +++--- synapse/api/constants.py | 6 + synapse/federation/federation_client.py | 39 ++++++ synapse/handlers/federation.py | 211 ++++++++++++++++++++++++++------ synapse/storage/rejections.py | 10 ++ synapse/storage/schema/im.sql | 1 + 6 files changed, 253 insertions(+), 52 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index a342a0e0da..461faa8c78 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -353,9 +353,23 @@ class Auth(object): def add_auth_events(self, builder, context): yield run_on_reactor() - if builder.type == EventTypes.Create: - builder.auth_events = [] - return + auth_ids = self.compute_auth_events(builder, context) + + auth_events_entries = yield self.store.add_event_hashes( + auth_ids + ) + + builder.auth_events = auth_events_entries + + context.auth_events = { + k: v + for k, v in context.current_state.items() + if v.event_id in auth_ids + } + + def compute_auth_events(self, event, context): + if event.type == EventTypes.Create: + return [] auth_ids = [] @@ -368,7 +382,7 @@ class Auth(object): key = (EventTypes.JoinRules, "", ) join_rule_event = context.current_state.get(key) - key = (EventTypes.Member, builder.user_id, ) + key = (EventTypes.Member, event.user_id, ) member_event = context.current_state.get(key) key = (EventTypes.Create, "", ) @@ -382,8 +396,8 @@ class Auth(object): else: is_public = False - if builder.type == EventTypes.Member: - e_type = builder.content["membership"] + if event.type == EventTypes.Member: + e_type = event.content["membership"] if e_type in [Membership.JOIN, Membership.INVITE]: if join_rule_event: auth_ids.append(join_rule_event.event_id) @@ -398,17 +412,7 @@ class Auth(object): if member_event.content["membership"] == Membership.JOIN: auth_ids.append(member_event.event_id) - auth_events_entries = yield self.store.add_event_hashes( - auth_ids - ) - - builder.auth_events = auth_events_entries - - context.auth_events = { - k: v - for k, v in context.current_state.items() - if v.event_id in auth_ids - } + return auth_ids @log_function def _can_send_event(self, event, auth_events): diff --git a/synapse/api/constants.py b/synapse/api/constants.py index 7ee6dcc46e..0d3fc629af 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -74,3 +74,9 @@ class EventTypes(object): Message = "m.room.message" Topic = "m.room.topic" Name = "m.room.name" + + +class RejectedReason(object): + AUTH_ERROR = "auth_error" + REPLACED = "replaced" + NOT_ANCESTOR = "not_ancestor" diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 91b44cd8b3..ebcd593506 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -331,6 +331,45 @@ class FederationClient(object): defer.returnValue(pdu) + @defer.inlineCallbacks + def query_auth(self, destination, room_id, event_id, local_auth): + """ + Params: + destination (str) + event_it (str) + local_auth (list) + """ + time_now = self._clock.time_msec() + + send_content = { + "auth_chain": [e.get_pdu_json(time_now) for e in local_auth], + } + + code, content = yield self.transport_layer.send_invite( + destination=destination, + room_id=room_id, + event_id=event_id, + content=send_content, + ) + + auth_chain = [ + (yield self._check_sigs_and_hash(self.event_from_pdu_json(e))) + for e in content["auth_chain"] + ] + + missing = [ + (yield self._check_sigs_and_hash(self.event_from_pdu_json(e))) + for e in content.get("missing", []) + ] + + ret = { + "auth_chain": auth_chain, + "rejects": content.get("rejects", []), + "missing": missing, + } + + defer.returnValue(ret) + def event_from_pdu_json(self, pdu_json, outlier=False): event = FrozenEvent( pdu_json diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index bcdcc90a18..97e3c503b9 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -17,19 +17,16 @@ from ._base import BaseHandler -from synapse.events.utils import prune_event from synapse.api.errors import ( - AuthError, FederationError, SynapseError, StoreError, + AuthError, FederationError, StoreError, ) -from synapse.api.constants import EventTypes, Membership +from synapse.api.constants import EventTypes, Membership, RejectedReason from synapse.util.logutils import log_function from synapse.util.async import run_on_reactor from synapse.crypto.event_signing import ( - compute_event_signature, check_event_content_hash, - add_hashes_and_signatures, + compute_event_signature, add_hashes_and_signatures, ) from synapse.types import UserID -from syutil.jsonutil import encode_canonical_json from twisted.internet import defer @@ -113,33 +110,6 @@ class FederationHandler(BaseHandler): logger.debug("Processing event: %s", event.event_id) - redacted_event = prune_event(event) - - redacted_pdu_json = redacted_event.get_pdu_json() - try: - yield self.keyring.verify_json_for_server( - event.origin, redacted_pdu_json - ) - except SynapseError as e: - logger.warn( - "Signature check failed for %s redacted to %s", - encode_canonical_json(pdu.get_pdu_json()), - encode_canonical_json(redacted_pdu_json), - ) - raise FederationError( - "ERROR", - e.code, - e.msg, - affected=event.event_id, - ) - - if not check_event_content_hash(event): - logger.warn( - "Event content has been tampered, redacting %s, %s", - event.event_id, encode_canonical_json(event.get_dict()) - ) - event = redacted_event - logger.debug("Event: %s", event) # FIXME (erikj): Awful hack to make the case where we are not currently @@ -180,7 +150,6 @@ class FederationHandler(BaseHandler): if state: for e in state: - logging.info("A :) %r", e) e.internal_metadata.outlier = True try: yield self._handle_new_event(e) @@ -747,7 +716,20 @@ class FederationHandler(BaseHandler): event.event_id, event.signatures, ) - self.auth.check(event, auth_events=context.auth_events) + try: + self.auth.check(event, auth_events=context.auth_events) + except AuthError: + # TODO: Store rejection. + context.rejected = RejectedReason.AUTH_ERROR + + yield self.store.persist_event( + event, + context=context, + backfilled=backfilled, + is_new_state=False, + current_state=current_state, + ) + raise logger.debug( "_handle_new_event: Before persist_event: %s, sigs: %s", @@ -768,3 +750,162 @@ class FederationHandler(BaseHandler): ) defer.returnValue(context) + + @defer.inlineCallbacks + def do_auth(self, origin, event, context): + for e_id, _ in event.auth_events: + pass + + auth_events = set(e_id for e_id, _ in event.auth_events) + current_state = set(e.event_id for e in context.auth_events.values()) + + missing_auth = auth_events - current_state + + if missing_auth: + # Do auth conflict res. + + # 1. Get what we think is the auth chain. + auth_ids = self.auth.compute_auth_events(event, context) + local_auth_chain = yield self.store.get_auth_chain(auth_ids) + + # 2. Get remote difference. + result = yield self.replication_layer.query_auth( + origin, + event.room_id, + event.event_id, + local_auth_chain, + ) + + # 3. Process any remote auth chain events we haven't seen. + for e in result.get("missing", []): + # TODO. + pass + + # 4. Look at rejects and their proofs. + # TODO. + + try: + self.auth.check(event, auth_events=context.auth_events) + except AuthError: + raise + + @defer.inlineCallbacks + def construct_auth_difference(self, local_auth, remote_auth): + """ Given a local and remote auth chain, find the differences. This + assumes that we have already processed all events in remote_auth + + Params: + local_auth (list) + remote_auth (list) + + Returns: + dict + """ + + # TODO: Make sure we are OK with local_auth or remote_auth having more + # auth events in them than strictly necessary. + + def sort_fun(ev): + return ev.depth, ev.event_id + + # We find the differences by starting at the "bottom" of each list + # and iterating up on both lists. The lists are ordered by depth and + # then event_id, we iterate up both lists until we find the event ids + # don't match. Then we look at depth/event_id to see which side is + # missing that event, and iterate only up that list. Repeat. + + remote_list = list(remote_auth) + remote_list.sort(key=sort_fun) + + local_list = list(local_auth) + local_list.sort(key=sort_fun) + + local_iter = iter(local_list) + remote_iter = iter(remote_list) + + current_local = local_iter.next() + current_remote = remote_iter.next() + + def get_next(it, opt=None): + return it.next() if it.has_next() else opt + + missing_remotes = [] + missing_locals = [] + while current_local and current_remote: + if current_remote is None: + missing_locals.append(current_local) + current_local = get_next(local_iter) + continue + + if current_local is None: + missing_remotes.append(current_remote) + current_remote = get_next(remote_iter) + continue + + if current_local.event_id == current_remote.event_id: + current_local = get_next(local_iter) + current_remote = get_next(remote_iter) + continue + + if current_local.depth < current_remote.depth: + missing_locals.append(current_local) + current_local = get_next(local_iter) + continue + + if current_local.depth > current_remote.depth: + missing_remotes.append(current_remote) + current_remote = get_next(remote_iter) + continue + + # They have the same depth, so we fall back to the event_id order + if current_local.event_id < current_remote.event_id: + missing_locals.append(current_local) + current_local = get_next(local_iter) + + if current_local.event_id > current_remote.event_id: + missing_remotes.append(current_remote) + current_remote = get_next(remote_iter) + continue + + # missing locals should be sent to the server + # We should find why we are missing remotes, as they will have been + # rejected. + + # Remove events from missing_remotes if they are referencing a missing + # remote. We only care about the "root" rejected ones. + missing_remote_ids = [e.event_id for e in missing_remotes] + base_remote_rejected = list(missing_remotes) + for e in missing_remotes: + for e_id, _ in e.auth_events: + if e_id in missing_remote_ids: + base_remote_rejected.remove(e) + + reason_map = {} + + for e in base_remote_rejected: + reason = yield self.store.get_rejection_reason(e.event_id) + if reason is None: + # FIXME: ERRR?! + raise RuntimeError("") + + reason_map[e.event_id] = reason + + if reason == RejectedReason.AUTH_ERROR: + pass + elif reason == RejectedReason.REPLACED: + # TODO: Get proof + pass + elif reason == RejectedReason.NOT_ANCESTOR: + # TODO: Get proof. + pass + + defer.returnValue({ + "rejects": { + e.event_id: { + "reason": reason_map[e.event_id], + "proof": None, + } + for e in base_remote_rejected + }, + "missing": missing_locals, + }) diff --git a/synapse/storage/rejections.py b/synapse/storage/rejections.py index 7d38b31f44..b7249700d7 100644 --- a/synapse/storage/rejections.py +++ b/synapse/storage/rejections.py @@ -31,3 +31,13 @@ class RejectionsStore(SQLBaseStore): "last_failure": self._clock.time_msec(), } ) + + def get_rejection_reason(self, event_id): + self._simple_select_one_onecol( + table="rejections", + retcol="reason", + keyvalues={ + "event_id": event_id, + }, + allow_none=True, + ) diff --git a/synapse/storage/schema/im.sql b/synapse/storage/schema/im.sql index bc7c6b6ed5..5866a387f6 100644 --- a/synapse/storage/schema/im.sql +++ b/synapse/storage/schema/im.sql @@ -128,5 +128,6 @@ CREATE TABLE IF NOT EXISTS rejections( event_id TEXT NOT NULL, reason TEXT NOT NULL, last_check TEXT NOT NULL, + root_rejected TEXT, CONSTRAINT ev_id UNIQUE (event_id) ON CONFLICT REPLACE ); -- cgit 1.5.1 From 78015948a7febb18e000651f72f8f58830a55b93 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 29 Jan 2015 16:50:23 +0000 Subject: Initial implementation of auth conflict resolution --- synapse/events/utils.py | 6 +- synapse/federation/federation_client.py | 2 +- synapse/federation/federation_server.py | 33 +++++ synapse/federation/transport/client.py | 16 +++ synapse/federation/transport/server.py | 21 +++- synapse/handlers/federation.py | 207 ++++++++++++++++++++------------ synapse/storage/rejections.py | 4 +- tests/handlers/test_federation.py | 2 + 8 files changed, 210 insertions(+), 81 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/events/utils.py b/synapse/events/utils.py index bcb5457278..10a6b9f264 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -45,12 +45,14 @@ def prune_event(event): "membership", ] + event_dict = event.get_dict() + new_content = {} def add_fields(*fields): for field in fields: if field in event.content: - new_content[field] = event.content[field] + new_content[field] = event_dict["content"][field] if event_type == EventTypes.Member: add_fields("membership") @@ -75,7 +77,7 @@ def prune_event(event): allowed_fields = { k: v - for k, v in event.get_dict().items() + for k, v in event_dict.items() if k in allowed_keys } diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index ebcd593506..1173ca817b 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -345,7 +345,7 @@ class FederationClient(object): "auth_chain": [e.get_pdu_json(time_now) for e in local_auth], } - code, content = yield self.transport_layer.send_invite( + code, content = yield self.transport_layer.send_query_auth( destination=destination, room_id=room_id, event_id=event_id, diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index fc5342afaa..8cff4e6472 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -230,6 +230,39 @@ class FederationServer(object): "auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus], })) + @defer.inlineCallbacks + def on_query_auth_request(self, origin, content, event_id): + auth_chain = [ + (yield self._check_sigs_and_hash(self.event_from_pdu_json(e))) + for e in content["auth_chain"] + ] + + missing = [ + (yield self._check_sigs_and_hash(self.event_from_pdu_json(e))) + for e in content.get("missing", []) + ] + + ret = yield self.handler.on_query_auth( + origin, event_id, auth_chain, content.get("rejects", []), missing + ) + + time_now = self._clock.time_msec() + send_content = { + "auth_chain": [ + e.get_pdu_json(time_now) + for e in ret["auth_chain"] + ], + "rejects": content.get("rejects", []), + "missing": [ + e.get_pdu_json(time_now) + for e in ret.get("missing", []) + ], + } + + defer.returnValue( + (200, send_content) + ) + @log_function def _get_persisted_pdu(self, origin, event_id, do_auth=True): """ Get a PDU from the database with given origin and id. diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index e634a3a213..4cb1dea2de 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -213,3 +213,19 @@ class TransportLayerClient(object): ) defer.returnValue(response) + + @defer.inlineCallbacks + @log_function + def send_query_auth(self, destination, room_id, event_id, content): + path = PREFIX + "/query_auth/%s/%s" % (room_id, event_id) + + code, content = yield self.client.post_json( + destination=destination, + path=path, + data=content, + ) + + if not 200 <= code < 300: + raise RuntimeError("Got %d from send_invite", code) + + defer.returnValue(json.loads(content)) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index a380a6910b..9c9f8d525b 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -42,7 +42,7 @@ class TransportLayerServer(object): content = None origin = None - if request.method == "PUT": + if request.method in ["PUT", "POST"]: # TODO: Handle other method types? other content types? try: content_bytes = request.content.read() @@ -234,6 +234,16 @@ class TransportLayerServer(object): ) ) ) + self.server.register_path( + "POST", + re.compile("^" + PREFIX + "/query_auth/([^/]*)/([^/]*)$"), + self._with_authentication( + lambda origin, content, query, context, event_id: + self._on_query_auth_request( + origin, content, event_id, + ) + ) + ) @defer.inlineCallbacks @log_function @@ -325,3 +335,12 @@ class TransportLayerServer(object): ) defer.returnValue((200, content)) + + @defer.inlineCallbacks + @log_function + def _on_query_auth_request(self, origin, content, event_id): + new_content = yield self.request_handler.on_query_auth_request( + origin, content, event_id + ) + + defer.returnValue((200, new_content)) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 97e3c503b9..14c26d8cea 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -126,7 +126,7 @@ class FederationHandler(BaseHandler): if not state: state, auth_chain = yield replication.get_state_for_room( - origin, context=event.room_id, event_id=event.event_id, + origin, room_id=event.room_id, event_id=event.event_id, ) if not auth_chain: @@ -139,7 +139,7 @@ class FederationHandler(BaseHandler): for e in auth_chain: e.internal_metadata.outlier = True try: - yield self._handle_new_event(e, fetch_auth_from=origin) + yield self._handle_new_event(origin, e) except: logger.exception( "Failed to handle auth event %s", @@ -152,7 +152,7 @@ class FederationHandler(BaseHandler): for e in state: e.internal_metadata.outlier = True try: - yield self._handle_new_event(e) + yield self._handle_new_event(origin, e) except: logger.exception( "Failed to handle state event %s", @@ -161,6 +161,7 @@ class FederationHandler(BaseHandler): try: yield self._handle_new_event( + origin, event, state=state, backfilled=backfilled, @@ -363,7 +364,14 @@ class FederationHandler(BaseHandler): for e in auth_chain: e.internal_metadata.outlier = True try: - yield self._handle_new_event(e) + auth_ids = [e_id for e_id, _ in e.auth_events] + auth = { + (e.type, e.state_key): e for e in auth_chain + if e.event_id in auth_ids + } + yield self._handle_new_event( + target_host, e, auth_events=auth + ) except: logger.exception( "Failed to handle auth event %s", @@ -374,8 +382,13 @@ class FederationHandler(BaseHandler): # FIXME: Auth these. e.internal_metadata.outlier = True try: + auth_ids = [e_id for e_id, _ in e.auth_events] + auth = { + (e.type, e.state_key): e for e in auth_chain + if e.event_id in auth_ids + } yield self._handle_new_event( - e, fetch_auth_from=target_host + target_host, e, auth_events=auth ) except: logger.exception( @@ -384,6 +397,7 @@ class FederationHandler(BaseHandler): ) yield self._handle_new_event( + target_host, new_event, state=state, current_state=state, @@ -450,7 +464,7 @@ class FederationHandler(BaseHandler): event.internal_metadata.outlier = False - context = yield self._handle_new_event(event) + context = yield self._handle_new_event(origin, event) logger.debug( "on_send_join_request: After _handle_new_event: %s, sigs: %s", @@ -651,11 +665,12 @@ class FederationHandler(BaseHandler): waiters.pop().callback(None) @defer.inlineCallbacks - def _handle_new_event(self, event, state=None, backfilled=False, - current_state=None, fetch_auth_from=None): + @log_function + def _handle_new_event(self, origin, event, state=None, backfilled=False, + current_state=None, auth_events=None): logger.debug( - "_handle_new_event: Before annotate: %s, sigs: %s", + "_handle_new_event: %s, sigs: %s", event.event_id, event.signatures, ) @@ -663,62 +678,34 @@ class FederationHandler(BaseHandler): event, old_state=state ) + if not auth_events: + auth_events = context.auth_events + logger.debug( - "_handle_new_event: Before auth fetch: %s, sigs: %s", - event.event_id, event.signatures, + "_handle_new_event: %s, auth_events: %s", + event.event_id, auth_events, ) is_new_state = not event.internal_metadata.is_outlier() - known_ids = set( - [s.event_id for s in context.auth_events.values()] - ) - - for e_id, _ in event.auth_events: - if e_id not in known_ids: - e = yield self.store.get_event(e_id, allow_none=True) - - if not e and fetch_auth_from is not None: - # Grab the auth_chain over federation if we are missing - # auth events. - auth_chain = yield self.replication_layer.get_event_auth( - fetch_auth_from, event.event_id, event.room_id - ) - for auth_event in auth_chain: - yield self._handle_new_event(auth_event) - e = yield self.store.get_event(e_id, allow_none=True) - - if not e: - # TODO: Do some conflict res to make sure that we're - # not the ones who are wrong. - logger.info( - "Rejecting %s as %s not in db or %s", - event.event_id, e_id, known_ids, - ) - # FIXME: How does raising AuthError work with federation? - raise AuthError(403, "Cannot find auth event") - - context.auth_events[(e.type, e.state_key)] = e - - logger.debug( - "_handle_new_event: Before hack: %s, sigs: %s", - event.event_id, event.signatures, - ) - + # This is a hack to fix some old rooms where the initial join event + # didn't reference the create event in its auth events. if event.type == EventTypes.Member and not event.auth_events: if len(event.prev_events) == 1: c = yield self.store.get_event(event.prev_events[0][0]) if c.type == EventTypes.Create: - context.auth_events[(c.type, c.state_key)] = c - - logger.debug( - "_handle_new_event: Before auth check: %s, sigs: %s", - event.event_id, event.signatures, - ) + auth_events[(c.type, c.state_key)] = c try: - self.auth.check(event, auth_events=context.auth_events) - except AuthError: + yield self.do_auth( + origin, event, context, auth_events=auth_events + ) + except AuthError as e: + logger.warn( + "Rejecting %s because %s", + event.event_id, e.msg + ) + # TODO: Store rejection. context.rejected = RejectedReason.AUTH_ERROR @@ -731,11 +718,6 @@ class FederationHandler(BaseHandler): ) raise - logger.debug( - "_handle_new_event: Before persist_event: %s, sigs: %s", - event.event_id, event.signatures, - ) - yield self.store.persist_event( event, context=context, @@ -744,25 +726,73 @@ class FederationHandler(BaseHandler): current_state=current_state, ) - logger.debug( - "_handle_new_event: After persist_event: %s, sigs: %s", - event.event_id, event.signatures, + defer.returnValue(context) + + @defer.inlineCallbacks + def on_query_auth(self, origin, event_id, remote_auth_chain, rejects, + missing): + # Just go through and process each event in `remote_auth_chain`. We + # don't want to fall into the trap of `missing` being wrong. + for e in remote_auth_chain: + try: + yield self._handle_new_event(origin, e) + except AuthError: + pass + + # Now get the current auth_chain for the event. + local_auth_chain = yield self.store.get_auth_chain([event_id]) + + # TODO: Check if we would now reject event_id. If so we need to tell + # everyone. + + ret = yield self.construct_auth_difference( + local_auth_chain, remote_auth_chain ) - defer.returnValue(context) + logger.debug("on_query_auth reutrning: %s", ret) + + defer.returnValue(ret) @defer.inlineCallbacks - def do_auth(self, origin, event, context): - for e_id, _ in event.auth_events: - pass + @log_function + def do_auth(self, origin, event, context, auth_events): + # Check if we have all the auth events. + res = yield self.store.have_events( + [e_id for e_id, _ in event.auth_events] + ) - auth_events = set(e_id for e_id, _ in event.auth_events) - current_state = set(e.event_id for e in context.auth_events.values()) + event_auth_events = set(e_id for e_id, _ in event.auth_events) + seen_events = set(res.keys()) - missing_auth = auth_events - current_state + missing_auth = event_auth_events - seen_events if missing_auth: + logger.debug("Missing auth: %s", missing_auth) + # If we don't have all the auth events, we need to get them. + remote_auth_chain = yield self.replication_layer.get_event_auth( + origin, event.room_id, event.event_id + ) + + for e in remote_auth_chain: + try: + auth_ids = [e_id for e_id, _ in e.auth_events] + auth = { + (e.type, e.state_key): e for e in remote_auth_chain + if e.event_id in auth_ids + } + yield self._handle_new_event( + origin, e, auth_events=auth + ) + auth_events[(e.type, e.state_key)] = e + except AuthError: + pass + + current_state = set(e.event_id for e in auth_events.values()) + different_auth = event_auth_events - current_state + + if different_auth and not event.internal_metadata.is_outlier(): # Do auth conflict res. + logger.debug("Different auth: %s", different_auth) # 1. Get what we think is the auth chain. auth_ids = self.auth.compute_auth_events(event, context) @@ -778,14 +808,24 @@ class FederationHandler(BaseHandler): # 3. Process any remote auth chain events we haven't seen. for e in result.get("missing", []): - # TODO. - pass + try: + auth_ids = [e_id for e_id, _ in e.auth_events] + auth = { + (e.type, e.state_key): e for e in result["auth_chain"] + if e.event_id in auth_ids + } + yield self._handle_new_event( + origin, e, auth_events=auth + ) + auth_events[(e.type, e.state_key)] = e + except AuthError: + pass # 4. Look at rejects and their proofs. # TODO. try: - self.auth.check(event, auth_events=context.auth_events) + self.auth.check(event, auth_events=auth_events) except AuthError: raise @@ -802,12 +842,16 @@ class FederationHandler(BaseHandler): dict """ + logger.debug("construct_auth_difference Start!") + # TODO: Make sure we are OK with local_auth or remote_auth having more # auth events in them than strictly necessary. def sort_fun(ev): return ev.depth, ev.event_id + logger.debug("construct_auth_difference after sort_fun!") + # We find the differences by starting at the "bottom" of each list # and iterating up on both lists. The lists are ordered by depth and # then event_id, we iterate up both lists until we find the event ids @@ -823,11 +867,18 @@ class FederationHandler(BaseHandler): local_iter = iter(local_list) remote_iter = iter(remote_list) - current_local = local_iter.next() - current_remote = remote_iter.next() + logger.debug("construct_auth_difference before get_next!") def get_next(it, opt=None): - return it.next() if it.has_next() else opt + try: + return it.next() + except: + return opt + + current_local = get_next(local_iter) + current_remote = get_next(remote_iter) + + logger.debug("construct_auth_difference before while") missing_remotes = [] missing_locals = [] @@ -867,6 +918,8 @@ class FederationHandler(BaseHandler): current_remote = get_next(remote_iter) continue + logger.debug("construct_auth_difference after while") + # missing locals should be sent to the server # We should find why we are missing remotes, as they will have been # rejected. @@ -886,6 +939,7 @@ class FederationHandler(BaseHandler): reason = yield self.store.get_rejection_reason(e.event_id) if reason is None: # FIXME: ERRR?! + logger.warn("Could not find reason for %s", e.event_id) raise RuntimeError("") reason_map[e.event_id] = reason @@ -899,7 +953,10 @@ class FederationHandler(BaseHandler): # TODO: Get proof. pass + logger.debug("construct_auth_difference returning") + defer.returnValue({ + "auth_chain": local_auth, "rejects": { e.event_id: { "reason": reason_map[e.event_id], diff --git a/synapse/storage/rejections.py b/synapse/storage/rejections.py index b7249700d7..4e1a9a2783 100644 --- a/synapse/storage/rejections.py +++ b/synapse/storage/rejections.py @@ -28,12 +28,12 @@ class RejectionsStore(SQLBaseStore): values={ "event_id": event_id, "reason": reason, - "last_failure": self._clock.time_msec(), + "last_check": self._clock.time_msec(), } ) def get_rejection_reason(self, event_id): - self._simple_select_one_onecol( + return self._simple_select_one_onecol( table="rejections", retcol="reason", keyvalues={ diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py index ed21defd13..44dbce6bea 100644 --- a/tests/handlers/test_federation.py +++ b/tests/handlers/test_federation.py @@ -52,6 +52,7 @@ class FederationTestCase(unittest.TestCase): "get_room", "get_destination_retry_timings", "set_destination_retry_timings", + "have_events", ]), resource_for_federation=NonCallableMock(), http_client=NonCallableMock(spec_set=[]), @@ -90,6 +91,7 @@ class FederationTestCase(unittest.TestCase): self.datastore.persist_event.return_value = defer.succeed(None) self.datastore.get_room.return_value = defer.succeed(True) self.auth.check_host_in_room.return_value = defer.succeed(True) + self.datastore.have_events.return_value = defer.succeed({}) def annotate(ev, old_state=None): context = Mock() -- cgit 1.5.1 From c1d860870b418b9583078022d0babe557b73760f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 30 Jan 2015 10:48:47 +0000 Subject: Fix regression where we no longer correctly handled the case of gaps in our event graph --- synapse/federation/federation_server.py | 3 +++ synapse/handlers/federation.py | 8 ++++++-- 2 files changed, 9 insertions(+), 2 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 8cff4e6472..845a07a3a3 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -366,10 +366,13 @@ class FederationServer(object): logger.debug("Processed pdu %s", event_id) else: logger.warn("Failed to get PDU %s", event_id) + fetch_state = True except: # TODO(erikj): Do some more intelligent retries. logger.exception("Failed to get PDU") fetch_state = True + else: + fetch_state = True else: fetch_state = True diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 14c26d8cea..de47a97e6b 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -119,7 +119,7 @@ class FederationHandler(BaseHandler): event.room_id, self.server_name ) - if not is_in_room and not event.internal_metadata.outlier: + if not is_in_room and not event.internal_metadata.is_outlier(): logger.debug("Got event for room we're not in.") replication = self.replication_layer @@ -780,6 +780,7 @@ class FederationHandler(BaseHandler): (e.type, e.state_key): e for e in remote_auth_chain if e.event_id in auth_ids } + e.internal_metadata.outlier = True yield self._handle_new_event( origin, e, auth_events=auth ) @@ -787,6 +788,8 @@ class FederationHandler(BaseHandler): except AuthError: pass + # FIXME: Assumes we have and stored all the state for all the + # prev_events current_state = set(e.event_id for e in auth_events.values()) different_auth = event_auth_events - current_state @@ -814,6 +817,7 @@ class FederationHandler(BaseHandler): (e.type, e.state_key): e for e in result["auth_chain"] if e.event_id in auth_ids } + e.internal_metadata.outlier = True yield self._handle_new_event( origin, e, auth_events=auth ) @@ -882,7 +886,7 @@ class FederationHandler(BaseHandler): missing_remotes = [] missing_locals = [] - while current_local and current_remote: + while current_local or current_remote: if current_remote is None: missing_locals.append(current_local) current_local = get_next(local_iter) -- cgit 1.5.1 From a70a801184814d116ed5b10a952e17c45df7bfc8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 30 Jan 2015 13:34:01 +0000 Subject: Fix bug where we superfluously asked for current state. Change API of /query_auth/ so that we don't duplicate events in the response. --- synapse/api/auth.py | 2 ++ synapse/federation/federation_client.py | 7 +---- synapse/federation/federation_server.py | 12 ++++---- synapse/handlers/federation.py | 51 ++++++++++++--------------------- synapse/state.py | 20 ++++++++++--- 5 files changed, 43 insertions(+), 49 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 3471afd7e7..37e31d2b6f 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -102,6 +102,8 @@ class Auth(object): def check_host_in_room(self, room_id, host): curr_state = yield self.state.get_current_state(room_id) + logger.debug("Got curr_state %s", curr_state) + for event in curr_state: if event.type == EventTypes.Member: try: diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 1173ca817b..e1539bd0e0 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -357,15 +357,10 @@ class FederationClient(object): for e in content["auth_chain"] ] - missing = [ - (yield self._check_sigs_and_hash(self.event_from_pdu_json(e))) - for e in content.get("missing", []) - ] - ret = { "auth_chain": auth_chain, "rejects": content.get("rejects", []), - "missing": missing, + "missing": content.get("missing", []), } defer.returnValue(ret) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 845a07a3a3..84ed0a0ba0 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -252,11 +252,8 @@ class FederationServer(object): e.get_pdu_json(time_now) for e in ret["auth_chain"] ], - "rejects": content.get("rejects", []), - "missing": [ - e.get_pdu_json(time_now) - for e in ret.get("missing", []) - ], + "rejects": ret.get("rejects", []), + "missing": ret.get("missing", []), } defer.returnValue( @@ -372,7 +369,10 @@ class FederationServer(object): logger.exception("Failed to get PDU") fetch_state = True else: - fetch_state = True + prevs = {e_id for e_id, _ in pdu.prev_events} + seen = set(have_seen.keys()) + if prevs - seen: + fetch_state = True else: fetch_state = True diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index cc22f21cd1..35cad4182a 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -121,38 +121,18 @@ class FederationHandler(BaseHandler): ) if not is_in_room and not event.internal_metadata.is_outlier(): logger.debug("Got event for room we're not in.") - - replication = self.replication_layer - - if not state: - state, auth_chain = yield replication.get_state_for_room( - origin, room_id=event.room_id, event_id=event.event_id, - ) - - if not auth_chain: - auth_chain = yield replication.get_event_auth( - origin, - context=event.room_id, - event_id=event.event_id, - ) - - for e in auth_chain: - e.internal_metadata.outlier = True - try: - yield self._handle_new_event(origin, e) - except: - logger.exception( - "Failed to handle auth event %s", - e.event_id, - ) - current_state = state - if state: + if state and auth_chain is not None: for e in state: e.internal_metadata.outlier = True try: - yield self._handle_new_event(origin, e) + auth_ids = [e_id for e_id, _ in e.auth_events] + auth = { + (e.type, e.state_key): e for e in auth_chain + if e.event_id in auth_ids + } + yield self._handle_new_event(origin, e, auth_events=auth) except: logger.exception( "Failed to handle state event %s", @@ -809,18 +789,23 @@ class FederationHandler(BaseHandler): ) # 3. Process any remote auth chain events we haven't seen. - for e in result.get("missing", []): + for missing_id in result.get("missing", []): try: - auth_ids = [e_id for e_id, _ in e.auth_events] + for e in result["auth_chain"]: + if e.event_id == missing_id: + ev = e + break + + auth_ids = [e_id for e_id, _ in ev.auth_events] auth = { (e.type, e.state_key): e for e in result["auth_chain"] if e.event_id in auth_ids } - e.internal_metadata.outlier = True + ev.internal_metadata.outlier = True yield self._handle_new_event( - origin, e, auth_events=auth + origin, ev, auth_events=auth ) - auth_events[(e.type, e.state_key)] = e + auth_events[(ev.type, ev.state_key)] = ev except AuthError: pass @@ -970,5 +955,5 @@ class FederationHandler(BaseHandler): } for e in base_remote_rejected }, - "missing": missing_locals, + "missing": [e.event_id for e in missing_locals], }) diff --git a/synapse/state.py b/synapse/state.py index d9fdfb34be..e6632978b5 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -166,10 +166,17 @@ class StateHandler(object): first is the name of a state group if one and only one is involved, otherwise `None`. """ + logger.debug("resolve_state_groups event_ids %s", event_ids) + state_groups = yield self.store.get_state_groups( event_ids ) + logger.debug( + "resolve_state_groups state_groups %s", + state_groups.keys() + ) + group_names = set(state_groups.keys()) if len(group_names) == 1: name, state_list = state_groups.items().pop() @@ -205,6 +212,15 @@ class StateHandler(object): if len(v.values()) > 1 } + logger.debug( + "resolve_state_groups Unconflicted state: %s", + unconflicted_state.values(), + ) + logger.debug( + "resolve_state_groups Conflicted state: %s", + conflicted_state.values(), + ) + if event_type: prev_states_events = conflicted_state.get( (event_type, state_key), [] @@ -240,10 +256,6 @@ class StateHandler(object): 1. power levels 2. memberships 3. other events. - - :param conflicted_state: - :param auth_events: - :return: """ resolved_state = {} power_key = (EventTypes.PowerLevels, "") -- cgit 1.5.1 From 776ac820f9bb7f0e9c2fae9facbee05b0132079e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 30 Jan 2015 15:58:28 +0000 Subject: Briefly doc structure of query_auth API. --- synapse/federation/federation_server.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 84ed0a0ba0..5fbd8b19de 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -232,6 +232,24 @@ class FederationServer(object): @defer.inlineCallbacks def on_query_auth_request(self, origin, content, event_id): + """ + Content is a dict with keys:: + auth_chain (list): A list of events that give the auth chain. + missing (list): A list of event_ids indicating what the other + side (`origin`) think we're missing. + rejects (dict): A mapping from event_id to a 2-tuple of reason + string and a proof (or None) of why the event was rejected. + The keys of this dict give the list of events the `origin` has + rejected. + + Args: + origin (str) + content (dict) + event_id (str) + + Returns: + Deferred: Results in `dict` with the same format as `content` + """ auth_chain = [ (yield self._check_sigs_and_hash(self.event_from_pdu_json(e))) for e in content["auth_chain"] -- cgit 1.5.1 From 941f59101b51e9225dbdc38b22110a01de194242 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 2 Feb 2015 16:56:01 +0000 Subject: Don't fail an entire request if one of the returned events fails a signature check. If an event does fail a signature check, look in the local database and request it from the originator. --- synapse/federation/federation_client.py | 107 ++++++++++++++++++++++++-------- synapse/storage/__init__.py | 21 ++++--- 2 files changed, 94 insertions(+), 34 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index e1539bd0e0..b809e935a0 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -224,17 +224,17 @@ class FederationClient(object): for p in result.get("auth_chain", []) ] - for i, pdu in enumerate(pdus): - pdus[i] = yield self._check_sigs_and_hash(pdu) - - # FIXME: We should handle signature failures more gracefully. + signed_pdus = yield self._check_sigs_and_hash_and_fetch( + pdus, outlier=True + ) - for i, pdu in enumerate(auth_chain): - auth_chain[i] = yield self._check_sigs_and_hash(pdu) + signed_auth = yield self._check_sigs_and_hash_and_fetch( + auth_chain, outlier=True + ) - # FIXME: We should handle signature failures more gracefully. + signed_auth.sort(key=lambda e: e.depth) - defer.returnValue((pdus, auth_chain)) + defer.returnValue((signed_pdus, signed_auth)) @defer.inlineCallbacks @log_function @@ -248,14 +248,13 @@ class FederationClient(object): for p in res["auth_chain"] ] - for i, pdu in enumerate(auth_chain): - auth_chain[i] = yield self._check_sigs_and_hash(pdu) - - # FIXME: We should handle signature failures more gracefully. + signed_auth = yield self._check_sigs_and_hash_and_fetch( + auth_chain, outlier=True + ) - auth_chain.sort(key=lambda e: e.depth) + signed_auth.sort(key=lambda e: e.depth) - defer.returnValue(auth_chain) + defer.returnValue(signed_auth) @defer.inlineCallbacks def make_join(self, destination, room_id, user_id): @@ -291,21 +290,19 @@ class FederationClient(object): for p in content.get("auth_chain", []) ] - for i, pdu in enumerate(state): - state[i] = yield self._check_sigs_and_hash(pdu) - - # FIXME: We should handle signature failures more gracefully. - - for i, pdu in enumerate(auth_chain): - auth_chain[i] = yield self._check_sigs_and_hash(pdu) + signed_state = yield self._check_sigs_and_hash_and_fetch( + state, outlier=True + ) - # FIXME: We should handle signature failures more gracefully. + signed_auth = yield self._check_sigs_and_hash_and_fetch( + auth_chain, outlier=True + ) auth_chain.sort(key=lambda e: e.depth) defer.returnValue({ - "state": state, - "auth_chain": auth_chain, + "state": signed_state, + "auth_chain": signed_auth, }) @defer.inlineCallbacks @@ -353,12 +350,18 @@ class FederationClient(object): ) auth_chain = [ - (yield self._check_sigs_and_hash(self.event_from_pdu_json(e))) + self.event_from_pdu_json(e) for e in content["auth_chain"] ] + signed_auth = yield self._check_sigs_and_hash_and_fetch( + auth_chain, outlier=True + ) + + signed_auth.sort(key=lambda e: e.depth) + ret = { - "auth_chain": auth_chain, + "auth_chain": signed_auth, "rejects": content.get("rejects", []), "missing": content.get("missing", []), } @@ -374,6 +377,58 @@ class FederationClient(object): return event + @defer.inlineCallbacks + def _check_sigs_and_hash_and_fetch(self, pdus, outlier=False): + """Takes a list of PDUs and checks the signatures and hashs of each + one. If a PDU fails its signature check then we check if we have it in + the database and if not then request if from the originating server of + that PDU. + + If a PDU fails its content hash check then it is redacted. + + The given list of PDUs are not modified, instead the function returns + a new list. + + Args: + pdu (list) + outlier (bool) + + Returns: + Deferred : A list of PDUs that have valid signatures and hashes. + """ + signed_pdus = [] + for pdu in pdus: + try: + new_pdu = yield self._check_sigs_and_hash(pdu) + signed_pdus.append(new_pdu) + except SynapseError: + # FIXME: We should handle signature failures more gracefully. + + # Check local db. + new_pdu = yield self.store.get_event( + pdu.event_id, + allow_rejected=True + ) + if new_pdu: + signed_pdus.append(new_pdu) + continue + + # Check pdu.origin + new_pdu = yield self.get_pdu( + destinations=[pdu.origin], + event_id=pdu.event_id, + outlier=outlier, + ) + + if new_pdu: + signed_pdus.append(new_pdu) + continue + + logger.warn("Failed to find copy of %s with valid signature") + + defer.returnValue(signed_pdus) + + @defer.inlineCallbacks def _check_sigs_and_hash(self, pdu): """Throws a SynapseError if the PDU does not have the correct diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 7c54b1b9d3..b4a7a3f068 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -128,16 +128,21 @@ class DataStore(RoomMemberStore, RoomStore, pass @defer.inlineCallbacks - def get_event(self, event_id, allow_none=False): - events = yield self._get_events([event_id]) + def get_event(self, event_id, check_redacted=True, + get_prev_content=False, allow_rejected=False, + allow_none=False): + event = yield self.runInteraction( + "get_event", self._get_event_txn, + event_id, + check_redacted=check_redacted, + get_prev_content=get_prev_content, + allow_rejected=allow_rejected, + ) - if not events: - if allow_none: - defer.returnValue(None) - else: - raise RuntimeError("Could not find event %s" % (event_id,)) + if not event and not allow_none: + raise RuntimeError("Could not find event %s" % (event_id,)) - defer.returnValue(events[0]) + defer.returnValue(event) @log_function def _persist_event_txn(self, txn, event, context, backfilled, -- cgit 1.5.1 From 40c6fe1b81e4d92cba797b0c966fd774e2a60a28 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 2 Feb 2015 17:06:37 +0000 Subject: Don't bother requesting PDUs with bad signatures from the same server --- synapse/federation/federation_client.py | 33 +++++++++++++++++---------------- 1 file changed, 17 insertions(+), 16 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index b809e935a0..f87e84db73 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -225,11 +225,11 @@ class FederationClient(object): ] signed_pdus = yield self._check_sigs_and_hash_and_fetch( - pdus, outlier=True + destination, pdus, outlier=True ) signed_auth = yield self._check_sigs_and_hash_and_fetch( - auth_chain, outlier=True + destination, auth_chain, outlier=True ) signed_auth.sort(key=lambda e: e.depth) @@ -249,7 +249,7 @@ class FederationClient(object): ] signed_auth = yield self._check_sigs_and_hash_and_fetch( - auth_chain, outlier=True + destination, auth_chain, outlier=True ) signed_auth.sort(key=lambda e: e.depth) @@ -291,11 +291,11 @@ class FederationClient(object): ] signed_state = yield self._check_sigs_and_hash_and_fetch( - state, outlier=True + destination, state, outlier=True ) signed_auth = yield self._check_sigs_and_hash_and_fetch( - auth_chain, outlier=True + destination, auth_chain, outlier=True ) auth_chain.sort(key=lambda e: e.depth) @@ -355,7 +355,7 @@ class FederationClient(object): ] signed_auth = yield self._check_sigs_and_hash_and_fetch( - auth_chain, outlier=True + destination, auth_chain, outlier=True ) signed_auth.sort(key=lambda e: e.depth) @@ -378,7 +378,7 @@ class FederationClient(object): return event @defer.inlineCallbacks - def _check_sigs_and_hash_and_fetch(self, pdus, outlier=False): + def _check_sigs_and_hash_and_fetch(self, origin, pdus, outlier=False): """Takes a list of PDUs and checks the signatures and hashs of each one. If a PDU fails its signature check then we check if we have it in the database and if not then request if from the originating server of @@ -414,15 +414,16 @@ class FederationClient(object): continue # Check pdu.origin - new_pdu = yield self.get_pdu( - destinations=[pdu.origin], - event_id=pdu.event_id, - outlier=outlier, - ) - - if new_pdu: - signed_pdus.append(new_pdu) - continue + if pdu.origin != origin: + new_pdu = yield self.get_pdu( + destinations=[pdu.origin], + event_id=pdu.event_id, + outlier=outlier, + ) + + if new_pdu: + signed_pdus.append(new_pdu) + continue logger.warn("Failed to find copy of %s with valid signature") -- cgit 1.5.1 From 0f48e22ef66ff8a34d4af13c25e20a461c8a8390 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Feb 2015 10:43:29 +0000 Subject: PEP8 --- synapse/federation/federation_client.py | 1 - 1 file changed, 1 deletion(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index f87e84db73..9ceb66e6f4 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -429,7 +429,6 @@ class FederationClient(object): defer.returnValue(signed_pdus) - @defer.inlineCallbacks def _check_sigs_and_hash(self, pdu): """Throws a SynapseError if the PDU does not have the correct -- cgit 1.5.1 From 0dd3aea319c13e66eb1d75b5b8a196032ee332b7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Feb 2015 14:58:30 +0000 Subject: Keep around the old (buggy) version of the prune_event function so that we can use it to check signatures for events on old servers --- synapse/api/auth.py | 2 - synapse/events/utils.py | 79 +++++++++++++++++++++++++++ synapse/federation/federation_client.py | 96 +-------------------------------- synapse/federation/federation_server.py | 52 ++++-------------- 4 files changed, 92 insertions(+), 137 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 37e31d2b6f..3471afd7e7 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -102,8 +102,6 @@ class Auth(object): def check_host_in_room(self, room_id, host): curr_state = yield self.state.get_current_state(room_id) - logger.debug("Got curr_state %s", curr_state) - for event in curr_state: if event.type == EventTypes.Member: try: diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 1aa952150e..65a9f70982 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -94,6 +94,85 @@ def prune_event(event): ) +def old_prune_event(event): + """This is an old and buggy version of the prune event function. The + difference between this and the new version is that when including dicts + in the content they were included as frozen_dicts rather than dicts. This + caused the JSON encoder to encode as a list of the keys rather than the + dict. + """ + event_type = event.type + + allowed_keys = [ + "event_id", + "sender", + "room_id", + "hashes", + "signatures", + "content", + "type", + "state_key", + "depth", + "prev_events", + "prev_state", + "auth_events", + "origin", + "origin_server_ts", + "membership", + ] + + event_dict = event.get_dict() + + new_content = {} + + def add_fields(*fields): + for field in fields: + if field in event.content: + # This is the line that is buggy: event.content may return + # a frozen_dict which the json encoders encode as lists rather + # than dicts. + new_content[field] = event.content[field] + + if event_type == EventTypes.Member: + add_fields("membership") + elif event_type == EventTypes.Create: + add_fields("creator") + elif event_type == EventTypes.JoinRules: + add_fields("join_rule") + elif event_type == EventTypes.PowerLevels: + add_fields( + "users", + "users_default", + "events", + "events_default", + "events_default", + "state_default", + "ban", + "kick", + "redact", + ) + elif event_type == EventTypes.Aliases: + add_fields("aliases") + + allowed_fields = { + k: v + for k, v in event_dict.items() + if k in allowed_keys + } + + allowed_fields["content"] = new_content + + allowed_fields["unsigned"] = {} + + if "age_ts" in event.unsigned: + allowed_fields["unsigned"]["age_ts"] = event.unsigned["age_ts"] + + return type(event)( + allowed_fields, + internal_metadata_dict=event.internal_metadata.get_dict() + ) + + def format_event_raw(d): return d diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 9ceb66e6f4..5fac629709 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -16,17 +16,11 @@ from twisted.internet import defer +from .federation_base import FederationBase from .units import Edu from synapse.util.logutils import log_function from synapse.events import FrozenEvent -from synapse.events.utils import prune_event - -from syutil.jsonutil import encode_canonical_json - -from synapse.crypto.event_signing import check_event_content_hash - -from synapse.api.errors import SynapseError import logging @@ -34,7 +28,7 @@ import logging logger = logging.getLogger(__name__) -class FederationClient(object): +class FederationClient(FederationBase): @log_function def send_pdu(self, pdu, destinations): """Informs the replication layer about a new PDU generated within the @@ -376,89 +370,3 @@ class FederationClient(object): event.internal_metadata.outlier = outlier return event - - @defer.inlineCallbacks - def _check_sigs_and_hash_and_fetch(self, origin, pdus, outlier=False): - """Takes a list of PDUs and checks the signatures and hashs of each - one. If a PDU fails its signature check then we check if we have it in - the database and if not then request if from the originating server of - that PDU. - - If a PDU fails its content hash check then it is redacted. - - The given list of PDUs are not modified, instead the function returns - a new list. - - Args: - pdu (list) - outlier (bool) - - Returns: - Deferred : A list of PDUs that have valid signatures and hashes. - """ - signed_pdus = [] - for pdu in pdus: - try: - new_pdu = yield self._check_sigs_and_hash(pdu) - signed_pdus.append(new_pdu) - except SynapseError: - # FIXME: We should handle signature failures more gracefully. - - # Check local db. - new_pdu = yield self.store.get_event( - pdu.event_id, - allow_rejected=True - ) - if new_pdu: - signed_pdus.append(new_pdu) - continue - - # Check pdu.origin - if pdu.origin != origin: - new_pdu = yield self.get_pdu( - destinations=[pdu.origin], - event_id=pdu.event_id, - outlier=outlier, - ) - - if new_pdu: - signed_pdus.append(new_pdu) - continue - - logger.warn("Failed to find copy of %s with valid signature") - - defer.returnValue(signed_pdus) - - @defer.inlineCallbacks - def _check_sigs_and_hash(self, pdu): - """Throws a SynapseError if the PDU does not have the correct - signatures. - - Returns: - FrozenEvent: Either the given event or it redacted if it failed the - content hash check. - """ - # Check signatures are correct. - redacted_event = prune_event(pdu) - redacted_pdu_json = redacted_event.get_pdu_json() - - try: - yield self.keyring.verify_json_for_server( - pdu.origin, redacted_pdu_json - ) - except SynapseError: - logger.warn( - "Signature check failed for %s redacted to %s", - encode_canonical_json(pdu.get_pdu_json()), - encode_canonical_json(redacted_pdu_json), - ) - raise - - if not check_event_content_hash(pdu): - logger.warn( - "Event content has been tampered, redacting %s, %s", - pdu.event_id, encode_canonical_json(pdu.get_dict()) - ) - defer.returnValue(redacted_event) - - defer.returnValue(pdu) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 5fbd8b19de..97dca3f84c 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -16,6 +16,7 @@ from twisted.internet import defer +from .federation_base import FederationBase from .units import Transaction, Edu from synapse.util.logutils import log_function @@ -35,7 +36,7 @@ import logging logger = logging.getLogger(__name__) -class FederationServer(object): +class FederationServer(FederationBase): def set_handler(self, handler): """Sets the handler that the replication layer will use to communicate receipt of new PDUs from other home servers. The required methods are @@ -251,17 +252,20 @@ class FederationServer(object): Deferred: Results in `dict` with the same format as `content` """ auth_chain = [ - (yield self._check_sigs_and_hash(self.event_from_pdu_json(e))) + self.event_from_pdu_json(e) for e in content["auth_chain"] ] - missing = [ - (yield self._check_sigs_and_hash(self.event_from_pdu_json(e))) - for e in content.get("missing", []) - ] + signed_auth = yield self._check_sigs_and_hash_and_fetch( + origin, auth_chain, outlier=True + ) ret = yield self.handler.on_query_auth( - origin, event_id, auth_chain, content.get("rejects", []), missing + origin, + event_id, + signed_auth, + content.get("rejects", []), + content.get("missing", []), ) time_now = self._clock.time_msec() @@ -426,37 +430,3 @@ class FederationServer(object): event.internal_metadata.outlier = outlier return event - - @defer.inlineCallbacks - def _check_sigs_and_hash(self, pdu): - """Throws a SynapseError if the PDU does not have the correct - signatures. - - Returns: - FrozenEvent: Either the given event or it redacted if it failed the - content hash check. - """ - # Check signatures are correct. - redacted_event = prune_event(pdu) - redacted_pdu_json = redacted_event.get_pdu_json() - - try: - yield self.keyring.verify_json_for_server( - pdu.origin, redacted_pdu_json - ) - except SynapseError: - logger.warn( - "Signature check failed for %s redacted to %s", - encode_canonical_json(pdu.get_pdu_json()), - encode_canonical_json(redacted_pdu_json), - ) - raise - - if not check_event_content_hash(pdu): - logger.warn( - "Event content has been tampered, redacting %s, %s", - pdu.event_id, encode_canonical_json(pdu.get_dict()) - ) - defer.returnValue(redacted_event) - - defer.returnValue(pdu) -- cgit 1.5.1 From 7b810e136ef2040fe55a778d4a4a790cdbd0f84c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Feb 2015 15:00:42 +0000 Subject: Add new FederationBase --- synapse/federation/federation_base.py | 126 ++++++++++++++++++++++++++++++++++ 1 file changed, 126 insertions(+) create mode 100644 synapse/federation/federation_base.py (limited to 'synapse/federation') diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py new file mode 100644 index 0000000000..d26a2396ac --- /dev/null +++ b/synapse/federation/federation_base.py @@ -0,0 +1,126 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from twisted.internet import defer + +from synapse.events.utils import prune_event, old_prune_event + +from syutil.jsonutil import encode_canonical_json + +from synapse.crypto.event_signing import check_event_content_hash + +from synapse.api.errors import SynapseError + +import logging + + +logger = logging.getLogger(__name__) + + +class FederationBase(object): + @defer.inlineCallbacks + def _check_sigs_and_hash_and_fetch(self, origin, pdus, outlier=False): + """Takes a list of PDUs and checks the signatures and hashs of each + one. If a PDU fails its signature check then we check if we have it in + the database and if not then request if from the originating server of + that PDU. + + If a PDU fails its content hash check then it is redacted. + + The given list of PDUs are not modified, instead the function returns + a new list. + + Args: + pdu (list) + outlier (bool) + + Returns: + Deferred : A list of PDUs that have valid signatures and hashes. + """ + signed_pdus = [] + for pdu in pdus: + try: + new_pdu = yield self._check_sigs_and_hash(pdu) + signed_pdus.append(new_pdu) + except SynapseError: + # FIXME: We should handle signature failures more gracefully. + + # Check local db. + new_pdu = yield self.store.get_event( + pdu.event_id, + allow_rejected=True + ) + if new_pdu: + signed_pdus.append(new_pdu) + continue + + # Check pdu.origin + if pdu.origin != origin: + new_pdu = yield self.get_pdu( + destinations=[pdu.origin], + event_id=pdu.event_id, + outlier=outlier, + ) + + if new_pdu: + signed_pdus.append(new_pdu) + continue + + logger.warn("Failed to find copy of %s with valid signature") + + defer.returnValue(signed_pdus) + + @defer.inlineCallbacks + def _check_sigs_and_hash(self, pdu): + """Throws a SynapseError if the PDU does not have the correct + signatures. + + Returns: + FrozenEvent: Either the given event or it redacted if it failed the + content hash check. + """ + # Check signatures are correct. + redacted_event = prune_event(pdu) + redacted_pdu_json = redacted_event.get_pdu_json() + + old_redacted = old_prune_event(pdu) + old_redacted_pdu_json = old_redacted.get_pdu_json() + + try: + try: + yield self.keyring.verify_json_for_server( + pdu.origin, old_redacted_pdu_json + ) + except SynapseError: + yield self.keyring.verify_json_for_server( + pdu.origin, redacted_pdu_json + ) + except SynapseError: + logger.warn( + "Signature check failed for %s redacted to %s", + encode_canonical_json(pdu.get_pdu_json()), + encode_canonical_json(redacted_pdu_json), + ) + raise + + if not check_event_content_hash(pdu): + logger.warn( + "Event content has been tampered, redacting %s, %s", + pdu.event_id, encode_canonical_json(pdu.get_dict()) + ) + defer.returnValue(redacted_event) + + defer.returnValue(pdu) \ No newline at end of file -- cgit 1.5.1 From 8dae5c81085458a3a0b3dc92278e8c317d5de204 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Feb 2015 15:01:12 +0000 Subject: Remove unused imports --- synapse/federation/federation_server.py | 5 ----- 1 file changed, 5 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 97dca3f84c..4742ca9390 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -22,11 +22,6 @@ from .units import Transaction, Edu from synapse.util.logutils import log_function from synapse.util.logcontext import PreserveLoggingContext from synapse.events import FrozenEvent -from synapse.events.utils import prune_event - -from syutil.jsonutil import encode_canonical_json - -from synapse.crypto.event_signing import check_event_content_hash from synapse.api.errors import FederationError, SynapseError -- cgit 1.5.1 From 9bace3a36751deed141225ccabd5bebecebc25f3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Feb 2015 15:32:17 +0000 Subject: Actually, the old prune_event function was non-deterministic, so no point keeping it around :( --- synapse/events/utils.py | 79 ----------------------------------- synapse/federation/federation_base.py | 16 ++----- 2 files changed, 4 insertions(+), 91 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 65a9f70982..1aa952150e 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -94,85 +94,6 @@ def prune_event(event): ) -def old_prune_event(event): - """This is an old and buggy version of the prune event function. The - difference between this and the new version is that when including dicts - in the content they were included as frozen_dicts rather than dicts. This - caused the JSON encoder to encode as a list of the keys rather than the - dict. - """ - event_type = event.type - - allowed_keys = [ - "event_id", - "sender", - "room_id", - "hashes", - "signatures", - "content", - "type", - "state_key", - "depth", - "prev_events", - "prev_state", - "auth_events", - "origin", - "origin_server_ts", - "membership", - ] - - event_dict = event.get_dict() - - new_content = {} - - def add_fields(*fields): - for field in fields: - if field in event.content: - # This is the line that is buggy: event.content may return - # a frozen_dict which the json encoders encode as lists rather - # than dicts. - new_content[field] = event.content[field] - - if event_type == EventTypes.Member: - add_fields("membership") - elif event_type == EventTypes.Create: - add_fields("creator") - elif event_type == EventTypes.JoinRules: - add_fields("join_rule") - elif event_type == EventTypes.PowerLevels: - add_fields( - "users", - "users_default", - "events", - "events_default", - "events_default", - "state_default", - "ban", - "kick", - "redact", - ) - elif event_type == EventTypes.Aliases: - add_fields("aliases") - - allowed_fields = { - k: v - for k, v in event_dict.items() - if k in allowed_keys - } - - allowed_fields["content"] = new_content - - allowed_fields["unsigned"] = {} - - if "age_ts" in event.unsigned: - allowed_fields["unsigned"]["age_ts"] = event.unsigned["age_ts"] - - return type(event)( - allowed_fields, - internal_metadata_dict=event.internal_metadata.get_dict() - ) - - def format_event_raw(d): return d diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index d26a2396ac..27c5918e0f 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -16,7 +16,7 @@ from twisted.internet import defer -from synapse.events.utils import prune_event, old_prune_event +from synapse.events.utils import prune_event from syutil.jsonutil import encode_canonical_json @@ -96,18 +96,10 @@ class FederationBase(object): redacted_event = prune_event(pdu) redacted_pdu_json = redacted_event.get_pdu_json() - old_redacted = old_prune_event(pdu) - old_redacted_pdu_json = old_redacted.get_pdu_json() - try: - try: - yield self.keyring.verify_json_for_server( - pdu.origin, old_redacted_pdu_json - ) - except SynapseError: - yield self.keyring.verify_json_for_server( - pdu.origin, redacted_pdu_json - ) + yield self.keyring.verify_json_for_server( + pdu.origin, redacted_pdu_json + ) except SynapseError: logger.warn( "Signature check failed for %s redacted to %s", -- cgit 1.5.1 From 3c39f42a0526186f85e69988504fff6adbf41d91 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 3 Feb 2015 16:14:19 +0000 Subject: New line --- synapse/federation/federation_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index 27c5918e0f..a990aec4fd 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -115,4 +115,4 @@ class FederationBase(object): ) defer.returnValue(redacted_event) - defer.returnValue(pdu) \ No newline at end of file + defer.returnValue(pdu) -- cgit 1.5.1 From ff78eded015b7596e883623bf826aa579662e766 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 4 Feb 2015 13:55:10 +0000 Subject: Retry make_join --- synapse/federation/federation_client.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 5fac629709..d6b8c43916 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -251,16 +251,21 @@ class FederationClient(FederationBase): defer.returnValue(signed_auth) @defer.inlineCallbacks - def make_join(self, destination, room_id, user_id): - ret = yield self.transport_layer.make_join( - destination, room_id, user_id - ) + def make_join(self, destinations, room_id, user_id): + for destination in destinations: + try: + ret = yield self.transport_layer.make_join( + destination, room_id, user_id + ) - pdu_dict = ret["event"] + pdu_dict = ret["event"] - logger.debug("Got response to make_join: %s", pdu_dict) + logger.debug("Got response to make_join: %s", pdu_dict) - defer.returnValue(self.event_from_pdu_json(pdu_dict)) + defer.returnValue(self.event_from_pdu_json(pdu_dict)) + break + except Exception as e: + logger.warn("Failed to make_join via %s", destination) @defer.inlineCallbacks def send_join(self, destination, pdu): -- cgit 1.5.1 From ae46f10fc5dba0f81518e3144ab8d9ed7a7d03bc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 4 Feb 2015 16:28:12 +0000 Subject: Apply sanity to the transport client interface. Convert 'make_join' and 'send_join' to accept iterables of destinations --- synapse/api/errors.py | 8 +++- synapse/federation/federation_client.py | 82 ++++++++++++++++++++------------- synapse/federation/transaction_queue.py | 23 +++++++-- synapse/federation/transport/client.py | 42 +++++++---------- synapse/handlers/federation.py | 4 +- synapse/http/matrixfederationclient.py | 42 ++++++++++++++--- 6 files changed, 130 insertions(+), 71 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/api/errors.py b/synapse/api/errors.py index ad478aa6b7..5041828f18 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -39,7 +39,7 @@ class Codes(object): TOO_LARGE = "M_TOO_LARGE" -class CodeMessageException(Exception): +class CodeMessageException(RuntimeError): """An exception with integer code and message string attributes.""" def __init__(self, code, msg): @@ -227,3 +227,9 @@ class FederationError(RuntimeError): "affected": self.affected, "source": self.source if self.source else self.affected, } + + +class HttpResponseException(CodeMessageException): + def __init__(self, code, msg, response): + self.response = response + super(HttpResponseException, self).__init__(code, msg) diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index d6b8c43916..eb36ec040b 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -19,6 +19,7 @@ from twisted.internet import defer from .federation_base import FederationBase from .units import Edu +from synapse.api.errors import CodeMessageException from synapse.util.logutils import log_function from synapse.events import FrozenEvent @@ -180,7 +181,8 @@ class FederationClient(FederationBase): pdu = yield self._check_sigs_and_hash(pdu) break - + except CodeMessageException: + raise except Exception as e: logger.info( "Failed to get PDU %s from %s because %s", @@ -264,45 +266,63 @@ class FederationClient(FederationBase): defer.returnValue(self.event_from_pdu_json(pdu_dict)) break - except Exception as e: - logger.warn("Failed to make_join via %s", destination) + except CodeMessageException: + raise + except RuntimeError as e: + logger.warn( + "Failed to make_join via %s: %s", + destination, e.message + ) + + raise RuntimeError("Failed to send to any server.") @defer.inlineCallbacks - def send_join(self, destination, pdu): - time_now = self._clock.time_msec() - _, content = yield self.transport_layer.send_join( - destination=destination, - room_id=pdu.room_id, - event_id=pdu.event_id, - content=pdu.get_pdu_json(time_now), - ) + def send_join(self, destinations, pdu): + for destination in destinations: + try: + time_now = self._clock.time_msec() + _, content = yield self.transport_layer.send_join( + destination=destination, + room_id=pdu.room_id, + event_id=pdu.event_id, + content=pdu.get_pdu_json(time_now), + ) - logger.debug("Got content: %s", content) + logger.debug("Got content: %s", content) - state = [ - self.event_from_pdu_json(p, outlier=True) - for p in content.get("state", []) - ] + state = [ + self.event_from_pdu_json(p, outlier=True) + for p in content.get("state", []) + ] - auth_chain = [ - self.event_from_pdu_json(p, outlier=True) - for p in content.get("auth_chain", []) - ] + auth_chain = [ + self.event_from_pdu_json(p, outlier=True) + for p in content.get("auth_chain", []) + ] - signed_state = yield self._check_sigs_and_hash_and_fetch( - destination, state, outlier=True - ) + signed_state = yield self._check_sigs_and_hash_and_fetch( + destination, state, outlier=True + ) - signed_auth = yield self._check_sigs_and_hash_and_fetch( - destination, auth_chain, outlier=True - ) + signed_auth = yield self._check_sigs_and_hash_and_fetch( + destination, auth_chain, outlier=True + ) - auth_chain.sort(key=lambda e: e.depth) + auth_chain.sort(key=lambda e: e.depth) + + defer.returnValue({ + "state": signed_state, + "auth_chain": signed_auth, + }) + except CodeMessageException: + raise + except RuntimeError as e: + logger.warn( + "Failed to send_join via %s: %s", + destination, e.message + ) - defer.returnValue({ - "state": signed_state, - "auth_chain": signed_auth, - }) + raise RuntimeError("Failed to send to any server.") @defer.inlineCallbacks def send_invite(self, destination, room_id, event_id, pdu): diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 9d4f2c09a2..f38aeba7cc 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -19,6 +19,7 @@ from twisted.internet import defer from .persistence import TransactionActions from .units import Transaction +from synapse.api.errors import HttpResponseException from synapse.util.logutils import log_function from synapse.util.logcontext import PreserveLoggingContext @@ -238,9 +239,14 @@ class TransactionQueue(object): del p["age_ts"] return data - code, response = yield self.transport_layer.send_transaction( - transaction, json_data_cb - ) + try: + response = yield self.transport_layer.send_transaction( + transaction, json_data_cb + ) + code = 200 + except HttpResponseException as e: + code = e.code + response = e.response logger.info("TX [%s] got %d response", destination, code) @@ -274,8 +280,7 @@ class TransactionQueue(object): pass logger.debug("TX [%s] Yielded to callbacks", destination) - - except Exception as e: + except RuntimeError as e: # We capture this here as there as nothing actually listens # for this finishing functions deferred. logger.warn( @@ -283,6 +288,14 @@ class TransactionQueue(object): destination, e, ) + except Exception as e: + # We capture this here as there as nothing actually listens + # for this finishing functions deferred. + logger.exception( + "TX [%s] Problem in _attempt_transaction: %s", + destination, + e, + ) self.set_retrying(destination, retry_interval) diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 4cb1dea2de..8b137e7128 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -19,7 +19,6 @@ from synapse.api.urls import FEDERATION_PREFIX as PREFIX from synapse.util.logutils import log_function import logging -import json logger = logging.getLogger(__name__) @@ -129,7 +128,7 @@ class TransportLayerClient(object): # generated by the json_data_callback. json_data = transaction.get_dict() - code, response = yield self.client.put_json( + response = yield self.client.put_json( transaction.destination, path=PREFIX + "/send/%s/" % transaction.transaction_id, data=json_data, @@ -137,95 +136,86 @@ class TransportLayerClient(object): ) logger.debug( - "send_data dest=%s, txid=%s, got response: %d", - transaction.destination, transaction.transaction_id, code + "send_data dest=%s, txid=%s, got response: 200", + transaction.destination, transaction.transaction_id, ) - defer.returnValue((code, response)) + defer.returnValue(response) @defer.inlineCallbacks @log_function def make_query(self, destination, query_type, args, retry_on_dns_fail): path = PREFIX + "/query/%s" % query_type - response = yield self.client.get_json( + content = yield self.client.get_json( destination=destination, path=path, args=args, retry_on_dns_fail=retry_on_dns_fail, ) - defer.returnValue(response) + defer.returnValue(content) @defer.inlineCallbacks @log_function def make_join(self, destination, room_id, user_id, retry_on_dns_fail=True): path = PREFIX + "/make_join/%s/%s" % (room_id, user_id) - response = yield self.client.get_json( + content = yield self.client.get_json( destination=destination, path=path, retry_on_dns_fail=retry_on_dns_fail, ) - defer.returnValue(response) + defer.returnValue(content) @defer.inlineCallbacks @log_function def send_join(self, destination, room_id, event_id, content): path = PREFIX + "/send_join/%s/%s" % (room_id, event_id) - code, content = yield self.client.put_json( + response = yield self.client.put_json( destination=destination, path=path, data=content, ) - if not 200 <= code < 300: - raise RuntimeError("Got %d from send_join", code) - - defer.returnValue(json.loads(content)) + defer.returnValue(response) @defer.inlineCallbacks @log_function def send_invite(self, destination, room_id, event_id, content): path = PREFIX + "/invite/%s/%s" % (room_id, event_id) - code, content = yield self.client.put_json( + response = yield self.client.put_json( destination=destination, path=path, data=content, ) - if not 200 <= code < 300: - raise RuntimeError("Got %d from send_invite", code) - - defer.returnValue(json.loads(content)) + defer.returnValue(response) @defer.inlineCallbacks @log_function def get_event_auth(self, destination, room_id, event_id): path = PREFIX + "/event_auth/%s/%s" % (room_id, event_id) - response = yield self.client.get_json( + content = yield self.client.get_json( destination=destination, path=path, ) - defer.returnValue(response) + defer.returnValue(content) @defer.inlineCallbacks @log_function def send_query_auth(self, destination, room_id, event_id, content): path = PREFIX + "/query_auth/%s/%s" % (room_id, event_id) - code, content = yield self.client.post_json( + content = yield self.client.post_json( destination=destination, path=path, data=content, ) - if not 200 <= code < 300: - raise RuntimeError("Got %d from send_invite", code) - - defer.returnValue(json.loads(content)) + defer.returnValue(content) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 0876589e31..a968a87360 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -288,7 +288,7 @@ class FederationHandler(BaseHandler): logger.debug("Joining %s to %s", joinee, room_id) pdu = yield self.replication_layer.make_join( - target_host, + [target_host], room_id, joinee ) @@ -331,7 +331,7 @@ class FederationHandler(BaseHandler): new_event = builder.build() ret = yield self.replication_layer.send_join( - target_host, + [target_host], new_event ) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index c7bf1b47b8..8559d06b7f 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -27,7 +27,9 @@ from synapse.util.logcontext import PreserveLoggingContext from syutil.jsonutil import encode_canonical_json -from synapse.api.errors import CodeMessageException, SynapseError, Codes +from synapse.api.errors import ( + SynapseError, Codes, HttpResponseException, +) from syutil.crypto.jsonsign import sign_json @@ -163,13 +165,12 @@ class MatrixFederationHttpClient(object): ) if 200 <= response.code < 300: - # We need to update the transactions table to say it was sent? pass else: # :'( # Update transactions table? - raise CodeMessageException( - response.code, response.phrase + raise HttpResponseException( + response.code, response.phrase, response ) defer.returnValue(response) @@ -238,11 +239,20 @@ class MatrixFederationHttpClient(object): headers_dict={"Content-Type": ["application/json"]}, ) + if 200 <= response.code < 300: + # We need to update the transactions table to say it was sent? + c_type = response.headers.getRawHeaders("Content-Type") + + if "application/json" not in c_type: + raise RuntimeError( + "Content-Type not application/json" + ) + logger.debug("Getting resp body") body = yield readBody(response) logger.debug("Got resp body") - defer.returnValue((response.code, body)) + defer.returnValue(json.loads(body)) @defer.inlineCallbacks def post_json(self, destination, path, data={}): @@ -275,11 +285,20 @@ class MatrixFederationHttpClient(object): headers_dict={"Content-Type": ["application/json"]}, ) + if 200 <= response.code < 300: + # We need to update the transactions table to say it was sent? + c_type = response.headers.getRawHeaders("Content-Type") + + if "application/json" not in c_type: + raise RuntimeError( + "Content-Type not application/json" + ) + logger.debug("Getting resp body") body = yield readBody(response) logger.debug("Got resp body") - defer.returnValue((response.code, body)) + defer.returnValue(json.loads(body)) @defer.inlineCallbacks def get_json(self, destination, path, args={}, retry_on_dns_fail=True): @@ -321,7 +340,18 @@ class MatrixFederationHttpClient(object): retry_on_dns_fail=retry_on_dns_fail ) + if 200 <= response.code < 300: + # We need to update the transactions table to say it was sent? + c_type = response.headers.getRawHeaders("Content-Type") + + if "application/json" not in c_type: + raise RuntimeError( + "Content-Type not application/json" + ) + + logger.debug("Getting resp body") body = yield readBody(response) + logger.debug("Got resp body") defer.returnValue(json.loads(body)) -- cgit 1.5.1 From e1515c3e91f2117adc3976b5e606728560ce9e96 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 5 Feb 2015 13:43:28 +0000 Subject: Pass through list of room hosts from room alias query to federation so that it can retry against different room hosts --- synapse/federation/federation_client.py | 5 ++++- synapse/handlers/federation.py | 20 +++++++++++++------- synapse/handlers/room.py | 12 +++++------- 3 files changed, 22 insertions(+), 15 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index eb36ec040b..9923b3fc0d 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -264,7 +264,9 @@ class FederationClient(FederationBase): logger.debug("Got response to make_join: %s", pdu_dict) - defer.returnValue(self.event_from_pdu_json(pdu_dict)) + defer.returnValue( + (destination, self.event_from_pdu_json(pdu_dict)) + ) break except CodeMessageException: raise @@ -313,6 +315,7 @@ class FederationClient(FederationBase): defer.returnValue({ "state": signed_state, "auth_chain": signed_auth, + "origin": destination, }) except CodeMessageException: raise diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 04a4689483..aba266c2bc 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -273,7 +273,7 @@ class FederationHandler(BaseHandler): @log_function @defer.inlineCallbacks - def do_invite_join(self, target_host, room_id, joinee, content, snapshot): + def do_invite_join(self, target_hosts, room_id, joinee, content, snapshot): """ Attempts to join the `joinee` to the room `room_id` via the server `target_host`. @@ -287,8 +287,8 @@ class FederationHandler(BaseHandler): """ logger.debug("Joining %s to %s", joinee, room_id) - pdu = yield self.replication_layer.make_join( - [target_host], + origin, pdu = yield self.replication_layer.make_join( + target_hosts, room_id, joinee ) @@ -330,11 +330,17 @@ class FederationHandler(BaseHandler): new_event = builder.build() + # Try the host we successfully got a response to /make_join/ + # request first. + target_hosts.remove(origin) + target_hosts.insert(0, origin) + ret = yield self.replication_layer.send_join( - [target_host], + target_hosts, new_event ) + origin = ret["origin"] state = ret["state"] auth_chain = ret["auth_chain"] auth_chain.sort(key=lambda e: e.depth) @@ -371,7 +377,7 @@ class FederationHandler(BaseHandler): if e.event_id in auth_ids } yield self._handle_new_event( - target_host, e, auth_events=auth + origin, e, auth_events=auth ) except: logger.exception( @@ -391,7 +397,7 @@ class FederationHandler(BaseHandler): if e.event_id in auth_ids } yield self._handle_new_event( - target_host, e, auth_events=auth + origin, e, auth_events=auth ) except: logger.exception( @@ -406,7 +412,7 @@ class FederationHandler(BaseHandler): } yield self._handle_new_event( - target_host, + origin, new_event, state=state, current_state=state, diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 23821d321f..0369b907a5 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -389,8 +389,6 @@ class RoomMemberHandler(BaseHandler): if not hosts: raise SynapseError(404, "No known servers") - host = hosts[0] - # If event doesn't include a display name, add one. yield self.distributor.fire( "collect_presencelike_data", joinee, content @@ -407,12 +405,12 @@ class RoomMemberHandler(BaseHandler): }) event, context = yield self._create_new_client_event(builder) - yield self._do_join(event, context, room_host=host, do_auth=True) + yield self._do_join(event, context, room_hosts=hosts, do_auth=True) defer.returnValue({"room_id": room_id}) @defer.inlineCallbacks - def _do_join(self, event, context, room_host=None, do_auth=True): + def _do_join(self, event, context, room_hosts=None, do_auth=True): joinee = UserID.from_string(event.state_key) # room_id = RoomID.from_string(event.room_id, self.hs) room_id = event.room_id @@ -441,7 +439,7 @@ class RoomMemberHandler(BaseHandler): if is_host_in_room: should_do_dance = False - elif room_host: # TODO: Shouldn't this be remote_room_host? + elif room_hosts: # TODO: Shouldn't this be remote_room_host? should_do_dance = True else: # TODO(markjh): get prev_state from snapshot @@ -453,7 +451,7 @@ class RoomMemberHandler(BaseHandler): inviter = UserID.from_string(prev_state.user_id) should_do_dance = not self.hs.is_mine(inviter) - room_host = inviter.domain + room_hosts = [inviter.domain] else: # return the same error as join_room_alias does raise SynapseError(404, "No known servers") @@ -461,7 +459,7 @@ class RoomMemberHandler(BaseHandler): if should_do_dance: handler = self.hs.get_handlers().federation_handler yield handler.do_invite_join( - room_host, + room_hosts, room_id, event.user_id, event.get_dict()["content"], # FIXME To get a non-frozen dict -- cgit 1.5.1 From e9c85a4d5ab332021f93634182ad8ed93bd0091c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 5 Feb 2015 13:50:15 +0000 Subject: Connection errors in twisted aren't RuntimeErrors --- synapse/federation/federation_client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 9923b3fc0d..70c9a6f46b 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -270,7 +270,7 @@ class FederationClient(FederationBase): break except CodeMessageException: raise - except RuntimeError as e: + except Exception as e: logger.warn( "Failed to make_join via %s: %s", destination, e.message @@ -319,7 +319,7 @@ class FederationClient(FederationBase): }) except CodeMessageException: raise - except RuntimeError as e: + except Exception as e: logger.warn( "Failed to send_join via %s: %s", destination, e.message -- cgit 1.5.1 From 697ab75a34b14da6d872d153f36017a6dc6d5b99 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 10 Feb 2015 15:46:24 +0000 Subject: Sign auth_chains when returned by /state/ requests --- synapse/federation/federation_server.py | 11 +++++++++++ 1 file changed, 11 insertions(+) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 4742ca9390..b23f72c7fa 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -25,6 +25,8 @@ from synapse.events import FrozenEvent from synapse.api.errors import FederationError, SynapseError +from synapse.crypto.event_signing import compute_event_signature + import logging @@ -156,6 +158,15 @@ class FederationServer(FederationBase): auth_chain = yield self.store.get_auth_chain( [pdu.event_id for pdu in pdus] ) + + for event in auth_chain: + event.signatures.update( + compute_event_signature( + event, + self.hs.hostname, + self.hs.config.signing_key[0] + ) + ) else: raise NotImplementedError("Specify an event") -- cgit 1.5.1 From c8e1da930dd4ffbc4ca409208b2d715182cd100f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 10 Feb 2015 17:30:46 +0000 Subject: Log all the exits from _attempt_new_transaction --- synapse/federation/transaction_queue.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index f38aeba7cc..731019ad9f 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -157,15 +157,19 @@ class TransactionQueue(object): else: logger.info("TX [%s] is ready for retry", destination) - logger.info("TX [%s] _attempt_new_transaction", destination) - if destination in self.pending_transactions: # XXX: pending_transactions can get stuck on by a never-ending # request at which point pending_pdus_by_dest just keeps growing. # we need application-layer timeouts of some flavour of these # requests + logger.info( + "TX [%s] Transaction already in progress", + destination + ) return + logger.info("TX [%s] _attempt_new_transaction", destination) + # list of (pending_pdu, deferred, order) pending_pdus = self.pending_pdus_by_dest.pop(destination, []) pending_edus = self.pending_edus_by_dest.pop(destination, []) @@ -176,6 +180,7 @@ class TransactionQueue(object): destination, len(pending_pdus)) if not pending_pdus and not pending_edus and not pending_failures: + logger.info("TX [%s] Nothing to send", destination) return logger.debug( -- cgit 1.5.1 From 4ebbaf0d4382813ba896f3e8101de12e112cbed5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 11 Feb 2015 14:23:10 +0000 Subject: Blunty replace json with simplejson --- synapse/crypto/keyclient.py | 2 +- synapse/federation/persistence.py | 2 +- synapse/federation/transport/server.py | 2 +- synapse/http/client.py | 2 +- synapse/http/matrixfederationclient.py | 2 +- synapse/push/__init__.py | 2 +- synapse/push/pusherpool.py | 2 +- synapse/rest/client/v1/directory.py | 2 +- synapse/rest/client/v1/login.py | 2 +- synapse/rest/client/v1/presence.py | 2 +- synapse/rest/client/v1/profile.py | 2 +- synapse/rest/client/v1/push_rule.py | 2 +- synapse/rest/client/v1/pusher.py | 2 +- synapse/rest/client/v1/register.py | 2 +- synapse/rest/client/v1/room.py | 2 +- synapse/rest/client/v2_alpha/filter.py | 2 +- synapse/rest/media/v0/content_repository.py | 2 +- synapse/storage/__init__.py | 2 +- synapse/storage/_base.py | 2 +- synapse/storage/filtering.py | 2 +- synapse/storage/push_rule.py | 2 +- 21 files changed, 21 insertions(+), 21 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/crypto/keyclient.py b/synapse/crypto/keyclient.py index cd12349f67..74008347c3 100644 --- a/synapse/crypto/keyclient.py +++ b/synapse/crypto/keyclient.py @@ -19,7 +19,7 @@ from twisted.internet.protocol import Factory from twisted.internet import defer, reactor from synapse.http.endpoint import matrix_federation_endpoint from synapse.util.logcontext import PreserveLoggingContext -import json +import simplejson as json import logging diff --git a/synapse/federation/persistence.py b/synapse/federation/persistence.py index 85c82a4623..8a1afc0ca5 100644 --- a/synapse/federation/persistence.py +++ b/synapse/federation/persistence.py @@ -23,7 +23,7 @@ from twisted.internet import defer from synapse.util.logutils import log_function -import json +import simplejson as json import logging diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 9c9f8d525b..2ffb37aa18 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -20,7 +20,7 @@ from synapse.api.errors import Codes, SynapseError from synapse.util.logutils import log_function import logging -import json +import simplejson as json import re diff --git a/synapse/http/client.py b/synapse/http/client.py index 198f575cfa..d500e19c81 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -23,7 +23,7 @@ from twisted.web.http_headers import Headers from StringIO import StringIO -import json +import simplejson as json import logging import urllib diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 056d446e42..406203acf2 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -33,7 +33,7 @@ from synapse.api.errors import ( from syutil.crypto.jsonsign import sign_json -import json +import simplejson as json import logging import urllib import urlparse diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index 418a348a58..0659a1cb9b 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -22,7 +22,7 @@ import synapse.util.async import baserules import logging -import json +import simplejson as json import re logger = logging.getLogger(__name__) diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 5a525befd7..7483d257bf 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -20,7 +20,7 @@ from httppusher import HttpPusher from synapse.push import PusherConfigException import logging -import json +import simplejson as json logger = logging.getLogger(__name__) diff --git a/synapse/rest/client/v1/directory.py b/synapse/rest/client/v1/directory.py index 8ed7e2d669..420aa89f38 100644 --- a/synapse/rest/client/v1/directory.py +++ b/synapse/rest/client/v1/directory.py @@ -20,7 +20,7 @@ from synapse.api.errors import AuthError, SynapseError, Codes from synapse.types import RoomAlias from .base import ClientV1RestServlet, client_path_pattern -import json +import simplejson as json import logging diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py index 7116ac98e8..b2257b749d 100644 --- a/synapse/rest/client/v1/login.py +++ b/synapse/rest/client/v1/login.py @@ -19,7 +19,7 @@ from synapse.api.errors import SynapseError from synapse.types import UserID from base import ClientV1RestServlet, client_path_pattern -import json +import simplejson as json class LoginRestServlet(ClientV1RestServlet): diff --git a/synapse/rest/client/v1/presence.py b/synapse/rest/client/v1/presence.py index 7feb4aadb1..78d4f2b128 100644 --- a/synapse/rest/client/v1/presence.py +++ b/synapse/rest/client/v1/presence.py @@ -21,7 +21,7 @@ from synapse.api.errors import SynapseError from synapse.types import UserID from .base import ClientV1RestServlet, client_path_pattern -import json +import simplejson as json import logging logger = logging.getLogger(__name__) diff --git a/synapse/rest/client/v1/profile.py b/synapse/rest/client/v1/profile.py index 15d6f3fc6c..1e77eb49cf 100644 --- a/synapse/rest/client/v1/profile.py +++ b/synapse/rest/client/v1/profile.py @@ -19,7 +19,7 @@ from twisted.internet import defer from .base import ClientV1RestServlet, client_path_pattern from synapse.types import UserID -import json +import simplejson as json class ProfileDisplaynameRestServlet(ClientV1RestServlet): diff --git a/synapse/rest/client/v1/push_rule.py b/synapse/rest/client/v1/push_rule.py index c4e7dfcf0e..b012f31084 100644 --- a/synapse/rest/client/v1/push_rule.py +++ b/synapse/rest/client/v1/push_rule.py @@ -27,7 +27,7 @@ from synapse.push.rulekinds import ( PRIORITY_CLASS_MAP, PRIORITY_CLASS_INVERSE_MAP ) -import json +import simplejson as json class PushRuleRestServlet(ClientV1RestServlet): diff --git a/synapse/rest/client/v1/pusher.py b/synapse/rest/client/v1/pusher.py index 80e9939b79..6045e86f34 100644 --- a/synapse/rest/client/v1/pusher.py +++ b/synapse/rest/client/v1/pusher.py @@ -19,7 +19,7 @@ from synapse.api.errors import SynapseError, Codes from synapse.push import PusherConfigException from .base import ClientV1RestServlet, client_path_pattern -import json +import simplejson as json class PusherRestServlet(ClientV1RestServlet): diff --git a/synapse/rest/client/v1/register.py b/synapse/rest/client/v1/register.py index c0423c2d45..d3399c446b 100644 --- a/synapse/rest/client/v1/register.py +++ b/synapse/rest/client/v1/register.py @@ -25,7 +25,7 @@ from synapse.util.async import run_on_reactor from hashlib import sha1 import hmac -import json +import simplejson as json import logging import urllib diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 410f19ccf6..0346afb1b4 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -23,7 +23,7 @@ from synapse.api.constants import EventTypes, Membership from synapse.types import UserID, RoomID, RoomAlias from synapse.events.utils import serialize_event -import json +import simplejson as json import logging import urllib diff --git a/synapse/rest/client/v2_alpha/filter.py b/synapse/rest/client/v2_alpha/filter.py index 6ddc495d23..703250cea8 100644 --- a/synapse/rest/client/v2_alpha/filter.py +++ b/synapse/rest/client/v2_alpha/filter.py @@ -21,7 +21,7 @@ from synapse.types import UserID from ._base import client_v2_pattern -import json +import simplejson as json import logging diff --git a/synapse/rest/media/v0/content_repository.py b/synapse/rest/media/v0/content_repository.py index 22e26e3cd5..e77a20fb2e 100644 --- a/synapse/rest/media/v0/content_repository.py +++ b/synapse/rest/media/v0/content_repository.py @@ -25,7 +25,7 @@ from twisted.web import server, resource from twisted.internet import defer import base64 -import json +import simplejson as json import logging import os import re diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index a63c59a8a2..924ea89035 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -44,7 +44,7 @@ from syutil.jsonutil import encode_canonical_json from synapse.crypto.event_signing import compute_event_reference_hash -import json +import simplejson as json import logging import os diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 3e1ab0a159..b74e74ac91 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -23,7 +23,7 @@ from synapse.util.logcontext import PreserveLoggingContext, LoggingContext from twisted.internet import defer import collections -import json +import simplejson as json import sys import time diff --git a/synapse/storage/filtering.py b/synapse/storage/filtering.py index e86eeced45..457a11fd02 100644 --- a/synapse/storage/filtering.py +++ b/synapse/storage/filtering.py @@ -17,7 +17,7 @@ from twisted.internet import defer from ._base import SQLBaseStore -import json +import simplejson as json class FilteringStore(SQLBaseStore): diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index 620de71398..ae46b39cc1 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -20,7 +20,7 @@ from twisted.internet import defer import logging import copy -import json +import simplejson as json logger = logging.getLogger(__name__) -- cgit 1.5.1 From ddb816cf60ca1b0c0f9bfab5df233a010ac309a3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 11 Feb 2015 15:44:28 +0000 Subject: Don't unfreeze when using FreezeEvent.get_dict, as we are using a JSONEncoder that understands FrozenDict --- synapse/events/__init__.py | 4 ---- synapse/federation/persistence.py | 7 ++++--- synapse/handlers/federation.py | 15 ++++++++++++++- synapse/handlers/room.py | 4 +++- synapse/storage/__init__.py | 11 +++++++++-- synapse/util/frozenutils.py | 8 ++++++-- 6 files changed, 36 insertions(+), 13 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 8f0c6e959f..f4ec8cd18c 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -140,10 +140,6 @@ class FrozenEvent(EventBase): return e - def get_dict(self): - # We need to unfreeze what we return - return unfreeze(super(FrozenEvent, self).get_dict()) - def __str__(self): return self.__repr__() diff --git a/synapse/federation/persistence.py b/synapse/federation/persistence.py index 8a1afc0ca5..76a9dcd777 100644 --- a/synapse/federation/persistence.py +++ b/synapse/federation/persistence.py @@ -23,7 +23,8 @@ from twisted.internet import defer from synapse.util.logutils import log_function -import simplejson as json +from syutil.jsonutil import encode_canonical_json + import logging @@ -70,7 +71,7 @@ class TransactionActions(object): transaction.transaction_id, transaction.origin, code, - json.dumps(response) + encode_canonical_json(response) ) @defer.inlineCallbacks @@ -100,5 +101,5 @@ class TransactionActions(object): transaction.transaction_id, transaction.destination, response_code, - json.dumps(response_dict) + encode_canonical_json(response_dict) ) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 0f9c82fd06..e36f0945ef 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -23,6 +23,7 @@ from synapse.api.errors import ( from synapse.api.constants import EventTypes, Membership, RejectedReason from synapse.util.logutils import log_function from synapse.util.async import run_on_reactor +from synapse.util.frozenutils import unfreeze from synapse.crypto.event_signing import ( compute_event_signature, add_hashes_and_signatures, ) @@ -311,9 +312,12 @@ class FederationHandler(BaseHandler): self.room_queues[room_id] = [] builder = self.event_builder_factory.new( - event.get_pdu_json() + unfreeze(event.get_pdu_json()) ) + logger.info("Builder: %s", builder.get_pdu_json()) + logger.info("Content: %s", content) + handled_events = set() try: @@ -324,14 +328,21 @@ class FederationHandler(BaseHandler): if not hasattr(event, "signatures"): builder.signatures = {} + logger.info("Content befhahs: %s", builder.content) + add_hashes_and_signatures( builder, self.hs.hostname, self.hs.config.signing_key[0], ) + logger.info("Content aftet hah: %s", builder.content) + logger.info("Content pdu json: %s", builder.get_pdu_json()) + new_event = builder.build() + logger.info("Content after build: %s", new_event.content) + # Try the host we successfully got a response to /make_join/ # request first. try: @@ -340,6 +351,7 @@ class FederationHandler(BaseHandler): except ValueError: pass + logger.info(new_event.content) ret = yield self.replication_layer.send_join( target_hosts, new_event @@ -485,6 +497,7 @@ class FederationHandler(BaseHandler): event.internal_metadata.outlier = False + logger.info(event.content) context = yield self._handle_new_event(origin, event) logger.debug( diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 0369b907a5..c685991a9f 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -403,7 +403,9 @@ class RoomMemberHandler(BaseHandler): "membership": Membership.JOIN, "content": content, }) + logger.info("Before build: %s", builder.get_pdu_json()) event, context = yield self._create_new_client_event(builder) + logger.info("After build: %s", event.get_dict()) yield self._do_join(event, context, room_hosts=hosts, do_auth=True) @@ -462,7 +464,7 @@ class RoomMemberHandler(BaseHandler): room_hosts, room_id, event.user_id, - event.get_dict()["content"], # FIXME To get a non-frozen dict + event.content, # FIXME To get a non-frozen dict context ) else: diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 924ea89035..94603ef826 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -298,12 +298,16 @@ class DataStore(RoomMemberStore, RoomStore, or_replace=True, ) + content = encode_canonical_json( + event.content + ).decode("UTF-8") + vals = { "topological_ordering": event.depth, "event_id": event.event_id, "type": event.type, "room_id": event.room_id, - "content": json.dumps(event.get_dict()["content"]), + "content": content, "processed": True, "outlier": outlier, "depth": event.depth, @@ -323,7 +327,10 @@ class DataStore(RoomMemberStore, RoomStore, "prev_events", ] } - vals["unrecognized_keys"] = json.dumps(unrec) + + vals["unrecognized_keys"] = encode_canonical_json( + unrec + ).decode("UTF-8") try: self._simple_insert_txn( diff --git a/synapse/util/frozenutils.py b/synapse/util/frozenutils.py index a13a2015e4..9e10d37aec 100644 --- a/synapse/util/frozenutils.py +++ b/synapse/util/frozenutils.py @@ -21,6 +21,9 @@ def freeze(o): if t is dict: return frozendict({k: freeze(v) for k, v in o.items()}) + if t is frozendict: + return o + if t is str or t is unicode: return o @@ -33,10 +36,11 @@ def freeze(o): def unfreeze(o): - if isinstance(o, frozendict) or isinstance(o, dict): + t = type(o) + if t is dict or t is frozendict: return dict({k: unfreeze(v) for k, v in o.items()}) - if isinstance(o, basestring): + if t is str or t is unicode: return o try: -- cgit 1.5.1 From ef276e8770e19c66d14462aa325b9cb241121bb6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 11 Feb 2015 16:48:05 +0000 Subject: Fix so timing out connections to actually work. --- synapse/federation/replication.py | 2 ++ synapse/util/__init__.py | 52 ++++++++++++++++++++++++++++++++------- 2 files changed, 45 insertions(+), 9 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index e442c6c5d5..54a0c7ad8e 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -72,5 +72,7 @@ class ReplicationLayer(FederationClient, FederationServer): self._order = 0 + self.hs = hs + def __str__(self): return "" % self.server_name diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 29f1344c5b..e77eba90ad 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -18,6 +18,9 @@ from synapse.util.logcontext import LoggingContext from twisted.internet import defer, reactor, task import time +import logging + +logger = logging.getLogger(__name__) class Clock(object): @@ -55,20 +58,51 @@ class Clock(object): timer.cancel() def time_bound_deferred(self, given_deferred, time_out): + if given_deferred.called: + return given_deferred + ret_deferred = defer.Deferred() - def timed_out(): - if not given_deferred.called: - given_deferred.cancel() + def timed_out_fn(): + try: ret_deferred.errback(RuntimeError("Timed out")) + except: + pass + + try: + given_deferred.cancel() + except: + pass + + timer = None + + def cancel(res): + try: + self.cancel_call_later(timer) + except: + pass + return res + + ret_deferred.addBoth(cancel) + + def sucess(res): + try: + ret_deferred.callback(res) + except: + pass + + return res + + def err(res): + try: + ret_deferred.errback(res) + except: + pass - timer = self.call_later(time_out, timed_out) + return res - def succeed(result): - self.cancel_call_later(timer) - ret_deferred.callback(result) + given_deferred.addCallbacks(callback=sucess, errback=err) - given_deferred.addCallback(succeed) - given_deferred.addErrback(ret_deferred.errback) + timer = self.call_later(time_out, timed_out_fn) return ret_deferred -- cgit 1.5.1 From 963256638d5b3c3edee14bfbd7c00944b45d04c0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 12 Feb 2015 18:17:11 +0000 Subject: Correctly handle all the places that can throw exceptions --- scripts/federation_client.py | 7 +++++-- synapse/federation/federation_base.py | 24 ++++++++++++++---------- synapse/federation/federation_server.py | 9 ++++++--- synapse/handlers/federation.py | 12 ++++++++++++ 4 files changed, 37 insertions(+), 15 deletions(-) (limited to 'synapse/federation') diff --git a/scripts/federation_client.py b/scripts/federation_client.py index 3139c61761..ea62dceb36 100644 --- a/scripts/federation_client.py +++ b/scripts/federation_client.py @@ -97,8 +97,11 @@ def lookup(destination, path): if ":" in destination: return "https://%s%s" % (destination, path) else: - srv = srvlookup.lookup("matrix", "tcp", destination)[0] - return "https://%s:%d%s" % (srv.host, srv.port, path) + try: + srv = srvlookup.lookup("matrix", "tcp", destination)[0] + return "https://%s:%d%s" % (srv.host, srv.port, path) + except: + return "https://%s:%d%s" % (destination, 8448, path) def get_json(origin_name, origin_key, destination, path): request_json = { diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index a990aec4fd..30f3f5c8a4 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -61,7 +61,8 @@ class FederationBase(object): # Check local db. new_pdu = yield self.store.get_event( pdu.event_id, - allow_rejected=True + allow_rejected=True, + allow_none=True, ) if new_pdu: signed_pdus.append(new_pdu) @@ -69,15 +70,18 @@ class FederationBase(object): # Check pdu.origin if pdu.origin != origin: - new_pdu = yield self.get_pdu( - destinations=[pdu.origin], - event_id=pdu.event_id, - outlier=outlier, - ) - - if new_pdu: - signed_pdus.append(new_pdu) - continue + try: + new_pdu = yield self.get_pdu( + destinations=[pdu.origin], + event_id=pdu.event_id, + outlier=outlier, + ) + + if new_pdu: + signed_pdus.append(new_pdu) + continue + except: + pass logger.warn("Failed to find copy of %s with valid signature") diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index b23f72c7fa..9f5c98694c 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -411,9 +411,12 @@ class FederationServer(FederationBase): "_handle_new_pdu getting state for %s", pdu.room_id ) - state, auth_chain = yield self.get_state_for_room( - origin, pdu.room_id, pdu.event_id, - ) + try: + state, auth_chain = yield self.get_state_for_room( + origin, pdu.room_id, pdu.event_id, + ) + except: + logger.warn("Failed to get state for event: %s", pdu.event_id) ret = yield self.handler.on_receive_pdu( origin, diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 77c81fe2da..5a7593cc3b 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -788,6 +788,18 @@ class FederationHandler(BaseHandler): defer.returnValue(ret) + @defer.inlineCallbacks + def trigger_query_auth(self, destination, event_id): + local_auth_chain = yield self.store.get_auth_chain([event_id]) + + result = yield self.replication_layer.query_auth( + destination, + event.room_id, + event.event_id, + local_auth_chain, + ) + + @defer.inlineCallbacks @log_function def do_auth(self, origin, event, context, auth_events): -- cgit 1.5.1 From 58d848adc0ee9ab8e44a780a19be1e46e0d5cec3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 12 Feb 2015 18:35:36 +0000 Subject: Parrellize fetching of events --- synapse/federation/federation_base.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index 30f3f5c8a4..966f4d2c5c 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -50,8 +50,11 @@ class FederationBase(object): Returns: Deferred : A list of PDUs that have valid signatures and hashes. """ + signed_pdus = [] - for pdu in pdus: + + @defer.inlineCallbacks + def do(pdu): try: new_pdu = yield self._check_sigs_and_hash(pdu) signed_pdus.append(new_pdu) @@ -66,7 +69,7 @@ class FederationBase(object): ) if new_pdu: signed_pdus.append(new_pdu) - continue + return # Check pdu.origin if pdu.origin != origin: @@ -79,12 +82,17 @@ class FederationBase(object): if new_pdu: signed_pdus.append(new_pdu) - continue + return except: pass logger.warn("Failed to find copy of %s with valid signature") + yield defer.gatherResults( + [do(pdu) for pdu in pdus], + consumeErrors=True + ) + defer.returnValue(signed_pdus) @defer.inlineCallbacks -- cgit 1.5.1 From 789251afa7d97a08318f92ffe96f5f7ce16d6157 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 12 Feb 2015 19:29:43 +0000 Subject: Fix logging --- synapse/federation/federation_base.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index 966f4d2c5c..21a763214b 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -86,7 +86,10 @@ class FederationBase(object): except: pass - logger.warn("Failed to find copy of %s with valid signature") + logger.warn( + "Failed to find copy of %s with valid signature", + pdu.event_id, + ) yield defer.gatherResults( [do(pdu) for pdu in pdus], -- cgit 1.5.1 From 61385846510ceed1bff571dde1233a70e02b4748 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 16 Feb 2015 14:08:02 +0000 Subject: Don't return anything from _handle_new_pdu, since we ignore the return value anyway --- synapse/federation/federation_server.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 9f5c98694c..7ca8d49c54 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -331,7 +331,6 @@ class FederationServer(FederationBase): ) if already_seen: logger.debug("Already seen pdu %s", pdu.event_id) - defer.returnValue({}) return # Check signature. @@ -418,7 +417,7 @@ class FederationServer(FederationBase): except: logger.warn("Failed to get state for event: %s", pdu.event_id) - ret = yield self.handler.on_receive_pdu( + yield self.handler.on_receive_pdu( origin, pdu, backfilled=False, @@ -426,8 +425,6 @@ class FederationServer(FederationBase): auth_chain=auth_chain, ) - defer.returnValue(ret) - def __str__(self): return "" % self.server_name -- cgit 1.5.1 From 91fc5eef1deb2f756d16f3d5ea20ede8d967f6df Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 16 Feb 2015 14:27:40 +0000 Subject: Mark old events as outliers. This is to fix the issue where if a remote server sends an event that references a really "old" event, then the local server will pull that in and send to all clients. We decide if an event is old if its depth is less than the minimum depth of the room. --- synapse/federation/federation_server.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 7ca8d49c54..4391a60c02 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -366,7 +366,13 @@ class FederationServer(FederationBase): pdu.room_id, min_depth ) - if min_depth and pdu.depth > min_depth and max_recursion > 0: + if min_depth and pdu.depth < min_depth: + # This is so that we don't notify the user about this + # message, to work around the fact that some events will + # reference really really old events we really don't want to + # send to the clients. + pdu.internal_metadata.outlier = True + elif min_depth and pdu.depth > min_depth and max_recursion > 0: for event_id, hashes in pdu.prev_events: if event_id not in have_seen: logger.debug( -- cgit 1.5.1 From baa5b9a97582d4b3c825be1225aba7863230c047 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 16 Feb 2015 18:02:39 +0000 Subject: Cache results of get_pdu. --- synapse/federation/federation_client.py | 42 ++++++++++++++++++++++++++++++--- 1 file changed, 39 insertions(+), 3 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 70c9a6f46b..83b4947b99 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -19,7 +19,8 @@ from twisted.internet import defer from .federation_base import FederationBase from .units import Edu -from synapse.api.errors import CodeMessageException +from synapse.api.errors import CodeMessageException, SynapseError +from synapse.util.expiringcache import ExpiringCache from synapse.util.logutils import log_function from synapse.events import FrozenEvent @@ -30,6 +31,20 @@ logger = logging.getLogger(__name__) class FederationClient(FederationBase): + def __init__(self): + self._fail_fetch_pdu_cache = None + + def start_pdu_fail_cache(self): + self._fail_fetch_pdu_cache = ExpiringCache( + cache_name="get_pdu_cache", + clock=self._clock, + max_len=1000, + expiry_ms=120*1000, + reset_expiry_on_get=False, + ) + + self._fail_fetch_pdu_cache.start() + @log_function def send_pdu(self, pdu, destinations): """Informs the replication layer about a new PDU generated within the @@ -160,6 +175,11 @@ class FederationClient(FederationBase): # TODO: Rate limit the number of times we try and get the same event. + if self._fail_fetch_pdu_cache: + e = self._fail_fetch_pdu_cache.get(event_id) + if e: + defer.returnValue(e) + pdu = None for destination in destinations: try: @@ -181,8 +201,21 @@ class FederationClient(FederationBase): pdu = yield self._check_sigs_and_hash(pdu) break - except CodeMessageException: - raise + except SynapseError: + logger.info( + "Failed to get PDU %s from %s because %s", + event_id, destination, e, + ) + continue + except CodeMessageException as e: + if 400 <= e.code < 500: + raise + + logger.info( + "Failed to get PDU %s from %s because %s", + event_id, destination, e, + ) + continue except Exception as e: logger.info( "Failed to get PDU %s from %s because %s", @@ -190,6 +223,9 @@ class FederationClient(FederationBase): ) continue + if self._fail_fetch_pdu_cache is not None: + self._fail_fetch_pdu_cache[event_id] = pdu + defer.returnValue(pdu) @defer.inlineCallbacks -- cgit 1.5.1 From 72a4de2ce627528a13bda480403344ffde6275d3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 17 Feb 2015 10:03:23 +0000 Subject: Use consumeErrors=True on all DeferredLists. This is so that the DeferredLists actually consume the error instead of propogating down the non-existent errback chain. This should reduce the number of unhandled errors we are seeing. --- synapse/federation/federation_server.py | 2 +- synapse/federation/transaction_queue.py | 2 +- synapse/handlers/presence.py | 8 ++++---- synapse/notifier.py | 6 ++++-- synapse/storage/roommember.py | 2 +- 5 files changed, 11 insertions(+), 9 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 9f5c98694c..e94d0411b4 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -124,7 +124,7 @@ class FederationServer(FederationBase): edu.content ) - results = yield defer.DeferredList(dl) + results = yield defer.DeferredList(dl, consumeErrors=True) ret = [] for r in results: diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 731019ad9f..bb20f2ebab 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -98,7 +98,7 @@ class TransactionQueue(object): deferreds.append(deferred) - yield defer.DeferredList(deferreds) + yield defer.DeferredList(deferreds, consumeErrors=True) # NO inlineCallbacks def enqueue_edu(self, edu): diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 59287010ed..8ef248ecf2 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -492,7 +492,7 @@ class PresenceHandler(BaseHandler): user, domain, remoteusers )) - yield defer.DeferredList(deferreds) + yield defer.DeferredList(deferreds, consumeErrors=True) def _start_polling_local(self, user, target_user): target_localpart = target_user.localpart @@ -548,7 +548,7 @@ class PresenceHandler(BaseHandler): self._stop_polling_remote(user, domain, remoteusers) ) - return defer.DeferredList(deferreds) + return defer.DeferredList(deferreds, consumeErrors=True) def _stop_polling_local(self, user, target_user): for localpart in self._local_pushmap.keys(): @@ -729,7 +729,7 @@ class PresenceHandler(BaseHandler): del self._remote_sendmap[user] with PreserveLoggingContext(): - yield defer.DeferredList(deferreds) + yield defer.DeferredList(deferreds, consumeErrors=True) @defer.inlineCallbacks def push_update_to_local_and_remote(self, observed_user, statuscache, @@ -768,7 +768,7 @@ class PresenceHandler(BaseHandler): ) ) - yield defer.DeferredList(deferreds) + yield defer.DeferredList(deferreds, consumeErrors=True) defer.returnValue((localusers, remote_domains)) diff --git a/synapse/notifier.py b/synapse/notifier.py index e3b6ead620..f5a394596d 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -135,7 +135,8 @@ class Notifier(object): with PreserveLoggingContext(): yield defer.DeferredList( - [notify(l).addErrback(eb) for l in listeners] + [notify(l).addErrback(eb) for l in listeners], + consumeErrors=True, ) @defer.inlineCallbacks @@ -203,7 +204,8 @@ class Notifier(object): with PreserveLoggingContext(): yield defer.DeferredList( - [notify(l).addErrback(eb) for l in listeners] + [notify(l).addErrback(eb) for l in listeners], + consumeErrors=True, ) @defer.inlineCallbacks diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 779f9ce544..9bf608bc90 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -288,7 +288,7 @@ class RoomMemberStore(SQLBaseStore): deferreds = [self.get_rooms_for_user(u) for u in user_id_list] - results = yield defer.DeferredList(deferreds) + results = yield defer.DeferredList(deferreds, consumeErrors=True) # A list of sets of strings giving room IDs for each user room_id_lists = [set([r.room_id for r in result[1]]) for result in results] -- cgit 1.5.1 From 02bfa889de1990eb8dc47770834bef777252dc82 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 17 Feb 2015 13:08:27 +0000 Subject: Handle recieving failures in transactions --- synapse/federation/federation_server.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index e94d0411b4..34f5b17446 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -114,7 +114,15 @@ class FederationServer(FederationBase): with PreserveLoggingContext(): dl = [] for pdu in pdu_list: - dl.append(self._handle_new_pdu(transaction.origin, pdu)) + d = self._handle_new_pdu(transaction.origin, pdu) + + def handle_failure(failure): + failure.trap(FederationError) + self.enqueue_failure(failure.value, transaction.origin) + + d.addErrback(handle_failure) + + dl.append(d) if hasattr(transaction, "edus"): for edu in [Edu(**x) for x in transaction.edus]: @@ -124,6 +132,9 @@ class FederationServer(FederationBase): edu.content ) + for failure in getattr(transaction, "failures", []): + logger.info("Got failure %r", failure) + results = yield defer.DeferredList(dl, consumeErrors=True) ret = [] -- cgit 1.5.1 From c82e26ad4ba80742adc68a7e8c52441d760e4746 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 17 Feb 2015 13:24:13 +0000 Subject: Actually respond with JSON to incoming transaction --- synapse/federation/federation_server.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 34f5b17446..c48a41b5d5 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -147,6 +147,8 @@ class FederationServer(FederationBase): logger.debug("Returning: %s", str(ret)) + response = ret + yield self.transaction_actions.set_response( transaction, 200, response -- cgit 1.5.1 From 659ead082ff11842fea803e44d75667e0ca38d71 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 17 Feb 2015 13:58:52 +0000 Subject: Format the response of transaction request in a nicer way --- synapse/federation/federation_server.py | 19 +++++++++++++++---- synapse/federation/transaction_queue.py | 22 ++++++++++++++++++++-- 2 files changed, 35 insertions(+), 6 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index c48a41b5d5..d1ec0b9eac 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -118,7 +118,7 @@ class FederationServer(FederationBase): def handle_failure(failure): failure.trap(FederationError) - self.enqueue_failure(failure.value, transaction.origin) + self.send_failure(failure.value, transaction.origin) d.addErrback(handle_failure) @@ -132,7 +132,7 @@ class FederationServer(FederationBase): edu.content ) - for failure in getattr(transaction, "failures", []): + for failure in getattr(transaction, "pdu_failures", []): logger.info("Got failure %r", failure) results = yield defer.DeferredList(dl, consumeErrors=True) @@ -143,11 +143,15 @@ class FederationServer(FederationBase): ret.append({}) else: logger.exception(r[1]) - ret.append({"error": str(r[1])}) + ret.append({"error": str(r[1].value)}) logger.debug("Returning: %s", str(ret)) - response = ret + response = { + "pdus": dict(zip( + (p.event_id for p in pdu_list), ret + )), + } yield self.transaction_actions.set_response( transaction, @@ -358,6 +362,13 @@ class FederationServer(FederationBase): affected=pdu.event_id, ) + raise FederationError( + "ERROR", + 403, + "Forbidden", + affected=pdu.event_id, + ) + state = None auth_chain = [] diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index bb20f2ebab..6faaa066fb 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -91,7 +91,7 @@ class TransactionQueue(object): if not deferred.called: deferred.errback(failure) else: - logger.warn("Failed to send pdu", failure) + logger.warn("Failed to send pdu", failure.value) with PreserveLoggingContext(): self._attempt_new_transaction(destination).addErrback(eb) @@ -116,7 +116,7 @@ class TransactionQueue(object): if not deferred.called: deferred.errback(failure) else: - logger.warn("Failed to send edu", failure) + logger.warn("Failed to send edu", failure.value) with PreserveLoggingContext(): self._attempt_new_transaction(destination).addErrback(eb) @@ -133,6 +133,15 @@ class TransactionQueue(object): (failure, deferred) ) + def eb(failure): + if not deferred.called: + deferred.errback(failure) + else: + logger.warn("Failed to send failure", failure.value) + + with PreserveLoggingContext(): + self._attempt_new_transaction(destination).addErrback(eb) + yield deferred @defer.inlineCallbacks @@ -249,6 +258,15 @@ class TransactionQueue(object): transaction, json_data_cb ) code = 200 + + if response: + for e_id, r in getattr(response, "pdus", {}).items(): + if "error" in r: + logger.warn( + "Transaction returned error for %s: %s", + e_id, r, + ) + except HttpResponseException as e: code = e.code response = e.response -- cgit 1.5.1 From 676e8ee78ae3f51cbc0113c8d810d4bc1e81cdab Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 17 Feb 2015 15:22:45 +0000 Subject: Remove debug raise --- synapse/federation/federation_server.py | 7 ------- 1 file changed, 7 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 679fb141e7..22b9663831 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -361,13 +361,6 @@ class FederationServer(FederationBase): affected=pdu.event_id, ) - raise FederationError( - "ERROR", - 403, - "Forbidden", - affected=pdu.event_id, - ) - state = None auth_chain = [] -- cgit 1.5.1 From 2b8f1a956c6a1d767a3b60e84e7d0afe5857fb0d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 17 Feb 2015 17:20:56 +0000 Subject: Add per server retry limiting. Factor out the pre destination retry logic from TransactionQueue so it can be reused in both get_pdu and crypto.keyring --- synapse/crypto/keyring.py | 22 ++--- synapse/federation/federation_client.py | 36 ++++--- synapse/federation/transaction_queue.py | 161 +++++++++++++------------------- synapse/util/retryutils.py | 108 +++++++++++++++++++++ 4 files changed, 205 insertions(+), 122 deletions(-) create mode 100644 synapse/util/retryutils.py (limited to 'synapse/federation') diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 3250cff595..ea00c830c0 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -22,6 +22,8 @@ from syutil.crypto.signing_key import ( from syutil.base64util import decode_base64, encode_base64 from synapse.api.errors import SynapseError, Codes +from synapse.util.retryutils import get_retry_limiter + from OpenSSL import crypto import logging @@ -88,19 +90,13 @@ class Keyring(object): # Try to fetch the key from the remote server. - retry_last_ts, retry_interval = (0, 0) - retry_timings = yield self.store.get_destination_retry_timings( - server_name + limiter = yield get_retry_limiter( + server_name, + self.clock, + self.store, ) - if retry_timings: - retry_last_ts, retry_interval = ( - retry_timings.retry_last_ts, retry_timings.retry_interval - ) - if retry_last_ts + retry_interval > int(self.clock.time_msec()): - logger.info("%s not ready for retry", server_name) - raise ValueError("No verification key found for given key ids") - try: + with limiter: (response, tls_certificate) = yield fetch_server_key( server_name, self.hs.tls_context_factory ) @@ -165,7 +161,3 @@ class Keyring(object): return raise ValueError("No verification key found for given key ids") - - except: - self.set_retrying(server_name, retry_interval) - raise diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 70c9a6f46b..c5b0274c2f 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -23,6 +23,8 @@ from synapse.api.errors import CodeMessageException from synapse.util.logutils import log_function from synapse.events import FrozenEvent +from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination + import logging @@ -163,24 +165,34 @@ class FederationClient(FederationBase): pdu = None for destination in destinations: try: - transaction_data = yield self.transport_layer.get_event( - destination, event_id + limiter = yield get_retry_limiter( + destination, + self._clock, + self.store, ) - logger.debug("transaction_data %r", transaction_data) + with limiter: + transaction_data = yield self.transport_layer.get_event( + destination, event_id + ) - pdu_list = [ - self.event_from_pdu_json(p, outlier=outlier) - for p in transaction_data["pdus"] - ] + logger.debug("transaction_data %r", transaction_data) - if pdu_list: - pdu = pdu_list[0] + pdu_list = [ + self.event_from_pdu_json(p, outlier=outlier) + for p in transaction_data["pdus"] + ] - # Check signatures are correct. - pdu = yield self._check_sigs_and_hash(pdu) + if pdu_list: + pdu = pdu_list[0] - break + # Check signatures are correct. + pdu = yield self._check_sigs_and_hash(pdu) + + break + except NotRetryingDestination as e: + logger.info(e.message) + continue except CodeMessageException: raise except Exception as e: diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index bb20f2ebab..7d02afe163 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -22,6 +22,9 @@ from .units import Transaction from synapse.api.errors import HttpResponseException from synapse.util.logutils import log_function from synapse.util.logcontext import PreserveLoggingContext +from synapse.util.retryutils import ( + get_retry_limiter, NotRetryingDestination, +) import logging @@ -138,25 +141,6 @@ class TransactionQueue(object): @defer.inlineCallbacks @log_function def _attempt_new_transaction(self, destination): - - (retry_last_ts, retry_interval) = (0, 0) - retry_timings = yield self.store.get_destination_retry_timings( - destination - ) - if retry_timings: - (retry_last_ts, retry_interval) = ( - retry_timings.retry_last_ts, retry_timings.retry_interval - ) - if retry_last_ts + retry_interval > int(self._clock.time_msec()): - logger.info( - "TX [%s] not ready for retry yet - " - "dropping transaction for now", - destination, - ) - return - else: - logger.info("TX [%s] is ready for retry", destination) - if destination in self.pending_transactions: # XXX: pending_transactions can get stuck on by a never-ending # request at which point pending_pdus_by_dest just keeps growing. @@ -204,77 +188,79 @@ class TransactionQueue(object): ] try: - self.pending_transactions[destination] = 1 - - logger.debug("TX [%s] Persisting transaction...", destination) - - transaction = Transaction.create_new( - origin_server_ts=int(self._clock.time_msec()), - transaction_id=str(self._next_txn_id), - origin=self.server_name, - destination=destination, - pdus=pdus, - edus=edus, - pdu_failures=failures, + limiter = yield get_retry_limiter( + destination, + self._clock, + self.store, ) - self._next_txn_id += 1 + with limiter: + self.pending_transactions[destination] = 1 - yield self.transaction_actions.prepare_to_send(transaction) + logger.debug("TX [%s] Persisting transaction...", destination) - logger.debug("TX [%s] Persisted transaction", destination) - logger.info( - "TX [%s] Sending transaction [%s]", - destination, - transaction.transaction_id, - ) + transaction = Transaction.create_new( + origin_server_ts=int(self._clock.time_msec()), + transaction_id=str(self._next_txn_id), + origin=self.server_name, + destination=destination, + pdus=pdus, + edus=edus, + pdu_failures=failures, + ) + + self._next_txn_id += 1 - # Actually send the transaction - - # FIXME (erikj): This is a bit of a hack to make the Pdu age - # keys work - def json_data_cb(): - data = transaction.get_dict() - now = int(self._clock.time_msec()) - if "pdus" in data: - for p in data["pdus"]: - if "age_ts" in p: - unsigned = p.setdefault("unsigned", {}) - unsigned["age"] = now - int(p["age_ts"]) - del p["age_ts"] - return data - - try: - response = yield self.transport_layer.send_transaction( - transaction, json_data_cb + yield self.transaction_actions.prepare_to_send(transaction) + + logger.debug("TX [%s] Persisted transaction", destination) + logger.info( + "TX [%s] Sending transaction [%s]", + destination, + transaction.transaction_id, ) - code = 200 - except HttpResponseException as e: - code = e.code - response = e.response - logger.info("TX [%s] got %d response", destination, code) + # Actually send the transaction + + # FIXME (erikj): This is a bit of a hack to make the Pdu age + # keys work + def json_data_cb(): + data = transaction.get_dict() + now = int(self._clock.time_msec()) + if "pdus" in data: + for p in data["pdus"]: + if "age_ts" in p: + unsigned = p.setdefault("unsigned", {}) + unsigned["age"] = now - int(p["age_ts"]) + del p["age_ts"] + return data + + try: + response = yield self.transport_layer.send_transaction( + transaction, json_data_cb + ) + code = 200 + except HttpResponseException as e: + code = e.code + response = e.response - logger.debug("TX [%s] Sent transaction", destination) - logger.debug("TX [%s] Marking as delivered...", destination) + logger.info("TX [%s] got %d response", destination, code) - yield self.transaction_actions.delivered( - transaction, code, response - ) + logger.debug("TX [%s] Sent transaction", destination) + logger.debug("TX [%s] Marking as delivered...", destination) + + yield self.transaction_actions.delivered( + transaction, code, response + ) + + logger.debug("TX [%s] Marked as delivered", destination) - logger.debug("TX [%s] Marked as delivered", destination) logger.debug("TX [%s] Yielding to callbacks...", destination) for deferred in deferreds: if code == 200: - if retry_last_ts: - # this host is alive! reset retry schedule - yield self.store.set_destination_retry_timings( - destination, 0, 0 - ) deferred.callback(None) else: - self.set_retrying(destination, retry_interval) deferred.errback(RuntimeError("Got status %d" % code)) # Ensures we don't continue until all callbacks on that @@ -285,6 +271,12 @@ class TransactionQueue(object): pass logger.debug("TX [%s] Yielded to callbacks", destination) + except NotRetryingDestination: + logger.info( + "TX [%s] not ready for retry yet - " + "dropping transaction for now", + destination, + ) except RuntimeError as e: # We capture this here as there as nothing actually listens # for this finishing functions deferred. @@ -302,8 +294,6 @@ class TransactionQueue(object): e, ) - self.set_retrying(destination, retry_interval) - for deferred in deferreds: if not deferred.called: deferred.errback(e) @@ -314,22 +304,3 @@ class TransactionQueue(object): # Check to see if there is anything else to send. self._attempt_new_transaction(destination) - - @defer.inlineCallbacks - def set_retrying(self, destination, retry_interval): - # track that this destination is having problems and we should - # give it a chance to recover before trying it again - - if retry_interval: - retry_interval *= 2 - # plateau at hourly retries for now - if retry_interval >= 60 * 60 * 1000: - retry_interval = 60 * 60 * 1000 - else: - retry_interval = 2000 # try again at first after 2 seconds - - yield self.store.set_destination_retry_timings( - destination, - int(self._clock.time_msec()), - retry_interval - ) diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py new file mode 100644 index 0000000000..bba524ebd8 --- /dev/null +++ b/synapse/util/retryutils.py @@ -0,0 +1,108 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +import logging + + +logger = logging.getLogger(__name__) + + +class NotRetryingDestination(Exception): + def __init__(self, retry_last_ts, retry_interval, destination): + msg = "Not retrying server %s." % (destination,) + super(NotRetryingDestination, self).__init__(msg) + + self.retry_last_ts = retry_last_ts + self.retry_interval = retry_interval + self.destination = destination + + +@defer.inlineCallbacks +def get_retry_limiter(destination, clock, store, **kwargs): + retry_last_ts, retry_interval = (0, 0) + + retry_timings = yield store.get_destination_retry_timings( + destination + ) + + if retry_timings: + retry_last_ts, retry_interval = ( + retry_timings.retry_last_ts, retry_timings.retry_interval + ) + + now = int(clock.time_msec()) + + if retry_last_ts + retry_interval > now: + raise NotRetryingDestination( + retry_last_ts=retry_last_ts, + retry_interval=retry_interval, + destination=destination, + ) + + defer.returnValue( + RetryDestinationLimiter( + destination, + clock, + store, + retry_interval, + **kwargs + ) + ) + + +class RetryDestinationLimiter(object): + def __init__(self, destination, clock, store, retry_interval, + min_retry_interval=20000, max_retry_interval=60 * 60 * 1000, + multiplier_retry_interval=2): + self.clock = clock + self.store = store + self.destination = destination + + self.retry_interval = retry_interval + self.min_retry_interval = min_retry_interval + self.max_retry_interval = max_retry_interval + self.multiplier_retry_interval = multiplier_retry_interval + + def __enter__(self): + pass + + def __exit__(self, exc_type, exc_val, exc_tb): + def err(self, failure): + logger.exception( + "Failed to store set_destination_retry_timings", + failure.value + ) + + if exc_type is None and exc_val is None and exc_tb is None: + # We connected successfully. + retry_last_ts = 0 + self.retry_interval = 0 + else: + # We couldn't connect. + if self.retry_interval: + self.retry_interval *= self.multiplier_retry_interval + + if self.retry_interval >= self.max_retry_interval: + self.retry_interval = self.max_retry_interval + else: + self.retry_interval = self.min_retry_interval + + retry_last_ts = int(self._clock.time_msec()), + + self.store.set_destination_retry_timings( + self.destination, retry_last_ts, self.retry_interval + ).addErrback(err) -- cgit 1.5.1 From 9371019133bf16cec163d58fe69aa701c8ca5305 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 17 Feb 2015 18:13:34 +0000 Subject: Try to only back off if we think we failed to connect to the remote --- synapse/crypto/keyring.py | 108 ++++++++++++++++---------------- synapse/federation/transaction_queue.py | 66 +++++++++---------- synapse/util/retryutils.py | 10 ++- 3 files changed, 95 insertions(+), 89 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index ea00c830c0..828aced44a 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -101,63 +101,63 @@ class Keyring(object): server_name, self.hs.tls_context_factory ) - # Check the response. + # Check the response. - x509_certificate_bytes = crypto.dump_certificate( - crypto.FILETYPE_ASN1, tls_certificate - ) + x509_certificate_bytes = crypto.dump_certificate( + crypto.FILETYPE_ASN1, tls_certificate + ) - if ("signatures" not in response - or server_name not in response["signatures"]): - raise ValueError("Key response not signed by remote server") - - if "tls_certificate" not in response: - raise ValueError("Key response missing TLS certificate") - - tls_certificate_b64 = response["tls_certificate"] - - if encode_base64(x509_certificate_bytes) != tls_certificate_b64: - raise ValueError("TLS certificate doesn't match") - - verify_keys = {} - for key_id, key_base64 in response["verify_keys"].items(): - if is_signing_algorithm_supported(key_id): - key_bytes = decode_base64(key_base64) - verify_key = decode_verify_key_bytes(key_id, key_bytes) - verify_keys[key_id] = verify_key - - for key_id in response["signatures"][server_name]: - if key_id not in response["verify_keys"]: - raise ValueError( - "Key response must include verification keys for all" - " signatures" - ) - if key_id in verify_keys: - verify_signed_json( - response, - server_name, - verify_keys[key_id] - ) - - # Cache the result in the datastore. - - time_now_ms = self.clock.time_msec() - - yield self.store.store_server_certificate( - server_name, - server_name, - time_now_ms, - tls_certificate, - ) + if ("signatures" not in response + or server_name not in response["signatures"]): + raise ValueError("Key response not signed by remote server") + + if "tls_certificate" not in response: + raise ValueError("Key response missing TLS certificate") - for key_id, key in verify_keys.items(): - yield self.store.store_server_verify_key( - server_name, server_name, time_now_ms, key + tls_certificate_b64 = response["tls_certificate"] + + if encode_base64(x509_certificate_bytes) != tls_certificate_b64: + raise ValueError("TLS certificate doesn't match") + + verify_keys = {} + for key_id, key_base64 in response["verify_keys"].items(): + if is_signing_algorithm_supported(key_id): + key_bytes = decode_base64(key_base64) + verify_key = decode_verify_key_bytes(key_id, key_bytes) + verify_keys[key_id] = verify_key + + for key_id in response["signatures"][server_name]: + if key_id not in response["verify_keys"]: + raise ValueError( + "Key response must include verification keys for all" + " signatures" + ) + if key_id in verify_keys: + verify_signed_json( + response, + server_name, + verify_keys[key_id] ) - for key_id in key_ids: - if key_id in verify_keys: - defer.returnValue(verify_keys[key_id]) - return + # Cache the result in the datastore. + + time_now_ms = self.clock.time_msec() + + yield self.store.store_server_certificate( + server_name, + server_name, + time_now_ms, + tls_certificate, + ) + + for key_id, key in verify_keys.items(): + yield self.store.store_server_verify_key( + server_name, server_name, time_now_ms, key + ) + + for key_id in key_ids: + if key_id in verify_keys: + defer.returnValue(verify_keys[key_id]) + return - raise ValueError("No verification key found for given key ids") + raise ValueError("No verification key found for given key ids") diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 7d02afe163..dc0f30cb64 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -167,15 +167,6 @@ class TransactionQueue(object): logger.info("TX [%s] Nothing to send", destination) return - logger.debug( - "TX [%s] Attempting new transaction" - " (pdus: %d, edus: %d, failures: %d)", - destination, - len(pending_pdus), - len(pending_edus), - len(pending_failures) - ) - # Sort based on the order field pending_pdus.sort(key=lambda t: t[2]) @@ -194,32 +185,41 @@ class TransactionQueue(object): self.store, ) - with limiter: - self.pending_transactions[destination] = 1 + logger.debug( + "TX [%s] Attempting new transaction" + " (pdus: %d, edus: %d, failures: %d)", + destination, + len(pending_pdus), + len(pending_edus), + len(pending_failures) + ) + + self.pending_transactions[destination] = 1 - logger.debug("TX [%s] Persisting transaction...", destination) + logger.debug("TX [%s] Persisting transaction...", destination) - transaction = Transaction.create_new( - origin_server_ts=int(self._clock.time_msec()), - transaction_id=str(self._next_txn_id), - origin=self.server_name, - destination=destination, - pdus=pdus, - edus=edus, - pdu_failures=failures, - ) + transaction = Transaction.create_new( + origin_server_ts=int(self._clock.time_msec()), + transaction_id=str(self._next_txn_id), + origin=self.server_name, + destination=destination, + pdus=pdus, + edus=edus, + pdu_failures=failures, + ) - self._next_txn_id += 1 + self._next_txn_id += 1 - yield self.transaction_actions.prepare_to_send(transaction) + yield self.transaction_actions.prepare_to_send(transaction) - logger.debug("TX [%s] Persisted transaction", destination) - logger.info( - "TX [%s] Sending transaction [%s]", - destination, - transaction.transaction_id, - ) + logger.debug("TX [%s] Persisted transaction", destination) + logger.info( + "TX [%s] Sending transaction [%s]", + destination, + transaction.transaction_id, + ) + with limiter: # Actually send the transaction # FIXME (erikj): This is a bit of a hack to make the Pdu age @@ -249,11 +249,11 @@ class TransactionQueue(object): logger.debug("TX [%s] Sent transaction", destination) logger.debug("TX [%s] Marking as delivered...", destination) - yield self.transaction_actions.delivered( - transaction, code, response - ) + yield self.transaction_actions.delivered( + transaction, code, response + ) - logger.debug("TX [%s] Marked as delivered", destination) + logger.debug("TX [%s] Marked as delivered", destination) logger.debug("TX [%s] Yielding to callbacks...", destination) diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index 888b7ef2e9..d190100c8c 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -15,6 +15,8 @@ from twisted.internet import defer +from synapse.api.errors import CodeMessageException + import logging @@ -67,7 +69,7 @@ def get_retry_limiter(destination, clock, store, **kwargs): class RetryDestinationLimiter(object): def __init__(self, destination, clock, store, retry_interval, min_retry_interval=20000, max_retry_interval=60 * 60 * 1000, - multiplier_retry_interval=2): + multiplier_retry_interval=2,): self.clock = clock self.store = store self.destination = destination @@ -87,7 +89,11 @@ class RetryDestinationLimiter(object): failure.value ) - if exc_type is None and exc_val is None and exc_tb is None: + valid_err_code = False + if exc_type is CodeMessageException: + valid_err_code = 0 <= exc_val.code < 500 + + if exc_type is None or valid_err_code: # We connected successfully. if not self.retry_interval: return -- cgit 1.5.1 From ec847059f3e9b9b5de62aa2f7ad2366c4e883fac Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 18 Feb 2015 10:14:10 +0000 Subject: Rename _fail_fetch_pdu_cache to _get_pdu_cache --- synapse/app/homeserver.py | 2 +- synapse/federation/federation_client.py | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 7565d94449..7be82d0576 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -275,7 +275,7 @@ def setup(): hs.get_pusherpool().start() hs.get_state_handler().start_caching() hs.get_datastore().start_profiling() - hs.get_replication_layer().start_pdu_fail_cache() + hs.get_replication_layer().start_get_pdu_cache() if config.daemonize: print config.pid_file diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 83b4947b99..6042e366bd 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -32,10 +32,10 @@ logger = logging.getLogger(__name__) class FederationClient(FederationBase): def __init__(self): - self._fail_fetch_pdu_cache = None + self._get_pdu_cache = None - def start_pdu_fail_cache(self): - self._fail_fetch_pdu_cache = ExpiringCache( + def start_get_pdu_cache(self): + self._get_pdu_cache = ExpiringCache( cache_name="get_pdu_cache", clock=self._clock, max_len=1000, @@ -43,7 +43,7 @@ class FederationClient(FederationBase): reset_expiry_on_get=False, ) - self._fail_fetch_pdu_cache.start() + self._get_pdu_cache.start() @log_function def send_pdu(self, pdu, destinations): @@ -175,8 +175,8 @@ class FederationClient(FederationBase): # TODO: Rate limit the number of times we try and get the same event. - if self._fail_fetch_pdu_cache: - e = self._fail_fetch_pdu_cache.get(event_id) + if self._get_pdu_cache: + e = self._get_pdu_cache.get(event_id) if e: defer.returnValue(e) @@ -223,8 +223,8 @@ class FederationClient(FederationBase): ) continue - if self._fail_fetch_pdu_cache is not None: - self._fail_fetch_pdu_cache[event_id] = pdu + if self._get_pdu_cache is not None: + self._get_pdu_cache[event_id] = pdu defer.returnValue(pdu) -- cgit 1.5.1 From e482541e1d77cb179dafa44037ee1e04003501ab Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 18 Feb 2015 10:44:22 +0000 Subject: Fix pyflakes --- synapse/federation/transaction_queue.py | 1 - 1 file changed, 1 deletion(-) (limited to 'synapse/federation') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index a77201ad49..ae04774c76 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -266,7 +266,6 @@ class TransactionQueue(object): logger.debug("TX [%s] Sent transaction", destination) logger.debug("TX [%s] Marking as delivered...", destination) - yield self.transaction_actions.delivered( transaction, code, response ) -- cgit 1.5.1 From 47d3ff4cf8ebceacc680c9e5b414150c6edbf85b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 18 Feb 2015 11:30:37 +0000 Subject: Don't send failure to self --- synapse/federation/transaction_queue.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'synapse/federation') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index ae04774c76..f0041d4d12 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -128,6 +128,9 @@ class TransactionQueue(object): @defer.inlineCallbacks def enqueue_failure(self, failure, destination): + if destination == self.server_name: + return + deferred = defer.Deferred() self.pending_failures_by_dest.setdefault( -- cgit 1.5.1 From b68e4a729ff91372a3a7269acc08c1f77584b0f4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 18 Feb 2015 11:32:39 +0000 Subject: Discard destination 'localhost' --- synapse/federation/transaction_queue.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index f0041d4d12..06944eefb9 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -107,7 +107,7 @@ class TransactionQueue(object): def enqueue_edu(self, edu): destination = edu.destination - if destination == self.server_name: + if destination == self.server_name or destination == "localhost": return deferred = defer.Deferred() @@ -128,7 +128,7 @@ class TransactionQueue(object): @defer.inlineCallbacks def enqueue_failure(self, failure, destination): - if destination == self.server_name: + if destination == self.server_name or destination == "localhost": return deferred = defer.Deferred() -- cgit 1.5.1 From 2462aacd77963431507bb97769acee3ed9d65ceb Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Wed, 18 Feb 2015 11:51:00 +0000 Subject: Restrict the destinations that synapse can talk to --- synapse/federation/transaction_queue.py | 30 +++++++++++++++++++++++++++--- 1 file changed, 27 insertions(+), 3 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index ae04774c76..4b5460c797 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -66,6 +66,26 @@ class TransactionQueue(object): # HACK to get unique tx id self._next_txn_id = int(self._clock.time_msec()) + def can_send_to(self, destination): + """Can we send messages to the given server? + + We can't send messages to ourselves. If we are running on localhost + then we can only federation with other servers running on localhost. + Otherwise we only federate with servers on a public domain. + + Args: + destination(str): The server we are possibly trying to send to. + Returns: + bool: True if we can send to the server. + """ + + if destination == self.server_name: + return False + if self.server_name.startswith("localhost"): + return destination.startswith("localhost") + else: + return not destination.startswith("localhost") + @defer.inlineCallbacks @log_function def enqueue_pdu(self, pdu, destinations, order): @@ -74,8 +94,9 @@ class TransactionQueue(object): # table and we'll get back to it later. destinations = set(destinations) - destinations.discard(self.server_name) - destinations.discard("localhost") + destinations = set( + dest for dest in destinations if self.can_send_to(dest) + ) logger.debug("Sending to: %s", str(destinations)) @@ -107,7 +128,7 @@ class TransactionQueue(object): def enqueue_edu(self, edu): destination = edu.destination - if destination == self.server_name: + if not self.can_send_to(destination): return deferred = defer.Deferred() @@ -130,6 +151,9 @@ class TransactionQueue(object): def enqueue_failure(self, failure, destination): deferred = defer.Deferred() + if not self.can_send_to(destination): + return + self.pending_failures_by_dest.setdefault( destination, [] ).append( -- cgit 1.5.1 From 446ef589920a9c3c4738873023494bed79d1f6c3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 18 Feb 2015 12:03:12 +0000 Subject: Add errback to all deferreds in transaction_queue --- synapse/federation/transaction_queue.py | 37 ++++++++++++++++++++------------- 1 file changed, 23 insertions(+), 14 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 06944eefb9..1e5505a4bf 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -90,14 +90,17 @@ class TransactionQueue(object): (pdu, deferred, order) ) - def eb(failure): + def chain(failure): if not deferred.called: deferred.errback(failure) - else: - logger.warn("Failed to send pdu", failure.value) + + def log_failure(failure): + logger.warn("Failed to send pdu", failure.value) + + deferred.addErrback(log_failure) with PreserveLoggingContext(): - self._attempt_new_transaction(destination).addErrback(eb) + self._attempt_new_transaction(destination).addErrback(chain) deferreds.append(deferred) @@ -115,14 +118,17 @@ class TransactionQueue(object): (edu, deferred) ) - def eb(failure): + def chain(failure): if not deferred.called: deferred.errback(failure) - else: - logger.warn("Failed to send edu", failure.value) + + def log_failure(failure): + logger.warn("Failed to send pdu", failure.value) + + deferred.addErrback(log_failure) with PreserveLoggingContext(): - self._attempt_new_transaction(destination).addErrback(eb) + self._attempt_new_transaction(destination).addErrback(chain) return deferred @@ -139,14 +145,17 @@ class TransactionQueue(object): (failure, deferred) ) - def eb(failure): + def chain(f): if not deferred.called: - deferred.errback(failure) - else: - logger.warn("Failed to send failure", failure.value) + deferred.errback(f) + + def log_failure(f): + logger.warn("Failed to send pdu", f.value) + + deferred.addErrback(log_failure) with PreserveLoggingContext(): - self._attempt_new_transaction(destination).addErrback(eb) + self._attempt_new_transaction(destination).addErrback(chain) yield deferred @@ -308,7 +317,7 @@ class TransactionQueue(object): except Exception as e: # We capture this here as there as nothing actually listens # for this finishing functions deferred. - logger.exception( + logger.warn( "TX [%s] Problem in _attempt_transaction: %s", destination, e, -- cgit 1.5.1 From 0ac2a79faa918280767c18e4db7ec29d7d3a3afb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 19 Feb 2015 17:24:14 +0000 Subject: Initial stab at implementing a batched get_missing_pdus request --- synapse/federation/federation_server.py | 72 +++++++++++++++++++++++++++++++++ synapse/handlers/federation.py | 9 +++-- synapse/storage/event_federation.py | 63 ++++++++++++++++++++++++++--- 3 files changed, 135 insertions(+), 9 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 22b9663831..34bc397e8a 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -305,6 +305,78 @@ class FederationServer(FederationBase): (200, send_content) ) + @defer.inlineCallbacks + def get_missing_events(self, origin, room_id, earliest_events, + latest_events, limit, min_depth): + limit = max(limit, 50) + min_depth = max(min_depth, 0) + + missing_events = yield self.store.get_missing_events( + room_id=room_id, + earliest_events=earliest_events, + latest_events=latest_events, + limit=limit, + min_depth=min_depth, + ) + + known_ids = {e.event_id for e in missing_events} | {earliest_events} + + back_edges = { + e for e in missing_events + if {i for i, h in e.prev_events.items()} <= known_ids + } + + decoded_auth_events = set() + state = {} + auth_events = set() + auth_and_state = {} + for event in back_edges: + state_pdus = yield self.handler.get_state_for_pdu( + origin, room_id, event.event_id, + do_auth=False, + ) + + state[event.event_id] = [s.event_id for s in state_pdus] + + auth_and_state.update({ + s.event_id: s for s in state_pdus + }) + + state_ids = {pdu.event_id for pdu in state_pdus} + prev_ids = {i for i, h in event.prev_events.items()} + partial_auth_chain = yield self.store.get_auth_chain( + state_ids | prev_ids, have_ids=decoded_auth_events.keys() + ) + + for p in partial_auth_chain: + p.signatures.update( + compute_event_signature( + p, + self.hs.hostname, + self.hs.config.signing_key[0] + ) + ) + + auth_events.update( + a.event_id for a in partial_auth_chain + ) + + auth_and_state.update({ + a.event_id: a for a in partial_auth_chain + }) + + time_now = self._clock.time_msec() + + defer.returnValue({ + "events": [ev.get_pdu_json(time_now) for ev in missing_events], + "state_for_events": state, + "auth_events": auth_events, + "event_map": { + k: ev.get_pdu_json(time_now) + for k, ev in auth_and_state.items() + }, + }) + @log_function def _get_persisted_pdu(self, origin, event_id, do_auth=True): """ Get a PDU from the database with given origin and id. diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 0eb2ff95ca..26bdc6d1a7 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -581,12 +581,13 @@ class FederationHandler(BaseHandler): defer.returnValue(event) @defer.inlineCallbacks - def get_state_for_pdu(self, origin, room_id, event_id): + def get_state_for_pdu(self, origin, room_id, event_id, do_auth=True): yield run_on_reactor() - in_room = yield self.auth.check_host_in_room(room_id, origin) - if not in_room: - raise AuthError(403, "Host not in room.") + if do_auth: + in_room = yield self.auth.check_host_in_room(room_id, origin) + if not in_room: + raise AuthError(403, "Host not in room.") state_groups = yield self.store.get_state_groups( [event_id] diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 3fbc090224..22bf7ad832 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -32,15 +32,15 @@ class EventFederationStore(SQLBaseStore): and backfilling from another server respectively. """ - def get_auth_chain(self, event_ids): + def get_auth_chain(self, event_ids, have_ids=set()): return self.runInteraction( "get_auth_chain", self._get_auth_chain_txn, - event_ids + event_ids, have_ids ) - def _get_auth_chain_txn(self, txn, event_ids): - results = self._get_auth_chain_ids_txn(txn, event_ids) + def _get_auth_chain_txn(self, txn, event_ids, have_ids): + results = self._get_auth_chain_ids_txn(txn, event_ids, have_ids) return self._get_events_txn(txn, results) @@ -51,8 +51,9 @@ class EventFederationStore(SQLBaseStore): event_ids ) - def _get_auth_chain_ids_txn(self, txn, event_ids): + def _get_auth_chain_ids_txn(self, txn, event_ids, have_ids): results = set() + have_ids = set(have_ids) base_sql = ( "SELECT auth_id FROM event_auth WHERE event_id = ?" @@ -64,6 +65,10 @@ class EventFederationStore(SQLBaseStore): for f in front: txn.execute(base_sql, (f,)) new_front.update([r[0] for r in txn.fetchall()]) + + new_front -= results + new_front -= have_ids + front = new_front results.update(front) @@ -378,3 +383,51 @@ class EventFederationStore(SQLBaseStore): event_results += new_front return self._get_events_txn(txn, event_results) + + def get_missing_events(self, room_id, earliest_events, latest_events, + limit, min_depth): + return self.runInteraction( + "get_missing_events", + self._get_missing_events, + room_id, earliest_events, latest_events, limit, min_depth + ) + + def _get_missing_events(self, txn, room_id, earliest_events, latest_events, + limit, min_depth): + + earliest_events = set(earliest_events) + front = set(latest_events) - earliest_events + + event_results = set() + + query = ( + "SELECT prev_event_id FROM event_edges " + "WHERE room_id = ? AND event_id = ? AND is_state = 0 " + "LIMIT ?" + ) + + while front and len(event_results) < limit: + new_front = set() + for event_id in front: + txn.execute( + query, + (room_id, event_id, limit - len(event_results)) + ) + + for e_id, in txn.fetchall(): + new_front.add(e_id) + + new_front -= earliest_events + new_front -= event_results + + front = new_front + event_results |= new_front + + events = self._get_events_txn(txn, event_results) + + events = sorted( + [ev for ev in events if ev.depth >= min_depth], + key=lambda e: e.depth, + ) + + return events[:limit] -- cgit 1.5.1 From db215b7e0007a207b8775d78c6693153e16f2731 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 23 Feb 2015 13:58:02 +0000 Subject: Implement and use new batched get missing pdu --- synapse/federation/federation_client.py | 19 ++++ synapse/federation/federation_server.py | 150 +++++++++++--------------------- synapse/federation/transaction_queue.py | 2 +- synapse/federation/transport/client.py | 19 ++++ synapse/federation/transport/server.py | 31 +++++++ synapse/handlers/federation.py | 23 +++++ 6 files changed, 144 insertions(+), 100 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index cd3c962d50..ca89a0787c 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -439,6 +439,25 @@ class FederationClient(FederationBase): defer.returnValue(ret) + @defer.inlineCallbacks + def get_missing_events(self, destination, room_id, earliest_events, + latest_events, limit, min_depth): + content = yield self.transport_layer.get_missing_events( + destination, room_id, earliest_events, latest_events, limit, + min_depth, + ) + + events = [ + self.event_from_pdu_json(e) + for e in content.get("events", []) + ] + + signed_events = yield self._check_sigs_and_hash_and_fetch( + destination, events, outlier=True + ) + + defer.returnValue(signed_events) + def event_from_pdu_json(self, pdu_json, outlier=False): event = FrozenEvent( pdu_json diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 34bc397e8a..f74e16abd5 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -142,7 +142,15 @@ class FederationServer(FederationBase): if r[0]: ret.append({}) else: - logger.exception(r[1]) + failure = r[1] + logger.error( + "Failed to handle PDU", + exc_info=( + failure.type, + failure.value, + failure.getTracebackObject() + ) + ) ret.append({"error": str(r[1].value)}) logger.debug("Returning: %s", str(ret)) @@ -306,75 +314,17 @@ class FederationServer(FederationBase): ) @defer.inlineCallbacks - def get_missing_events(self, origin, room_id, earliest_events, - latest_events, limit, min_depth): - limit = max(limit, 50) - min_depth = max(min_depth, 0) - - missing_events = yield self.store.get_missing_events( - room_id=room_id, - earliest_events=earliest_events, - latest_events=latest_events, - limit=limit, - min_depth=min_depth, + @log_function + def on_get_missing_events(self, origin, room_id, earliest_events, + latest_events, limit, min_depth): + missing_events = yield self.handler.on_get_missing_events( + origin, room_id, earliest_events, latest_events, limit, min_depth ) - known_ids = {e.event_id for e in missing_events} | {earliest_events} - - back_edges = { - e for e in missing_events - if {i for i, h in e.prev_events.items()} <= known_ids - } - - decoded_auth_events = set() - state = {} - auth_events = set() - auth_and_state = {} - for event in back_edges: - state_pdus = yield self.handler.get_state_for_pdu( - origin, room_id, event.event_id, - do_auth=False, - ) - - state[event.event_id] = [s.event_id for s in state_pdus] - - auth_and_state.update({ - s.event_id: s for s in state_pdus - }) - - state_ids = {pdu.event_id for pdu in state_pdus} - prev_ids = {i for i, h in event.prev_events.items()} - partial_auth_chain = yield self.store.get_auth_chain( - state_ids | prev_ids, have_ids=decoded_auth_events.keys() - ) - - for p in partial_auth_chain: - p.signatures.update( - compute_event_signature( - p, - self.hs.hostname, - self.hs.config.signing_key[0] - ) - ) - - auth_events.update( - a.event_id for a in partial_auth_chain - ) - - auth_and_state.update({ - a.event_id: a for a in partial_auth_chain - }) - time_now = self._clock.time_msec() defer.returnValue({ "events": [ev.get_pdu_json(time_now) for ev in missing_events], - "state_for_events": state, - "auth_events": auth_events, - "event_map": { - k: ev.get_pdu_json(time_now) - for k, ev in auth_and_state.items() - }, }) @log_function @@ -403,7 +353,7 @@ class FederationServer(FederationBase): @defer.inlineCallbacks @log_function - def _handle_new_pdu(self, origin, pdu, max_recursion=10): + def _handle_new_pdu(self, origin, pdu, get_missing=True): # We reprocess pdus when we have seen them only as outliers existing = yield self._get_persisted_pdu( origin, pdu.event_id, do_auth=False @@ -455,48 +405,50 @@ class FederationServer(FederationBase): pdu.room_id, min_depth ) + prevs = {e_id for e_id, _ in pdu.prev_events} + seen = set(have_seen.keys()) + if min_depth and pdu.depth < min_depth: # This is so that we don't notify the user about this # message, to work around the fact that some events will # reference really really old events we really don't want to # send to the clients. pdu.internal_metadata.outlier = True - elif min_depth and pdu.depth > min_depth and max_recursion > 0: - for event_id, hashes in pdu.prev_events: - if event_id not in have_seen: - logger.debug( - "_handle_new_pdu requesting pdu %s", - event_id + elif min_depth and pdu.depth > min_depth: + if get_missing and prevs - seen: + latest_tuples = yield self.store.get_latest_events_in_room( + pdu.room_id + ) + + # We add the prev events that we have seen to the latest + # list to ensure the remote server doesn't give them to us + latest = set(e_id for e_id, _, _ in latest_tuples) + latest |= seen + + missing_events = yield self.get_missing_events( + origin, + pdu.room_id, + earliest_events=list(latest), + latest_events=[pdu.event_id], + limit=10, + min_depth=min_depth, + ) + + for e in missing_events: + yield self._handle_new_pdu( + origin, + e, + get_missing=False ) - try: - new_pdu = yield self.federation_client.get_pdu( - [origin, pdu.origin], - event_id=event_id, - ) - - if new_pdu: - yield self._handle_new_pdu( - origin, - new_pdu, - max_recursion=max_recursion-1 - ) - - logger.debug("Processed pdu %s", event_id) - else: - logger.warn("Failed to get PDU %s", event_id) - fetch_state = True - except: - # TODO(erikj): Do some more intelligent retries. - logger.exception("Failed to get PDU") - fetch_state = True - else: - prevs = {e_id for e_id, _ in pdu.prev_events} - seen = set(have_seen.keys()) - if prevs - seen: - fetch_state = True - else: - fetch_state = True + have_seen = yield self.store.have_events( + [ev for ev, _ in pdu.prev_events] + ) + + prevs = {e_id for e_id, _ in pdu.prev_events} + seen = set(have_seen.keys()) + if prevs - seen: + fetch_state = True if fetch_state: # We need to get the state at this event, since we haven't diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 7d30c924d1..8f1acbe590 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -287,7 +287,7 @@ class TransactionQueue(object): code = 200 if response: - for e_id, r in getattr(response, "pdus", {}).items(): + for e_id, r in response.get("pdus", {}).items(): if "error" in r: logger.warn( "Transaction returned error for %s: %s", diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 8b137e7128..80d03012b7 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -219,3 +219,22 @@ class TransportLayerClient(object): ) defer.returnValue(content) + + @defer.inlineCallbacks + @log_function + def get_missing_events(self, destination, room_id, earliest_events, + latest_events, limit, min_depth): + path = PREFIX + "/get_missing_events/%s" % (room_id,) + + content = yield self.client.post_json( + destination=destination, + path=path, + data={ + "limit": int(limit), + "min_depth": int(min_depth), + "earliest_events": earliest_events, + "latest_events": latest_events, + } + ) + + defer.returnValue(content) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 2ffb37aa18..ad75c8ddb7 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -234,6 +234,7 @@ class TransportLayerServer(object): ) ) ) + self.server.register_path( "POST", re.compile("^" + PREFIX + "/query_auth/([^/]*)/([^/]*)$"), @@ -245,6 +246,17 @@ class TransportLayerServer(object): ) ) + self.server.register_path( + "POST", + re.compile("^" + PREFIX + "/get_missing_events/([^/]*)/?$"), + self._with_authentication( + lambda origin, content, query, room_id: + self._get_missing_events( + origin, content, room_id, + ) + ) + ) + @defer.inlineCallbacks @log_function def _on_send_request(self, origin, content, query, transaction_id): @@ -344,3 +356,22 @@ class TransportLayerServer(object): ) defer.returnValue((200, new_content)) + + @defer.inlineCallbacks + @log_function + def _get_missing_events(self, origin, content, room_id): + limit = int(content.get("limit", 10)) + min_depth = int(content.get("min_depth", 0)) + earliest_events = content.get("earliest_events", []) + latest_events = content.get("latest_events", []) + + content = yield self.request_handler.on_get_missing_events( + origin, + room_id=room_id, + earliest_events=earliest_events, + latest_events=latest_events, + min_depth=min_depth, + limit=limit, + ) + + defer.returnValue((200, content)) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 26bdc6d1a7..628e62f8b1 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -789,6 +789,29 @@ class FederationHandler(BaseHandler): defer.returnValue(ret) + @defer.inlineCallbacks + def on_get_missing_events(self, origin, room_id, earliest_events, + latest_events, limit, min_depth): + in_room = yield self.auth.check_host_in_room( + room_id, + origin + ) + if not in_room: + raise AuthError(403, "Host not in room.") + + limit = min(limit, 20) + min_depth = max(min_depth, 0) + + missing_events = yield self.store.get_missing_events( + room_id=room_id, + earliest_events=earliest_events, + latest_events=latest_events, + limit=limit, + min_depth=min_depth, + ) + + defer.returnValue(missing_events) + @defer.inlineCallbacks @log_function def do_auth(self, origin, event, context, auth_events): -- cgit 1.5.1 From 59362454ddbcb3adf64d81da00dcd85cdb59a2e6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 26 Feb 2015 15:47:35 +0000 Subject: Must update pending_transactions map before yield'ing --- synapse/federation/transaction_queue.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 8f1acbe590..741a4e7a1a 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -224,6 +224,8 @@ class TransactionQueue(object): ] try: + self.pending_transactions[destination] = 1 + limiter = yield get_retry_limiter( destination, self._clock, @@ -239,8 +241,6 @@ class TransactionQueue(object): len(pending_failures) ) - self.pending_transactions[destination] = 1 - logger.debug("TX [%s] Persisting transaction...", destination) transaction = Transaction.create_new( -- cgit 1.5.1 From 93d90765c4b09bc870fc91c6ddcf21fd4389659d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 26 Feb 2015 16:15:26 +0000 Subject: Initial implementation of federation server rate limiting --- synapse/federation/transport/__init__.py | 12 ++- synapse/federation/transport/server.py | 175 ++++++++++++++++++++++++++++++- 2 files changed, 182 insertions(+), 5 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/transport/__init__.py b/synapse/federation/transport/__init__.py index 6800ac46c5..7028ca6947 100644 --- a/synapse/federation/transport/__init__.py +++ b/synapse/federation/transport/__init__.py @@ -21,7 +21,7 @@ support HTTPS), however individual pairings of servers may decide to communicate over a different (albeit still reliable) protocol. """ -from .server import TransportLayerServer +from .server import TransportLayerServer, FederationRateLimiter from .client import TransportLayerClient @@ -55,8 +55,18 @@ class TransportLayer(TransportLayerServer, TransportLayerClient): send requests """ self.keyring = homeserver.get_keyring() + self.clock = homeserver.get_clock() self.server_name = server_name self.server = server self.client = client self.request_handler = None self.received_handler = None + + self.ratelimiter = FederationRateLimiter( + self.clock, + window_size=10000, + sleep_limit=10, + sleep_msec=500, + reject_limit=50, + concurrent_requests=3, + ) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 2ffb37aa18..a9e625f127 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -16,9 +16,11 @@ from twisted.internet import defer from synapse.api.urls import FEDERATION_PREFIX as PREFIX -from synapse.api.errors import Codes, SynapseError +from synapse.api.errors import Codes, SynapseError, LimitExceededError +from synapse.util.async import sleep from synapse.util.logutils import log_function +import collections import logging import simplejson as json import re @@ -27,6 +29,163 @@ import re logger = logging.getLogger(__name__) +class FederationRateLimiter(object): + def __init__(self, clock, window_size, sleep_limit, sleep_msec, + reject_limit, concurrent_requests): + self.clock = clock + + self.window_size = window_size + self.sleep_limit = sleep_limit + self.sleep_msec = sleep_msec + self.reject_limit = reject_limit + self.concurrent_requests = concurrent_requests + + self.ratelimiters = {} + + def ratelimit(self, host): + return self.ratelimiters.setdefault( + host, + PerHostRatelimiter( + clock=self.clock, + window_size=self.window_size, + sleep_limit=self.sleep_limit, + sleep_msec=self.sleep_msec, + reject_limit=self.reject_limit, + concurrent_requests=self.concurrent_requests, + ) + ).ratelimit() + + +class PerHostRatelimiter(object): + def __init__(self, clock, window_size, sleep_limit, sleep_msec, + reject_limit, concurrent_requests): + self.clock = clock + + self.window_size = window_size + self.sleep_limit = sleep_limit + self.sleep_msec = sleep_msec + self.reject_limit = reject_limit + self.concurrent_requests = concurrent_requests + + self.sleeping_requests = set() + self.ready_request_queue = collections.OrderedDict() + self.current_processing = set() + self.request_times = [] + + def is_empty(self): + time_now = self.clock.time_msec() + self.request_times[:] = [ + r for r in self.request_times + if time_now - r < self.window_size + ] + + return not ( + self.ready_request_queue + or self.sleeping_requests + or self.current_processing + or self.request_times + ) + + def ratelimit(self): + request_id = object() + + def on_enter(): + return self._on_enter(request_id) + + def on_exit(exc_type, exc_val, exc_tb): + return self._on_exit(request_id) + + return ContextManagerFunction(on_enter, on_exit) + + def _on_enter(self, request_id): + time_now = self.clock.time_msec() + self.request_times[:] = [ + r for r in self.request_times + if time_now - r < self.window_size + ] + + queue_size = len(self.ready_request_queue) + len(self.sleeping_requests) + if queue_size > self.reject_limit: + raise LimitExceededError( + retry_after_ms=int( + self.window_size / self.sleep_limit + ), + ) + + self.request_times.append(time_now) + + def queue_request(): + if len(self.current_processing) > self.concurrent_requests: + logger.debug("Ratelimit [%s]: Queue req", id(request_id)) + queue_defer = defer.Deferred() + self.ready_request_queue[request_id] = queue_defer + return queue_defer + else: + return defer.succeed(None) + + logger.debug("Ratelimit [%s]: len(self.request_times)=%d", id(request_id), len(self.request_times)) + logger.debug("Ratelimit [%s]: len(self.request_times)=%d", id(request_id), len(self.request_times)) + + if len(self.request_times) > self.sleep_limit: + logger.debug("Ratelimit [%s]: sleeping req", id(request_id)) + ret_defer = sleep(self.sleep_msec/1000.0) + + self.sleeping_requests.add(request_id) + + def on_wait_finished(_): + logger.debug("Ratelimit [%s]: Finished sleeping", id(request_id)) + self.sleeping_requests.discard(request_id) + queue_defer = queue_request() + return queue_defer + + ret_defer.addBoth(on_wait_finished) + else: + ret_defer = queue_request() + + def on_start(r): + logger.debug("Ratelimit [%s]: Processing req", id(request_id)) + self.current_processing.add(request_id) + return r + + def on_err(r): + self.current_processing.discard(request_id) + return r + + def on_both(r): + # Ensure that we've properly cleaned up. + self.sleeping_requests.discard(request_id) + self.ready_request_queue.pop(request_id, None) + return r + + ret_defer.addCallbacks(on_start, on_err) + ret_defer.addBoth(on_both) + return ret_defer + + def _on_exit(self, request_id): + logger.debug("Ratelimit [%s]: Processed req", id(request_id)) + self.current_processing.discard(request_id) + try: + request_id, deferred = self.ready_request_queue.popitem() + self.current_processing.add(request_id) + deferred.callback(None) + except KeyError: + pass + + +class ContextManagerFunction(object): + def __init__(self, on_enter, on_exit): + self.on_enter = on_enter + self.on_exit = on_exit + + def __enter__(self): + if self.on_enter: + return self.on_enter() + + def __exit__(self, exc_type, exc_val, exc_tb): + if self.on_exit: + return self.on_exit(exc_type, exc_val, exc_tb) + + class TransportLayerServer(object): """Handles incoming federation HTTP requests""" @@ -98,15 +257,23 @@ class TransportLayerServer(object): def new_handler(request, *args, **kwargs): try: (origin, content) = yield self._authenticate_request(request) - response = yield handler( - origin, content, request.args, *args, **kwargs - ) + with self.ratelimiter.ratelimit(origin) as d: + yield d + response = yield handler( + origin, content, request.args, *args, **kwargs + ) except: logger.exception("_authenticate_request failed") raise defer.returnValue(response) return new_handler + def rate_limit_origin(self, handler): + def new_handler(origin, *args, **kwargs): + response = yield handler(origin, *args, **kwargs) + defer.returnValue(response) + return new_handler() + @log_function def register_received_handler(self, handler): """ Register a handler that will be fired when we receive data. -- cgit 1.5.1 From 9dc9118e552bfddaa7579b4ded8b1a0da7eff0e6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 27 Feb 2015 15:16:47 +0000 Subject: Document FederationRateLimiter --- synapse/federation/transport/server.py | 59 +++++++++++++++++++++++++++++----- 1 file changed, 51 insertions(+), 8 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index a9e625f127..390e54b9fb 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -32,6 +32,21 @@ logger = logging.getLogger(__name__) class FederationRateLimiter(object): def __init__(self, clock, window_size, sleep_limit, sleep_msec, reject_limit, concurrent_requests): + """ + Args: + clock (Clock) + window_size (int): The window size in milliseconds. + sleep_limit (int): The number of requests received in the last + `window_size` milliseconds before we artificially start + delaying processing of requests. + sleep_msec (int): The number of milliseconds to delay processing + of incoming requests by. + reject_limit (int): The maximum number of requests that are can be + queued for processing before we start rejecting requests with + a 429 Too Many Requests response. + concurrent_requests (int): The number of concurrent requests to + process. + """ self.clock = clock self.window_size = window_size @@ -43,9 +58,23 @@ class FederationRateLimiter(object): self.ratelimiters = {} def ratelimit(self, host): + """Used to ratelimit an incoming request from given host + + Example usage: + + with rate_limiter.ratelimit(origin) as wait_deferred: + yield wait_deferred + # Handle request ... + + Args: + host (str): Origin of incoming request. + + Returns: + _PerHostRatelimiter + """ return self.ratelimiters.setdefault( host, - PerHostRatelimiter( + _PerHostRatelimiter( clock=self.clock, window_size=self.window_size, sleep_limit=self.sleep_limit, @@ -56,7 +85,7 @@ class FederationRateLimiter(object): ).ratelimit() -class PerHostRatelimiter(object): +class _PerHostRatelimiter(object): def __init__(self, clock, window_size, sleep_limit, sleep_msec, reject_limit, concurrent_requests): self.clock = clock @@ -123,17 +152,25 @@ class PerHostRatelimiter(object): else: return defer.succeed(None) - logger.debug("Ratelimit [%s]: len(self.request_times)=%d", id(request_id), len(self.request_times)) - logger.debug("Ratelimit [%s]: len(self.request_times)=%d", id(request_id), len(self.request_times)) + logger.debug( + "Ratelimit [%s]: len(self.request_times)=%d", + id(request_id), len(self.request_times), + ) if len(self.request_times) > self.sleep_limit: - logger.debug("Ratelimit [%s]: sleeping req", id(request_id)) + logger.debug( + "Ratelimit [%s]: sleeping req", + id(request_id), + ) ret_defer = sleep(self.sleep_msec/1000.0) self.sleeping_requests.add(request_id) def on_wait_finished(_): - logger.debug("Ratelimit [%s]: Finished sleeping", id(request_id)) + logger.debug( + "Ratelimit [%s]: Finished sleeping", + id(request_id), + ) self.sleeping_requests.discard(request_id) queue_defer = queue_request() return queue_defer @@ -143,7 +180,10 @@ class PerHostRatelimiter(object): ret_defer = queue_request() def on_start(r): - logger.debug("Ratelimit [%s]: Processing req", id(request_id)) + logger.debug( + "Ratelimit [%s]: Processing req", + id(request_id), + ) self.current_processing.add(request_id) return r @@ -162,7 +202,10 @@ class PerHostRatelimiter(object): return ret_defer def _on_exit(self, request_id): - logger.debug("Ratelimit [%s]: Processed req", id(request_id)) + logger.debug( + "Ratelimit [%s]: Processed req", + id(request_id), + ) self.current_processing.discard(request_id) try: request_id, deferred = self.ready_request_queue.popitem() -- cgit 1.5.1 From 0554d0708225afe13d141bd00e3aaca2509f3f78 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 27 Feb 2015 15:41:52 +0000 Subject: Move federation rate limiting out of transport layer --- synapse/federation/transport/__init__.py | 4 +- synapse/federation/transport/server.py | 204 +--------------------------- synapse/util/ratelimitutils.py | 226 +++++++++++++++++++++++++++++++ 3 files changed, 230 insertions(+), 204 deletions(-) create mode 100644 synapse/util/ratelimitutils.py (limited to 'synapse/federation') diff --git a/synapse/federation/transport/__init__.py b/synapse/federation/transport/__init__.py index 7028ca6947..f0283b5105 100644 --- a/synapse/federation/transport/__init__.py +++ b/synapse/federation/transport/__init__.py @@ -21,9 +21,11 @@ support HTTPS), however individual pairings of servers may decide to communicate over a different (albeit still reliable) protocol. """ -from .server import TransportLayerServer, FederationRateLimiter +from .server import TransportLayerServer from .client import TransportLayerClient +from synapse.util.ratelimitutils import FederationRateLimiter + class TransportLayer(TransportLayerServer, TransportLayerClient): """This is a basic implementation of the transport layer that translates diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 390e54b9fb..fce9c0195e 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -16,11 +16,9 @@ from twisted.internet import defer from synapse.api.urls import FEDERATION_PREFIX as PREFIX -from synapse.api.errors import Codes, SynapseError, LimitExceededError -from synapse.util.async import sleep +from synapse.api.errors import Codes, SynapseError from synapse.util.logutils import log_function -import collections import logging import simplejson as json import re @@ -29,206 +27,6 @@ import re logger = logging.getLogger(__name__) -class FederationRateLimiter(object): - def __init__(self, clock, window_size, sleep_limit, sleep_msec, - reject_limit, concurrent_requests): - """ - Args: - clock (Clock) - window_size (int): The window size in milliseconds. - sleep_limit (int): The number of requests received in the last - `window_size` milliseconds before we artificially start - delaying processing of requests. - sleep_msec (int): The number of milliseconds to delay processing - of incoming requests by. - reject_limit (int): The maximum number of requests that are can be - queued for processing before we start rejecting requests with - a 429 Too Many Requests response. - concurrent_requests (int): The number of concurrent requests to - process. - """ - self.clock = clock - - self.window_size = window_size - self.sleep_limit = sleep_limit - self.sleep_msec = sleep_msec - self.reject_limit = reject_limit - self.concurrent_requests = concurrent_requests - - self.ratelimiters = {} - - def ratelimit(self, host): - """Used to ratelimit an incoming request from given host - - Example usage: - - with rate_limiter.ratelimit(origin) as wait_deferred: - yield wait_deferred - # Handle request ... - - Args: - host (str): Origin of incoming request. - - Returns: - _PerHostRatelimiter - """ - return self.ratelimiters.setdefault( - host, - _PerHostRatelimiter( - clock=self.clock, - window_size=self.window_size, - sleep_limit=self.sleep_limit, - sleep_msec=self.sleep_msec, - reject_limit=self.reject_limit, - concurrent_requests=self.concurrent_requests, - ) - ).ratelimit() - - -class _PerHostRatelimiter(object): - def __init__(self, clock, window_size, sleep_limit, sleep_msec, - reject_limit, concurrent_requests): - self.clock = clock - - self.window_size = window_size - self.sleep_limit = sleep_limit - self.sleep_msec = sleep_msec - self.reject_limit = reject_limit - self.concurrent_requests = concurrent_requests - - self.sleeping_requests = set() - self.ready_request_queue = collections.OrderedDict() - self.current_processing = set() - self.request_times = [] - - def is_empty(self): - time_now = self.clock.time_msec() - self.request_times[:] = [ - r for r in self.request_times - if time_now - r < self.window_size - ] - - return not ( - self.ready_request_queue - or self.sleeping_requests - or self.current_processing - or self.request_times - ) - - def ratelimit(self): - request_id = object() - - def on_enter(): - return self._on_enter(request_id) - - def on_exit(exc_type, exc_val, exc_tb): - return self._on_exit(request_id) - - return ContextManagerFunction(on_enter, on_exit) - - def _on_enter(self, request_id): - time_now = self.clock.time_msec() - self.request_times[:] = [ - r for r in self.request_times - if time_now - r < self.window_size - ] - - queue_size = len(self.ready_request_queue) + len(self.sleeping_requests) - if queue_size > self.reject_limit: - raise LimitExceededError( - retry_after_ms=int( - self.window_size / self.sleep_limit - ), - ) - - self.request_times.append(time_now) - - def queue_request(): - if len(self.current_processing) > self.concurrent_requests: - logger.debug("Ratelimit [%s]: Queue req", id(request_id)) - queue_defer = defer.Deferred() - self.ready_request_queue[request_id] = queue_defer - return queue_defer - else: - return defer.succeed(None) - - logger.debug( - "Ratelimit [%s]: len(self.request_times)=%d", - id(request_id), len(self.request_times), - ) - - if len(self.request_times) > self.sleep_limit: - logger.debug( - "Ratelimit [%s]: sleeping req", - id(request_id), - ) - ret_defer = sleep(self.sleep_msec/1000.0) - - self.sleeping_requests.add(request_id) - - def on_wait_finished(_): - logger.debug( - "Ratelimit [%s]: Finished sleeping", - id(request_id), - ) - self.sleeping_requests.discard(request_id) - queue_defer = queue_request() - return queue_defer - - ret_defer.addBoth(on_wait_finished) - else: - ret_defer = queue_request() - - def on_start(r): - logger.debug( - "Ratelimit [%s]: Processing req", - id(request_id), - ) - self.current_processing.add(request_id) - return r - - def on_err(r): - self.current_processing.discard(request_id) - return r - - def on_both(r): - # Ensure that we've properly cleaned up. - self.sleeping_requests.discard(request_id) - self.ready_request_queue.pop(request_id, None) - return r - - ret_defer.addCallbacks(on_start, on_err) - ret_defer.addBoth(on_both) - return ret_defer - - def _on_exit(self, request_id): - logger.debug( - "Ratelimit [%s]: Processed req", - id(request_id), - ) - self.current_processing.discard(request_id) - try: - request_id, deferred = self.ready_request_queue.popitem() - self.current_processing.add(request_id) - deferred.callback(None) - except KeyError: - pass - - -class ContextManagerFunction(object): - def __init__(self, on_enter, on_exit): - self.on_enter = on_enter - self.on_exit = on_exit - - def __enter__(self): - if self.on_enter: - return self.on_enter() - - def __exit__(self, exc_type, exc_val, exc_tb): - if self.on_exit: - return self.on_exit(exc_type, exc_val, exc_tb) - - class TransportLayerServer(object): """Handles incoming federation HTTP requests""" diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py new file mode 100644 index 0000000000..259d5f6f88 --- /dev/null +++ b/synapse/util/ratelimitutils.py @@ -0,0 +1,226 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from synapse.api.errors import LimitExceededError + +from synapse.util.async import sleep + +import collections +import logging + + +logger = logging.getLogger(__name__) + + +class FederationRateLimiter(object): + def __init__(self, clock, window_size, sleep_limit, sleep_msec, + reject_limit, concurrent_requests): + """ + Args: + clock (Clock) + window_size (int): The window size in milliseconds. + sleep_limit (int): The number of requests received in the last + `window_size` milliseconds before we artificially start + delaying processing of requests. + sleep_msec (int): The number of milliseconds to delay processing + of incoming requests by. + reject_limit (int): The maximum number of requests that are can be + queued for processing before we start rejecting requests with + a 429 Too Many Requests response. + concurrent_requests (int): The number of concurrent requests to + process. + """ + self.clock = clock + + self.window_size = window_size + self.sleep_limit = sleep_limit + self.sleep_msec = sleep_msec + self.reject_limit = reject_limit + self.concurrent_requests = concurrent_requests + + self.ratelimiters = {} + + def ratelimit(self, host): + """Used to ratelimit an incoming request from given host + + Example usage: + + with rate_limiter.ratelimit(origin) as wait_deferred: + yield wait_deferred + # Handle request ... + + Args: + host (str): Origin of incoming request. + + Returns: + _PerHostRatelimiter + """ + return self.ratelimiters.setdefault( + host, + _PerHostRatelimiter( + clock=self.clock, + window_size=self.window_size, + sleep_limit=self.sleep_limit, + sleep_msec=self.sleep_msec, + reject_limit=self.reject_limit, + concurrent_requests=self.concurrent_requests, + ) + ).ratelimit() + + +class _PerHostRatelimiter(object): + def __init__(self, clock, window_size, sleep_limit, sleep_msec, + reject_limit, concurrent_requests): + self.clock = clock + + self.window_size = window_size + self.sleep_limit = sleep_limit + self.sleep_msec = sleep_msec + self.reject_limit = reject_limit + self.concurrent_requests = concurrent_requests + + self.sleeping_requests = set() + self.ready_request_queue = collections.OrderedDict() + self.current_processing = set() + self.request_times = [] + + def is_empty(self): + time_now = self.clock.time_msec() + self.request_times[:] = [ + r for r in self.request_times + if time_now - r < self.window_size + ] + + return not ( + self.ready_request_queue + or self.sleeping_requests + or self.current_processing + or self.request_times + ) + + def ratelimit(self): + request_id = object() + + def on_enter(): + return self._on_enter(request_id) + + def on_exit(exc_type, exc_val, exc_tb): + return self._on_exit(request_id) + + return ContextManagerFunction(on_enter, on_exit) + + def _on_enter(self, request_id): + time_now = self.clock.time_msec() + self.request_times[:] = [ + r for r in self.request_times + if time_now - r < self.window_size + ] + + queue_size = len(self.ready_request_queue) + len(self.sleeping_requests) + if queue_size > self.reject_limit: + raise LimitExceededError( + retry_after_ms=int( + self.window_size / self.sleep_limit + ), + ) + + self.request_times.append(time_now) + + def queue_request(): + if len(self.current_processing) > self.concurrent_requests: + logger.debug("Ratelimit [%s]: Queue req", id(request_id)) + queue_defer = defer.Deferred() + self.ready_request_queue[request_id] = queue_defer + return queue_defer + else: + return defer.succeed(None) + + logger.debug( + "Ratelimit [%s]: len(self.request_times)=%d", + id(request_id), len(self.request_times), + ) + + if len(self.request_times) > self.sleep_limit: + logger.debug( + "Ratelimit [%s]: sleeping req", + id(request_id), + ) + ret_defer = sleep(self.sleep_msec/1000.0) + + self.sleeping_requests.add(request_id) + + def on_wait_finished(_): + logger.debug( + "Ratelimit [%s]: Finished sleeping", + id(request_id), + ) + self.sleeping_requests.discard(request_id) + queue_defer = queue_request() + return queue_defer + + ret_defer.addBoth(on_wait_finished) + else: + ret_defer = queue_request() + + def on_start(r): + logger.debug( + "Ratelimit [%s]: Processing req", + id(request_id), + ) + self.current_processing.add(request_id) + return r + + def on_err(r): + self.current_processing.discard(request_id) + return r + + def on_both(r): + # Ensure that we've properly cleaned up. + self.sleeping_requests.discard(request_id) + self.ready_request_queue.pop(request_id, None) + return r + + ret_defer.addCallbacks(on_start, on_err) + ret_defer.addBoth(on_both) + return ret_defer + + def _on_exit(self, request_id): + logger.debug( + "Ratelimit [%s]: Processed req", + id(request_id), + ) + self.current_processing.discard(request_id) + try: + request_id, deferred = self.ready_request_queue.popitem() + self.current_processing.add(request_id) + deferred.callback(None) + except KeyError: + pass + + +class ContextManagerFunction(object): + def __init__(self, on_enter, on_exit): + self.on_enter = on_enter + self.on_exit = on_exit + + def __enter__(self): + if self.on_enter: + return self.on_enter() + + def __exit__(self, exc_type, exc_val, exc_tb): + if self.on_exit: + return self.on_exit(exc_type, exc_val, exc_tb) -- cgit 1.5.1 From 9d9b230501915c326136567349b0995623c48a21 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 2 Mar 2015 11:33:45 +0000 Subject: Make the federation server ratelimiting configurable. --- synapse/config/ratelimiting.py | 36 ++++++++++++++++++++++++++++++++ synapse/federation/transport/__init__.py | 10 ++++----- 2 files changed, 41 insertions(+), 5 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/config/ratelimiting.py b/synapse/config/ratelimiting.py index 17c7e64ce7..862c07ef8c 100644 --- a/synapse/config/ratelimiting.py +++ b/synapse/config/ratelimiting.py @@ -22,6 +22,12 @@ class RatelimitConfig(Config): self.rc_messages_per_second = args.rc_messages_per_second self.rc_message_burst_count = args.rc_message_burst_count + self.federation_rc_window_size = args.federation_rc_window_size + self.federation_rc_sleep_limit = args.federation_rc_sleep_limit + self.federation_rc_sleep_delay = args.federation_rc_sleep_delay + self.federation_rc_reject_limit = args.federation_rc_reject_limit + self.federation_rc_concurrent = args.federation_rc_concurrent + @classmethod def add_arguments(cls, parser): super(RatelimitConfig, cls).add_arguments(parser) @@ -34,3 +40,33 @@ class RatelimitConfig(Config): "--rc-message-burst-count", type=float, default=10, help="number of message a client can send before being throttled" ) + + rc_group.add_argument( + "--federation-rc-window-size", type=int, default=10000, + help="The federation window size in milliseconds", + ) + + rc_group.add_argument( + "--federation-rc-sleep-limit", type=int, default=10, + help="The number of federation requests from a single server" + " in a window before the server will delay processing the" + " request.", + ) + + rc_group.add_argument( + "--federation-rc-sleep-delay", type=int, default=500, + help="The duration in milliseconds to delay processing events from" + " remote servers by if they go over the sleep limit.", + ) + + rc_group.add_argument( + "--federation-rc-reject-limit", type=int, default=50, + help="The maximum number of concurrent federation requests allowed" + " from a single server", + ) + + rc_group.add_argument( + "--federation-rc-concurrent", type=int, default=3, + help="The number of federation requests to concurrently process" + " from a single server", + ) diff --git a/synapse/federation/transport/__init__.py b/synapse/federation/transport/__init__.py index f0283b5105..2a671b9aec 100644 --- a/synapse/federation/transport/__init__.py +++ b/synapse/federation/transport/__init__.py @@ -66,9 +66,9 @@ class TransportLayer(TransportLayerServer, TransportLayerClient): self.ratelimiter = FederationRateLimiter( self.clock, - window_size=10000, - sleep_limit=10, - sleep_msec=500, - reject_limit=50, - concurrent_requests=3, + window_size=homeserver.config.federation_rc_window_size, + sleep_limit=homeserver.config.federation_rc_sleep_limit, + sleep_msec=homeserver.config.federation_rc_sleep_delay, + reject_limit=homeserver.config.federation_rc_reject_limit, + concurrent_requests=homeserver.config.federation_rc_concurrent, ) -- cgit 1.5.1 From 23d9bd1d745a037202bb9a134cdb848eb65a01e9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 2 Mar 2015 11:39:40 +0000 Subject: Process transactions serially. Since the events received in a transaction are ordered, later events might depend on earlier events and so we shouldn't blindly process them in parellel. --- synapse/federation/federation_server.py | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 22b9663831..7ee37fb34d 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -112,17 +112,23 @@ class FederationServer(FederationBase): logger.debug("[%s] Transaction is new", transaction.transaction_id) with PreserveLoggingContext(): - dl = [] + results = [] + for pdu in pdu_list: d = self._handle_new_pdu(transaction.origin, pdu) def handle_failure(failure): failure.trap(FederationError) self.send_failure(failure.value, transaction.origin) + return failure d.addErrback(handle_failure) - dl.append(d) + try: + yield d + results.append({}) + except Exception as e: + results.append({"error": str(e)}) if hasattr(transaction, "edus"): for edu in [Edu(**x) for x in transaction.edus]: @@ -135,21 +141,11 @@ class FederationServer(FederationBase): for failure in getattr(transaction, "pdu_failures", []): logger.info("Got failure %r", failure) - results = yield defer.DeferredList(dl, consumeErrors=True) - - ret = [] - for r in results: - if r[0]: - ret.append({}) - else: - logger.exception(r[1]) - ret.append({"error": str(r[1].value)}) - - logger.debug("Returning: %s", str(ret)) + logger.debug("Returning: %s", str(results)) response = { "pdus": dict(zip( - (p.event_id for p in pdu_list), ret + (p.event_id for p in pdu_list), results )), } -- cgit 1.5.1 From 29481690c5b296a1c8aee3068d32ef083ef227f3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 2 Mar 2015 11:50:43 +0000 Subject: If we're yielding don't add errback --- synapse/federation/federation_server.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 7ee37fb34d..bc9bac809a 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -117,16 +117,12 @@ class FederationServer(FederationBase): for pdu in pdu_list: d = self._handle_new_pdu(transaction.origin, pdu) - def handle_failure(failure): - failure.trap(FederationError) - self.send_failure(failure.value, transaction.origin) - return failure - - d.addErrback(handle_failure) - try: yield d results.append({}) + except FederationError as e: + self.send_failure(e, transaction.origin) + results.append({"error": str(e)}) except Exception as e: results.append({"error": str(e)}) -- cgit 1.5.1 From ae702d161ab6d518caa91759ec6bdec01b11954f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 5 Mar 2015 16:08:02 +0000 Subject: Handle if get_missing_pdu returns 400 or not all events. --- synapse/federation/federation_client.py | 109 ++++++++++++++++++++++++++++---- synapse/federation/federation_server.py | 6 +- 2 files changed, 100 insertions(+), 15 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index ca89a0787c..b87c8a3bbb 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -19,14 +19,18 @@ from twisted.internet import defer from .federation_base import FederationBase from .units import Edu -from synapse.api.errors import CodeMessageException, SynapseError +from synapse.api.errors import ( + CodeMessageException, HttpResponseException, SynapseError, +) from synapse.util.expiringcache import ExpiringCache from synapse.util.logutils import log_function from synapse.events import FrozenEvent from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination +import itertools import logging +import random logger = logging.getLogger(__name__) @@ -440,21 +444,100 @@ class FederationClient(FederationBase): defer.returnValue(ret) @defer.inlineCallbacks - def get_missing_events(self, destination, room_id, earliest_events, + def get_missing_events(self, destination, room_id, earliest_events_ids, latest_events, limit, min_depth): - content = yield self.transport_layer.get_missing_events( - destination, room_id, earliest_events, latest_events, limit, - min_depth, - ) + try: + content = yield self.transport_layer.get_missing_events( + destination=destination, + room_id=room_id, + earliest_events=earliest_events_ids, + latest_events=[e.event_id for e in latest_events], + limit=limit, + min_depth=min_depth, + ) + + events = [ + self.event_from_pdu_json(e) + for e in content.get("events", []) + ] + + signed_events = yield self._check_sigs_and_hash_and_fetch( + destination, events, outlier=True + ) + + have_gotten_all_from_destination = True + except HttpResponseException as e: + if not e.code == 400: + raise - events = [ - self.event_from_pdu_json(e) - for e in content.get("events", []) - ] + signed_events = [] + have_gotten_all_from_destination = False - signed_events = yield self._check_sigs_and_hash_and_fetch( - destination, events, outlier=True - ) + if len(signed_events) >= limit: + defer.returnValue(signed_events) + + servers = yield self.store.get_joined_hosts_for_room(room_id) + + servers = set(servers) + servers.discard(self.server_name) + + failed_to_fetch = set() + + while len(signed_events) < limit: + # Are we missing any? + + seen_events = set(earliest_events_ids) + seen_events.update(e.event_id for e in signed_events) + + missing_events = {} + for e in itertools.chain(latest_events, signed_events): + missing_events.update({ + e_id: e.depth for e_id, _ in e.prev_events + if e_id not in seen_events and e_id not in failed_to_fetch + }) + + if not missing_events: + break + + have_seen = yield self.store.have_events(missing_events) + + for k in have_seen: + missing_events.pop(k, None) + + if not missing_events: + break + + # Okay, we haven't gotten everything yet. Lets get them. + ordered_missing = sorted(missing_events.items(), key=lambda x: x[0]) + + if have_gotten_all_from_destination: + servers.discard(destination) + + def random_server_list(): + srvs = list(servers) + random.shuffle(srvs) + return srvs + + deferreds = [ + self.get_pdu( + destinations=random_server_list(), + event_id=e_id, + ) + for e_id, depth in ordered_missing[:limit - len(signed_events)] + ] + + got_a_new_event = False + + res = yield defer.DeferredList(deferreds, consumeErrors=True) + for (result, val), (e_id, _) in zip(res, ordered_missing): + if result: + signed_events.append(val) + got_a_new_event = True + else: + failed_to_fetch.add(e_id) + + if not got_a_new_event: + break defer.returnValue(signed_events) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 4264d857be..dd4ca74ba6 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -413,12 +413,14 @@ class FederationServer(FederationBase): missing_events = yield self.get_missing_events( origin, pdu.room_id, - earliest_events=list(latest), - latest_events=[pdu.event_id], + earliest_events_ids=list(latest), + latest_events=[pdu], limit=10, min_depth=min_depth, ) + missing_events.sort(key=lambda x: x.depth) + for e in missing_events: yield self._handle_new_pdu( origin, -- cgit 1.5.1 From 6dfd8c73fcdd727cd6589513e2b8059f779623ae Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 5 Mar 2015 16:31:13 +0000 Subject: Docs. --- synapse/federation/federation_client.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index b87c8a3bbb..11e2753fed 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -446,6 +446,20 @@ class FederationClient(FederationBase): @defer.inlineCallbacks def get_missing_events(self, destination, room_id, earliest_events_ids, latest_events, limit, min_depth): + """Tries to fetch events we are missing. This is called when we receive + an event without having received all of its ancestors. + + Args: + destination (str) + room_id (str) + earliest_events_ids (list): List of event ids. Effectively the + events we expected to receive, but haven't. `get_missing_events` + should only return events that didn't happen before these. + latest_events (list): List of events we have received that we don't + have all previous events for. + limit (int): Maximum number of events to return. + min_depth (int): Minimum depth of events tor return. + """ try: content = yield self.transport_layer.get_missing_events( destination=destination, @@ -470,6 +484,8 @@ class FederationClient(FederationBase): if not e.code == 400: raise + # We are probably hitting an old server that doesn't support + # get_missing_events signed_events = [] have_gotten_all_from_destination = False -- cgit 1.5.1 From 39aa968a764816632a05ac0e3cf9c865b7a3a68d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 5 Mar 2015 16:31:32 +0000 Subject: Respect min_depth argument --- synapse/federation/federation_client.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 11e2753fed..75b6a7b46a 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -507,10 +507,12 @@ class FederationClient(FederationBase): missing_events = {} for e in itertools.chain(latest_events, signed_events): - missing_events.update({ - e_id: e.depth for e_id, _ in e.prev_events - if e_id not in seen_events and e_id not in failed_to_fetch - }) + if e.depth > min_depth: + missing_events.update({ + e_id: e.depth for e_id, _ in e.prev_events + if e_id not in seen_events + and e_id not in failed_to_fetch + }) if not missing_events: break -- cgit 1.5.1 From 96fee64421a534787e9316a61ab407b43c782dc7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 5 Mar 2015 16:31:47 +0000 Subject: Remove unecessary check --- synapse/federation/federation_client.py | 6 ------ 1 file changed, 6 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 75b6a7b46a..f131941f45 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -544,19 +544,13 @@ class FederationClient(FederationBase): for e_id, depth in ordered_missing[:limit - len(signed_events)] ] - got_a_new_event = False - res = yield defer.DeferredList(deferreds, consumeErrors=True) for (result, val), (e_id, _) in zip(res, ordered_missing): if result: signed_events.append(val) - got_a_new_event = True else: failed_to_fetch.add(e_id) - if not got_a_new_event: - break - defer.returnValue(signed_events) def event_from_pdu_json(self, pdu_json, outlier=False): -- cgit 1.5.1 From 9708f49abfb5fa48c1190364093ab4ce5c4e6f23 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 5 Mar 2015 16:35:16 +0000 Subject: Docs --- synapse/federation/federation_server.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index dd4ca74ba6..9c7dcdba96 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -419,6 +419,8 @@ class FederationServer(FederationBase): min_depth=min_depth, ) + # We want to sort these by depth so we process them and + # tell clients about them in order. missing_events.sort(key=lambda x: x.depth) for e in missing_events: -- cgit 1.5.1