summary refs log tree commit diff
diff options
context:
space:
mode:
authorRichard van der Hoff <1389908+richvdh@users.noreply.github.com>2018-10-16 10:44:49 +0100
committerGitHub <noreply@github.com>2018-10-16 10:44:49 +0100
commitb8a5b0097c4fc88a43c5c5f9785720d38299d3d8 (patch)
tree60e04d23b0abe7125f875515046001a54d2f4dcd
parentMerge pull request #4019 from matrix-org/dbkr/e2e_backups (diff)
downloadsynapse-b8a5b0097c4fc88a43c5c5f9785720d38299d3d8.tar.xz
Various cleanups in the federation client code (#4031)
- Improve logging: log things in the right order, include destination and txids
  in all log lines, don't log successful responses twice

- Fix the docstring on TransportLayerClient.send_transaction

- Don't use treq.request, which is overcomplicated for our purposes: just use a
  twisted.web.client.Agent.

- simplify the logic for setting up the bodyProducer

- fix bytes/str confusions
-rw-r--r--changelog.d/4031.misc1
-rw-r--r--synapse/federation/transaction_queue.py27
-rw-r--r--synapse/federation/transport/client.py19
-rw-r--r--synapse/http/matrixfederationclient.py78
4 files changed, 64 insertions, 61 deletions
diff --git a/changelog.d/4031.misc b/changelog.d/4031.misc
new file mode 100644
index 0000000000..60be8b59fd
--- /dev/null
+++ b/changelog.d/4031.misc
@@ -0,0 +1 @@
+Various cleanups in the federation client code
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 98b5950800..3fdd63be95 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -633,14 +633,6 @@ class TransactionQueue(object):
                 transaction, json_data_cb
             )
             code = 200
-
-            if response:
-                for e_id, r in response.get("pdus", {}).items():
-                    if "error" in r:
-                        logger.warn(
-                            "Transaction returned error for %s: %s",
-                            e_id, r,
-                        )
         except HttpResponseException as e:
             code = e.code
             response = e.response
@@ -657,19 +649,24 @@ class TransactionQueue(object):
             destination, txn_id, code
         )
 
-        logger.debug("TX [%s] Sent transaction", destination)
-        logger.debug("TX [%s] Marking as delivered...", destination)
-
         yield self.transaction_actions.delivered(
             transaction, code, response
         )
 
-        logger.debug("TX [%s] Marked as delivered", destination)
+        logger.debug("TX [%s] {%s} Marked as delivered", destination, txn_id)
 
-        if code != 200:
+        if code == 200:
+            for e_id, r in response.get("pdus", {}).items():
+                if "error" in r:
+                    logger.warn(
+                        "TX [%s] {%s} Remote returned error for %s: %s",
+                        destination, txn_id, e_id, r,
+                    )
+        else:
             for p in pdus:
