diff options
author | Matthew Hodgson <matthew@matrix.org> | 2014-12-07 02:26:07 +0000 |
---|---|---|
committer | Matthew Hodgson <matthew@matrix.org> | 2014-12-07 02:26:07 +0000 |
commit | aed62a35832a3ec1c7425ecc99cab06a781263ba (patch) | |
tree | 6dd8bdeb4f79084a084f24f6747c1e1d00fcf973 /synapse/federation | |
parent | Pull in latest matrix-angular_sdk (diff) | |
download | synapse-aed62a35832a3ec1c7425ecc99cab06a781263ba.tar.xz |
track replication destination health, and perform exponential back-off when sending transactions. does *not* yet retry transactions, but drops them on the floor if waiting for a server to recover.
Diffstat (limited to 'synapse/federation')
-rw-r--r-- | synapse/federation/replication.py | 43 | ||||
-rw-r--r-- | synapse/federation/transport.py | 2 |
2 files changed, 37 insertions, 8 deletions
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 01f87fe423..f9c05b5ea3 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -723,6 +723,8 @@ class _TransactionQueue(object): deferreds = [] for destination in destinations: + # XXX: why don't we specify an errback for this deferred + # like we do for EDUs? --matthew deferred = defer.Deferred() self.pending_pdus_by_dest.setdefault(destination, []).append( (pdu, deferred, order) @@ -738,6 +740,9 @@ class _TransactionQueue(object): # NO inlineCallbacks def enqueue_edu(self, edu): destination = edu.destination + + if destination == self.server_name: + return deferred = defer.Deferred() self.pending_edus_by_dest.setdefault(destination, []).append( @@ -766,14 +771,23 @@ class _TransactionQueue(object): ) yield deferred - + @defer.inlineCallbacks @log_function def _attempt_new_transaction(self, destination): + + (retry_last_ts, retry_interval) = self.store.get_destination_retry_timings(destination) + if retry_last_ts + retry_interval > int(self._clock.time_msec()): + logger.info("TX [%s] not ready for retry yet - dropping transaction for now") + return + if destination in self.pending_transactions: + # XXX: pending_transactions can get stuck on by a never-ending request + # at which point pending_pdus_by_dest just keeps growing. + # we need application-layer timeouts of some flavour of these requests return - # list of (pending_pdu, deferred, order) + # list of (pending_pdu, deferred, order) pending_pdus = self.pending_pdus_by_dest.pop(destination, []) pending_edus = self.pending_edus_by_dest.pop(destination, []) pending_failures = self.pending_failures_by_dest.pop(destination, []) @@ -781,7 +795,8 @@ class _TransactionQueue(object): if not pending_pdus and not pending_edus and not pending_failures: return - logger.debug("TX [%s] Attempting new transaction", destination) + logger.debug("TX [%s] Attempting new transaction (pdus: %d, edus: %d, failures: %d)", + destination, len(pending_pdus), len(pending_edus), len(pending_failures)) # Sort based on the order field pending_pdus.sort(key=lambda t: t[2]) @@ -814,7 +829,7 @@ class _TransactionQueue(object): yield self.transaction_actions.prepare_to_send(transaction) logger.debug("TX [%s] Persisted transaction", destination) - logger.debug("TX [%s] Sending transaction...", destination) + logger.info("TX [%s] Sending transaction [%s]", destination, transaction.transaction_id) # Actually send the transaction @@ -835,6 +850,8 @@ class _TransactionQueue(object): transaction, json_data_cb ) + logger.info("TX [%s] got %d response", destination, code) + logger.debug("TX [%s] Sent transaction", destination) logger.debug("TX [%s] Marking as delivered...", destination) @@ -849,6 +866,7 @@ class _TransactionQueue(object): if code == 200: deferred.callback(None) else: + start_retrying(destination, retry_interval) deferred.errback(RuntimeError("Got status %d" % code)) # Ensures we don't continue until all callbacks on that @@ -861,12 +879,12 @@ class _TransactionQueue(object): logger.debug("TX [%s] Yielded to callbacks", destination) except Exception as e: - logger.error("TX Problem in _attempt_transaction") - # We capture this here as there as nothing actually listens # for this finishing functions deferred. - logger.exception(e) + logger.exception("TX [%s] Problem in _attempt_transaction: %s", destination, e) + start_retrying(destination, retry_interval) + for deferred in deferreds: if not deferred.called: deferred.errback(e) @@ -877,3 +895,14 @@ class _TransactionQueue(object): # Check to see if there is anything else to send. self._attempt_new_transaction(destination) + +def start_retrying(destination, retry_interval): + # track that this destination is having problems and we should + # give it a chance to recover before trying it again + if retry_interval: + retry_interval *= 2 + else: + retry_interval = 2 # try again at first after 2 seconds + self.store.set_destination_retry_timings(destination, + int(self._clock.time_msec()), retry_interval) + \ No newline at end of file diff --git a/synapse/federation/transport.py b/synapse/federation/transport.py index 8d86152085..0f11c6d491 100644 --- a/synapse/federation/transport.py +++ b/synapse/federation/transport.py @@ -155,7 +155,7 @@ class TransportLayer(object): @defer.inlineCallbacks @log_function def send_transaction(self, transaction, json_data_callback=None): - """ Sends the given Transaction to it's destination + """ Sends the given Transaction to its destination Args: transaction (Transaction) |