diff options
Diffstat (limited to 'synapse/http')
-rw-r--r-- | synapse/http/client.py | 4 | ||||
-rw-r--r-- | synapse/http/endpoint.py | 13 | ||||
-rw-r--r-- | synapse/http/matrixfederationclient.py | 456 | ||||
-rw-r--r-- | synapse/http/request_metrics.py | 53 | ||||
-rw-r--r-- | synapse/http/server.py | 49 | ||||
-rw-r--r-- | synapse/http/site.py | 33 |
6 files changed, 375 insertions, 233 deletions
diff --git a/synapse/http/client.py b/synapse/http/client.py index ec339a92ad..3d05f83b8c 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -43,7 +43,7 @@ from twisted.web.http_headers import Headers from synapse.api.errors import Codes, HttpResponseException, SynapseError from synapse.http import cancelled_to_request_timed_out_error, redact_uri from synapse.http.endpoint import SpiderEndpoint -from synapse.util.async_helpers import add_timeout_to_deferred +from synapse.util.async_helpers import timeout_deferred from synapse.util.caches import CACHE_SIZE_FACTOR from synapse.util.logcontext import make_deferred_yieldable @@ -99,7 +99,7 @@ class SimpleHttpClient(object): request_deferred = treq.request( method, uri, agent=self.agent, data=data, headers=headers ) - add_timeout_to_deferred( + request_deferred = timeout_deferred( request_deferred, 60, self.hs.get_reactor(), cancelled_to_request_timed_out_error, ) diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py index b0c9369519..91025037a3 100644 --- a/synapse/http/endpoint.py +++ b/synapse/http/endpoint.py @@ -108,7 +108,7 @@ def matrix_federation_endpoint(reactor, destination, tls_client_options_factory= Args: reactor: Twisted reactor. - destination (bytes): The name of the server to connect to. + destination (unicode): The name of the server to connect to. tls_client_options_factory (synapse.crypto.context_factory.ClientTLSOptionsFactory): Factory which generates TLS options for client connections. @@ -126,10 +126,17 @@ def matrix_federation_endpoint(reactor, destination, tls_client_options_factory= transport_endpoint = HostnameEndpoint default_port = 8008 else: + # the SNI string should be the same as the Host header, minus the port. + # as per https://github.com/matrix-org/synapse/issues/2525#issuecomment-336896777, + # the Host header and SNI should therefore be the server_name of the remote + # server. + tls_options = tls_client_options_factory.get_options(domain) + def transport_endpoint(reactor, host, port, timeout): return wrapClientTLS( - tls_client_options_factory.get_options(host), - HostnameEndpoint(reactor, host, port, timeout=timeout)) + tls_options, + HostnameEndpoint(reactor, host, port, timeout=timeout), + ) default_port = 8448 if port is None: diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 083484a687..24b6110c20 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -17,10 +17,12 @@ import cgi import logging import random import sys +from io import BytesIO from six import PY3, string_types from six.moves import urllib +import attr import treq from canonicaljson import encode_canonical_json from prometheus_client import Counter @@ -28,8 +30,9 @@ from signedjson.sign import sign_json from twisted.internet import defer, protocol from twisted.internet.error import DNSLookupError +from twisted.internet.task import _EPSILON, Cooperator from twisted.web._newclient import ResponseDone -from twisted.web.client import Agent, HTTPConnectionPool +from twisted.web.client import Agent, FileBodyProducer, HTTPConnectionPool from twisted.web.http_headers import Headers import synapse.metrics @@ -41,13 +44,11 @@ from synapse.api.errors import ( SynapseError, ) from synapse.http.endpoint import matrix_federation_endpoint -from synapse.util import logcontext -from synapse.util.async_helpers import timeout_no_seriously +from synapse.util.async_helpers import timeout_deferred from synapse.util.logcontext import make_deferred_yieldable from synapse.util.metrics import Measure logger = logging.getLogger(__name__) -outbound_logger = logging.getLogger("synapse.http.outbound") outgoing_requests_counter = Counter("synapse_http_matrixfederationclient_requests", "", ["method"]) @@ -78,6 +79,99 @@ class MatrixFederationEndpointFactory(object): ) +_next_id = 1 + + +@attr.s +class MatrixFederationRequest(object): + method = attr.ib() + """HTTP method + :type: str + """ + + path = attr.ib() + """HTTP path + :type: str + """ + + destination = attr.ib() + """The remote server to send the HTTP request to. + :type: str""" + + json = attr.ib(default=None) + """JSON to send in the body. + :type: dict|None + """ + + json_callback = attr.ib(default=None) + """A callback to generate the JSON. + :type: func|None + """ + + query = attr.ib(default=None) + """Query arguments. + :type: dict|None + """ + + txn_id = attr.ib(default=None) + """Unique ID for this request (for logging) + :type: str|None + """ + + def __attrs_post_init__(self): + global _next_id + self.txn_id = "%s-O-%s" % (self.method, _next_id) + _next_id = (_next_id + 1) % (MAXINT - 1) + + def get_json(self): + if self.json_callback: + return self.json_callback() + return self.json + + +@defer.inlineCallbacks +def _handle_json_response(reactor, timeout_sec, request, response): + """ + Reads the JSON body of a response, with a timeout + + Args: + reactor (IReactor): twisted reactor, for the timeout + timeout_sec (float): number of seconds to wait for response to complete + request (MatrixFederationRequest): the request that triggered the response + response (IResponse): response to the request + + Returns: + dict: parsed JSON response + """ + try: + check_content_type_is_json(response.headers) + + d = treq.json_content(response) + d = timeout_deferred( + d, + timeout=timeout_sec, + reactor=reactor, + ) + + body = yield make_deferred_yieldable(d) + except Exception as e: + logger.warn( + "{%s} [%s] Error reading response: %s", + request.txn_id, + request.destination, + e, + ) + raise + logger.info( + "{%s} [%s] Completed: %d %s", + request.txn_id, + request.destination, + response.code, + response.phrase.decode('ascii', errors='replace'), + ) + defer.returnValue(body) + + class MatrixFederationHttpClient(object): """HTTP client used to talk to other homeservers over the federation protocol. Send client certificates and signs requests. @@ -101,41 +195,42 @@ class MatrixFederationHttpClient(object): ) self.clock = hs.get_clock() self._store = hs.get_datastore() - self.version_string = hs.version_string.encode('ascii') - self._next_id = 1 + self.version_string_bytes = hs.version_string.encode('ascii') self.default_timeout = 60 - def _create_url(self, destination, path_bytes, param_bytes, query_bytes): - return urllib.parse.urlunparse( - (b"matrix", destination, path_bytes, param_bytes, query_bytes, b"") - ) + def schedule(x): + reactor.callLater(_EPSILON, x) + + self._cooperator = Cooperator(scheduler=schedule) @defer.inlineCallbacks - def _request(self, destination, method, path, - json=None, json_callback=None, - param_bytes=b"", - query=None, retry_on_dns_fail=True, - timeout=None, long_retries=False, - ignore_backoff=False, - backoff_on_404=False): + def _send_request( + self, + request, + retry_on_dns_fail=True, + timeout=None, + long_retries=False, + ignore_backoff=False, + backoff_on_404=False + ): """ - Creates and sends a request to the given server. + Sends a request to the given server. Args: - destination (str): The remote server to send the HTTP request to. - method (str): HTTP method - path (str): The HTTP path - json (dict or None): JSON to send in the body. - json_callback (func or None): A callback to generate the JSON. - query (dict or None): Query arguments. + request (MatrixFederationRequest): details of request to be sent + + timeout (int|None): number of milliseconds to wait for the response headers + (including connecting to the server). 60s by default. + ignore_backoff (bool): true to ignore the historical backoff data and try the request anyway. + backoff_on_404 (bool): Back off if we get a 404 Returns: Deferred: resolves with the http response object on success. - Fails with ``HTTPRequestException``: if we get an HTTP response + Fails with ``HttpResponseException``: if we get an HTTP response code >= 300. Fails with ``NotRetryingDestination`` if we are not yet ready @@ -154,38 +249,32 @@ class MatrixFederationHttpClient(object): if ( self.hs.config.federation_domain_whitelist is not None and - destination not in self.hs.config.federation_domain_whitelist + request.destination not in self.hs.config.federation_domain_whitelist ): - raise FederationDeniedError(destination) + raise FederationDeniedError(request.destination) limiter = yield synapse.util.retryutils.get_retry_limiter( - destination, + request.destination, self.clock, self._store, backoff_on_404=backoff_on_404, ignore_backoff=ignore_backoff, ) - headers_dict = {} - path_bytes = path.encode("ascii") - if query: - query_bytes = encode_query_args(query) + method_bytes = request.method.encode("ascii") + destination_bytes = request.destination.encode("ascii") + path_bytes = request.path.encode("ascii") + if request.query: + query_bytes = encode_query_args(request.query) else: query_bytes = b"" headers_dict = { - "User-Agent": [self.version_string], - "Host": [destination], + b"User-Agent": [self.version_string_bytes], + b"Host": [destination_bytes], } with limiter: - url = self._create_url( - destination.encode("ascii"), path_bytes, param_bytes, query_bytes - ).decode('ascii') - - txn_id = "%s-O-%s" % (method, self._next_id) - self._next_id = (self._next_id + 1) % (MAXINT - 1) - # XXX: Would be much nicer to retry only at the transaction-layer # (once we have reliable transactions in place) if long_retries: @@ -193,49 +282,56 @@ class MatrixFederationHttpClient(object): else: retries_left = MAX_SHORT_RETRIES - http_url = urllib.parse.urlunparse( - (b"", b"", path_bytes, param_bytes, query_bytes, b"") - ).decode('ascii') + url_bytes = urllib.parse.urlunparse(( + b"matrix", destination_bytes, + path_bytes, None, query_bytes, b"", + )) + url_str = url_bytes.decode('ascii') + + url_to_sign_bytes = urllib.parse.urlunparse(( + b"", b"", + path_bytes, None, query_bytes, b"", + )) - log_result = None while True: try: - if json_callback: - json = json_callback() - + json = request.get_json() if json: - data = encode_canonical_json(json) - headers_dict["Content-Type"] = ["application/json"] + headers_dict[b"Content-Type"] = [b"application/json"] self.sign_request( - destination, method, http_url, headers_dict, json + destination_bytes, method_bytes, url_to_sign_bytes, + headers_dict, json, + ) + data = encode_canonical_json(json) + producer = FileBodyProducer( + BytesIO(data), + cooperator=self._cooperator, ) else: - data = None - self.sign_request(destination, method, http_url, headers_dict) + producer = None + self.sign_request( + destination_bytes, method_bytes, url_to_sign_bytes, + headers_dict, + ) - outbound_logger.info( + logger.info( "{%s} [%s] Sending request: %s %s", - txn_id, destination, method, url + request.txn_id, request.destination, request.method, + url_str, ) - request_deferred = treq.request( - method, - url, + # we don't want all the fancy cookie and redirect handling that + # treq.request gives: just use the raw Agent. + request_deferred = self.agent.request( + method_bytes, + url_bytes, headers=Headers(headers_dict), - data=data, - agent=self.agent, - reactor=self.hs.get_reactor(), - unbuffered=True + bodyProducer=producer, ) - request_deferred.addTimeout(_sec_timeout, self.hs.get_reactor()) - # Sometimes the timeout above doesn't work, so lets hack yet - # another layer of timeouts in in the vain hope that at some - # point the world made sense and this really really really - # should work. - request_deferred = timeout_no_seriously( + request_deferred = timeout_deferred( request_deferred, - timeout=_sec_timeout * 2, + timeout=_sec_timeout, reactor=self.hs.get_reactor(), ) @@ -244,33 +340,19 @@ class MatrixFederationHttpClient(object): request_deferred, ) - log_result = "%d %s" % ( - response.code, - response.phrase.decode('ascii', errors='replace'), - ) break except Exception as e: - if not retry_on_dns_fail and isinstance(e, DNSLookupError): - logger.warn( - "DNS Lookup failed to %s with %s", - destination, - e - ) - log_result = "DNS Lookup failed to %s with %s" % ( - destination, e - ) - raise - logger.warn( - "{%s} Sending request failed to %s: %s %s: %s", - txn_id, - destination, - method, - url, + "{%s} [%s] Request failed: %s %s: %s", + request.txn_id, + request.destination, + request.method, + url_str, _flatten_response_never_received(e), ) - log_result = _flatten_response_never_received(e) + if not retry_on_dns_fail and isinstance(e, DNSLookupError): + raise if retries_left and not timeout: if long_retries: @@ -283,33 +365,37 @@ class MatrixFederationHttpClient(object): delay *= random.uniform(0.8, 1.4) logger.debug( - "{%s} Waiting %s before sending to %s...", - txn_id, + "{%s} [%s] Waiting %ss before re-sending...", + request.txn_id, + request.destination, delay, - destination ) yield self.clock.sleep(delay) retries_left -= 1 else: raise - finally: - outbound_logger.info( - "{%s} [%s] Result: %s", - txn_id, - destination, - log_result, - ) + + logger.info( + "{%s} [%s] Got response headers: %d %s", + request.txn_id, + request.destination, + response.code, + response.phrase.decode('ascii', errors='replace'), + ) if 200 <= response.code < 300: pass else: # :'( # Update transactions table? - with logcontext.PreserveLoggingContext(): - d = treq.content(response) - d.addTimeout(_sec_timeout, self.hs.get_reactor()) - body = yield make_deferred_yieldable(d) + d = treq.content(response) + d = timeout_deferred( + d, + timeout=_sec_timeout, + reactor=self.hs.get_reactor(), + ) + body = yield make_deferred_yieldable(d) raise HttpResponseException( response.code, response.phrase, body ) @@ -326,8 +412,9 @@ class MatrixFederationHttpClient(object): destination_is must be non-None. method (bytes): The HTTP method of the request url_bytes (bytes): The URI path of the request - headers_dict (dict): Dictionary of request headers to append to - content (bytes): The body of the request + headers_dict (dict[bytes, list[bytes]]): Dictionary of request headers to + append to + content (object): The body of the request destination_is (bytes): As 'destination', but if the destination is an identity server @@ -393,7 +480,7 @@ class MatrixFederationHttpClient(object): Deferred: Succeeds when we get a 2xx HTTP response. The result will be the decoded JSON body. - Fails with ``HTTPRequestException`` if we get an HTTP response + Fails with ``HttpResponseException`` if we get an HTTP response code >= 300. Fails with ``NotRetryingDestination`` if we are not yet ready @@ -403,29 +490,26 @@ class MatrixFederationHttpClient(object): is not on our federation whitelist """ - if not json_data_callback: - json_data_callback = lambda: data - - response = yield self._request( - destination, - "PUT", - path, - json_callback=json_data_callback, + request = MatrixFederationRequest( + method="PUT", + destination=destination, + path=path, query=args, + json_callback=json_data_callback, + json=data, + ) + + response = yield self._send_request( + request, long_retries=long_retries, timeout=timeout, ignore_backoff=ignore_backoff, backoff_on_404=backoff_on_404, ) - if 200 <= response.code < 300: - # We need to update the transactions table to say it was sent? - check_content_type_is_json(response.headers) - - with logcontext.PreserveLoggingContext(): - d = treq.json_content(response) - d.addTimeout(self.default_timeout, self.hs.get_reactor()) - body = yield make_deferred_yieldable(d) + body = yield _handle_json_response( + self.hs.get_reactor(), self.default_timeout, request, response, + ) defer.returnValue(body) @defer.inlineCallbacks @@ -450,7 +534,7 @@ class MatrixFederationHttpClient(object): Deferred: Succeeds when we get a 2xx HTTP response. The result will be the decoded JSON body. - Fails with ``HTTPRequestException`` if we get an HTTP response + Fails with ``HttpResponseException`` if we get an HTTP response code >= 300. Fails with ``NotRetryingDestination`` if we are not yet ready @@ -459,31 +543,30 @@ class MatrixFederationHttpClient(object): Fails with ``FederationDeniedError`` if this destination is not on our federation whitelist """ - response = yield self._request( - destination, - "POST", - path, + + request = MatrixFederationRequest( + method="POST", + destination=destination, + path=path, query=args, json=data, + ) + + response = yield self._send_request( + request, long_retries=long_retries, timeout=timeout, ignore_backoff=ignore_backoff, ) - if 200 <= response.code < 300: - # We need to update the transactions table to say it was sent? - check_content_type_is_json(response.headers) - - with logcontext.PreserveLoggingContext(): - d = treq.json_content(response) - if timeout: - _sec_timeout = timeout / 1000 - else: - _sec_timeout = self.default_timeout - - d.addTimeout(_sec_timeout, self.hs.get_reactor()) - body = yield make_deferred_yieldable(d) + if timeout: + _sec_timeout = timeout / 1000 + else: + _sec_timeout = self.default_timeout + body = yield _handle_json_response( + self.hs.get_reactor(), _sec_timeout, request, response, + ) defer.returnValue(body) @defer.inlineCallbacks @@ -506,7 +589,7 @@ class MatrixFederationHttpClient(object): Deferred: Succeeds when we get a 2xx HTTP response. The result will be the decoded JSON body. - Fails with ``HTTPRequestException`` if we get an HTTP response + Fails with ``HttpResponseException`` if we get an HTTP response code >= 300. Fails with ``NotRetryingDestination`` if we are not yet ready @@ -519,25 +602,23 @@ class MatrixFederationHttpClient(object): logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail) - response = yield self._request( - destination, - "GET", - path, + request = MatrixFederationRequest( + method="GET", + destination=destination, + path=path, query=args, + ) + + response = yield self._send_request( + request, retry_on_dns_fail=retry_on_dns_fail, timeout=timeout, ignore_backoff=ignore_backoff, ) - if 200 <= response.code < 300: - # We need to update the transactions table to say it was sent? - check_content_type_is_json(response.headers) - - with logcontext.PreserveLoggingContext(): - d = treq.json_content(response) - d.addTimeout(self.default_timeout, self.hs.get_reactor()) - body = yield make_deferred_yieldable(d) - + body = yield _handle_json_response( + self.hs.get_reactor(), self.default_timeout, request, response, + ) defer.returnValue(body) @defer.inlineCallbacks @@ -559,7 +640,7 @@ class MatrixFederationHttpClient(object): Deferred: Succeeds when we get a 2xx HTTP response. The result will be the decoded JSON body. - Fails with ``HTTPRequestException`` if we get an HTTP response + Fails with ``HttpResponseException`` if we get an HTTP response code >= 300. Fails with ``NotRetryingDestination`` if we are not yet ready @@ -568,25 +649,23 @@ class MatrixFederationHttpClient(object): Fails with ``FederationDeniedError`` if this destination is not on our federation whitelist """ - response = yield self._request( - destination, - "DELETE", - path, + request = MatrixFederationRequest( + method="DELETE", + destination=destination, + path=path, query=args, + ) + + response = yield self._send_request( + request, long_retries=long_retries, timeout=timeout, ignore_backoff=ignore_backoff, ) - if 200 <= response.code < 300: - # We need to update the transactions table to say it was sent? - check_content_type_is_json(response.headers) - - with logcontext.PreserveLoggingContext(): - d = treq.json_content(response) - d.addTimeout(self.default_timeout, self.hs.get_reactor()) - body = yield make_deferred_yieldable(d) - + body = yield _handle_json_response( + self.hs.get_reactor(), self.default_timeout, request, response, + ) defer.returnValue(body) @defer.inlineCallbacks @@ -605,7 +684,7 @@ class MatrixFederationHttpClient(object): Deferred: resolves with an (int,dict) tuple of the file length and a dict of the response headers. - Fails with ``HTTPRequestException`` if we get an HTTP response code + Fails with ``HttpResponseException`` if we get an HTTP response code >= 300 Fails with ``NotRetryingDestination`` if we are not yet ready @@ -614,11 +693,15 @@ class MatrixFederationHttpClient(object): Fails with ``FederationDeniedError`` if this destination is not on our federation whitelist """ - response = yield self._request( - destination, - "GET", - path, + request = MatrixFederationRequest( + method="GET", + destination=destination, + path=path, query=args, + ) + + response = yield self._send_request( + request, retry_on_dns_fail=retry_on_dns_fail, ignore_backoff=ignore_backoff, ) @@ -626,14 +709,25 @@ class MatrixFederationHttpClient(object): headers = dict(response.headers.getAllRawHeaders()) try: - with logcontext.PreserveLoggingContext(): - d = _readBodyToFile(response, output_stream, max_size) - d.addTimeout(self.default_timeout, self.hs.get_reactor()) - length = yield make_deferred_yieldable(d) - except Exception: - logger.exception("Failed to download body") + d = _readBodyToFile(response, output_stream, max_size) + d.addTimeout(self.default_timeout, self.hs.get_reactor()) + length = yield make_deferred_yieldable(d) + except Exception as e: + logger.warn( + "{%s} [%s] Error reading response: %s", + request.txn_id, + request.destination, + e, + ) raise - + logger.info( + "{%s} [%s] Completed: %d %s [%d bytes]", + request.txn_id, + request.destination, + response.code, + response.phrase.decode('ascii', errors='replace'), + length, + ) defer.returnValue((length, headers)) diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py index 72c2654678..62045a918b 100644 --- a/synapse/http/request_metrics.py +++ b/synapse/http/request_metrics.py @@ -39,7 +39,8 @@ outgoing_responses_counter = Counter( ) response_timer = Histogram( - "synapse_http_server_response_time_seconds", "sec", + "synapse_http_server_response_time_seconds", + "sec", ["method", "servlet", "tag", "code"], ) @@ -79,15 +80,11 @@ response_size = Counter( # than when the response was written. in_flight_requests_ru_utime = Counter( - "synapse_http_server_in_flight_requests_ru_utime_seconds", - "", - ["method", "servlet"], + "synapse_http_server_in_flight_requests_ru_utime_seconds", "", ["method", "servlet"] ) in_flight_requests_ru_stime = Counter( - "synapse_http_server_in_flight_requests_ru_stime_seconds", - "", - ["method", "servlet"], + "synapse_http_server_in_flight_requests_ru_stime_seconds", "", ["method", "servlet"] ) in_flight_requests_db_txn_count = Counter( @@ -134,7 +131,7 @@ def _get_in_flight_counts(): # type counts = {} for rm in reqs: - key = (rm.method, rm.name,) + key = (rm.method, rm.name) counts[key] = counts.get(key, 0) + 1 return counts @@ -162,7 +159,7 @@ class RequestMetrics(object): with _in_flight_requests_lock: _in_flight_requests.add(self) - def stop(self, time_sec, request): + def stop(self, time_sec, response_code, sent_bytes): with _in_flight_requests_lock: _in_flight_requests.discard(self) @@ -175,39 +172,40 @@ class RequestMetrics(object): if context != self.start_context: logger.warn( "Context have unexpectedly changed %r, %r", - context, self.start_context + context, + self.start_context, ) return - response_code = str(request.code) + response_code = str(response_code) - outgoing_responses_counter.labels(request.method, response_code).inc() + outgoing_responses_counter.labels(self.method, response_code).inc() - response_count.labels(request.method, self.name, tag).inc() + response_count.labels(self.method, self.name, tag).inc() - response_timer.labels(request.method, self.name, tag, response_code).observe( + response_timer.labels(self.method, self.name, tag, response_code).observe( time_sec - self.start ) resource_usage = context.get_resource_usage() - response_ru_utime.labels(request.method, self.name, tag).inc( - resource_usage.ru_utime, + response_ru_utime.labels(self.method, self.name, tag).inc( + resource_usage.ru_utime ) - response_ru_stime.labels(request.method, self.name, tag).inc( - resource_usage.ru_stime, + response_ru_stime.labels(self.method, self.name, tag).inc( + resource_usage.ru_stime ) - response_db_txn_count.labels(request.method, self.name, tag).inc( + response_db_txn_count.labels(self.method, self.name, tag).inc( resource_usage.db_txn_count ) - response_db_txn_duration.labels(request.method, self.name, tag).inc( + response_db_txn_duration.labels(self.method, self.name, tag).inc( resource_usage.db_txn_duration_sec ) - response_db_sched_duration.labels(request.method, self.name, tag).inc( + response_db_sched_duration.labels(self.method, self.name, tag).inc( resource_usage.db_sched_duration_sec ) - response_size.labels(request.method, self.name, tag).inc(request.sentLength) + response_size.labels(self.method, self.name, tag).inc(sent_bytes) # We always call this at the end to ensure that we update the metrics # regardless of whether a call to /metrics while the request was in @@ -222,8 +220,15 @@ class RequestMetrics(object): diff = new_stats - self._request_stats self._request_stats = new_stats - in_flight_requests_ru_utime.labels(self.method, self.name).inc(diff.ru_utime) - in_flight_requests_ru_stime.labels(self.method, self.name).inc(diff.ru_stime) + # max() is used since rapid use of ru_stime/ru_utime can end up with the + # count going backwards due to NTP, time smearing, fine-grained + # correction, or floating points. Who knows, really? + in_flight_requests_ru_utime.labels(self.method, self.name).inc( + max(diff.ru_utime, 0) + ) + in_flight_requests_ru_stime.labels(self.method, self.name).inc( + max(diff.ru_stime, 0) + ) in_flight_requests_db_txn_count.labels(self.method, self.name).inc( diff.db_txn_count diff --git a/synapse/http/server.py b/synapse/http/server.py index 2d5c23e673..b4b25cab19 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -84,10 +84,21 @@ def wrap_json_request_handler(h): logger.info( "%s SynapseError: %s - %s", request, code, e.msg ) - respond_with_json( - request, code, e.error_dict(), send_cors=True, - pretty_print=_request_user_agent_is_curl(request), - ) + + # Only respond with an error response if we haven't already started + # writing, otherwise lets just kill the connection + if request.startedWriting: + if request.transport: + try: + request.transport.abortConnection() + except Exception: + # abortConnection throws if the connection is already closed + pass + else: + respond_with_json( + request, code, e.error_dict(), send_cors=True, + pretty_print=_request_user_agent_is_curl(request), + ) except Exception: # failure.Failure() fishes the original Failure out @@ -100,16 +111,26 @@ def wrap_json_request_handler(h): request, f.getTraceback().rstrip(), ) - respond_with_json( - request, - 500, - { - "error": "Internal server error", - "errcode": Codes.UNKNOWN, - }, - send_cors=True, - pretty_print=_request_user_agent_is_curl(request), - ) + # Only respond with an error response if we haven't already started + # writing, otherwise lets just kill the connection + if request.startedWriting: + if request.transport: + try: + request.transport.abortConnection() + except Exception: + # abortConnection throws if the connection is already closed + pass + else: + respond_with_json( + request, + 500, + { + "error": "Internal server error", + "errcode": Codes.UNKNOWN, + }, + send_cors=True, + pretty_print=_request_user_agent_is_curl(request), + ) return wrap_async_request_handler(wrapped_request_handler) diff --git a/synapse/http/site.py b/synapse/http/site.py index 2191de9836..e508c0bd4f 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -75,14 +75,14 @@ class SynapseRequest(Request): return '<%s at 0x%x method=%r uri=%r clientproto=%r site=%r>' % ( self.__class__.__name__, id(self), - self.method, + self.get_method(), self.get_redacted_uri(), - self.clientproto, + self.clientproto.decode('ascii', errors='replace'), self.site.site_tag, ) def get_request_id(self): - return "%s-%i" % (self.method.decode('ascii'), self.request_seq) + return "%s-%i" % (self.get_method(), self.request_seq) def get_redacted_uri(self): uri = self.uri @@ -90,6 +90,21 @@ class SynapseRequest(Request): uri = self.uri.decode('ascii') return redact_uri(uri) + def get_method(self): + """Gets the method associated with the request (or placeholder if not + method has yet been received). + + Note: This is necessary as the placeholder value in twisted is str + rather than bytes, so we need to sanitise `self.method`. + + Returns: + str + """ + method = self.method + if isinstance(method, bytes): + method = self.method.decode('ascii') + return method + def get_user_agent(self): return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1] @@ -119,7 +134,7 @@ class SynapseRequest(Request): # dispatching to the handler, so that the handler # can update the servlet name in the request # metrics - requests_counter.labels(self.method.decode('ascii'), + requests_counter.labels(self.get_method(), self.request_metrics.name).inc() @contextlib.contextmanager @@ -207,14 +222,14 @@ class SynapseRequest(Request): self.start_time = time.time() self.request_metrics = RequestMetrics() self.request_metrics.start( - self.start_time, name=servlet_name, method=self.method.decode('ascii'), + self.start_time, name=servlet_name, method=self.get_method(), ) self.site.access_logger.info( "%s - %s - Received request: %s %s", self.getClientIP(), self.site.site_tag, - self.method.decode('ascii'), + self.get_method(), self.get_redacted_uri() ) @@ -280,7 +295,7 @@ class SynapseRequest(Request): int(usage.db_txn_count), self.sentLength, code, - self.method.decode('ascii'), + self.get_method(), self.get_redacted_uri(), self.clientproto.decode('ascii', errors='replace'), user_agent, @@ -288,7 +303,7 @@ class SynapseRequest(Request): ) try: - self.request_metrics.stop(self.finish_time, self) + self.request_metrics.stop(self.finish_time, self.code, self.sentLength) except Exception as e: logger.warn("Failed to stop metrics: %r", e) @@ -308,7 +323,7 @@ class XForwardedForRequest(SynapseRequest): C{b"-"}. """ return self.requestHeaders.getRawHeaders( - b"x-forwarded-for", [b"-"])[0].split(b",")[0].strip() + b"x-forwarded-for", [b"-"])[0].split(b",")[0].strip().decode('ascii') class SynapseRequestFactory(object): |