diff options
Diffstat (limited to 'synapse/federation')
-rw-r--r-- | synapse/federation/federation_client.py | 40 | ||||
-rw-r--r-- | synapse/federation/transaction_queue.py | 217 | ||||
-rw-r--r-- | synapse/federation/transport/client.py | 7 |
3 files changed, 120 insertions, 144 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 5dcd4eecce..deee0f4904 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -29,7 +29,7 @@ from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred from synapse.events import FrozenEvent, builder import synapse.metrics -from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination +from synapse.util.retryutils import NotRetryingDestination import copy import itertools @@ -88,7 +88,7 @@ class FederationClient(FederationBase): @log_function def make_query(self, destination, query_type, args, - retry_on_dns_fail=False): + retry_on_dns_fail=False, ignore_backoff=False): """Sends a federation Query to a remote homeserver of the given type and arguments. @@ -98,6 +98,8 @@ class FederationClient(FederationBase): handler name used in register_query_handler(). args (dict): Mapping of strings to strings containing the details of the query request. + ignore_backoff (bool): true to ignore the historical backoff data + and try the request anyway. Returns: a Deferred which will eventually yield a JSON object from the @@ -106,7 +108,8 @@ class FederationClient(FederationBase): sent_queries_counter.inc(query_type) return self.transport_layer.make_query( - destination, query_type, args, retry_on_dns_fail=retry_on_dns_fail + destination, query_type, args, retry_on_dns_fail=retry_on_dns_fail, + ignore_backoff=ignore_backoff, ) @log_function @@ -234,31 +237,24 @@ class FederationClient(FederationBase): continue try: - limiter = yield get_retry_limiter( - destination, - self._clock, - self.store, + transaction_data = yield self.transport_layer.get_event( + destination, event_id, timeout=timeout, ) - with limiter: - transaction_data = yield self.transport_layer.get_event( - destination, event_id, timeout=timeout, - ) - - logger.debug("transaction_data %r", transaction_data) + logger.debug("transaction_data %r", transaction_data) - pdu_list = [ - self.event_from_pdu_json(p, outlier=outlier) - for p in transaction_data["pdus"] - ] + pdu_list = [ + self.event_from_pdu_json(p, outlier=outlier) + for p in transaction_data["pdus"] + ] - if pdu_list and pdu_list[0]: - pdu = pdu_list[0] + if pdu_list and pdu_list[0]: + pdu = pdu_list[0] - # Check signatures are correct. - signed_pdu = yield self._check_sigs_and_hashes([pdu])[0] + # Check signatures are correct. + signed_pdu = yield self._check_sigs_and_hashes([pdu])[0] - break + break pdu_attempts[destination] = now diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index c802dd67a3..2c96475b2a 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -12,7 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import datetime from twisted.internet import defer @@ -22,9 +22,7 @@ from .units import Transaction, Edu from synapse.api.errors import HttpResponseException from synapse.util.async import run_on_reactor from synapse.util.logcontext import preserve_context_over_fn -from synapse.util.retryutils import ( - get_retry_limiter, NotRetryingDestination, -) +from synapse.util.retryutils import NotRetryingDestination from synapse.util.metrics import measure_func from synapse.types import get_domain_from_id from synapse.handlers.presence import format_user_presence_state @@ -311,14 +309,8 @@ class TransactionQueue(object): # XXX: what's this for? yield run_on_reactor() + pending_pdus = [] while True: - limiter = yield get_retry_limiter( - destination, - self.clock, - self.store, - backoff_on_404=True, # If we get a 404 the other side has gone - ) - device_message_edus, device_stream_id, dev_list_id = ( yield self._get_new_device_messages(destination) ) @@ -374,7 +366,6 @@ class TransactionQueue(object): success = yield self._send_new_transaction( destination, pending_pdus, pending_edus, pending_failures, - limiter=limiter, ) if success: # Remove the acknowledged device messages from the database @@ -392,12 +383,24 @@ class TransactionQueue(object): self.last_device_list_stream_id_by_dest[destination] = dev_list_id else: break - except NotRetryingDestination: + except NotRetryingDestination as e: logger.debug( - "TX [%s] not ready for retry yet - " + "TX [%s] not ready for retry yet (next retry at %s) - " "dropping transaction for now", destination, + datetime.datetime.fromtimestamp( + (e.retry_last_ts + e.retry_interval) / 1000.0 + ), + ) + except Exception as e: + logger.warn( + "TX [%s] Failed to send transaction: %s", + destination, + e, ) + for p in pending_pdus: + logger.info("Failed to send event %s to %s", p.event_id, + destination) finally: # We want to be *very* sure we delete this after we stop processing self.pending_transactions.pop(destination, None) @@ -437,7 +440,7 @@ class TransactionQueue(object): @measure_func("_send_new_transaction") @defer.inlineCallbacks def _send_new_transaction(self, destination, pending_pdus, pending_edus, - pending_failures, limiter): + pending_failures): # Sort based on the order field pending_pdus.sort(key=lambda t: t[1]) @@ -447,132 +450,104 @@ class TransactionQueue(object): success = True - try: - logger.debug("TX [%s] _attempt_new_transaction", destination) + logger.debug("TX [%s] _attempt_new_transaction", destination) - txn_id = str(self._next_txn_id) + txn_id = str(self._next_txn_id) - logger.debug( - "TX [%s] {%s} Attempting new transaction" - " (pdus: %d, edus: %d, failures: %d)", - destination, txn_id, - len(pdus), - len(edus), - len(failures) - ) + logger.debug( + "TX [%s] {%s} Attempting new transaction" + " (pdus: %d, edus: %d, failures: %d)", + destination, txn_id, + len(pdus), + len(edus), + len(failures) + ) - logger.debug("TX [%s] Persisting transaction...", destination) + logger.debug("TX [%s] Persisting transaction...", destination) - transaction = Transaction.create_new( - origin_server_ts=int(self.clock.time_msec()), - transaction_id=txn_id, - origin=self.server_name, - destination=destination, - pdus=pdus, - edus=edus, - pdu_failures=failures, - ) + transaction = Transaction.create_new( + origin_server_ts=int(self.clock.time_msec()), + transaction_id=txn_id, + origin=self.server_name, + destination=destination, + pdus=pdus, + edus=edus, + pdu_failures=failures, + ) - self._next_txn_id += 1 + self._next_txn_id += 1 - yield self.transaction_actions.prepare_to_send(transaction) + yield self.transaction_actions.prepare_to_send(transaction) - logger.debug("TX [%s] Persisted transaction", destination) - logger.info( - "TX [%s] {%s} Sending transaction [%s]," - " (PDUs: %d, EDUs: %d, failures: %d)", - destination, txn_id, - transaction.transaction_id, - len(pdus), - len(edus), - len(failures), - ) + logger.debug("TX [%s] Persisted transaction", destination) + logger.info( + "TX [%s] {%s} Sending transaction [%s]," + " (PDUs: %d, EDUs: %d, failures: %d)", + destination, txn_id, + transaction.transaction_id, + len(pdus), + len(edus), + len(failures), + ) - with limiter: - # Actually send the transaction - - # FIXME (erikj): This is a bit of a hack to make the Pdu age - # keys work - def json_data_cb(): - data = transaction.get_dict() - now = int(self.clock.time_msec()) - if "pdus" in data: - for p in data["pdus"]: - if "age_ts" in p: - unsigned = p.setdefault("unsigned", {}) - unsigned["age"] = now - int(p["age_ts"]) - del p["age_ts"] - return data - - try: - response = yield self.transport_layer.send_transaction( - transaction, json_data_cb - ) - code = 200 - - if response: - for e_id, r in response.get("pdus", {}).items(): - if "error" in r: - logger.warn( - "Transaction returned error for %s: %s", - e_id, r, - ) - except HttpResponseException as e: - code = e.code - response = e.response - - if e.code in (401, 404, 429) or 500 <= e.code: - logger.info( - "TX [%s] {%s} got %d response", - destination, txn_id, code + # Actually send the transaction + + # FIXME (erikj): This is a bit of a hack to make the Pdu age + # keys work + def json_data_cb(): + data = transaction.get_dict() + now = int(self.clock.time_msec()) + if "pdus" in data: + for p in data["pdus"]: + if "age_ts" in p: + unsigned = p.setdefault("unsigned", {}) + unsigned["age"] = now - int(p["age_ts"]) + del p["age_ts"] + return data + + try: + response = yield self.transport_layer.send_transaction( + transaction, json_data_cb + ) + code = 200 + + if response: + for e_id, r in response.get("pdus", {}).items(): + if "error" in r: + logger.warn( + "Transaction returned error for %s: %s", + e_id, r, ) - raise e + except HttpResponseException as e: + code = e.code + response = e.response + if e.code in (401, 404, 429) or 500 <= e.code: logger.info( "TX [%s] {%s} got %d response", destination, txn_id, code ) + raise e - logger.debug("TX [%s] Sent transaction", destination) - logger.debug("TX [%s] Marking as delivered...", destination) - - yield self.transaction_actions.delivered( - transaction, code, response - ) + logger.info( + "TX [%s] {%s} got %d response", + destination, txn_id, code + ) - logger.debug("TX [%s] Marked as delivered", destination) + logger.debug("TX [%s] Sent transaction", destination) + logger.debug("TX [%s] Marking as delivered...", destination) - if code != 200: - for p in pdus: - logger.info( - "Failed to send event %s to %s", p.event_id, destination - ) - success = False - except RuntimeError as e: - # We capture this here as there as nothing actually listens - # for this finishing functions deferred. - logger.warn( - "TX [%s] Problem in _attempt_transaction: %s", - destination, - e, - ) + yield self.transaction_actions.delivered( + transaction, code, response + ) - success = False + logger.debug("TX [%s] Marked as delivered", destination) + if code != 200: for p in pdus: - logger.info("Failed to send event %s to %s", p.event_id, destination) - except Exception as e: - # We capture this here as there as nothing actually listens - # for this finishing functions deferred. - logger.warn( - "TX [%s] Problem in _attempt_transaction: %s", - destination, - e, - ) - + logger.info( + "Failed to send event %s to %s", p.event_id, destination + ) success = False - for p in pdus: - logger.info("Failed to send event %s to %s", p.event_id, destination) - defer.returnValue(success) diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index f49e8a2cc4..15a03378f5 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -163,6 +163,7 @@ class TransportLayerClient(object): data=json_data, json_data_callback=json_data_callback, long_retries=True, + backoff_on_404=True, # If we get a 404 the other side has gone ) logger.debug( @@ -174,7 +175,8 @@ class TransportLayerClient(object): @defer.inlineCallbacks @log_function - def make_query(self, destination, query_type, args, retry_on_dns_fail): + def make_query(self, destination, query_type, args, retry_on_dns_fail, + ignore_backoff=False): path = PREFIX + "/query/%s" % query_type content = yield self.client.get_json( @@ -183,6 +185,7 @@ class TransportLayerClient(object): args=args, retry_on_dns_fail=retry_on_dns_fail, timeout=10000, + ignore_backoff=ignore_backoff, ) defer.returnValue(content) @@ -242,6 +245,7 @@ class TransportLayerClient(object): destination=destination, path=path, data=content, + ignore_backoff=True, ) defer.returnValue(response) @@ -269,6 +273,7 @@ class TransportLayerClient(object): destination=remote_server, path=path, args=args, + ignore_backoff=True, ) defer.returnValue(response) |