summary refs log tree commit diff
path: root/synapse/http
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/http')
-rw-r--r--synapse/http/matrixfederationclient.py78
-rw-r--r--synapse/http/request_metrics.py31
2 files changed, 58 insertions, 51 deletions
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 1a79f6305b..24b6110c20 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -195,7 +195,7 @@ class MatrixFederationHttpClient(object):
         )
         self.clock = hs.get_clock()
         self._store = hs.get_datastore()
-        self.version_string = hs.version_string.encode('ascii')
+        self.version_string_bytes = hs.version_string.encode('ascii')
         self.default_timeout = 60
 
         def schedule(x):
@@ -261,8 +261,8 @@ class MatrixFederationHttpClient(object):
             ignore_backoff=ignore_backoff,
         )
 
-        method = request.method
-        destination = request.destination
+        method_bytes = request.method.encode("ascii")
+        destination_bytes = request.destination.encode("ascii")
         path_bytes = request.path.encode("ascii")
         if request.query:
             query_bytes = encode_query_args(request.query)
@@ -270,8 +270,8 @@ class MatrixFederationHttpClient(object):
             query_bytes = b""
 
         headers_dict = {
-            "User-Agent": [self.version_string],
-            "Host": [request.destination],
+            b"User-Agent": [self.version_string_bytes],
+            b"Host": [destination_bytes],
         }
 
         with limiter:
@@ -282,50 +282,51 @@ class MatrixFederationHttpClient(object):
             else:
                 retries_left = MAX_SHORT_RETRIES
 
-            url = urllib.parse.urlunparse((
-                b"matrix", destination.encode("ascii"),
+            url_bytes = urllib.parse.urlunparse((
+                b"matrix", destination_bytes,
                 path_bytes, None, query_bytes, b"",
-            )).decode('ascii')
+            ))
+            url_str = url_bytes.decode('ascii')
 
-            http_url = urllib.parse.urlunparse((
+            url_to_sign_bytes = urllib.parse.urlunparse((
                 b"", b"",
                 path_bytes, None, query_bytes, b"",
-            )).decode('ascii')
+            ))
 
             while True:
                 try:
                     json = request.get_json()
                     if json:
-                        data = encode_canonical_json(json)
-                        headers_dict["Content-Type"] = ["application/json"]
+                        headers_dict[b"Content-Type"] = [b"application/json"]
                         self.sign_request(
-                            destination, method, http_url, headers_dict, json
+                            destination_bytes, method_bytes, url_to_sign_bytes,
+                            headers_dict, json,
                         )
-                    else:
-                        data = None
-                        self.sign_request(destination, method, http_url, headers_dict)
-
-                    logger.info(
-                        "{%s} [%s] Sending request: %s %s",
-                        request.txn_id, destination, method, url
-                    )
-
-                    if data:
+                        data = encode_canonical_json(json)
                         producer = FileBodyProducer(
                             BytesIO(data),
-                            cooperator=self._cooperator
+                            cooperator=self._cooperator,
                         )
                     else:
                         producer = None
+                        self.sign_request(
+                            destination_bytes, method_bytes, url_to_sign_bytes,
+                            headers_dict,
+                        )
 
