summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2014-11-04 14:17:55 +0000
committerErik Johnston <erik@matrix.org>2014-11-04 14:17:55 +0000
commit440cbd5235e7e23dfe97d8e3d394cc0d35b35fd6 (patch)
treec569286ed8a097628ad2ee6c5d00e8a8ee5b0e23
parentRemove unused interface (diff)
downloadsynapse-440cbd5235e7e23dfe97d8e3d394cc0d35b35fd6.tar.xz
Add support for sending failures
-rw-r--r--synapse/federation/replication.py30
-rw-r--r--synapse/federation/units.py1
-rw-r--r--synapse/types.py34
3 files changed, 63 insertions, 2 deletions
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index 99dd390a64..680e7322a6 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -144,6 +144,11 @@ class ReplicationLayer(object):
         return defer.succeed(None)
 
     @log_function
+    def send_failure(self, failure, destination):
+        self._transaction_queue.enqueue_failure(failure, destination)
+        return defer.succeed(None)
+
+    @log_function
     def make_query(self, destination, query_type, args,
                    retry_on_dns_fail=True):
         """Sends a federation Query to a remote homeserver of the given type
@@ -558,6 +563,9 @@ class _TransactionQueue(object):
         # destination -> list of tuple(edu, deferred)
         self.pending_edus_by_dest = {}
 
+        # destination -> list of tuple(failure, deferred)
+        self.pending_failures_by_dest = {}
+
         # HACK to get unique tx id
         self._next_txn_id = int(self._clock.time_msec())
 
@@ -611,6 +619,18 @@ class _TransactionQueue(object):
         return deferred
 
     @defer.inlineCallbacks
+    def enqueue_failure(self, failure, destination):
+        deferred = defer.Deferred()
+
+        self.pending_failures_by_dest.setdefault(
+            destination, []
+        ).append(
+            (failure, deferred)
+        )
+
+        yield deferred
+
+    @defer.inlineCallbacks
     @log_function
     def _attempt_new_transaction(self, destination):
         if destination in self.pending_transactions:
@@ -619,8 +639,9 @@ class _TransactionQueue(object):
         #  list of (pending_pdu, deferred, order)
         pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
         pending_edus = self.pending_edus_by_dest.pop(destination, [])
+        pending_failures = self.pending_failures_by_dest(destination, [])
 
-        if not pending_pdus and not pending_edus:
+        if not pending_pdus and not pending_edus and not pending_failures:
             return
 
         logger.debug("TX [%s] Attempting new transaction", destination)
@@ -630,7 +651,11 @@ class _TransactionQueue(object):
 
         pdus = [x[0] for x in pending_pdus]
         edus = [x[0] for x in pending_edus]
-        deferreds = [x[1] for x in pending_pdus + pending_edus]
+        failures = [x[0].get_dict() for x in pending_failures]
+        deferreds = [
+            x[1]
+            for x in pending_pdus + pending_edus + pending_failures
+        ]
 
         try:
             self.pending_transactions[destination] = 1
@@ -644,6 +669,7 @@ class _TransactionQueue(object):
                 destination=destination,
                 pdus=pdus,
                 edus=edus,
+                pdu_failures=failures,
             )
 
             self._next_txn_id += 1
diff --git a/synapse/federation/units.py b/synapse/federation/units.py
index 9b25556707..2070ffe1e2 100644
--- a/synapse/federation/units.py
+++ b/synapse/federation/units.py
@@ -157,6 +157,7 @@ class Transaction(JsonEncodedObject):
         "edus",
         "transaction_id",
         "destination",
+        "pdu_failures",
     ]
 
     internal_keys = [
diff --git a/synapse/types.py b/synapse/types.py
index 649ff2f7d7..8fac20fd2e 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -128,3 +128,37 @@ class StreamToken(
         d = self._asdict()
         d[key] = new_value
         return StreamToken(**d)
+
+
+class FederationError(RuntimeError):
+    """  This class is used to inform remote home servers about erroneous
+    PDUs they sent us.
+
+    FATAL: The remote server could not interpret the source event.
+        (e.g., it was missing a required field)
+    ERROR: The remote server interpreted the event, but it failed some other
+        check (e.g. auth)
+    WARN: The remote server accepted the event, but believes some part of it
+        is wrong (e.g., it referred to an invalid event)
+    """
+
+    def __init__(self, level, code, reason, affected, source=None):
+        if level not in ["FATAL", "ERROR", "WARN"]:
+            raise ValueError("Level is not valid: %s" % (level,))
+        self.level = level
+        self.code = code
+        self.reason = reason
+        self.affected = affected
+        self.source = source
+
+        msg = "%s %s: %s" % (level, code, reason,)
+        super(FederationError, self).__init__(msg)
+
+    def get_dict(self):
+        return {
+            "level": self.level,
+            "code": self.code,
+            "reason": self.reason,
+            "affected": self.affected,
+            "source": self.source if self.source else self.affected,
+        }