diff options
Diffstat (limited to '')
-rw-r--r-- | synapse/federation/federation_client.py | 36 | ||||
-rw-r--r-- | synapse/federation/transaction_queue.py | 161 |
2 files changed, 90 insertions, 107 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 70c9a6f46b..c5b0274c2f 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -23,6 +23,8 @@ from synapse.api.errors import CodeMessageException from synapse.util.logutils import log_function from synapse.events import FrozenEvent +from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination + import logging @@ -163,24 +165,34 @@ class FederationClient(FederationBase): pdu = None for destination in destinations: try: - transaction_data = yield self.transport_layer.get_event( - destination, event_id + limiter = yield get_retry_limiter( + destination, + self._clock, + self.store, ) - logger.debug("transaction_data %r", transaction_data) + with limiter: + transaction_data = yield self.transport_layer.get_event( + destination, event_id + ) - pdu_list = [ - self.event_from_pdu_json(p, outlier=outlier) - for p in transaction_data["pdus"] - ] + logger.debug("transaction_data %r", transaction_data) - if pdu_list: - pdu = pdu_list[0] + pdu_list = [ + self.event_from_pdu_json(p, outlier=outlier) + for p in transaction_data["pdus"] + ] - # Check signatures are correct. - pdu = yield self._check_sigs_and_hash(pdu) + if pdu_list: + pdu = pdu_list[0] - break + # Check signatures are correct. + pdu = yield self._check_sigs_and_hash(pdu) + + break + except NotRetryingDestination as e: + logger.info(e.message) + continue except CodeMessageException: raise except Exception as e: diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index bb20f2ebab..7d02afe163 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -22,6 +22,9 @@ from .units import Transaction from synapse.api.errors import HttpResponseException from synapse.util.logutils import log_function from synapse.util.logcontext import PreserveLoggingContext +from synapse.util.retryutils import ( + get_retry_limiter, NotRetryingDestination, +) import logging @@ -138,25 +141,6 @@ class TransactionQueue(object): @defer.inlineCallbacks @log_function def _attempt_new_transaction(self, destination): - - (retry_last_ts, retry_interval) = (0, 0) - retry_timings = yield self.store.get_destination_retry_timings( - destination - ) - if retry_timings: - (retry_last_ts, retry_interval) = ( - retry_timings.retry_last_ts, retry_timings.retry_interval - ) - if retry_last_ts + retry_interval > int(self._clock.time_msec()): - logger.info( - "TX [%s] not ready for retry yet - " - "dropping transaction for now", - destination, - ) - return - else: - logger.info("TX [%s] is ready for retry", destination) - if destination in self.pending_transactions: # XXX: pending_transactions can get stuck on by a never-ending # request at which point pending_pdus_by_dest just keeps growing. @@ -204,77 +188,79 @@ class TransactionQueue(object): ] try: - self.pending_transactions[destination] = 1 - - logger.debug("TX [%s] Persisting transaction...", destination) - - transaction = Transaction.create_new( - origin_server_ts=int(self._clock.time_msec()), - transaction_id=str(self._next_txn_id), - origin=self.server_name, - destination=destination, - pdus=pdus, - edus=edus, - pdu_failures=failures, + limiter = yield get_retry_limiter( + destination, + self._clock, + self.store, ) - self._next_txn_id += 1 + with limiter: + self.pending_transactions[destination] = 1 - yield self.transaction_actions.prepare_to_send(transaction) + logger.debug("TX [%s] Persisting transaction...", destination) - logger.debug("TX [%s] Persisted transaction", destination) - logger.info( - "TX [%s] Sending transaction [%s]", - destination, - transaction.transaction_id, - ) + transaction = Transaction.create_new( + origin_server_ts=int(self._clock.time_msec()), + transaction_id=str(self._next_txn_id), + origin=self.server_name, + destination=destination, + pdus=pdus, + edus=edus, + pdu_failures=failures, + ) + + self._next_txn_id += 1 - # Actually send the transaction - - # FIXME (erikj): This is a bit of a hack to make the Pdu age - # keys work - def json_data_cb(): - data = transaction.get_dict() - now = int(self._clock.time_msec()) - if "pdus" in data: - for p in data["pdus"]: - if "age_ts" in p: - unsigned = p.setdefault("unsigned", {}) - unsigned["age"] = now - int(p["age_ts"]) - del p["age_ts"] - return data - - try: - response = yield self.transport_layer.send_transaction( - transaction, json_data_cb + yield self.transaction_actions.prepare_to_send(transaction) + + logger.debug("TX [%s] Persisted transaction", destination) + logger.info( + "TX [%s] Sending transaction [%s]", + destination, + transaction.transaction_id, ) - code = 200 - except HttpResponseException as e: - code = e.code - response = e.response - logger.info("TX [%s] got %d response", destination, code) + # Actually send the transaction + + # FIXME (erikj): This is a bit of a hack to make the Pdu age + # keys work + def json_data_cb(): + data = transaction.get_dict() + now = int(self._clock.time_msec()) + if "pdus" in data: + for p in data["pdus"]: + if "age_ts" in p: + unsigned = p.setdefault("unsigned", {}) + unsigned["age"] = now - int(p["age_ts"]) + del p["age_ts"] + return data + + try: + response = yield self.transport_layer.send_transaction( + transaction, json_data_cb + ) + code = 200 + except HttpResponseException as e: + code = e.code + response = e.response - logger.debug("TX [%s] Sent transaction", destination) - logger.debug("TX [%s] Marking as delivered...", destination) + logger.info("TX [%s] got %d response", destination, code) - yield self.transaction_actions.delivered( - transaction, code, response - ) + logger.debug("TX [%s] Sent transaction", destination) + logger.debug("TX [%s] Marking as delivered...", destination) + + yield self.transaction_actions.delivered( + transaction, code, response + ) + + logger.debug("TX [%s] Marked as delivered", destination) - logger.debug("TX [%s] Marked as delivered", destination) logger.debug("TX [%s] Yielding to callbacks...", destination) for deferred in deferreds: if code == 200: - if retry_last_ts: - # this host is alive! reset retry schedule - yield self.store.set_destination_retry_timings( - destination, 0, 0 - ) deferred.callback(None) else: - self.set_retrying(destination, retry_interval) deferred.errback(RuntimeError("Got status %d" % code)) # Ensures we don't continue until all callbacks on that @@ -285,6 +271,12 @@ class TransactionQueue(object): pass logger.debug("TX [%s] Yielded to callbacks", destination) + except NotRetryingDestination: + logger.info( + "TX [%s] not ready for retry yet - " + "dropping transaction for now", + destination, + ) except RuntimeError as e: # We capture this here as there as nothing actually listens # for this finishing functions deferred. @@ -302,8 +294,6 @@ class TransactionQueue(object): e, ) - self.set_retrying(destination, retry_interval) - for deferred in deferreds: if not deferred.called: deferred.errback(e) @@ -314,22 +304,3 @@ class TransactionQueue(object): # Check to see if there is anything else to send. self._attempt_new_transaction(destination) - - @defer.inlineCallbacks - def set_retrying(self, destination, retry_interval): - # track that this destination is having problems and we should - # give it a chance to recover before trying it again - - if retry_interval: - retry_interval *= 2 - # plateau at hourly retries for now - if retry_interval >= 60 * 60 * 1000: - retry_interval = 60 * 60 * 1000 - else: - retry_interval = 2000 # try again at first after 2 seconds - - yield self.store.set_destination_retry_timings( - destination, - int(self._clock.time_msec()), - retry_interval - ) |