diff options
author | Erik Johnston <erik@matrix.org> | 2014-11-04 14:17:55 +0000 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2014-11-04 14:17:55 +0000 |
commit | 440cbd5235e7e23dfe97d8e3d394cc0d35b35fd6 (patch) | |
tree | c569286ed8a097628ad2ee6c5d00e8a8ee5b0e23 /synapse/federation | |
parent | Remove unused interface (diff) | |
download | synapse-440cbd5235e7e23dfe97d8e3d394cc0d35b35fd6.tar.xz |
Add support for sending failures
Diffstat (limited to 'synapse/federation')
-rw-r--r-- | synapse/federation/replication.py | 30 | ||||
-rw-r--r-- | synapse/federation/units.py | 1 |
2 files changed, 29 insertions, 2 deletions
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 99dd390a64..680e7322a6 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -144,6 +144,11 @@ class ReplicationLayer(object): return defer.succeed(None) @log_function + def send_failure(self, failure, destination): + self._transaction_queue.enqueue_failure(failure, destination) + return defer.succeed(None) + + @log_function def make_query(self, destination, query_type, args, retry_on_dns_fail=True): """Sends a federation Query to a remote homeserver of the given type @@ -558,6 +563,9 @@ class _TransactionQueue(object): # destination -> list of tuple(edu, deferred) self.pending_edus_by_dest = {} + # destination -> list of tuple(failure, deferred) + self.pending_failures_by_dest = {} + # HACK to get unique tx id self._next_txn_id = int(self._clock.time_msec()) @@ -611,6 +619,18 @@ class _TransactionQueue(object): return deferred @defer.inlineCallbacks + def enqueue_failure(self, failure, destination): + deferred = defer.Deferred() + + self.pending_failures_by_dest.setdefault( + destination, [] + ).append( + (failure, deferred) + ) + + yield deferred + + @defer.inlineCallbacks @log_function def _attempt_new_transaction(self, destination): if destination in self.pending_transactions: @@ -619,8 +639,9 @@ class _TransactionQueue(object): # list of (pending_pdu, deferred, order) pending_pdus = self.pending_pdus_by_dest.pop(destination, []) pending_edus = self.pending_edus_by_dest.pop(destination, []) + pending_failures = self.pending_failures_by_dest(destination, []) - if not pending_pdus and not pending_edus: + if not pending_pdus and not pending_edus and not pending_failures: return logger.debug("TX [%s] Attempting new transaction", destination) @@ -630,7 +651,11 @@ class _TransactionQueue(object): pdus = [x[0] for x in pending_pdus] edus = [x[0] for x in pending_edus] - deferreds = [x[1] for x in pending_pdus + pending_edus] + failures = [x[0].get_dict() for x in pending_failures] + deferreds = [ + x[1] + for x in pending_pdus + pending_edus + pending_failures + ] try: self.pending_transactions[destination] = 1 @@ -644,6 +669,7 @@ class _TransactionQueue(object): destination=destination, pdus=pdus, edus=edus, + pdu_failures=failures, ) self._next_txn_id += 1 diff --git a/synapse/federation/units.py b/synapse/federation/units.py index 9b25556707..2070ffe1e2 100644 --- a/synapse/federation/units.py +++ b/synapse/federation/units.py @@ -157,6 +157,7 @@ class Transaction(JsonEncodedObject): "edus", "transaction_id", "destination", + "pdu_failures", ] internal_keys = [ |