diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index d6b8c43916..eb36ec040b 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -19,6 +19,7 @@ from twisted.internet import defer
from .federation_base import FederationBase
from .units import Edu
+from synapse.api.errors import CodeMessageException
from synapse.util.logutils import log_function
from synapse.events import FrozenEvent
@@ -180,7 +181,8 @@ class FederationClient(FederationBase):
pdu = yield self._check_sigs_and_hash(pdu)
break
-
+ except CodeMessageException:
+ raise
except Exception as e:
logger.info(
"Failed to get PDU %s from %s because %s",
@@ -264,45 +266,63 @@ class FederationClient(FederationBase):
defer.returnValue(self.event_from_pdu_json(pdu_dict))
break
- except Exception as e:
- logger.warn("Failed to make_join via %s", destination)
+ except CodeMessageException:
+ raise
+ except RuntimeError as e:
+ logger.warn(
+ "Failed to make_join via %s: %s",
+ destination, e.message
+ )
+
+ raise RuntimeError("Failed to send to any server.")
@defer.inlineCallbacks
- def send_join(self, destination, pdu):
- time_now = self._clock.time_msec()
- _, content = yield self.transport_layer.send_join(
- destination=destination,
- room_id=pdu.room_id,
- event_id=pdu.event_id,
- content=pdu.get_pdu_json(time_now),
- )
+ def send_join(self, destinations, pdu):
+ for destination in destinations:
+ try:
+ time_now = self._clock.time_msec()
+ _, content = yield self.transport_layer.send_join(
+ destination=destination,
+ room_id=pdu.room_id,
+ event_id=pdu.event_id,
+ content=pdu.get_pdu_json(time_now),
+ )
- logger.debug("Got content: %s", content)
+ logger.debug("Got content: %s", content)
- state = [
- self.event_from_pdu_json(p, outlier=True)
- for p in content.get("state", [])
- ]
+ state = [
+ self.event_from_pdu_json(p, outlier=True)
+ for p in content.get("state", [])
+ ]
- auth_chain = [
- self.event_from_pdu_json(p, outlier=True)
- for p in content.get("auth_chain", [])
- ]
+ auth_chain = [
+ self.event_from_pdu_json(p, outlier=True)
+ for p in content.get("auth_chain", [])
+ ]
- signed_state = yield self._check_sigs_and_hash_and_fetch(
- destination, state, outlier=True
- )
+ signed_state = yield self._check_sigs_and_hash_and_fetch(
+ destination, state, outlier=True
+ )
- signed_auth = yield self._check_sigs_and_hash_and_fetch(
- destination, auth_chain, outlier=True
- )
+ signed_auth = yield self._check_sigs_and_hash_and_fetch(
+ destination, auth_chain, outlier=True
+ )
- auth_chain.sort(key=lambda e: e.depth)
+ auth_chain.sort(key=lambda e: e.depth)
+
+ defer.returnValue({
+ "state": signed_state,
+ "auth_chain": signed_auth,
+ })
+ except CodeMessageException:
+ raise
+ except RuntimeError as e:
+ logger.warn(
+ "Failed to send_join via %s: %s",
+ destination, e.message
+ )
- defer.returnValue({
- "state": signed_state,
- "auth_chain": signed_auth,
- })
+ raise RuntimeError("Failed to send to any server.")
@defer.inlineCallbacks
def send_invite(self, destination, room_id, event_id, pdu):
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 9d4f2c09a2..f38aeba7cc 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -19,6 +19,7 @@ from twisted.internet import defer
from .persistence import TransactionActions
from .units import Transaction
+from synapse.api.errors import HttpResponseException
from synapse.util.logutils import log_function
from synapse.util.logcontext import PreserveLoggingContext
@@ -238,9 +239,14 @@ class TransactionQueue(object):
del p["age_ts"]
return data
- code, response = yield self.transport_layer.send_transaction(
- transaction, json_data_cb
- )
+ try:
+ response = yield self.transport_layer.send_transaction(
+ transaction, json_data_cb
+ )
+ code = 200
+ except HttpResponseException as e:
+ code = e.code
+ response = e.response
logger.info("TX [%s] got %d response", destination, code)
@@ -274,8 +280,7 @@ class TransactionQueue(object):
pass
logger.debug("TX [%s] Yielded to callbacks", destination)
-
- except Exception as e:
+ except RuntimeError as e:
# We capture this here as there as nothing actually listens
# for this finishing functions deferred.
logger.warn(
@@ -283,6 +288,14 @@ class TransactionQueue(object):
destination,
e,
)
+ except Exception as e:
+ # We capture this here as there as nothing actually listens
+ # for this finishing functions deferred.
+ logger.exception(
+ "TX [%s] Problem in _attempt_transaction: %s",
+ destination,
+ e,
+ )
self.set_retrying(destination, retry_interval)
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 4cb1dea2de..8b137e7128 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -19,7 +19,6 @@ from synapse.api.urls import FEDERATION_PREFIX as PREFIX
from synapse.util.logutils import log_function
import logging
-import json
logger = logging.getLogger(__name__)
@@ -129,7 +128,7 @@ class TransportLayerClient(object):
# generated by the json_data_callback.
json_data = transaction.get_dict()
- code, response = yield self.client.put_json(
+ response = yield self.client.put_json(
transaction.destination,
path=PREFIX + "/send/%s/" % transaction.transaction_id,
data=json_data,
@@ -137,95 +136,86 @@ class TransportLayerClient(object):
)
logger.debug(
- "send_data dest=%s, txid=%s, got response: %d",
- transaction.destination, transaction.transaction_id, code
+ "send_data dest=%s, txid=%s, got response: 200",
+ transaction.destination, transaction.transaction_id,
)
- defer.returnValue((code, response))
+ defer.returnValue(response)
@defer.inlineCallbacks
@log_function
def make_query(self, destination, query_type, args, retry_on_dns_fail):
path = PREFIX + "/query/%s" % query_type
- response = yield self.client.get_json(
+ content = yield self.client.get_json(
destination=destination,
path=path,
args=args,
retry_on_dns_fail=retry_on_dns_fail,
)
- defer.returnValue(response)
+ defer.returnValue(content)
@defer.inlineCallbacks
@log_function
def make_join(self, destination, room_id, user_id, retry_on_dns_fail=True):
path = PREFIX + "/make_join/%s/%s" % (room_id, user_id)
- response = yield self.client.get_json(
+ content = yield self.client.get_json(
destination=destination,
path=path,
retry_on_dns_fail=retry_on_dns_fail,
)
- defer.returnValue(response)
+ defer.returnValue(content)
@defer.inlineCallbacks
@log_function
def send_join(self, destination, room_id, event_id, content):
path = PREFIX + "/send_join/%s/%s" % (room_id, event_id)
- code, content = yield self.client.put_json(
+ response = yield self.client.put_json(
destination=destination,
path=path,
data=content,
)
- if not 200 <= code < 300:
- raise RuntimeError("Got %d from send_join", code)
-
- defer.returnValue(json.loads(content))
+ defer.returnValue(response)
@defer.inlineCallbacks
@log_function
def send_invite(self, destination, room_id, event_id, content):
path = PREFIX + "/invite/%s/%s" % (room_id, event_id)
- code, content = yield self.client.put_json(
+ response = yield self.client.put_json(
destination=destination,
path=path,
data=content,
)
- if not 200 <= code < 300:
- raise RuntimeError("Got %d from send_invite", code)
-
- defer.returnValue(json.loads(content))
+ defer.returnValue(response)
@defer.inlineCallbacks
@log_function
def get_event_auth(self, destination, room_id, event_id):
path = PREFIX + "/event_auth/%s/%s" % (room_id, event_id)
- response = yield self.client.get_json(
+ content = yield self.client.get_json(
destination=destination,
path=path,
)
- defer.returnValue(response)
+ defer.returnValue(content)
@defer.inlineCallbacks
@log_function
def send_query_auth(self, destination, room_id, event_id, content):
path = PREFIX + "/query_auth/%s/%s" % (room_id, event_id)
- code, content = yield self.client.post_json(
+ content = yield self.client.post_json(
destination=destination,
path=path,
data=content,
)
- if not 200 <= code < 300:
- raise RuntimeError("Got %d from send_invite", code)
-
- defer.returnValue(json.loads(content))
+ defer.returnValue(content)
|