diff options
Diffstat (limited to 'synapse/federation')
-rw-r--r-- | synapse/federation/federation_client.py | 79 | ||||
-rw-r--r-- | synapse/federation/federation_server.py | 36 | ||||
-rw-r--r-- | synapse/federation/transaction_queue.py | 216 |
3 files changed, 212 insertions, 119 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 70c9a6f46b..cd3c962d50 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -19,10 +19,13 @@ from twisted.internet import defer from .federation_base import FederationBase from .units import Edu -from synapse.api.errors import CodeMessageException +from synapse.api.errors import CodeMessageException, SynapseError +from synapse.util.expiringcache import ExpiringCache from synapse.util.logutils import log_function from synapse.events import FrozenEvent +from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination + import logging @@ -30,6 +33,20 @@ logger = logging.getLogger(__name__) class FederationClient(FederationBase): + def __init__(self): + self._get_pdu_cache = None + + def start_get_pdu_cache(self): + self._get_pdu_cache = ExpiringCache( + cache_name="get_pdu_cache", + clock=self._clock, + max_len=1000, + expiry_ms=120*1000, + reset_expiry_on_get=False, + ) + + self._get_pdu_cache.start() + @log_function def send_pdu(self, pdu, destinations): """Informs the replication layer about a new PDU generated within the @@ -160,29 +177,58 @@ class FederationClient(FederationBase): # TODO: Rate limit the number of times we try and get the same event. + if self._get_pdu_cache: + e = self._get_pdu_cache.get(event_id) + if e: + defer.returnValue(e) + pdu = None for destination in destinations: try: - transaction_data = yield self.transport_layer.get_event( - destination, event_id + limiter = yield get_retry_limiter( + destination, + self._clock, + self.store, ) - logger.debug("transaction_data %r", transaction_data) + with limiter: + transaction_data = yield self.transport_layer.get_event( + destination, event_id + ) - pdu_list = [ - self.event_from_pdu_json(p, outlier=outlier) - for p in transaction_data["pdus"] - ] + logger.debug("transaction_data %r", transaction_data) - if pdu_list: - pdu = pdu_list[0] + pdu_list = [ + self.event_from_pdu_json(p, outlier=outlier) + for p in transaction_data["pdus"] + ] - # Check signatures are correct. - pdu = yield self._check_sigs_and_hash(pdu) + if pdu_list: + pdu = pdu_list[0] - break - except CodeMessageException: - raise + # Check signatures are correct. + pdu = yield self._check_sigs_and_hash(pdu) + + break + + except SynapseError: + logger.info( + "Failed to get PDU %s from %s because %s", + event_id, destination, e, + ) + continue + except CodeMessageException as e: + if 400 <= e.code < 500: + raise + + logger.info( + "Failed to get PDU %s from %s because %s", + event_id, destination, e, + ) + continue + except NotRetryingDestination as e: + logger.info(e.message) + continue except Exception as e: logger.info( "Failed to get PDU %s from %s because %s", @@ -190,6 +236,9 @@ class FederationClient(FederationBase): ) continue + if self._get_pdu_cache is not None: + self._get_pdu_cache[event_id] = pdu + defer.returnValue(pdu) @defer.inlineCallbacks diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 9f5c98694c..22b9663831 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -114,7 +114,15 @@ class FederationServer(FederationBase): with PreserveLoggingContext(): dl = [] for pdu in pdu_list: - dl.append(self._handle_new_pdu(transaction.origin, pdu)) + d = self._handle_new_pdu(transaction.origin, pdu) + + def handle_failure(failure): + failure.trap(FederationError) + self.send_failure(failure.value, transaction.origin) + + d.addErrback(handle_failure) + + dl.append(d) if hasattr(transaction, "edus"): for edu in [Edu(**x) for x in transaction.edus]: @@ -124,7 +132,10 @@ class FederationServer(FederationBase): edu.content ) - results = yield defer.DeferredList(dl) + for failure in getattr(transaction, "pdu_failures", []): + logger.info("Got failure %r", failure) + + results = yield defer.DeferredList(dl, consumeErrors=True) ret = [] for r in results: @@ -132,10 +143,16 @@ class FederationServer(FederationBase): ret.append({}) else: logger.exception(r[1]) - ret.append({"error": str(r[1])}) + ret.append({"error": str(r[1].value)}) logger.debug("Returning: %s", str(ret)) + response = { + "pdus": dict(zip( + (p.event_id for p in pdu_list), ret + )), + } + yield self.transaction_actions.set_response( transaction, 200, response @@ -331,7 +348,6 @@ class FederationServer(FederationBase): ) if already_seen: logger.debug("Already seen pdu %s", pdu.event_id) - defer.returnValue({}) return # Check signature. @@ -367,7 +383,13 @@ class FederationServer(FederationBase): pdu.room_id, min_depth ) - if min_depth and pdu.depth > min_depth and max_recursion > 0: + if min_depth and pdu.depth < min_depth: + # This is so that we don't notify the user about this + # message, to work around the fact that some events will + # reference really really old events we really don't want to + # send to the clients. + pdu.internal_metadata.outlier = True + elif min_depth and pdu.depth > min_depth and max_recursion > 0: for event_id, hashes in pdu.prev_events: if event_id not in have_seen: logger.debug( @@ -418,7 +440,7 @@ class FederationServer(FederationBase): except: logger.warn("Failed to get state for event: %s", pdu.event_id) - ret = yield self.handler.on_receive_pdu( + yield self.handler.on_receive_pdu( origin, pdu, backfilled=False, @@ -426,8 +448,6 @@ class FederationServer(FederationBase): auth_chain=auth_chain, ) - defer.returnValue(ret) - def __str__(self): return "<ReplicationLayer(%s)>" % self.server_name diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 731019ad9f..7d30c924d1 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -22,6 +22,9 @@ from .units import Transaction from synapse.api.errors import HttpResponseException from synapse.util.logutils import log_function from synapse.util.logcontext import PreserveLoggingContext +from synapse.util.retryutils import ( + get_retry_limiter, NotRetryingDestination, +) import logging @@ -63,6 +66,26 @@ class TransactionQueue(object): # HACK to get unique tx id self._next_txn_id = int(self._clock.time_msec()) + def can_send_to(self, destination): + """Can we send messages to the given server? + + We can't send messages to ourselves. If we are running on localhost + then we can only federation with other servers running on localhost. + Otherwise we only federate with servers on a public domain. + + Args: + destination(str): The server we are possibly trying to send to. + Returns: + bool: True if we can send to the server. + """ + + if destination == self.server_name: + return False + if self.server_name.startswith("localhost"): + return destination.startswith("localhost") + else: + return not destination.startswith("localhost") + @defer.inlineCallbacks @log_function def enqueue_pdu(self, pdu, destinations, order): @@ -71,8 +94,9 @@ class TransactionQueue(object): # table and we'll get back to it later. destinations = set(destinations) - destinations.discard(self.server_name) - destinations.discard("localhost") + destinations = set( + dest for dest in destinations if self.can_send_to(dest) + ) logger.debug("Sending to: %s", str(destinations)) @@ -87,24 +111,27 @@ class TransactionQueue(object): (pdu, deferred, order) ) - def eb(failure): + def chain(failure): if not deferred.called: deferred.errback(failure) - else: - logger.warn("Failed to send pdu", failure) + + def log_failure(failure): + logger.warn("Failed to send pdu", failure.value) + + deferred.addErrback(log_failure) with PreserveLoggingContext(): - self._attempt_new_transaction(destination).addErrback(eb) + self._attempt_new_transaction(destination).addErrback(chain) deferreds.append(deferred) - yield defer.DeferredList(deferreds) + yield defer.DeferredList(deferreds, consumeErrors=True) # NO inlineCallbacks def enqueue_edu(self, edu): destination = edu.destination - if destination == self.server_name: + if not self.can_send_to(destination): return deferred = defer.Deferred() @@ -112,51 +139,53 @@ class TransactionQueue(object): (edu, deferred) ) - def eb(failure): + def chain(failure): if not deferred.called: deferred.errback(failure) - else: - logger.warn("Failed to send edu", failure) + + def log_failure(failure): + logger.warn("Failed to send pdu", failure.value) + + deferred.addErrback(log_failure) with PreserveLoggingContext(): - self._attempt_new_transaction(destination).addErrback(eb) + self._attempt_new_transaction(destination).addErrback(chain) return deferred @defer.inlineCallbacks def enqueue_failure(self, failure, destination): + if destination == self.server_name or destination == "localhost": + return + deferred = defer.Deferred() + if not self.can_send_to(destination): + return + self.pending_failures_by_dest.setdefault( destination, [] ).append( (failure, deferred) ) + def chain(f): + if not deferred.called: + deferred.errback(f) + + def log_failure(f): + logger.warn("Failed to send pdu", f.value) + + deferred.addErrback(log_failure) + + with PreserveLoggingContext(): + self._attempt_new_transaction(destination).addErrback(chain) + yield deferred @defer.inlineCallbacks @log_function def _attempt_new_transaction(self, destination): - - (retry_last_ts, retry_interval) = (0, 0) - retry_timings = yield self.store.get_destination_retry_timings( - destination - ) - if retry_timings: - (retry_last_ts, retry_interval) = ( - retry_timings.retry_last_ts, retry_timings.retry_interval - ) - if retry_last_ts + retry_interval > int(self._clock.time_msec()): - logger.info( - "TX [%s] not ready for retry yet - " - "dropping transaction for now", - destination, - ) - return - else: - logger.info("TX [%s] is ready for retry", destination) - if destination in self.pending_transactions: # XXX: pending_transactions can get stuck on by a never-ending # request at which point pending_pdus_by_dest just keeps growing. @@ -183,15 +212,6 @@ class TransactionQueue(object): logger.info("TX [%s] Nothing to send", destination) return - logger.debug( - "TX [%s] Attempting new transaction" - " (pdus: %d, edus: %d, failures: %d)", - destination, - len(pending_pdus), - len(pending_edus), - len(pending_failures) - ) - # Sort based on the order field pending_pdus.sort(key=lambda t: t[2]) @@ -204,6 +224,21 @@ class TransactionQueue(object): ] try: + limiter = yield get_retry_limiter( + destination, + self._clock, + self.store, + ) + + logger.debug( + "TX [%s] Attempting new transaction" + " (pdus: %d, edus: %d, failures: %d)", + destination, + len(pending_pdus), + len(pending_edus), + len(pending_failures) + ) + self.pending_transactions[destination] = 1 logger.debug("TX [%s] Persisting transaction...", destination) @@ -229,52 +264,56 @@ class TransactionQueue(object): transaction.transaction_id, ) - # Actually send the transaction - - # FIXME (erikj): This is a bit of a hack to make the Pdu age - # keys work - def json_data_cb(): - data = transaction.get_dict() - now = int(self._clock.time_msec()) - if "pdus" in data: - for p in data["pdus"]: - if "age_ts" in p: - unsigned = p.setdefault("unsigned", {}) - unsigned["age"] = now - int(p["age_ts"]) - del p["age_ts"] - return data - - try: - response = yield self.transport_layer.send_transaction( - transaction, json_data_cb - ) - code = 200 - except HttpResponseException as e: - code = e.code - response = e.response - - logger.info("TX [%s] got %d response", destination, code) - - logger.debug("TX [%s] Sent transaction", destination) - logger.debug("TX [%s] Marking as delivered...", destination) + with limiter: + # Actually send the transaction + + # FIXME (erikj): This is a bit of a hack to make the Pdu age + # keys work + def json_data_cb(): + data = transaction.get_dict() + now = int(self._clock.time_msec()) + if "pdus" in data: + for p in data["pdus"]: + if "age_ts" in p: + unsigned = p.setdefault("unsigned", {}) + unsigned["age"] = now - int(p["age_ts"]) + del p["age_ts"] + return data + + try: + response = yield self.transport_layer.send_transaction( + transaction, json_data_cb + ) + code = 200 + + if response: + for e_id, r in getattr(response, "pdus", {}).items(): + if "error" in r: + logger.warn( + "Transaction returned error for %s: %s", + e_id, r, + ) + except HttpResponseException as e: + code = e.code + response = e.response + + logger.info("TX [%s] got %d response", destination, code) + + logger.debug("TX [%s] Sent transaction", destination) + logger.debug("TX [%s] Marking as delivered...", destination) yield self.transaction_actions.delivered( transaction, code, response ) logger.debug("TX [%s] Marked as delivered", destination) + logger.debug("TX [%s] Yielding to callbacks...", destination) for deferred in deferreds: if code == 200: - if retry_last_ts: - # this host is alive! reset retry schedule - yield self.store.set_destination_retry_timings( - destination, 0, 0 - ) deferred.callback(None) else: - self.set_retrying(destination, retry_interval) deferred.errback(RuntimeError("Got status %d" % code)) # Ensures we don't continue until all callbacks on that @@ -285,6 +324,12 @@ class TransactionQueue(object): pass logger.debug("TX [%s] Yielded to callbacks", destination) + except NotRetryingDestination: + logger.info( + "TX [%s] not ready for retry yet - " + "dropping transaction for now", + destination, + ) except RuntimeError as e: # We capture this here as there as nothing actually listens # for this finishing functions deferred. @@ -296,14 +341,12 @@ class TransactionQueue(object): except Exception as e: # We capture this here as there as nothing actually listens # for this finishing functions deferred. - logger.exception( + logger.warn( "TX [%s] Problem in _attempt_transaction: %s", destination, e, ) - self.set_retrying(destination, retry_interval) - for deferred in deferreds: if not deferred.called: deferred.errback(e) @@ -314,22 +357,3 @@ class TransactionQueue(object): # Check to see if there is anything else to send. self._attempt_new_transaction(destination) - - @defer.inlineCallbacks - def set_retrying(self, destination, retry_interval): - # track that this destination is having problems and we should - # give it a chance to recover before trying it again - - if retry_interval: - retry_interval *= 2 - # plateau at hourly retries for now - if retry_interval >= 60 * 60 * 1000: - retry_interval = 60 * 60 * 1000 - else: - retry_interval = 2000 # try again at first after 2 seconds - - yield self.store.set_destination_retry_timings( - destination, - int(self._clock.time_msec()), - retry_interval - ) |