-                    request_deferred = treq.request(
-                        method,
-                        url,
+                    logger.info(
+                        "{%s} [%s] Sending request: %s %s",
+                        request.txn_id, request.destination, request.method,
+                        url_str,
+                    )
+
+                    # we don't want all the fancy cookie and redirect handling that
+                    # treq.request gives: just use the raw Agent.
+                    request_deferred = self.agent.request(
+                        method_bytes,
+                        url_bytes,
                         headers=Headers(headers_dict),
-                        data=producer,
-                        agent=self.agent,
-                        reactor=self.hs.get_reactor(),
-                        unbuffered=True
+                        bodyProducer=producer,
                     )
 
                     request_deferred = timeout_deferred(
@@ -344,9 +345,9 @@ class MatrixFederationHttpClient(object):
                     logger.warn(
                         "{%s} [%s] Request failed: %s %s: %s",
                         request.txn_id,
-                        destination,
-                        method,
-                        url,
+                        request.destination,
+                        request.method,
+                        url_str,
                         _flatten_response_never_received(e),
                     )
 
@@ -366,7 +367,7 @@ class MatrixFederationHttpClient(object):
                         logger.debug(
                             "{%s} [%s] Waiting %ss before re-sending...",
                             request.txn_id,
-                            destination,
+                            request.destination,
                             delay,
                         )
 
@@ -378,7 +379,7 @@ class MatrixFederationHttpClient(object):
             logger.info(
                 "{%s} [%s] Got response headers: %d %s",
                 request.txn_id,
-                destination,
+                request.destination,
                 response.code,
                 response.phrase.decode('ascii', errors='replace'),
             )
@@ -411,8 +412,9 @@ class MatrixFederationHttpClient(object):
                 destination_is must be non-None.
             method (bytes): The HTTP method of the request
             url_bytes (bytes): The URI path of the request
-            headers_dict (dict): Dictionary of request headers to append to
-            content (bytes): The body of the request
+            headers_dict (dict[bytes, list[bytes]]): Dictionary of request headers to
+                append to
+            content (object): The body of the request
             destination_is (bytes): As 'destination', but if the destination is an
                 identity server
 
diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py
index fedb4e6b18..62045a918b 100644
--- a/synapse/http/request_metrics.py
+++ b/synapse/http/request_metrics.py
@@ -39,7 +39,8 @@ outgoing_responses_counter = Counter(
 )
 
 response_timer = Histogram(
-    "synapse_http_server_response_time_seconds", "sec",
+    "synapse_http_server_response_time_seconds",
+    "sec",
     ["method", "servlet", "tag", "code"],
 )
 
@@ -79,15 +80,11 @@ response_size = Counter(
 # than when the response was written.
 
 in_flight_requests_ru_utime = Counter(
-    "synapse_http_server_in_flight_requests_ru_utime_seconds",
-    "",
-    ["method", "servlet"],
+    "synapse_http_server_in_flight_requests_ru_utime_seconds", "", ["method", "servlet"]
 )
 
 in_flight_requests_ru_stime = Counter(
-    "synapse_http_server_in_flight_requests_ru_stime_seconds",
-    "",
-    ["method", "servlet"],
+    "synapse_http_server_in_flight_requests_ru_stime_seconds", "", ["method", "servlet"]
 )
 
 in_flight_requests_db_txn_count = Counter(
@@ -134,7 +131,7 @@ def _get_in_flight_counts():
     # type
     counts = {}
     for rm in reqs:
-        key = (rm.method, rm.name,)
+        key = (rm.method, rm.name)
         counts[key] = counts.get(key, 0) + 1
 
     return counts
@@ -175,7 +172,8 @@ class RequestMetrics(object):
             if context != self.start_context:
                 logger.warn(
                     "Context have unexpectedly changed %r, %r",
-                    context, self.start_context
+                    context,
+                    self.start_context,
                 )
                 return
 
@@ -192,10 +190,10 @@ class RequestMetrics(object):
         resource_usage = context.get_resource_usage()
 
         response_ru_utime.labels(self.method, self.name, tag).inc(
-            resource_usage.ru_utime,
+            resource_usage.ru_utime
         )
         response_ru_stime.labels(self.method, self.name, tag).inc(
-            resource_usage.ru_stime,
+            resource_usage.ru_stime
         )
         response_db_txn_count.labels(self.method, self.name, tag).inc(
             resource_usage.db_txn_count
@@ -222,8 +220,15 @@ class RequestMetrics(object):
         diff = new_stats - self._request_stats
         self._request_stats = new_stats
 
-        in_flight_requests_ru_utime.labels(self.method, self.name).inc(diff.ru_utime)
-        in_flight_requests_ru_stime.labels(self.method, self.name).inc(diff.ru_stime)
+        # max() is used since rapid use of ru_stime/ru_utime can end up with the
+        # count going backwards due to NTP, time smearing, fine-grained
+        # correction, or floating points. Who knows, really?
+        in_flight_requests_ru_utime.labels(self.method, self.name).inc(
+            max(diff.ru_utime, 0)
+        )
+        in_flight_requests_ru_stime.labels(self.method, self.name).inc(
+            max(diff.ru_stime, 0)
+        )
 
         in_flight_requests_db_txn_count.labels(self.method, self.name).inc(
             diff.db_txn_count