diff options
Diffstat (limited to 'synapse/federation')
-rw-r--r-- | synapse/federation/federation_client.py | 43 | ||||
-rw-r--r-- | synapse/federation/federation_server.py | 21 | ||||
-rw-r--r-- | synapse/federation/transaction_queue.py | 22 |
3 files changed, 79 insertions, 7 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index c5b0274c2f..cd3c962d50 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -19,7 +19,8 @@ from twisted.internet import defer from .federation_base import FederationBase from .units import Edu -from synapse.api.errors import CodeMessageException +from synapse.api.errors import CodeMessageException, SynapseError +from synapse.util.expiringcache import ExpiringCache from synapse.util.logutils import log_function from synapse.events import FrozenEvent @@ -32,6 +33,20 @@ logger = logging.getLogger(__name__) class FederationClient(FederationBase): + def __init__(self): + self._get_pdu_cache = None + + def start_get_pdu_cache(self): + self._get_pdu_cache = ExpiringCache( + cache_name="get_pdu_cache", + clock=self._clock, + max_len=1000, + expiry_ms=120*1000, + reset_expiry_on_get=False, + ) + + self._get_pdu_cache.start() + @log_function def send_pdu(self, pdu, destinations): """Informs the replication layer about a new PDU generated within the @@ -162,6 +177,11 @@ class FederationClient(FederationBase): # TODO: Rate limit the number of times we try and get the same event. + if self._get_pdu_cache: + e = self._get_pdu_cache.get(event_id) + if e: + defer.returnValue(e) + pdu = None for destination in destinations: try: @@ -190,11 +210,25 @@ class FederationClient(FederationBase): pdu = yield self._check_sigs_and_hash(pdu) break + + except SynapseError: + logger.info( + "Failed to get PDU %s from %s because %s", + event_id, destination, e, + ) + continue + except CodeMessageException as e: + if 400 <= e.code < 500: + raise + + logger.info( + "Failed to get PDU %s from %s because %s", + event_id, destination, e, + ) + continue except NotRetryingDestination as e: logger.info(e.message) continue - except CodeMessageException: - raise except Exception as e: logger.info( "Failed to get PDU %s from %s because %s", @@ -202,6 +236,9 @@ class FederationClient(FederationBase): ) continue + if self._get_pdu_cache is not None: + self._get_pdu_cache[event_id] = pdu + defer.returnValue(pdu) @defer.inlineCallbacks diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 078ad0626d..22b9663831 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -114,7 +114,15 @@ class FederationServer(FederationBase): with PreserveLoggingContext(): dl = [] for pdu in pdu_list: - dl.append(self._handle_new_pdu(transaction.origin, pdu)) + d = self._handle_new_pdu(transaction.origin, pdu) + + def handle_failure(failure): + failure.trap(FederationError) + self.send_failure(failure.value, transaction.origin) + + d.addErrback(handle_failure) + + dl.append(d) if hasattr(transaction, "edus"): for edu in [Edu(**x) for x in transaction.edus]: @@ -124,6 +132,9 @@ class FederationServer(FederationBase): edu.content ) + for failure in getattr(transaction, "pdu_failures", []): + logger.info("Got failure %r", failure) + results = yield defer.DeferredList(dl, consumeErrors=True) ret = [] @@ -132,10 +143,16 @@ class FederationServer(FederationBase): ret.append({}) else: logger.exception(r[1]) - ret.append({"error": str(r[1])}) + ret.append({"error": str(r[1].value)}) logger.debug("Returning: %s", str(ret)) + response = { + "pdus": dict(zip( + (p.event_id for p in pdu_list), ret + )), + } + yield self.transaction_actions.set_response( transaction, 200, response diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index dc0f30cb64..a77201ad49 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -94,7 +94,7 @@ class TransactionQueue(object): if not deferred.called: deferred.errback(failure) else: - logger.warn("Failed to send pdu", failure) + logger.warn("Failed to send pdu", failure.value) with PreserveLoggingContext(): self._attempt_new_transaction(destination).addErrback(eb) @@ -119,7 +119,7 @@ class TransactionQueue(object): if not deferred.called: deferred.errback(failure) else: - logger.warn("Failed to send edu", failure) + logger.warn("Failed to send edu", failure.value) with PreserveLoggingContext(): self._attempt_new_transaction(destination).addErrback(eb) @@ -136,6 +136,15 @@ class TransactionQueue(object): (failure, deferred) ) + def eb(failure): + if not deferred.called: + deferred.errback(failure) + else: + logger.warn("Failed to send failure", failure.value) + + with PreserveLoggingContext(): + self._attempt_new_transaction(destination).addErrback(eb) + yield deferred @defer.inlineCallbacks @@ -240,6 +249,14 @@ class TransactionQueue(object): transaction, json_data_cb ) code = 200 + + if response: + for e_id, r in getattr(response, "pdus", {}).items(): + if "error" in r: + logger.warn( + "Transaction returned error for %s: %s", + e_id, r, + ) except HttpResponseException as e: code = e.code response = e.response @@ -249,6 +266,7 @@ class TransactionQueue(object): logger.debug("TX [%s] Sent transaction", destination) logger.debug("TX [%s] Marking as delivered...", destination) + yield self.transaction_actions.delivered( transaction, code, response ) |