diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 9f5c98694c..22b9663831 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -114,7 +114,15 @@ class FederationServer(FederationBase):
with PreserveLoggingContext():
dl = []
for pdu in pdu_list:
- dl.append(self._handle_new_pdu(transaction.origin, pdu))
+ d = self._handle_new_pdu(transaction.origin, pdu)
+
+ def handle_failure(failure):
+ failure.trap(FederationError)
+ self.send_failure(failure.value, transaction.origin)
+
+ d.addErrback(handle_failure)
+
+ dl.append(d)
if hasattr(transaction, "edus"):
for edu in [Edu(**x) for x in transaction.edus]:
@@ -124,7 +132,10 @@ class FederationServer(FederationBase):
edu.content
)
- results = yield defer.DeferredList(dl)
+ for failure in getattr(transaction, "pdu_failures", []):
+ logger.info("Got failure %r", failure)
+
+ results = yield defer.DeferredList(dl, consumeErrors=True)
ret = []
for r in results:
@@ -132,10 +143,16 @@ class FederationServer(FederationBase):
ret.append({})
else:
logger.exception(r[1])
- ret.append({"error": str(r[1])})
+ ret.append({"error": str(r[1].value)})
logger.debug("Returning: %s", str(ret))
+ response = {
+ "pdus": dict(zip(
+ (p.event_id for p in pdu_list), ret
+ )),
+ }
+
yield self.transaction_actions.set_response(
transaction,
200, response
@@ -331,7 +348,6 @@ class FederationServer(FederationBase):
)
if already_seen:
logger.debug("Already seen pdu %s", pdu.event_id)
- defer.returnValue({})
return
# Check signature.
@@ -367,7 +383,13 @@ class FederationServer(FederationBase):
pdu.room_id, min_depth
)
- if min_depth and pdu.depth > min_depth and max_recursion > 0:
+ if min_depth and pdu.depth < min_depth:
+ # This is so that we don't notify the user about this
+ # message, to work around the fact that some events will
+ # reference really really old events we really don't want to
+ # send to the clients.
+ pdu.internal_metadata.outlier = True
+ elif min_depth and pdu.depth > min_depth and max_recursion > 0:
for event_id, hashes in pdu.prev_events:
if event_id not in have_seen:
logger.debug(
@@ -418,7 +440,7 @@ class FederationServer(FederationBase):
except:
logger.warn("Failed to get state for event: %s", pdu.event_id)
- ret = yield self.handler.on_receive_pdu(
+ yield self.handler.on_receive_pdu(
origin,
pdu,
backfilled=False,
@@ -426,8 +448,6 @@ class FederationServer(FederationBase):
auth_chain=auth_chain,
)
- defer.returnValue(ret)
-
def __str__(self):
return "<ReplicationLayer(%s)>" % self.server_name
|