-                logger.info(
-                    "Failed to send event %s to %s", p.event_id, destination
+                logger.warn(
+                    "TX [%s] {%s} Failed to send event %s",
+                    destination, txn_id, p.event_id,
                 )
             success = False
 
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 2ab973d6c8..edba5a9808 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -143,9 +143,17 @@ class TransportLayerClient(object):
             transaction (Transaction)
 
         Returns:
-            Deferred: Results of the deferred is a tuple in the form of
-            (response_code, response_body) where the response_body is a
-            python dict decoded from json
+            Deferred: Succeeds when we get a 2xx HTTP response. The result
+            will be the decoded JSON body.
+
+            Fails with ``HTTPRequestException`` if we get an HTTP response
+            code >= 300.
+
+            Fails with ``NotRetryingDestination`` if we are not yet ready
+            to retry this server.
+
+            Fails with ``FederationDeniedError`` if this destination
+            is not on our federation whitelist
         """
         logger.debug(
             "send_data dest=%s, txid=%s",
@@ -170,11 +178,6 @@ class TransportLayerClient(object):
             backoff_on_404=True,  # If we get a 404 the other side has gone
         )
 
-        logger.debug(
-            "send_data dest=%s, txid=%s, got response: 200",
-            transaction.destination, transaction.transaction_id,
-        )
-
         defer.returnValue(response)
 
     @defer.inlineCallbacks
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 14b12cd1c4..fcc02fc77d 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -195,7 +195,7 @@ class MatrixFederationHttpClient(object):
         )
         self.clock = hs.get_clock()
         self._store = hs.get_datastore()
-        self.version_string = hs.version_string.encode('ascii')
+        self.version_string_bytes = hs.version_string.encode('ascii')
         self.default_timeout = 60
 
         def schedule(x):
@@ -261,8 +261,8 @@ class MatrixFederationHttpClient(object):
             ignore_backoff=ignore_backoff,
         )
 
-        method = request.method
-        destination = request.destination
+        method_bytes = request.method.encode("ascii")
+        destination_bytes = request.destination.encode("ascii")
         path_bytes = request.path.encode("ascii")
         if request.query:
             query_bytes = encode_query_args(request.query)
@@ -270,8 +270,8 @@ class MatrixFederationHttpClient(object):
             query_bytes = b""
 
         headers_dict = {
-            "User-Agent": [self.version_string],
-            "Host": [request.destination],
+            b"User-Agent": [self.version_string_bytes],
+            b"Host": [destination_bytes],
         }
 
         with limiter:
@@ -282,50 +282,51 @@ class MatrixFederationHttpClient(object):
             else:
                 retries_left = MAX_SHORT_RETRIES
 
-            url = urllib.parse.urlunparse((
-                b"matrix", destination.encode("ascii"),
+            url_bytes = urllib.parse.urlunparse((
+                b"matrix", destination_bytes,
                 path_bytes, None, query_bytes, b"",
-            )).decode('ascii')
+            ))
+            url_str = url_bytes.decode('ascii')
 
-            http_url = urllib.parse.urlunparse((
+            url_to_sign_bytes = urllib.parse.urlunparse((
                 b"", b"",
                 path_bytes, None, query_bytes, b"",
-            )).decode('ascii')
+            ))
 
             while True:
                 try:
                     json = request.get_json()
                     if json:
-                        data = encode_canonical_json(json)
-                        headers_dict["Content-Type"] = ["application/json"]
+                        headers_dict[b"Content-Type"] = [b"application/json"]
                         self.sign_request(
-                            destination, method, http_url, headers_dict, json
+                            destination_bytes, method_bytes, url_to_sign_bytes,
+                            headers_dict, json,
                         )
-                    else:
-                        data = None
-                        self.sign_request(destination, method, http_url, headers_dict)
-
-                    logger.info(
-                        "{%s} [%s] Sending request: %s %s",
-                        request.txn_id, destination, method, url
-                    )
-
-                    if data:
+                        data = encode_canonical_json(json)
                         producer = FileBodyProducer(
                             BytesIO(data),
-                            cooperator=self._cooperator
+                            cooperator=self._cooperator,
                         )
                     else:
                         producer = None
+                        self.sign_request(
+                            destination_bytes, method_bytes, url_to_sign_bytes,
+                            headers_dict,
+                        )
 
-                    request_deferred = treq.request(
-                        method,
-                        url,
+                    logger.info(
+                        "{%s} [%s] Sending request: %s %s",
+                        request.txn_id, request.destination, request.method,
+                        url_str,
+                    )
+
+                    # we don't want all the fancy cookie and redirect handling that
+                    # treq.request gives: just use the raw Agent.
+                    request_deferred = self.agent.request(
+                        method_bytes,
+                        url_bytes,
                         headers=Headers(headers_dict),
-                        data=producer,
-                        agent=self.agent,
-                        reactor=self.hs.get_reactor(),
-                        unbuffered=True
+                        bodyProducer=producer,
                     )
 
                     request_deferred = timeout_deferred(
@@ -344,9 +345,9 @@ class MatrixFederationHttpClient(object):
                     logger.warn(
                         "{%s} [%s] Request failed: %s %s: %s",
                         request.txn_id,
-                        destination,
-                        method,
-                        url,
+                        request.destination,
+                        request.method,
+                        url_str,
                         _flatten_response_never_received(e),
                     )
 
@@ -366,7 +367,7 @@ class MatrixFederationHttpClient(object):
                         logger.debug(
                             "{%s} [%s] Waiting %ss before re-sending...",
                             request.txn_id,
-                            destination,
+                            request.destination,
                             delay,
                         )
 
@@ -378,7 +379,7 @@ class MatrixFederationHttpClient(object):
             logger.info(
                 "{%s} [%s] Got response headers: %d %s",
                 request.txn_id,
-                destination,
+                request.destination,
                 response.code,
                 response.phrase.decode('ascii', errors='replace'),
             )
@@ -411,8 +412,9 @@ class MatrixFederationHttpClient(object):
                 destination_is must be non-None.
             method (bytes): The HTTP method of the request
             url_bytes (bytes): The URI path of the request
-            headers_dict (dict): Dictionary of request headers to append to
-            content (bytes): The body of the request
+            headers_dict (dict[bytes, list[bytes]]): Dictionary of request headers to
+                append to
+            content (object): The body of the request
             destination_is (bytes): As 'destination', but if the destination is an
                 identity server