diff options
author | Erik Johnston <erik@matrix.org> | 2014-11-04 14:17:55 +0000 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2014-11-04 14:17:55 +0000 |
commit | 440cbd5235e7e23dfe97d8e3d394cc0d35b35fd6 (patch) | |
tree | c569286ed8a097628ad2ee6c5d00e8a8ee5b0e23 | |
parent | Remove unused interface (diff) | |
download | synapse-440cbd5235e7e23dfe97d8e3d394cc0d35b35fd6.tar.xz |
Add support for sending failures
-rw-r--r-- | synapse/federation/replication.py | 30 | ||||
-rw-r--r-- | synapse/federation/units.py | 1 | ||||
-rw-r--r-- | synapse/types.py | 34 |
3 files changed, 63 insertions, 2 deletions
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 99dd390a64..680e7322a6 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -144,6 +144,11 @@ class ReplicationLayer(object): return defer.succeed(None) @log_function + def send_failure(self, failure, destination): + self._transaction_queue.enqueue_failure(failure, destination) + return defer.succeed(None) + + @log_function def make_query(self, destination, query_type, args, retry_on_dns_fail=True): """Sends a federation Query to a remote homeserver of the given type @@ -558,6 +563,9 @@ class _TransactionQueue(object): # destination -> list of tuple(edu, deferred) self.pending_edus_by_dest = {} + # destination -> list of tuple(failure, deferred) + self.pending_failures_by_dest = {} + # HACK to get unique tx id self._next_txn_id = int(self._clock.time_msec()) @@ -611,6 +619,18 @@ class _TransactionQueue(object): return deferred @defer.inlineCallbacks + def enqueue_failure(self, failure, destination): + deferred = defer.Deferred() + + self.pending_failures_by_dest.setdefault( + destination, [] + ).append( + (failure, deferred) + ) + + yield deferred + + @defer.inlineCallbacks @log_function def _attempt_new_transaction(self, destination): if destination in self.pending_transactions: @@ -619,8 +639,9 @@ class _TransactionQueue(object): # list of (pending_pdu, deferred, order) pending_pdus = self.pending_pdus_by_dest.pop(destination, []) pending_edus = self.pending_edus_by_dest.pop(destination, []) + pending_failures = self.pending_failures_by_dest(destination, []) - if not pending_pdus and not pending_edus: + if not pending_pdus and not pending_edus and not pending_failures: return logger.debug("TX [%s] Attempting new transaction", destination) @@ -630,7 +651,11 @@ class _TransactionQueue(object): pdus = [x[0] for x in pending_pdus] edus = [x[0] for x in pending_edus] - deferreds = [x[1] for x in pending_pdus + pending_edus] + failures = [x[0].get_dict() for x in pending_failures] + deferreds = [ + x[1] + for x in pending_pdus + pending_edus + pending_failures + ] try: self.pending_transactions[destination] = 1 @@ -644,6 +669,7 @@ class _TransactionQueue(object): destination=destination, pdus=pdus, edus=edus, + pdu_failures=failures, ) self._next_txn_id += 1 diff --git a/synapse/federation/units.py b/synapse/federation/units.py index 9b25556707..2070ffe1e2 100644 --- a/synapse/federation/units.py +++ b/synapse/federation/units.py @@ -157,6 +157,7 @@ class Transaction(JsonEncodedObject): "edus", "transaction_id", "destination", + "pdu_failures", ] internal_keys = [ diff --git a/synapse/types.py b/synapse/types.py index 649ff2f7d7..8fac20fd2e 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -128,3 +128,37 @@ class StreamToken( d = self._asdict() d[key] = new_value return StreamToken(**d) + + +class FederationError(RuntimeError): + """ This class is used to inform remote home servers about erroneous + PDUs they sent us. + + FATAL: The remote server could not interpret the source event. + (e.g., it was missing a required field) + ERROR: The remote server interpreted the event, but it failed some other + check (e.g. auth) + WARN: The remote server accepted the event, but believes some part of it + is wrong (e.g., it referred to an invalid event) + """ + + def __init__(self, level, code, reason, affected, source=None): + if level not in ["FATAL", "ERROR", "WARN"]: + raise ValueError("Level is not valid: %s" % (level,)) + self.level = level + self.code = code + self.reason = reason + self.affected = affected + self.source = source + + msg = "%s %s: %s" % (level, code, reason,) + super(FederationError, self).__init__(msg) + + def get_dict(self): + return { + "level": self.level, + "code": self.code, + "reason": self.reason, + "affected": self.affected, + "source": self.source if self.source else self.affected, + } |