diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 98b5950800..3fdd63be95 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -633,14 +633,6 @@ class TransactionQueue(object):
transaction, json_data_cb
)
code = 200
-
- if response:
- for e_id, r in response.get("pdus", {}).items():
- if "error" in r:
- logger.warn(
- "Transaction returned error for %s: %s",
- e_id, r,
- )
except HttpResponseException as e:
code = e.code
response = e.response
@@ -657,19 +649,24 @@ class TransactionQueue(object):
destination, txn_id, code
)
- logger.debug("TX [%s] Sent transaction", destination)
- logger.debug("TX [%s] Marking as delivered...", destination)
-
yield self.transaction_actions.delivered(
transaction, code, response
)
- logger.debug("TX [%s] Marked as delivered", destination)
+ logger.debug("TX [%s] {%s} Marked as delivered", destination, txn_id)
- if code != 200:
+ if code == 200:
+ for e_id, r in response.get("pdus", {}).items():
+ if "error" in r:
+ logger.warn(
+ "TX [%s] {%s} Remote returned error for %s: %s",
+ destination, txn_id, e_id, r,
+ )
+ else:
for p in pdus:
- logger.info(
- "Failed to send event %s to %s", p.event_id, destination
+ logger.warn(
+ "TX [%s] {%s} Failed to send event %s",
+ destination, txn_id, p.event_id,
)
success = False
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 2ab973d6c8..edba5a9808 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -143,9 +143,17 @@ class TransportLayerClient(object):
transaction (Transaction)
Returns:
- Deferred: Results of the deferred is a tuple in the form of
- (response_code, response_body) where the response_body is a
- python dict decoded from json
+ Deferred: Succeeds when we get a 2xx HTTP response. The result
+ will be the decoded JSON body.
+
+ Fails with ``HTTPRequestException`` if we get an HTTP response
+ code >= 300.
+
+ Fails with ``NotRetryingDestination`` if we are not yet ready
+ to retry this server.
+
+ Fails with ``FederationDeniedError`` if this destination
+ is not on our federation whitelist
"""
logger.debug(
"send_data dest=%s, txid=%s",
@@ -170,11 +178,6 @@ class TransportLayerClient(object):
backoff_on_404=True, # If we get a 404 the other side has gone
)
- logger.debug(
- "send_data dest=%s, txid=%s, got response: 200",
- transaction.destination, transaction.transaction_id,
- )
-
defer.returnValue(response)
@defer.inlineCallbacks
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 14b12cd1c4..fcc02fc77d 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -195,7 +195,7 @@ class MatrixFederationHttpClient(object):
)
self.clock = hs.get_clock()
self._store = hs.get_datastore()
- self.version_string = hs.version_string.encode('ascii')
+ self.version_string_bytes = hs.version_string.encode('ascii')
self.default_timeout = 60
def schedule(x):
@@ -261,8 +261,8 @@ class MatrixFederationHttpClient(object):
ignore_backoff=ignore_backoff,
)
- method = request.method
- destination = request.destination
+ method_bytes = request.method.encode("ascii")
+ destination_bytes = request.destination.encode("ascii")
path_bytes = request.path.encode("ascii")
if request.query:
query_bytes = encode_query_args(request.query)
@@ -270,8 +270,8 @@ class MatrixFederationHttpClient(object):
query_bytes = b""
headers_dict = {
- "User-Agent": [self.version_string],
- "Host": [request.destination],
+ b"User-Agent": [self.version_string_bytes],
+ b"Host": [destination_bytes],
}
with limiter:
@@ -282,50 +282,51 @@ class MatrixFederationHttpClient(object):
else:
retries_left = MAX_SHORT_RETRIES
- url = urllib.parse.urlunparse((
- b"matrix", destination.encode("ascii"),
+ url_bytes = urllib.parse.urlunparse((
+ b"matrix", destination_bytes,
path_bytes, None, query_bytes, b"",
- )).decode('ascii')
+ ))
+ url_str = url_bytes.decode('ascii')
- http_url = urllib.parse.urlunparse((
+ url_to_sign_bytes = urllib.parse.urlunparse((
b"", b"",
path_bytes, None, query_bytes, b"",
- )).decode('ascii')
+ ))
while True:
try:
json = request.get_json()
if json:
- data = encode_canonical_json(json)
- headers_dict["Content-Type"] = ["application/json"]
+ headers_dict[b"Content-Type"] = [b"application/json"]
self.sign_request(
- destination, method, http_url, headers_dict, json
+ destination_bytes, method_bytes, url_to_sign_bytes,
+ headers_dict, json,
)
- else:
- data = None
- self.sign_request(destination, method, http_url, headers_dict)
-
- logger.info(
- "{%s} [%s] Sending request: %s %s",
- request.txn_id, destination, method, url
- )
-
- if data:
+ data = encode_canonical_json(json)
producer = FileBodyProducer(
BytesIO(data),
- cooperator=self._cooperator
+ cooperator=self._cooperator,
)
else:
producer = None
+ self.sign_request(
+ destination_bytes, method_bytes, url_to_sign_bytes,
+ headers_dict,
+ )
- request_deferred = treq.request(
- method,
- url,
+ logger.info(
+ "{%s} [%s] Sending request: %s %s",
+ request.txn_id, request.destination, request.method,
+ url_str,
+ )
+
+ # we don't want all the fancy cookie and redirect handling that
+ # treq.request gives: just use the raw Agent.
+ request_deferred = self.agent.request(
+ method_bytes,
+ url_bytes,
headers=Headers(headers_dict),
- data=producer,
- agent=self.agent,
- reactor=self.hs.get_reactor(),
- unbuffered=True
+ bodyProducer=producer,
)
request_deferred = timeout_deferred(
@@ -344,9 +345,9 @@ class MatrixFederationHttpClient(object):
logger.warn(
"{%s} [%s] Request failed: %s %s: %s",
request.txn_id,
- destination,
- method,
- url,
+ request.destination,
+ request.method,
+ url_str,
_flatten_response_never_received(e),
)
@@ -366,7 +367,7 @@ class MatrixFederationHttpClient(object):
logger.debug(
"{%s} [%s] Waiting %ss before re-sending...",
request.txn_id,
- destination,
+ request.destination,
delay,
)
@@ -378,7 +379,7 @@ class MatrixFederationHttpClient(object):
logger.info(
"{%s} [%s] Got response headers: %d %s",
request.txn_id,
- destination,
+ request.destination,
response.code,
response.phrase.decode('ascii', errors='replace'),
)
@@ -411,8 +412,9 @@ class MatrixFederationHttpClient(object):
destination_is must be non-None.
method (bytes): The HTTP method of the request
url_bytes (bytes): The URI path of the request
- headers_dict (dict): Dictionary of request headers to append to
- content (bytes): The body of the request
+ headers_dict (dict[bytes, list[bytes]]): Dictionary of request headers to
+ append to
+ content (object): The body of the request
destination_is (bytes): As 'destination', but if the destination is an
identity server
|