diff options
author | Matthew Hodgson <matthew@matrix.org> | 2018-05-29 00:25:22 +0100 |
---|---|---|
committer | Matthew Hodgson <matthew@matrix.org> | 2018-05-29 00:25:22 +0100 |
commit | 7a6df013cc8a128278d2ce7e5eb569e0b424f9b0 (patch) | |
tree | 5de624a65953eb96ab67274462d850a88c0cce3c /synapse/http | |
parent | make lazy_load_members configurable in filters (diff) | |
parent | Merge pull request #3256 from matrix-org/3218-official-prom (diff) | |
download | synapse-7a6df013cc8a128278d2ce7e5eb569e0b424f9b0.tar.xz |
merge develop
Diffstat (limited to 'synapse/http')
-rw-r--r-- | synapse/http/__init__.py | 22 | ||||
-rw-r--r-- | synapse/http/additional_resource.py | 7 | ||||
-rw-r--r-- | synapse/http/client.py | 45 | ||||
-rw-r--r-- | synapse/http/endpoint.py | 116 | ||||
-rw-r--r-- | synapse/http/matrixfederationclient.py | 69 | ||||
-rw-r--r-- | synapse/http/request_metrics.py | 273 | ||||
-rw-r--r-- | synapse/http/server.py | 361 | ||||
-rw-r--r-- | synapse/http/site.py | 115 |
8 files changed, 614 insertions, 394 deletions
diff --git a/synapse/http/__init__.py b/synapse/http/__init__.py index bfebb0f644..054372e179 100644 --- a/synapse/http/__init__.py +++ b/synapse/http/__init__.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,3 +13,24 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from twisted.internet.defer import CancelledError +from twisted.python import failure + +from synapse.api.errors import SynapseError + + +class RequestTimedOutError(SynapseError): + """Exception representing timeout of an outbound request""" + def __init__(self): + super(RequestTimedOutError, self).__init__(504, "Timed out") + + +def cancelled_to_request_timed_out_error(value, timeout): + """Turns CancelledErrors into RequestTimedOutErrors. + + For use with async.add_timeout_to_deferred + """ + if isinstance(value, failure.Failure): + value.trap(CancelledError) + raise RequestTimedOutError() + return value diff --git a/synapse/http/additional_resource.py b/synapse/http/additional_resource.py index 343e932cb1..a797396ade 100644 --- a/synapse/http/additional_resource.py +++ b/synapse/http/additional_resource.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.http.server import wrap_request_handler +from synapse.http.server import wrap_json_request_handler from twisted.web.resource import Resource from twisted.web.server import NOT_DONE_YET @@ -42,14 +42,13 @@ class AdditionalResource(Resource): Resource.__init__(self) self._handler = handler - # these are required by the request_handler wrapper - self.version_string = hs.version_string + # required by the request_handler wrapper self.clock = hs.get_clock() def render(self, request): self._async_render(request) return NOT_DONE_YET - @wrap_request_handler + @wrap_json_request_handler def _async_render(self, request): return self._handler(request) diff --git a/synapse/http/client.py b/synapse/http/client.py index f3e4973c2e..4d4eee3d64 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -18,10 +19,10 @@ from OpenSSL.SSL import VERIFY_NONE from synapse.api.errors import ( CodeMessageException, MatrixCodeMessageException, SynapseError, Codes, ) +from synapse.http import cancelled_to_request_timed_out_error +from synapse.util.async import add_timeout_to_deferred from synapse.util.caches import CACHE_SIZE_FACTOR from synapse.util.logcontext import make_deferred_yieldable -from synapse.util import logcontext -import synapse.metrics from synapse.http.endpoint import SpiderEndpoint from canonicaljson import encode_canonical_json @@ -38,8 +39,9 @@ from twisted.web.http import PotentialDataLoss from twisted.web.http_headers import Headers from twisted.web._newclient import ResponseDone -from StringIO import StringIO +from six import StringIO +from prometheus_client import Counter import simplejson as json import logging import urllib @@ -47,16 +49,9 @@ import urllib logger = logging.getLogger(__name__) -metrics = synapse.metrics.get_metrics_for(__name__) - -outgoing_requests_counter = metrics.register_counter( - "requests", - labels=["method"], -) -incoming_responses_counter = metrics.register_counter( - "responses", - labels=["method", "code"], -) +outgoing_requests_counter = Counter("synapse_http_client_requests", "", ["method"]) +incoming_responses_counter = Counter("synapse_http_client_responses", "", + ["method", "code"]) class SimpleHttpClient(object): @@ -93,32 +88,28 @@ class SimpleHttpClient(object): def request(self, method, uri, *args, **kwargs): # A small wrapper around self.agent.request() so we can easily attach # counters to it - outgoing_requests_counter.inc(method) + outgoing_requests_counter.labels(method).inc() - def send_request(): + logger.info("Sending request %s %s", method, uri) + + try: request_deferred = self.agent.request( method, uri, *args, **kwargs ) - - return self.clock.time_bound_deferred( + add_timeout_to_deferred( request_deferred, - time_out=60, + 60, cancelled_to_request_timed_out_error, ) + response = yield make_deferred_yieldable(request_deferred) - logger.info("Sending request %s %s", method, uri) - - try: - with logcontext.PreserveLoggingContext(): - response = yield send_request() - - incoming_responses_counter.inc(method, response.code) + incoming_responses_counter.labels(method, response.code).inc() logger.info( "Received response to %s %s: %s", method, uri, response.code ) defer.returnValue(response) except Exception as e: - incoming_responses_counter.inc(method, "ERR") + incoming_responses_counter.labels(method, "ERR").inc() logger.info( "Error sending request to %s %s: %s %s", method, uri, type(e).__name__, e.message @@ -509,7 +500,7 @@ class SpiderHttpClient(SimpleHttpClient): reactor, SpiderEndpointFactory(hs) ) - ), [('gzip', GzipDecoder)] + ), [(b'gzip', GzipDecoder)] ) # We could look like Chrome: # self.user_agent = ("Mozilla/5.0 (%s) (KHTML, like Gecko) diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py index 87639b9151..87a482650d 100644 --- a/synapse/http/endpoint.py +++ b/synapse/http/endpoint.py @@ -12,8 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import socket - from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS from twisted.internet import defer, reactor from twisted.internet.error import ConnectError @@ -33,7 +31,7 @@ SERVER_CACHE = {} # our record of an individual server which can be tried to reach a destination. # -# "host" is actually a dotted-quad or ipv6 address string. Except when there's +# "host" is the hostname acquired from the SRV record. Except when there's # no SRV record, in which case it is the original hostname. _Server = collections.namedtuple( "_Server", "priority weight host port expires" @@ -117,10 +115,15 @@ class _WrappedConnection(object): if time.time() - self.last_request >= 2.5 * 60: self.abort() # Abort the underlying TLS connection. The abort() method calls - # loseConnection() on the underlying TLS connection which tries to + # loseConnection() on the TLS connection which tries to # shutdown the connection cleanly. We call abortConnection() - # since that will promptly close the underlying TCP connection. - self.transport.abortConnection() + # since that will promptly close the TLS connection. + # + # In Twisted >18.4; the TLS connection will be None if it has closed + # which will make abortConnection() throw. Check that the TLS connection + # is not None before trying to close it. + if self.transport.getHandle() is not None: + self.transport.abortConnection() def request(self, request): self.last_request = time.time() @@ -288,7 +291,7 @@ def resolve_service(service_name, dns_client=client, cache=SERVER_CACHE, clock=t if (len(answers) == 1 and answers[0].type == dns.SRV and answers[0].payload - and answers[0].payload.target == dns.Name('.')): + and answers[0].payload.target == dns.Name(b'.')): raise ConnectError("Service %s unavailable" % service_name) for answer in answers: @@ -297,20 +300,13 @@ def resolve_service(service_name, dns_client=client, cache=SERVER_CACHE, clock=t payload = answer.payload - hosts = yield _get_hosts_for_srv_record( - dns_client, str(payload.target) - ) - - for (ip, ttl) in hosts: - host_ttl = min(answer.ttl, ttl) - - servers.append(_Server( - host=ip, - port=int(payload.port), - priority=int(payload.priority), - weight=int(payload.weight), - expires=int(clock.time()) + host_ttl, - )) + servers.append(_Server( + host=str(payload.target), + port=int(payload.port), + priority=int(payload.priority), + weight=int(payload.weight), + expires=int(clock.time()) + answer.ttl, + )) servers.sort() cache[service_name] = list(servers) @@ -328,81 +324,3 @@ def resolve_service(service_name, dns_client=client, cache=SERVER_CACHE, clock=t raise e defer.returnValue(servers) - - -@defer.inlineCallbacks -def _get_hosts_for_srv_record(dns_client, host): - """Look up each of the hosts in a SRV record - - Args: - dns_client (twisted.names.dns.IResolver): - host (basestring): host to look up - - Returns: - Deferred[list[(str, int)]]: a list of (host, ttl) pairs - - """ - ip4_servers = [] - ip6_servers = [] - - def cb(res): - # lookupAddress and lookupIP6Address return a three-tuple - # giving the answer, authority, and additional sections of the - # response. - # - # we only care about the answers. - - return res[0] - - def eb(res, record_type): - if res.check(DNSNameError): - return [] - logger.warn("Error looking up %s for %s: %s", record_type, host, res) - return res - - # no logcontexts here, so we can safely fire these off and gatherResults - d1 = dns_client.lookupAddress(host).addCallbacks( - cb, eb, errbackArgs=("A", )) - d2 = dns_client.lookupIPV6Address(host).addCallbacks( - cb, eb, errbackArgs=("AAAA", )) - results = yield defer.DeferredList( - [d1, d2], consumeErrors=True) - - # if all of the lookups failed, raise an exception rather than blowing out - # the cache with an empty result. - if results and all(s == defer.FAILURE for (s, _) in results): - defer.returnValue(results[0][1]) - - for (success, result) in results: - if success == defer.FAILURE: - continue - - for answer in result: - if not answer.payload: - continue - - try: - if answer.type == dns.A: - ip = answer.payload.dottedQuad() - ip4_servers.append((ip, answer.ttl)) - elif answer.type == dns.AAAA: - ip = socket.inet_ntop( - socket.AF_INET6, answer.payload.address, - ) - ip6_servers.append((ip, answer.ttl)) - else: - # the most likely candidate here is a CNAME record. - # rfc2782 says srvs may not point to aliases. - logger.warn( - "Ignoring unexpected DNS record type %s for %s", - answer.type, host, - ) - continue - except Exception as e: - logger.warn("Ignoring invalid DNS response for %s: %s", - host, e) - continue - - # keep the ipv4 results before the ipv6 results, mostly to match historical - # behaviour. - defer.returnValue(ip4_servers + ip6_servers) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 9145405cb0..821aed362b 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,17 +13,19 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import synapse.util.retryutils from twisted.internet import defer, reactor, protocol from twisted.internet.error import DNSLookupError from twisted.web.client import readBody, HTTPConnectionPool, Agent from twisted.web.http_headers import Headers from twisted.web._newclient import ResponseDone +from synapse.http import cancelled_to_request_timed_out_error from synapse.http.endpoint import matrix_federation_endpoint -from synapse.util.async import sleep -from synapse.util import logcontext import synapse.metrics +from synapse.util.async import sleep, add_timeout_to_deferred +from synapse.util import logcontext +from synapse.util.logcontext import make_deferred_yieldable +import synapse.util.retryutils from canonicaljson import encode_canonical_json @@ -38,22 +41,19 @@ import logging import random import sys import urllib -import urlparse +from six.moves.urllib import parse as urlparse +from six import string_types + +from prometheus_client import Counter logger = logging.getLogger(__name__) outbound_logger = logging.getLogger("synapse.http.outbound") -metrics = synapse.metrics.get_metrics_for(__name__) - -outgoing_requests_counter = metrics.register_counter( - "requests", - labels=["method"], -) -incoming_responses_counter = metrics.register_counter( - "responses", - labels=["method", "code"], -) +outgoing_requests_counter = Counter("synapse_http_matrixfederationclient_requests", + "", ["method"]) +incoming_responses_counter = Counter("synapse_http_matrixfederationclient_responses", + "", ["method", "code"]) MAX_LONG_RETRIES = 10 @@ -184,21 +184,20 @@ class MatrixFederationHttpClient(object): producer = body_callback(method, http_url_bytes, headers_dict) try: - def send_request(): - request_deferred = self.agent.request( - method, - url_bytes, - Headers(headers_dict), - producer - ) - - return self.clock.time_bound_deferred( - request_deferred, - time_out=timeout / 1000. if timeout else 60, - ) - - with logcontext.PreserveLoggingContext(): - response = yield send_request() + request_deferred = self.agent.request( + method, + url_bytes, + Headers(headers_dict), + producer + ) + add_timeout_to_deferred( + request_deferred, + timeout / 1000. if timeout else 60, + cancelled_to_request_timed_out_error, + ) + response = yield make_deferred_yieldable( + request_deferred, + ) log_result = "%d %s" % (response.code, response.phrase,) break @@ -286,7 +285,8 @@ class MatrixFederationHttpClient(object): headers_dict[b"Authorization"] = auth_headers @defer.inlineCallbacks - def put_json(self, destination, path, data={}, json_data_callback=None, + def put_json(self, destination, path, args={}, data={}, + json_data_callback=None, long_retries=False, timeout=None, ignore_backoff=False, backoff_on_404=False): @@ -296,6 +296,7 @@ class MatrixFederationHttpClient(object): destination (str): The remote server to send the HTTP request to. path (str): The HTTP path. + args (dict): query params data (dict): A dict containing the data that will be used as the request body. This will be encoded as JSON. json_data_callback (callable): A callable returning the dict to @@ -342,6 +343,7 @@ class MatrixFederationHttpClient(object): path, body_callback=body_callback, headers_dict={"Content-Type": ["application/json"]}, + query_bytes=encode_query_args(args), long_retries=long_retries, timeout=timeout, ignore_backoff=ignore_backoff, @@ -373,6 +375,7 @@ class MatrixFederationHttpClient(object): giving up. None indicates no timeout. ignore_backoff (bool): true to ignore the historical backoff data and try the request anyway. + args (dict): query params Returns: Deferred: Succeeds when we get a 2xx HTTP response. The result will be the decoded JSON body. @@ -548,7 +551,7 @@ class MatrixFederationHttpClient(object): encoded_args = {} for k, vs in args.items(): - if isinstance(vs, basestring): + if isinstance(vs, string_types): vs = [vs] encoded_args[k] = [v.encode("UTF-8") for v in vs] @@ -663,7 +666,7 @@ def check_content_type_is_json(headers): RuntimeError if the """ - c_type = headers.getRawHeaders("Content-Type") + c_type = headers.getRawHeaders(b"Content-Type") if c_type is None: raise RuntimeError( "No Content-Type header" @@ -680,7 +683,7 @@ def check_content_type_is_json(headers): def encode_query_args(args): encoded_args = {} for k, vs in args.items(): - if isinstance(vs, basestring): + if isinstance(vs, string_types): vs = [vs] encoded_args[k] = [v.encode("UTF-8") for v in vs] diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py new file mode 100644 index 0000000000..dc06f6c443 --- /dev/null +++ b/synapse/http/request_metrics.py @@ -0,0 +1,273 @@ +# -*- coding: utf-8 -*- +# Copyright 2014-2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from prometheus_client.core import Counter, Histogram +from synapse.metrics import LaterGauge + +from synapse.util.logcontext import LoggingContext + +logger = logging.getLogger(__name__) + + +# total number of responses served, split by method/servlet/tag +response_count = Counter( + "synapse_http_server_response_count", "", ["method", "servlet", "tag"] +) + +requests_counter = Counter( + "synapse_http_server_requests_received", "", ["method", "servlet"] +) + +outgoing_responses_counter = Counter( + "synapse_http_server_responses", "", ["method", "code"] +) + +response_timer = Histogram( + "synapse_http_server_response_time_seconds", "sec", ["method", "servlet", "tag"] +) + +response_ru_utime = Counter( + "synapse_http_server_response_ru_utime_seconds", "sec", ["method", "servlet", "tag"] +) + +response_ru_stime = Counter( + "synapse_http_server_response_ru_stime_seconds", "sec", ["method", "servlet", "tag"] +) + +response_db_txn_count = Counter( + "synapse_http_server_response_db_txn_count", "", ["method", "servlet", "tag"] +) + +# seconds spent waiting for db txns, excluding scheduling time, when processing +# this request +response_db_txn_duration = Counter( + "synapse_http_server_response_db_txn_duration_seconds", + "", + ["method", "servlet", "tag"], +) + +# seconds spent waiting for a db connection, when processing this request +response_db_sched_duration = Counter( + "synapse_http_server_response_db_sched_duration_seconds", + "", + ["method", "servlet", "tag"], +) + +# size in bytes of the response written +response_size = Counter( + "synapse_http_server_response_size", "", ["method", "servlet", "tag"] +) + +# In flight metrics are incremented while the requests are in flight, rather +# than when the response was written. + +in_flight_requests_ru_utime = Counter( + "synapse_http_server_in_flight_requests_ru_utime_seconds", + "", + ["method", "servlet"], +) + +in_flight_requests_ru_stime = Counter( + "synapse_http_server_in_flight_requests_ru_stime_seconds", + "", + ["method", "servlet"], +) + +in_flight_requests_db_txn_count = Counter( + "synapse_http_server_in_flight_requests_db_txn_count", "", ["method", "servlet"] +) + +# seconds spent waiting for db txns, excluding scheduling time, when processing +# this request +in_flight_requests_db_txn_duration = Counter( + "synapse_http_server_in_flight_requests_db_txn_duration_seconds", + "", + ["method", "servlet"], +) + +# seconds spent waiting for a db connection, when processing this request +in_flight_requests_db_sched_duration = Counter( + "synapse_http_server_in_flight_requests_db_sched_duration_seconds", + "", + ["method", "servlet"], +) + +# The set of all in flight requests, set[RequestMetrics] +_in_flight_requests = set() + + +def _get_in_flight_counts(): + """Returns a count of all in flight requests by (method, server_name) + + Returns: + dict[tuple[str, str], int] + """ + for rm in _in_flight_requests: + rm.update_metrics() + + # Map from (method, name) -> int, the number of in flight requests of that + # type + counts = {} + for rm in _in_flight_requests: + key = (rm.method, rm.name,) + counts[key] = counts.get(key, 0) + 1 + + return counts + + +LaterGauge( + "synapse_http_request_metrics_in_flight_requests_count", + "", + ["method", "servlet"], + _get_in_flight_counts, +) + + +class RequestMetrics(object): + def start(self, time_sec, name, method): + self.start = time_sec + self.start_context = LoggingContext.current_context() + self.name = name + self.method = method + + self._request_stats = _RequestStats.from_context(self.start_context) + + _in_flight_requests.add(self) + + def stop(self, time_sec, request): + _in_flight_requests.discard(self) + + context = LoggingContext.current_context() + + tag = "" + if context: + tag = context.tag + + if context != self.start_context: + logger.warn( + "Context have unexpectedly changed %r, %r", + context, self.start_context + ) + return + + outgoing_responses_counter.labels(request.method, str(request.code)).inc() + + response_count.labels(request.method, self.name, tag).inc() + + response_timer.labels(request.method, self.name, tag).observe( + time_sec - self.start + ) + + ru_utime, ru_stime = context.get_resource_usage() + + response_ru_utime.labels(request.method, self.name, tag).inc(ru_utime) + response_ru_stime.labels(request.method, self.name, tag).inc(ru_stime) + response_db_txn_count.labels(request.method, self.name, tag).inc( + context.db_txn_count + ) + response_db_txn_duration.labels(request.method, self.name, tag).inc( + context.db_txn_duration_sec + ) + response_db_sched_duration.labels(request.method, self.name, tag).inc( + context.db_sched_duration_sec + ) + + response_size.labels(request.method, self.name, tag).inc(request.sentLength) + + # We always call this at the end to ensure that we update the metrics + # regardless of whether a call to /metrics while the request was in + # flight. + self.update_metrics() + + def update_metrics(self): + """Updates the in flight metrics with values from this request. + """ + diff = self._request_stats.update(self.start_context) + + in_flight_requests_ru_utime.labels(self.method, self.name).inc(diff.ru_utime) + in_flight_requests_ru_stime.labels(self.method, self.name).inc(diff.ru_stime) + + in_flight_requests_db_txn_count.labels(self.method, self.name).inc( + diff.db_txn_count + ) + + in_flight_requests_db_txn_duration.labels(self.method, self.name).inc( + diff.db_txn_duration_sec + ) + + in_flight_requests_db_sched_duration.labels(self.method, self.name).inc( + diff.db_sched_duration_sec + ) + + +class _RequestStats(object): + """Keeps tracks of various metrics for an in flight request. + """ + + __slots__ = [ + "ru_utime", + "ru_stime", + "db_txn_count", + "db_txn_duration_sec", + "db_sched_duration_sec", + ] + + def __init__( + self, ru_utime, ru_stime, db_txn_count, db_txn_duration_sec, db_sched_duration_sec + ): + self.ru_utime = ru_utime + self.ru_stime = ru_stime + self.db_txn_count = db_txn_count + self.db_txn_duration_sec = db_txn_duration_sec + self.db_sched_duration_sec = db_sched_duration_sec + + @staticmethod + def from_context(context): + ru_utime, ru_stime = context.get_resource_usage() + + return _RequestStats( + ru_utime, ru_stime, + context.db_txn_count, + context.db_txn_duration_sec, + context.db_sched_duration_sec, + ) + + def update(self, context): + """Updates the current values and returns the difference between the + old and new values. + + Returns: + _RequestStats: The difference between the old and new values + """ + new = _RequestStats.from_context(context) + + diff = _RequestStats( + new.ru_utime - self.ru_utime, + new.ru_stime - self.ru_stime, + new.db_txn_count - self.db_txn_count, + new.db_txn_duration_sec - self.db_txn_duration_sec, + new.db_sched_duration_sec - self.db_sched_duration_sec, + ) + + self.ru_utime = new.ru_utime + self.ru_stime = new.ru_stime + self.db_txn_count = new.db_txn_count + self.db_txn_duration_sec = new.db_txn_duration_sec + self.db_sched_duration_sec = new.db_sched_duration_sec + + return diff diff --git a/synapse/http/server.py b/synapse/http/server.py index e64aa92729..bc09b8b2be 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -13,11 +13,15 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import cgi +from six.moves import http_client from synapse.api.errors import ( cs_exception, SynapseError, CodeMessageException, UnrecognizedRequestError, Codes ) +from synapse.http.request_metrics import ( + requests_counter, +) from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.caches import intern_dict from synapse.util.metrics import Measure @@ -41,173 +45,174 @@ import simplejson logger = logging.getLogger(__name__) -metrics = synapse.metrics.get_metrics_for(__name__) - -# total number of responses served, split by method/servlet/tag -response_count = metrics.register_counter( - "response_count", - labels=["method", "servlet", "tag"], - alternative_names=( - # the following are all deprecated aliases for the same metric - metrics.name_prefix + x for x in ( - "_requests", - "_response_time:count", - "_response_ru_utime:count", - "_response_ru_stime:count", - "_response_db_txn_count:count", - "_response_db_txn_duration:count", - ) - ) -) +HTML_ERROR_TEMPLATE = """<!DOCTYPE html> +<html lang=en> + <head> + <meta charset="utf-8"> + <title>Error {code}</title> + </head> + <body> + <p>{msg}</p> + </body> +</html> +""" -requests_counter = metrics.register_counter( - "requests_received", - labels=["method", "servlet", ], -) -outgoing_responses_counter = metrics.register_counter( - "responses", - labels=["method", "code"], -) +def wrap_json_request_handler(h): + """Wraps a request handler method with exception handling. -response_timer = metrics.register_counter( - "response_time_seconds", - labels=["method", "servlet", "tag"], - alternative_names=( - metrics.name_prefix + "_response_time:total", - ), -) + Also adds logging as per wrap_request_handler_with_logging. -response_ru_utime = metrics.register_counter( - "response_ru_utime_seconds", labels=["method", "servlet", "tag"], - alternative_names=( - metrics.name_prefix + "_response_ru_utime:total", - ), -) + The handler method must have a signature of "handle_foo(self, request)", + where "self" must have a "clock" attribute (and "request" must be a + SynapseRequest). -response_ru_stime = metrics.register_counter( - "response_ru_stime_seconds", labels=["method", "servlet", "tag"], - alternative_names=( - metrics.name_prefix + "_response_ru_stime:total", - ), -) + The handler must return a deferred. If the deferred succeeds we assume that + a response has been sent. If the deferred fails with a SynapseError we use + it to send a JSON response with the appropriate HTTP reponse code. If the + deferred fails with any other type of error we send a 500 reponse. + """ -response_db_txn_count = metrics.register_counter( - "response_db_txn_count", labels=["method", "servlet", "tag"], - alternative_names=( - metrics.name_prefix + "_response_db_txn_count:total", - ), -) + @defer.inlineCallbacks + def wrapped_request_handler(self, request): + try: + yield h(self, request) + except CodeMessageException as e: + code = e.code + if isinstance(e, SynapseError): + logger.info( + "%s SynapseError: %s - %s", request, code, e.msg + ) + else: + logger.exception(e) + respond_with_json( + request, code, cs_exception(e), send_cors=True, + pretty_print=_request_user_agent_is_curl(request), + ) -# seconds spent waiting for db txns, excluding scheduling time, when processing -# this request -response_db_txn_duration = metrics.register_counter( - "response_db_txn_duration_seconds", labels=["method", "servlet", "tag"], - alternative_names=( - metrics.name_prefix + "_response_db_txn_duration:total", - ), -) + except Exception: + # failure.Failure() fishes the original Failure out + # of our stack, and thus gives us a sensible stack + # trace. + f = failure.Failure() + logger.error( + "Failed handle request via %r: %r: %s", + h, + request, + f.getTraceback().rstrip(), + ) + respond_with_json( + request, + 500, + { + "error": "Internal server error", + "errcode": Codes.UNKNOWN, + }, + send_cors=True, + pretty_print=_request_user_agent_is_curl(request), + ) -# seconds spent waiting for a db connection, when processing this request -response_db_sched_duration = metrics.register_counter( - "response_db_sched_duration_seconds", labels=["method", "servlet", "tag"] -) + return wrap_request_handler_with_logging(wrapped_request_handler) -_next_request_id = 0 +def wrap_html_request_handler(h): + """Wraps a request handler method with exception handling. -def request_handler(include_metrics=False): - """Decorator for ``wrap_request_handler``""" - return lambda request_handler: wrap_request_handler(request_handler, include_metrics) + Also adds logging as per wrap_request_handler_with_logging. + The handler method must have a signature of "handle_foo(self, request)", + where "self" must have a "clock" attribute (and "request" must be a + SynapseRequest). + """ + def wrapped_request_handler(self, request): + d = defer.maybeDeferred(h, self, request) + d.addErrback(_return_html_error, request) + return d -def wrap_request_handler(request_handler, include_metrics=False): - """Wraps a method that acts as a request handler with the necessary logging - and exception handling. + return wrap_request_handler_with_logging(wrapped_request_handler) - The method must have a signature of "handle_foo(self, request)". The - argument "self" must have "version_string" and "clock" attributes. The - argument "request" must be a twisted HTTP request. - The method must return a deferred. If the deferred succeeds we assume that - a response has been sent. If the deferred fails with a SynapseError we use - it to send a JSON response with the appropriate HTTP reponse code. If the - deferred fails with any other type of error we send a 500 reponse. +def _return_html_error(f, request): + """Sends an HTML error page corresponding to the given failure - We insert a unique request-id into the logging context for this request and - log the response and duration for this request. + Args: + f (twisted.python.failure.Failure): + request (twisted.web.iweb.IRequest): """ + if f.check(CodeMessageException): + cme = f.value + code = cme.code + msg = cme.msg + + if isinstance(cme, SynapseError): + logger.info( + "%s SynapseError: %s - %s", request, code, msg + ) + else: + logger.error( + "Failed handle request %r: %s", + request, + f.getTraceback().rstrip(), + ) + else: + code = http_client.INTERNAL_SERVER_ERROR + msg = "Internal server error" + + logger.error( + "Failed handle request %r: %s", + request, + f.getTraceback().rstrip(), + ) + + body = HTML_ERROR_TEMPLATE.format( + code=code, msg=cgi.escape(msg), + ).encode("utf-8") + request.setResponseCode(code) + request.setHeader(b"Content-Type", b"text/html; charset=utf-8") + request.setHeader(b"Content-Length", b"%i" % (len(body),)) + request.write(body) + finish_request(request) + + +def wrap_request_handler_with_logging(h): + """Wraps a request handler to provide logging and metrics + The handler method must have a signature of "handle_foo(self, request)", + where "self" must have a "clock" attribute (and "request" must be a + SynapseRequest). + + As well as calling `request.processing` (which will log the response and + duration for this request), the wrapped request handler will insert the + request id into the logging context. + """ @defer.inlineCallbacks def wrapped_request_handler(self, request): - global _next_request_id - request_id = "%s-%s" % (request.method, _next_request_id) - _next_request_id += 1 + """ + Args: + self: + request (synapse.http.site.SynapseRequest): + """ + request_id = request.get_request_id() with LoggingContext(request_id) as request_context: + request_context.request = request_id with Measure(self.clock, "wrapped_request_handler"): - request_metrics = RequestMetrics() # we start the request metrics timer here with an initial stab # at the servlet name. For most requests that name will be # JsonResource (or a subclass), and JsonResource._async_render # will update it once it picks a servlet. servlet_name = self.__class__.__name__ - request_metrics.start(self.clock, name=servlet_name) - - request_context.request = request_id - with request.processing(): - try: - with PreserveLoggingContext(request_context): - if include_metrics: - yield request_handler(self, request, request_metrics) - else: - requests_counter.inc(request.method, servlet_name) - yield request_handler(self, request) - except CodeMessageException as e: - code = e.code - if isinstance(e, SynapseError): - logger.info( - "%s SynapseError: %s - %s", request, code, e.msg - ) - else: - logger.exception(e) - outgoing_responses_counter.inc(request.method, str(code)) - respond_with_json( - request, code, cs_exception(e), send_cors=True, - pretty_print=_request_user_agent_is_curl(request), - version_string=self.version_string, - ) - except Exception: - # failure.Failure() fishes the original Failure out - # of our stack, and thus gives us a sensible stack - # trace. - f = failure.Failure() - logger.error( - "Failed handle request %s.%s on %r: %r: %s", - request_handler.__module__, - request_handler.__name__, - self, - request, - f.getTraceback().rstrip(), - ) - respond_with_json( - request, - 500, - { - "error": "Internal server error", - "errcode": Codes.UNKNOWN, - }, - send_cors=True, - pretty_print=_request_user_agent_is_curl(request), - version_string=self.version_string, - ) - finally: - try: - request_metrics.stop( - self.clock, request - ) - except Exception as e: - logger.warn("Failed to stop metrics: %r", e) + with request.processing(servlet_name): + with PreserveLoggingContext(request_context): + d = defer.maybeDeferred(h, self, request) + + # record the arrival of the request *after* + # dispatching to the handler, so that the handler + # can update the servlet name in the request + # metrics + requests_counter.labels(request.method, + request.request_metrics.name).inc() + yield d return wrapped_request_handler @@ -257,7 +262,6 @@ class JsonResource(HttpServer, resource.Resource): self.canonical_json = canonical_json self.clock = hs.get_clock() self.path_regexs = {} - self.version_string = hs.version_string self.hs = hs def register_paths(self, method, path_patterns, callback): @@ -273,13 +277,9 @@ class JsonResource(HttpServer, resource.Resource): self._async_render(request) return server.NOT_DONE_YET - # Disable metric reporting because _async_render does its own metrics. - # It does its own metric reporting because _async_render dispatches to - # a callback and it's the class name of that callback we want to report - # against rather than the JsonResource itself. - @request_handler(include_metrics=True) + @wrap_json_request_handler @defer.inlineCallbacks - def _async_render(self, request, request_metrics): + def _async_render(self, request): """ This gets called from render() every time someone sends us a request. This checks if anyone has registered a callback for that method and path. @@ -291,9 +291,7 @@ class JsonResource(HttpServer, resource.Resource): servlet_classname = servlet_instance.__class__.__name__ else: servlet_classname = "%r" % callback - - request_metrics.name = servlet_classname - requests_counter.inc(request.method, servlet_classname) + request.request_metrics.name = servlet_classname # Now trigger the callback. If it returns a response, we send it # here. If it throws an exception, that is handled by the wrapper @@ -324,7 +322,7 @@ class JsonResource(HttpServer, resource.Resource): register_paths, so will return (possibly via Deferred) either None, or a tuple of (http code, response body). """ - if request.method == "OPTIONS": + if request.method == b"OPTIONS": return _options_handler, {} # Loop through all the registered callbacks to check if the method @@ -340,15 +338,12 @@ class JsonResource(HttpServer, resource.Resource): def _send_response(self, request, code, response_json_object, response_code_message=None): - outgoing_responses_counter.inc(request.method, str(code)) - # TODO: Only enable CORS for the requests that need it. respond_with_json( request, code, response_json_object, send_cors=True, response_code_message=response_code_message, pretty_print=_request_user_agent_is_curl(request), - version_string=self.version_string, canonical_json=self.canonical_json, ) @@ -381,52 +376,6 @@ def _unrecognised_request_handler(request): raise UnrecognizedRequestError() -class RequestMetrics(object): - def start(self, clock, name): - self.start = clock.time_msec() - self.start_context = LoggingContext.current_context() - self.name = name - - def stop(self, clock, request): - context = LoggingContext.current_context() - - tag = "" - if context: - tag = context.tag - - if context != self.start_context: - logger.warn( - "Context have unexpectedly changed %r, %r", - context, self.start_context - ) - return - - response_count.inc(request.method, self.name, tag) - - response_timer.inc_by( - clock.time_msec() - self.start, request.method, - self.name, tag - ) - - ru_utime, ru_stime = context.get_resource_usage() - - response_ru_utime.inc_by( - ru_utime, request.method, self.name, tag - ) - response_ru_stime.inc_by( - ru_stime, request.method, self.name, tag - ) - response_db_txn_count.inc_by( - context.db_txn_count, request.method, self.name, tag - ) - response_db_txn_duration.inc_by( - context.db_txn_duration_ms / 1000., request.method, self.name, tag - ) - response_db_sched_duration.inc_by( - context.db_sched_duration_ms / 1000., request.method, self.name, tag - ) - - class RootRedirect(resource.Resource): """Redirects the root '/' path to another path.""" @@ -445,7 +394,7 @@ class RootRedirect(resource.Resource): def respond_with_json(request, code, json_object, send_cors=False, response_code_message=None, pretty_print=False, - version_string="", canonical_json=True): + canonical_json=True): # could alternatively use request.notifyFinish() and flip a flag when # the Deferred fires, but since the flag is RIGHT THERE it seems like # a waste. @@ -461,19 +410,17 @@ def respond_with_json(request, code, json_object, send_cors=False, if canonical_json or synapse.events.USE_FROZEN_DICTS: json_bytes = encode_canonical_json(json_object) else: - # ujson doesn't like frozen_dicts. json_bytes = simplejson.dumps(json_object) return respond_with_json_bytes( request, code, json_bytes, send_cors=send_cors, response_code_message=response_code_message, - version_string=version_string ) def respond_with_json_bytes(request, code, json_bytes, send_cors=False, - version_string="", response_code_message=None): + response_code_message=None): """Sends encoded JSON in response to the given request. Args: @@ -487,8 +434,8 @@ def respond_with_json_bytes(request, code, json_bytes, send_cors=False, request.setResponseCode(code, message=response_code_message) request.setHeader(b"Content-Type", b"application/json") - request.setHeader(b"Server", version_string) request.setHeader(b"Content-Length", b"%d" % (len(json_bytes),)) + request.setHeader(b"Cache-Control", b"no-cache, no-store, must-revalidate") if send_cors: set_cors_headers(request) @@ -536,9 +483,9 @@ def finish_request(request): def _request_user_agent_is_curl(request): user_agents = request.requestHeaders.getRawHeaders( - "User-Agent", default=[] + b"User-Agent", default=[] ) for user_agent in user_agents: - if "curl" in user_agent: + if b"curl" in user_agent: return True return False diff --git a/synapse/http/site.py b/synapse/http/site.py index e422c8dfae..60299657b9 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -12,27 +12,51 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.util.logcontext import LoggingContext -from twisted.web.server import Site, Request - import contextlib import logging import re import time -ACCESS_TOKEN_RE = re.compile(r'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$') +from twisted.web.server import Site, Request + +from synapse.http.request_metrics import RequestMetrics +from synapse.util.logcontext import LoggingContext + +logger = logging.getLogger(__name__) + +ACCESS_TOKEN_RE = re.compile(br'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$') + +_next_request_seq = 0 class SynapseRequest(Request): + """Class which encapsulates an HTTP request to synapse. + + All of the requests processed in synapse are of this type. + + It extends twisted's twisted.web.server.Request, and adds: + * Unique request ID + * Redaction of access_token query-params in __repr__ + * Logging at start and end + * Metrics to record CPU, wallclock and DB time by endpoint. + + It provides a method `processing` which should be called by the Resource + which is handling the request, and returns a context manager. + + """ def __init__(self, site, *args, **kw): Request.__init__(self, *args, **kw) self.site = site self.authenticated_entity = None self.start_time = 0 + global _next_request_seq + self.request_seq = _next_request_seq + _next_request_seq += 1 + def __repr__(self): # We overwrite this so that we don't log ``access_token`` - return '<%s at 0x%x method=%s uri=%s clientproto=%s site=%s>' % ( + return '<%s at 0x%x method=%r uri=%r clientproto=%r site=%r>' % ( self.__class__.__name__, id(self), self.method, @@ -41,16 +65,30 @@ class SynapseRequest(Request): self.site.site_tag, ) + def get_request_id(self): + return "%s-%i" % (self.method, self.request_seq) + def get_redacted_uri(self): return ACCESS_TOKEN_RE.sub( - r'\1<redacted>\3', + br'\1<redacted>\3', self.uri ) def get_user_agent(self): - return self.requestHeaders.getRawHeaders("User-Agent", [None])[-1] + return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1] + + def render(self, resrc): + # override the Server header which is set by twisted + self.setHeader("Server", self.site.server_version_string) + return Request.render(self, resrc) + + def _started_processing(self, servlet_name): + self.start_time = time.time() + self.request_metrics = RequestMetrics() + self.request_metrics.start( + self.start_time, name=servlet_name, method=self.method, + ) - def started_processing(self): self.site.access_logger.info( "%s - %s - Received request: %s %s", self.getClientIP(), @@ -58,32 +96,32 @@ class SynapseRequest(Request): self.method, self.get_redacted_uri() ) - self.start_time = int(time.time() * 1000) - - def finished_processing(self): + def _finished_processing(self): try: context = LoggingContext.current_context() ru_utime, ru_stime = context.get_resource_usage() db_txn_count = context.db_txn_count - db_txn_duration_ms = context.db_txn_duration_ms - db_sched_duration_ms = context.db_sched_duration_ms + db_txn_duration_sec = context.db_txn_duration_sec + db_sched_duration_sec = context.db_sched_duration_sec except Exception: ru_utime, ru_stime = (0, 0) - db_txn_count, db_txn_duration_ms = (0, 0) + db_txn_count, db_txn_duration_sec = (0, 0) + + end_time = time.time() self.site.access_logger.info( "%s - %s - {%s}" - " Processed request: %dms (%dms, %dms) (%dms/%dms/%d)" + " Processed request: %.3fsec (%.3fsec, %.3fsec) (%.3fsec/%.3fsec/%d)" " %sB %s \"%s %s %s\" \"%s\"", self.getClientIP(), self.site.site_tag, self.authenticated_entity, - int(time.time() * 1000) - self.start_time, - int(ru_utime * 1000), - int(ru_stime * 1000), - db_sched_duration_ms, - db_txn_duration_ms, + end_time - self.start_time, + ru_utime, + ru_stime, + db_sched_duration_sec, + db_txn_duration_sec, int(db_txn_count), self.sentLength, self.code, @@ -93,11 +131,38 @@ class SynapseRequest(Request): self.get_user_agent(), ) + try: + self.request_metrics.stop(end_time, self) + except Exception as e: + logger.warn("Failed to stop metrics: %r", e) + @contextlib.contextmanager - def processing(self): - self.started_processing() + def processing(self, servlet_name): + """Record the fact that we are processing this request. + + Returns a context manager; the correct way to use this is: + + @defer.inlineCallbacks + def handle_request(request): + with request.processing("FooServlet"): + yield really_handle_the_request() + + This will log the request's arrival. Once the context manager is + closed, the completion of the request will be logged, and the various + metrics will be updated. + + Args: + servlet_name (str): the name of the servlet which will be + processing this request. This is used in the metrics. + + It is possible to update this afterwards by updating + self.request_metrics.servlet_name. + """ + # TODO: we should probably just move this into render() and finish(), + # to save having to call a separate method. + self._started_processing(servlet_name) yield - self.finished_processing() + self._finished_processing() class XForwardedForRequest(SynapseRequest): @@ -135,7 +200,8 @@ class SynapseSite(Site): Subclass of a twisted http Site that does access logging with python's standard logging """ - def __init__(self, logger_name, site_tag, config, resource, *args, **kwargs): + def __init__(self, logger_name, site_tag, config, resource, + server_version_string, *args, **kwargs): Site.__init__(self, resource, *args, **kwargs) self.site_tag = site_tag @@ -143,6 +209,7 @@ class SynapseSite(Site): proxied = config.get("x_forwarded", False) self.requestFactory = SynapseRequestFactory(self, proxied) self.access_logger = logging.getLogger(logger_name) + self.server_version_string = server_version_string def log(self, request): pass |