summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/federation_client.py43
-rw-r--r--synapse/federation/federation_server.py21
-rw-r--r--synapse/federation/transaction_queue.py22
3 files changed, 79 insertions, 7 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index c5b0274c2f..cd3c962d50 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -19,7 +19,8 @@ from twisted.internet import defer
 from .federation_base import FederationBase
 from .units import Edu
 
-from synapse.api.errors import CodeMessageException
+from synapse.api.errors import CodeMessageException, SynapseError
+from synapse.util.expiringcache import ExpiringCache
 from synapse.util.logutils import log_function
 from synapse.events import FrozenEvent
 
@@ -32,6 +33,20 @@ logger = logging.getLogger(__name__)
 
 
 class FederationClient(FederationBase):
+    def __init__(self):
+        self._get_pdu_cache = None
+
+    def start_get_pdu_cache(self):
+        self._get_pdu_cache = ExpiringCache(
+            cache_name="get_pdu_cache",
+            clock=self._clock,
+            max_len=1000,
+            expiry_ms=120*1000,
+            reset_expiry_on_get=False,
+        )
+
+        self._get_pdu_cache.start()
+
     @log_function
     def send_pdu(self, pdu, destinations):
         """Informs the replication layer about a new PDU generated within the
@@ -162,6 +177,11 @@ class FederationClient(FederationBase):
 
         # TODO: Rate limit the number of times we try and get the same event.
 
+        if self._get_pdu_cache:
+            e = self._get_pdu_cache.get(event_id)
+            if e:
+                defer.returnValue(e)
+
         pdu = None
         for destination in destinations:
             try:
@@ -190,11 +210,25 @@ class FederationClient(FederationBase):
                         pdu = yield self._check_sigs_and_hash(pdu)
 
                         break
+
+            except SynapseError:
+                logger.info(
+                    "Failed to get PDU %s from %s because %s",
+                    event_id, destination, e,
+                )
+                continue
+            except CodeMessageException as e:
+                if 400 <= e.code < 500:
+                    raise
+
+                logger.info(
+                    "Failed to get PDU %s from %s because %s",
+                    event_id, destination, e,
+                )
+                continue
             except NotRetryingDestination as e:
                 logger.info(e.message)
                 continue
-            except CodeMessageException:
-                raise
             except Exception as e:
                 logger.info(
                     "Failed to get PDU %s from %s because %s",
@@ -202,6 +236,9 @@ class FederationClient(FederationBase):
                 )
                 continue
 
+        if self._get_pdu_cache is not None:
+            self._get_pdu_cache[event_id] = pdu
+
         defer.returnValue(pdu)
 
     @defer.inlineCallbacks
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 078ad0626d..22b9663831 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -114,7 +114,15 @@ class FederationServer(FederationBase):
         with PreserveLoggingContext():
             dl = []
             for pdu in pdu_list:
-                dl.append(self._handle_new_pdu(transaction.origin, pdu))
+                d = self._handle_new_pdu(transaction.origin, pdu)
+
+                def handle_failure(failure):
+                    failure.trap(FederationError)
+                    self.send_failure(failure.value, transaction.origin)
+
+                d.addErrback(handle_failure)
+
+                dl.append(d)
 
             if hasattr(transaction, "edus"):
                 for edu in [Edu(**x) for x in transaction.edus]:
@@ -124,6 +132,9 @@ class FederationServer(FederationBase):
                         edu.content
                     )
 
+            for failure in getattr(transaction, "pdu_failures", []):
+                logger.info("Got failure %r", failure)
+
             results = yield defer.DeferredList(dl, consumeErrors=True)
 
         ret = []
@@ -132,10 +143,16 @@ class FederationServer(FederationBase):
                 ret.append({})
             else:
                 logger.exception(r[1])
-                ret.append({"error": str(r[1])})
+                ret.append({"error": str(r[1].value)})
 
         logger.debug("Returning: %s", str(ret))
 
+        response = {
+            "pdus": dict(zip(
+                (p.event_id for p in pdu_list), ret
+            )),
+        }
+
         yield self.transaction_actions.set_response(
             transaction,
             200, response
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index dc0f30cb64..a77201ad49 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -94,7 +94,7 @@ class TransactionQueue(object):
                 if not deferred.called:
                     deferred.errback(failure)
                 else:
-                    logger.warn("Failed to send pdu", failure)
+                    logger.warn("Failed to send pdu", failure.value)
 
             with PreserveLoggingContext():
                 self._attempt_new_transaction(destination).addErrback(eb)
@@ -119,7 +119,7 @@ class TransactionQueue(object):
             if not deferred.called:
                 deferred.errback(failure)
             else:
-                logger.warn("Failed to send edu", failure)
+                logger.warn("Failed to send edu", failure.value)
 
         with PreserveLoggingContext():
             self._attempt_new_transaction(destination).addErrback(eb)
@@ -136,6 +136,15 @@ class TransactionQueue(object):
             (failure, deferred)
         )
 
+        def eb(failure):
+            if not deferred.called:
+                deferred.errback(failure)
+            else:
+                logger.warn("Failed to send failure", failure.value)
+
+        with PreserveLoggingContext():
+            self._attempt_new_transaction(destination).addErrback(eb)
+
         yield deferred
 
     @defer.inlineCallbacks
@@ -240,6 +249,14 @@ class TransactionQueue(object):
                         transaction, json_data_cb
                     )
                     code = 200
+
+                    if response:
+                        for e_id, r in getattr(response, "pdus", {}).items():
+                            if "error" in r:
+                                logger.warn(
+                                    "Transaction returned error for %s: %s",
+                                    e_id, r,
+                                )
                 except HttpResponseException as e:
                     code = e.code
                     response = e.response
@@ -249,6 +266,7 @@ class TransactionQueue(object):
                 logger.debug("TX [%s] Sent transaction", destination)
                 logger.debug("TX [%s] Marking as delivered...", destination)
 
+
             yield self.transaction_actions.delivered(
                 transaction, code, response
             )