diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index 99dd390a64..680e7322a6 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -144,6 +144,11 @@ class ReplicationLayer(object):
return defer.succeed(None)
@log_function
+ def send_failure(self, failure, destination):
+ self._transaction_queue.enqueue_failure(failure, destination)
+ return defer.succeed(None)
+
+ @log_function
def make_query(self, destination, query_type, args,
retry_on_dns_fail=True):
"""Sends a federation Query to a remote homeserver of the given type
@@ -558,6 +563,9 @@ class _TransactionQueue(object):
# destination -> list of tuple(edu, deferred)
self.pending_edus_by_dest = {}
+ # destination -> list of tuple(failure, deferred)
+ self.pending_failures_by_dest = {}
+
# HACK to get unique tx id
self._next_txn_id = int(self._clock.time_msec())
@@ -611,6 +619,18 @@ class _TransactionQueue(object):
return deferred
@defer.inlineCallbacks
+ def enqueue_failure(self, failure, destination):
+ deferred = defer.Deferred()
+
+ self.pending_failures_by_dest.setdefault(
+ destination, []
+ ).append(
+ (failure, deferred)
+ )
+
+ yield deferred
+
+ @defer.inlineCallbacks
@log_function
def _attempt_new_transaction(self, destination):
if destination in self.pending_transactions:
@@ -619,8 +639,9 @@ class _TransactionQueue(object):
# list of (pending_pdu, deferred, order)
pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
pending_edus = self.pending_edus_by_dest.pop(destination, [])
+ pending_failures = self.pending_failures_by_dest(destination, [])
- if not pending_pdus and not pending_edus:
+ if not pending_pdus and not pending_edus and not pending_failures:
return
logger.debug("TX [%s] Attempting new transaction", destination)
@@ -630,7 +651,11 @@ class _TransactionQueue(object):
pdus = [x[0] for x in pending_pdus]
edus = [x[0] for x in pending_edus]
- deferreds = [x[1] for x in pending_pdus + pending_edus]
+ failures = [x[0].get_dict() for x in pending_failures]
+ deferreds = [
+ x[1]
+ for x in pending_pdus + pending_edus + pending_failures
+ ]
try:
self.pending_transactions[destination] = 1
@@ -644,6 +669,7 @@ class _TransactionQueue(object):
destination=destination,
pdus=pdus,
edus=edus,
+ pdu_failures=failures,
)
self._next_txn_id += 1
diff --git a/synapse/federation/units.py b/synapse/federation/units.py
index 9b25556707..2070ffe1e2 100644
--- a/synapse/federation/units.py
+++ b/synapse/federation/units.py
@@ -157,6 +157,7 @@ class Transaction(JsonEncodedObject):
"edus",
"transaction_id",
"destination",
+ "pdu_failures",
]
internal_keys = [
diff --git a/synapse/types.py b/synapse/types.py
index 649ff2f7d7..8fac20fd2e 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -128,3 +128,37 @@ class StreamToken(
d = self._asdict()
d[key] = new_value
return StreamToken(**d)
+
+
+class FederationError(RuntimeError):
+ """ This class is used to inform remote home servers about erroneous
+ PDUs they sent us.
+
+ FATAL: The remote server could not interpret the source event.
+ (e.g., it was missing a required field)
+ ERROR: The remote server interpreted the event, but it failed some other
+ check (e.g. auth)
+ WARN: The remote server accepted the event, but believes some part of it
+ is wrong (e.g., it referred to an invalid event)
+ """
+
+ def __init__(self, level, code, reason, affected, source=None):
+ if level not in ["FATAL", "ERROR", "WARN"]:
+ raise ValueError("Level is not valid: %s" % (level,))
+ self.level = level
+ self.code = code
+ self.reason = reason
+ self.affected = affected
+ self.source = source
+
+ msg = "%s %s: %s" % (level, code, reason,)
+ super(FederationError, self).__init__(msg)
+
+ def get_dict(self):
+ return {
+ "level": self.level,
+ "code": self.code,
+ "reason": self.reason,
+ "affected": self.affected,
+ "source": self.source if self.source else self.affected,
+ }
|