diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 4391a60c02..679fb141e7 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -114,7 +114,15 @@ class FederationServer(FederationBase):
with PreserveLoggingContext():
dl = []
for pdu in pdu_list:
- dl.append(self._handle_new_pdu(transaction.origin, pdu))
+ d = self._handle_new_pdu(transaction.origin, pdu)
+
+ def handle_failure(failure):
+ failure.trap(FederationError)
+ self.send_failure(failure.value, transaction.origin)
+
+ d.addErrback(handle_failure)
+
+ dl.append(d)
if hasattr(transaction, "edus"):
for edu in [Edu(**x) for x in transaction.edus]:
@@ -124,7 +132,10 @@ class FederationServer(FederationBase):
edu.content
)
- results = yield defer.DeferredList(dl)
+ for failure in getattr(transaction, "pdu_failures", []):
+ logger.info("Got failure %r", failure)
+
+ results = yield defer.DeferredList(dl, consumeErrors=True)
ret = []
for r in results:
@@ -132,10 +143,16 @@ class FederationServer(FederationBase):
ret.append({})
else:
logger.exception(r[1])
- ret.append({"error": str(r[1])})
+ ret.append({"error": str(r[1].value)})
logger.debug("Returning: %s", str(ret))
+ response = {
+ "pdus": dict(zip(
+ (p.event_id for p in pdu_list), ret
+ )),
+ }
+
yield self.transaction_actions.set_response(
transaction,
200, response
@@ -344,6 +361,13 @@ class FederationServer(FederationBase):
affected=pdu.event_id,
)
+ raise FederationError(
+ "ERROR",
+ 403,
+ "Forbidden",
+ affected=pdu.event_id,
+ )
+
state = None
auth_chain = []
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 731019ad9f..6faaa066fb 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -91,14 +91,14 @@ class TransactionQueue(object):
if not deferred.called:
deferred.errback(failure)
else:
- logger.warn("Failed to send pdu", failure)
+ logger.warn("Failed to send pdu", failure.value)
with PreserveLoggingContext():
self._attempt_new_transaction(destination).addErrback(eb)
deferreds.append(deferred)
- yield defer.DeferredList(deferreds)
+ yield defer.DeferredList(deferreds, consumeErrors=True)
# NO inlineCallbacks
def enqueue_edu(self, edu):
@@ -116,7 +116,7 @@ class TransactionQueue(object):
if not deferred.called:
deferred.errback(failure)
else:
- logger.warn("Failed to send edu", failure)
+ logger.warn("Failed to send edu", failure.value)
with PreserveLoggingContext():
self._attempt_new_transaction(destination).addErrback(eb)
@@ -133,6 +133,15 @@ class TransactionQueue(object):
(failure, deferred)
)
+ def eb(failure):
+ if not deferred.called:
+ deferred.errback(failure)
+ else:
+ logger.warn("Failed to send failure", failure.value)
+
+ with PreserveLoggingContext():
+ self._attempt_new_transaction(destination).addErrback(eb)
+
yield deferred
@defer.inlineCallbacks
@@ -249,6 +258,15 @@ class TransactionQueue(object):
transaction, json_data_cb
)
code = 200
+
+ if response:
+ for e_id, r in getattr(response, "pdus", {}).items():
+ if "error" in r:
+ logger.warn(
+ "Transaction returned error for %s: %s",
+ e_id, r,
+ )
+
except HttpResponseException as e:
code = e.code
response = e.response
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 59287010ed..8ef248ecf2 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -492,7 +492,7 @@ class PresenceHandler(BaseHandler):
user, domain, remoteusers
))
- yield defer.DeferredList(deferreds)
+ yield defer.DeferredList(deferreds, consumeErrors=True)
def _start_polling_local(self, user, target_user):
target_localpart = target_user.localpart
@@ -548,7 +548,7 @@ class PresenceHandler(BaseHandler):
self._stop_polling_remote(user, domain, remoteusers)
)
- return defer.DeferredList(deferreds)
+ return defer.DeferredList(deferreds, consumeErrors=True)
def _stop_polling_local(self, user, target_user):
for localpart in self._local_pushmap.keys():
@@ -729,7 +729,7 @@ class PresenceHandler(BaseHandler):
del self._remote_sendmap[user]
with PreserveLoggingContext():
- yield defer.DeferredList(deferreds)
+ yield defer.DeferredList(deferreds, consumeErrors=True)
@defer.inlineCallbacks
def push_update_to_local_and_remote(self, observed_user, statuscache,
@@ -768,7 +768,7 @@ class PresenceHandler(BaseHandler):
)
)
- yield defer.DeferredList(deferreds)
+ yield defer.DeferredList(deferreds, consumeErrors=True)
defer.returnValue((localusers, remote_domains))
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index c69787005f..c2762f92c7 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -181,7 +181,7 @@ class TypingNotificationHandler(BaseHandler):
},
))
- yield defer.DeferredList(deferreds, consumeErrors=False)
+ yield defer.DeferredList(deferreds, consumeErrors=True)
@defer.inlineCallbacks
def _recv_edu(self, origin, content):
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 1927948001..764b151d9a 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -16,6 +16,7 @@
from twisted.internet import defer, reactor, protocol
from twisted.internet.error import DNSLookupError
+from twisted.python.failure import Failure
from twisted.web.client import readBody, _AgentBase, _URI
from twisted.web.http_headers import Headers
from twisted.web._newclient import ResponseDone
@@ -146,14 +147,22 @@ class MatrixFederationHttpClient(object):
)
raise SynapseError(400, "Domain specified not found.")
+ if hasattr(e, "reasons"):
+ reasons = ", ".join(
+ f.value.message
+ for f in e.reasons
+ )
+ else:
+ reasons = e.message
+
logger.warn(
- "Sending request failed to %s: %s %s : %s",
+ "Sending request failed to %s: %s %s: %s - %s",
destination,
method,
url_bytes,
- e
+ type(e). __name__,
+ reasons,
)
- _print_ex(e)
if retries_left:
yield sleep(2 ** (5 - retries_left))
@@ -447,14 +456,6 @@ def _readBodyToFile(response, stream, max_size):
return d
-def _print_ex(e):
- if hasattr(e, "reasons") and e.reasons:
- for ex in e.reasons:
- _print_ex(ex)
- else:
- logger.warn(e)
-
-
class _JsonProducer(object):
""" Used by the twisted http client to create the HTTP body from json
"""
diff --git a/synapse/notifier.py b/synapse/notifier.py
index c7f75ab801..2475f3ffbe 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -141,7 +141,8 @@ class Notifier(object):
with PreserveLoggingContext():
yield defer.DeferredList(
- [notify(l).addErrback(eb) for l in listeners]
+ [notify(l).addErrback(eb) for l in listeners],
+ consumeErrors=True,
)
@defer.inlineCallbacks
@@ -209,7 +210,8 @@ class Notifier(object):
with PreserveLoggingContext():
yield defer.DeferredList(
- [notify(l).addErrback(eb) for l in listeners]
+ [notify(l).addErrback(eb) for l in listeners],
+ consumeErrors=True,
)
@defer.inlineCallbacks
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 779f9ce544..9bf608bc90 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -288,7 +288,7 @@ class RoomMemberStore(SQLBaseStore):
deferreds = [self.get_rooms_for_user(u) for u in user_id_list]
- results = yield defer.DeferredList(deferreds)
+ results = yield defer.DeferredList(deferreds, consumeErrors=True)
# A list of sets of strings giving room IDs for each user
room_id_lists = [set([r.room_id for r in result[1]]) for result in results]
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index e77eba90ad..79109d0b19 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -99,8 +99,6 @@ class Clock(object):
except:
pass
- return res
-
given_deferred.addCallbacks(callback=sucess, errback=err)
timer = self.call_later(time_out, timed_out_fn)
|