diff options
Diffstat (limited to 'synapse/federation')
-rw-r--r-- | synapse/federation/federation_server.py | 30 | ||||
-rw-r--r-- | synapse/federation/transaction_queue.py | 24 |
2 files changed, 48 insertions, 6 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 4391a60c02..679fb141e7 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -114,7 +114,15 @@ class FederationServer(FederationBase): with PreserveLoggingContext(): dl = [] for pdu in pdu_list: - dl.append(self._handle_new_pdu(transaction.origin, pdu)) + d = self._handle_new_pdu(transaction.origin, pdu) + + def handle_failure(failure): + failure.trap(FederationError) + self.send_failure(failure.value, transaction.origin) + + d.addErrback(handle_failure) + + dl.append(d) if hasattr(transaction, "edus"): for edu in [Edu(**x) for x in transaction.edus]: @@ -124,7 +132,10 @@ class FederationServer(FederationBase): edu.content ) - results = yield defer.DeferredList(dl) + for failure in getattr(transaction, "pdu_failures", []): + logger.info("Got failure %r", failure) + + results = yield defer.DeferredList(dl, consumeErrors=True) ret = [] for r in results: @@ -132,10 +143,16 @@ class FederationServer(FederationBase): ret.append({}) else: logger.exception(r[1]) - ret.append({"error": str(r[1])}) + ret.append({"error": str(r[1].value)}) logger.debug("Returning: %s", str(ret)) + response = { + "pdus": dict(zip( + (p.event_id for p in pdu_list), ret + )), + } + yield self.transaction_actions.set_response( transaction, 200, response @@ -344,6 +361,13 @@ class FederationServer(FederationBase): affected=pdu.event_id, ) + raise FederationError( + "ERROR", + 403, + "Forbidden", + affected=pdu.event_id, + ) + state = None auth_chain = [] diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 731019ad9f..6faaa066fb 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -91,14 +91,14 @@ class TransactionQueue(object): if not deferred.called: deferred.errback(failure) else: - logger.warn("Failed to send pdu", failure) + logger.warn("Failed to send pdu", failure.value) with PreserveLoggingContext(): self._attempt_new_transaction(destination).addErrback(eb) deferreds.append(deferred) - yield defer.DeferredList(deferreds) + yield defer.DeferredList(deferreds, consumeErrors=True) # NO inlineCallbacks def enqueue_edu(self, edu): @@ -116,7 +116,7 @@ class TransactionQueue(object): if not deferred.called: deferred.errback(failure) else: - logger.warn("Failed to send edu", failure) + logger.warn("Failed to send edu", failure.value) with PreserveLoggingContext(): self._attempt_new_transaction(destination).addErrback(eb) @@ -133,6 +133,15 @@ class TransactionQueue(object): (failure, deferred) ) + def eb(failure): + if not deferred.called: + deferred.errback(failure) + else: + logger.warn("Failed to send failure", failure.value) + + with PreserveLoggingContext(): + self._attempt_new_transaction(destination).addErrback(eb) + yield deferred @defer.inlineCallbacks @@ -249,6 +258,15 @@ class TransactionQueue(object): transaction, json_data_cb ) code = 200 + + if response: + for e_id, r in getattr(response, "pdus", {}).items(): + if "error" in r: + logger.warn( + "Transaction returned error for %s: %s", + e_id, r, + ) + except HttpResponseException as e: code = e.code response = e.response |