diff options
Diffstat (limited to '')
-rw-r--r-- | synapse/http/additional_resource.py | 3 | ||||
-rw-r--r-- | synapse/http/client.py | 105 | ||||
-rw-r--r-- | synapse/http/endpoint.py | 109 | ||||
-rw-r--r-- | synapse/http/matrixfederationclient.py | 90 | ||||
-rw-r--r-- | synapse/http/request_metrics.py | 115 | ||||
-rw-r--r-- | synapse/http/server.py | 157 | ||||
-rw-r--r-- | synapse/http/servlet.py | 75 | ||||
-rw-r--r-- | synapse/http/site.py | 250 |
8 files changed, 559 insertions, 345 deletions
diff --git a/synapse/http/additional_resource.py b/synapse/http/additional_resource.py index a797396ade..0e10e3f8f7 100644 --- a/synapse/http/additional_resource.py +++ b/synapse/http/additional_resource.py @@ -13,10 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.http.server import wrap_json_request_handler from twisted.web.resource import Resource from twisted.web.server import NOT_DONE_YET +from synapse.http.server import wrap_json_request_handler + class AdditionalResource(Resource): """Resource wrapper for additional_resources diff --git a/synapse/http/client.py b/synapse/http/client.py index 8064a84c5c..ab4fbf59b2 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -13,39 +13,38 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from OpenSSL import SSL -from OpenSSL.SSL import VERIFY_NONE +import logging +import urllib -from synapse.api.errors import ( - CodeMessageException, MatrixCodeMessageException, SynapseError, Codes, -) -from synapse.http import cancelled_to_request_timed_out_error, redact_uri -from synapse.util.async import add_timeout_to_deferred -from synapse.util.caches import CACHE_SIZE_FACTOR -from synapse.util.logcontext import make_deferred_yieldable -from synapse.http.endpoint import SpiderEndpoint +from six import StringIO -from canonicaljson import encode_canonical_json +from canonicaljson import encode_canonical_json, json +from prometheus_client import Counter -from twisted.internet import defer, reactor, ssl, protocol, task +from OpenSSL import SSL +from OpenSSL.SSL import VERIFY_NONE +from twisted.internet import defer, protocol, reactor, ssl, task from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS +from twisted.web._newclient import ResponseDone from twisted.web.client import ( - BrowserLikeRedirectAgent, ContentDecoderAgent, GzipDecoder, Agent, - readBody, PartialDownloadError, + Agent, + BrowserLikeRedirectAgent, + ContentDecoderAgent, + FileBodyProducer as TwistedFileBodyProducer, + GzipDecoder, HTTPConnectionPool, + PartialDownloadError, + readBody, ) -from twisted.web.client import FileBodyProducer as TwistedFileBodyProducer from twisted.web.http import PotentialDataLoss from twisted.web.http_headers import Headers -from twisted.web._newclient import ResponseDone - -from six import StringIO - -from prometheus_client import Counter -import simplejson as json -import logging -import urllib +from synapse.api.errors import Codes, HttpResponseException, SynapseError +from synapse.http import cancelled_to_request_timed_out_error, redact_uri +from synapse.http.endpoint import SpiderEndpoint +from synapse.util.async_helpers import add_timeout_to_deferred +from synapse.util.caches import CACHE_SIZE_FACTOR +from synapse.util.logcontext import make_deferred_yieldable logger = logging.getLogger(__name__) @@ -98,8 +97,8 @@ class SimpleHttpClient(object): method, uri, *args, **kwargs ) add_timeout_to_deferred( - request_deferred, - 60, cancelled_to_request_timed_out_error, + request_deferred, 60, self.hs.get_reactor(), + cancelled_to_request_timed_out_error, ) response = yield make_deferred_yieldable(request_deferred) @@ -115,7 +114,7 @@ class SimpleHttpClient(object): "Error sending request to %s %s: %s %s", method, redact_uri(uri), type(e).__name__, e.message ) - raise e + raise @defer.inlineCallbacks def post_urlencoded_get_json(self, uri, args={}, headers=None): @@ -128,6 +127,11 @@ class SimpleHttpClient(object): Returns: Deferred[object]: parsed json + + Raises: + HttpResponseException: On a non-2xx HTTP response. + + ValueError: if the response was not JSON """ # TODO: Do we ever want to log message contents? @@ -151,7 +155,10 @@ class SimpleHttpClient(object): body = yield make_deferred_yieldable(readBody(response)) - defer.returnValue(json.loads(body)) + if 200 <= response.code < 300: + defer.returnValue(json.loads(body)) + else: + raise HttpResponseException(response.code, response.phrase, body) @defer.inlineCallbacks def post_json_get_json(self, uri, post_json, headers=None): @@ -165,6 +172,11 @@ class SimpleHttpClient(object): Returns: Deferred[object]: parsed json + + Raises: + HttpResponseException: On a non-2xx HTTP response. + + ValueError: if the response was not JSON """ json_str = encode_canonical_json(post_json) @@ -189,9 +201,7 @@ class SimpleHttpClient(object): if 200 <= response.code < 300: defer.returnValue(json.loads(body)) else: - raise self._exceptionFromFailedRequest(response, body) - - defer.returnValue(json.loads(body)) + raise HttpResponseException(response.code, response.phrase, body) @defer.inlineCallbacks def get_json(self, uri, args={}, headers=None): @@ -209,14 +219,12 @@ class SimpleHttpClient(object): Deferred: Succeeds when we get *any* 2xx HTTP response, with the HTTP body as JSON. Raises: - On a non-2xx HTTP response. The response body will be used as the - error message. + HttpResponseException On a non-2xx HTTP response. + + ValueError: if the response was not JSON """ - try: - body = yield self.get_raw(uri, args, headers=headers) - defer.returnValue(json.loads(body)) - except CodeMessageException as e: - raise self._exceptionFromFailedRequest(e.code, e.msg) + body = yield self.get_raw(uri, args, headers=headers) + defer.returnValue(json.loads(body)) @defer.inlineCallbacks def put_json(self, uri, json_body, args={}, headers=None): @@ -235,7 +243,9 @@ class SimpleHttpClient(object): Deferred: Succeeds when we get *any* 2xx HTTP response, with the HTTP body as JSON. Raises: - On a non-2xx HTTP response. + HttpResponseException On a non-2xx HTTP response. + + ValueError: if the response was not JSON """ if len(args): query_bytes = urllib.urlencode(args, True) @@ -262,10 +272,7 @@ class SimpleHttpClient(object): if 200 <= response.code < 300: defer.returnValue(json.loads(body)) else: - # NB: This is explicitly not json.loads(body)'d because the contract - # of CodeMessageException is a *string* message. Callers can always - # load it into JSON if they want. - raise CodeMessageException(response.code, body) + raise HttpResponseException(response.code, response.phrase, body) @defer.inlineCallbacks def get_raw(self, uri, args={}, headers=None): @@ -283,8 +290,7 @@ class SimpleHttpClient(object): Deferred: Succeeds when we get *any* 2xx HTTP response, with the HTTP body at text. Raises: - On a non-2xx HTTP response. The response body will be used as the - error message. + HttpResponseException on a non-2xx HTTP response. """ if len(args): query_bytes = urllib.urlencode(args, True) @@ -307,16 +313,7 @@ class SimpleHttpClient(object): if 200 <= response.code < 300: defer.returnValue(body) else: - raise CodeMessageException(response.code, body) - - def _exceptionFromFailedRequest(self, response, body): - try: - jsonBody = json.loads(body) - errcode = jsonBody['errcode'] - error = jsonBody['error'] - return MatrixCodeMessageException(response.code, error, errcode) - except (ValueError, KeyError): - return CodeMessageException(response.code, body) + raise HttpResponseException(response.code, response.phrase, body) # XXX: FIXME: This is horribly copy-pasted from matrixfederationclient. # The two should be factored out. diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py index 87a482650d..b0c9369519 100644 --- a/synapse/http/endpoint.py +++ b/synapse/http/endpoint.py @@ -12,21 +12,20 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS -from twisted.internet import defer, reactor -from twisted.internet.error import ConnectError -from twisted.names import client, dns -from twisted.names.error import DNSNameError, DomainError - import collections import logging import random +import re import time +from twisted.internet import defer +from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS +from twisted.internet.error import ConnectError +from twisted.names import client, dns +from twisted.names.error import DNSNameError, DomainError logger = logging.getLogger(__name__) - SERVER_CACHE = {} # our record of an individual server which can be tried to reach a destination. @@ -38,34 +37,98 @@ _Server = collections.namedtuple( ) -def matrix_federation_endpoint(reactor, destination, ssl_context_factory=None, +def parse_server_name(server_name): + """Split a server name into host/port parts. + + Args: + server_name (str): server name to parse + + Returns: + Tuple[str, int|None]: host/port parts. + + Raises: + ValueError if the server name could not be parsed. + """ + try: + if server_name[-1] == ']': + # ipv6 literal, hopefully + return server_name, None + + domain_port = server_name.rsplit(":", 1) + domain = domain_port[0] + port = int(domain_port[1]) if domain_port[1:] else None + return domain, port + except Exception: + raise ValueError("Invalid server name '%s'" % server_name) + + +VALID_HOST_REGEX = re.compile( + "\\A[0-9a-zA-Z.-]+\\Z", +) + + +def parse_and_validate_server_name(server_name): + """Split a server name into host/port parts and do some basic validation. + + Args: + server_name (str): server name to parse + + Returns: + Tuple[str, int|None]: host/port parts. + + Raises: + ValueError if the server name could not be parsed. + """ + host, port = parse_server_name(server_name) + + # these tests don't need to be bulletproof as we'll find out soon enough + # if somebody is giving us invalid data. What we *do* need is to be sure + # that nobody is sneaking IP literals in that look like hostnames, etc. + + # look for ipv6 literals + if host[0] == '[': + if host[-1] != ']': + raise ValueError("Mismatched [...] in server name '%s'" % ( + server_name, + )) + return host, port + + # otherwise it should only be alphanumerics. + if not VALID_HOST_REGEX.match(host): + raise ValueError("Server name '%s' contains invalid characters" % ( + server_name, + )) + + return host, port + + +def matrix_federation_endpoint(reactor, destination, tls_client_options_factory=None, timeout=None): """Construct an endpoint for the given matrix destination. Args: reactor: Twisted reactor. destination (bytes): The name of the server to connect to. - ssl_context_factory (twisted.internet.ssl.ContextFactory): Factory - which generates SSL contexts to use for TLS. + tls_client_options_factory + (synapse.crypto.context_factory.ClientTLSOptionsFactory): + Factory which generates TLS options for client connections. timeout (int): connection timeout in seconds """ - domain_port = destination.split(":") - domain = domain_port[0] - port = int(domain_port[1]) if domain_port[1:] else None + domain, port = parse_server_name(destination) endpoint_kw_args = {} if timeout is not None: endpoint_kw_args.update(timeout=timeout) - if ssl_context_factory is None: + if tls_client_options_factory is None: transport_endpoint = HostnameEndpoint default_port = 8008 else: def transport_endpoint(reactor, host, port, timeout): return wrapClientTLS( - ssl_context_factory, + tls_client_options_factory.get_options(host), HostnameEndpoint(reactor, host, port, timeout=timeout)) default_port = 8448 @@ -74,21 +137,22 @@ def matrix_federation_endpoint(reactor, destination, ssl_context_factory=None, reactor, "matrix", domain, protocol="tcp", default_port=default_port, endpoint=transport_endpoint, endpoint_kw_args=endpoint_kw_args - )) + ), reactor) else: return _WrappingEndpointFac(transport_endpoint( reactor, domain, port, **endpoint_kw_args - )) + ), reactor) class _WrappingEndpointFac(object): - def __init__(self, endpoint_fac): + def __init__(self, endpoint_fac, reactor): self.endpoint_fac = endpoint_fac + self.reactor = reactor @defer.inlineCallbacks def connect(self, protocolFactory): conn = yield self.endpoint_fac.connect(protocolFactory) - conn = _WrappedConnection(conn) + conn = _WrappedConnection(conn, self.reactor) defer.returnValue(conn) @@ -98,9 +162,10 @@ class _WrappedConnection(object): """ __slots__ = ["conn", "last_request"] - def __init__(self, conn): + def __init__(self, conn, reactor): object.__setattr__(self, "conn", conn) object.__setattr__(self, "last_request", time.time()) + self._reactor = reactor def __getattr__(self, name): return getattr(self.conn, name) @@ -131,14 +196,14 @@ class _WrappedConnection(object): # Time this connection out if we haven't send a request in the last # N minutes # TODO: Cancel the previous callLater? - reactor.callLater(3 * 60, self._time_things_out_maybe) + self._reactor.callLater(3 * 60, self._time_things_out_maybe) d = self.conn.request(request) def update_request_time(res): self.last_request = time.time() # TODO: Cancel the previous callLater? - reactor.callLater(3 * 60, self._time_things_out_maybe) + self._reactor.callLater(3 * 60, self._time_things_out_maybe) return res d.addCallback(update_request_time) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 821aed362b..b34bb8e31a 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -13,39 +13,38 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer, reactor, protocol -from twisted.internet.error import DNSLookupError -from twisted.web.client import readBody, HTTPConnectionPool, Agent -from twisted.web.http_headers import Headers -from twisted.web._newclient import ResponseDone - -from synapse.http import cancelled_to_request_timed_out_error -from synapse.http.endpoint import matrix_federation_endpoint -import synapse.metrics -from synapse.util.async import sleep, add_timeout_to_deferred -from synapse.util import logcontext -from synapse.util.logcontext import make_deferred_yieldable -import synapse.util.retryutils - -from canonicaljson import encode_canonical_json - -from synapse.api.errors import ( - SynapseError, Codes, HttpResponseException, FederationDeniedError, -) - -from signedjson.sign import sign_json - import cgi -import simplejson as json import logging import random import sys import urllib -from six.moves.urllib import parse as urlparse -from six import string_types +from six import string_types +from six.moves.urllib import parse as urlparse +from canonicaljson import encode_canonical_json, json from prometheus_client import Counter +from signedjson.sign import sign_json + +from twisted.internet import defer, protocol, reactor +from twisted.internet.error import DNSLookupError +from twisted.web._newclient import ResponseDone +from twisted.web.client import Agent, HTTPConnectionPool, readBody +from twisted.web.http_headers import Headers + +import synapse.metrics +import synapse.util.retryutils +from synapse.api.errors import ( + Codes, + FederationDeniedError, + HttpResponseException, + SynapseError, +) +from synapse.http import cancelled_to_request_timed_out_error +from synapse.http.endpoint import matrix_federation_endpoint +from synapse.util import logcontext +from synapse.util.async_helpers import add_timeout_to_deferred +from synapse.util.logcontext import make_deferred_yieldable logger = logging.getLogger(__name__) outbound_logger = logging.getLogger("synapse.http.outbound") @@ -62,14 +61,14 @@ MAX_SHORT_RETRIES = 3 class MatrixFederationEndpointFactory(object): def __init__(self, hs): - self.tls_server_context_factory = hs.tls_server_context_factory + self.tls_client_options_factory = hs.tls_client_options_factory def endpointForURI(self, uri): destination = uri.netloc return matrix_federation_endpoint( reactor, destination, timeout=10, - ssl_context_factory=self.tls_server_context_factory + tls_client_options_factory=self.tls_client_options_factory ) @@ -134,7 +133,7 @@ class MatrixFederationHttpClient(object): failures, connection failures, SSL failures.) """ if ( - self.hs.config.federation_domain_whitelist and + self.hs.config.federation_domain_whitelist is not None and destination not in self.hs.config.federation_domain_whitelist ): raise FederationDeniedError(destination) @@ -193,6 +192,7 @@ class MatrixFederationHttpClient(object): add_timeout_to_deferred( request_deferred, timeout / 1000. if timeout else 60, + self.hs.get_reactor(), cancelled_to_request_timed_out_error, ) response = yield make_deferred_yieldable( @@ -234,7 +234,7 @@ class MatrixFederationHttpClient(object): delay = min(delay, 2) delay *= random.uniform(0.8, 1.4) - yield sleep(delay) + yield self.clock.sleep(delay) retries_left -= 1 else: raise @@ -260,14 +260,35 @@ class MatrixFederationHttpClient(object): defer.returnValue(response) def sign_request(self, destination, method, url_bytes, headers_dict, - content=None): + content=None, destination_is=None): + """ + Signs a request by adding an Authorization header to headers_dict + Args: + destination (bytes|None): The desination home server of the request. + May be None if the destination is an identity server, in which case + destination_is must be non-None. + method (bytes): The HTTP method of the request + url_bytes (bytes): The URI path of the request + headers_dict (dict): Dictionary of request headers to append to + content (bytes): The body of the request + destination_is (bytes): As 'destination', but if the destination is an + identity server + + Returns: + None + """ request = { "method": method, "uri": url_bytes, "origin": self.server_name, - "destination": destination, } + if destination is not None: + request["destination"] = destination + + if destination_is is not None: + request["destination_is"] = destination_is + if content is not None: request["content"] = content @@ -418,7 +439,7 @@ class MatrixFederationHttpClient(object): defer.returnValue(json.loads(body)) @defer.inlineCallbacks - def get_json(self, destination, path, args={}, retry_on_dns_fail=True, + def get_json(self, destination, path, args=None, retry_on_dns_fail=True, timeout=None, ignore_backoff=False): """ GETs some json from the given host homeserver and path @@ -426,7 +447,7 @@ class MatrixFederationHttpClient(object): destination (str): The remote server to send the HTTP request to. path (str): The HTTP path. - args (dict): A dictionary used to create query strings, defaults to + args (dict|None): A dictionary used to create query strings, defaults to None. timeout (int): How long to try (in ms) the destination for before giving up. None indicates no timeout and that the request will @@ -681,6 +702,9 @@ def check_content_type_is_json(headers): def encode_query_args(args): + if args is None: + return b"" + encoded_args = {} for k, vs in args.items(): if isinstance(vs, string_types): diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py index dc06f6c443..72c2654678 100644 --- a/synapse/http/request_metrics.py +++ b/synapse/http/request_metrics.py @@ -15,10 +15,11 @@ # limitations under the License. import logging +import threading from prometheus_client.core import Counter, Histogram -from synapse.metrics import LaterGauge +from synapse.metrics import LaterGauge from synapse.util.logcontext import LoggingContext logger = logging.getLogger(__name__) @@ -38,7 +39,8 @@ outgoing_responses_counter = Counter( ) response_timer = Histogram( - "synapse_http_server_response_time_seconds", "sec", ["method", "servlet", "tag"] + "synapse_http_server_response_time_seconds", "sec", + ["method", "servlet", "tag", "code"], ) response_ru_utime = Counter( @@ -110,6 +112,9 @@ in_flight_requests_db_sched_duration = Counter( # The set of all in flight requests, set[RequestMetrics] _in_flight_requests = set() +# Protects the _in_flight_requests set from concurrent accesss +_in_flight_requests_lock = threading.Lock() + def _get_in_flight_counts(): """Returns a count of all in flight requests by (method, server_name) @@ -117,13 +122,18 @@ def _get_in_flight_counts(): Returns: dict[tuple[str, str], int] """ - for rm in _in_flight_requests: + # Cast to a list to prevent it changing while the Prometheus + # thread is collecting metrics + with _in_flight_requests_lock: + reqs = list(_in_flight_requests) + + for rm in reqs: rm.update_metrics() # Map from (method, name) -> int, the number of in flight requests of that # type counts = {} - for rm in _in_flight_requests: + for rm in reqs: key = (rm.method, rm.name,) counts[key] = counts.get(key, 0) + 1 @@ -131,7 +141,7 @@ def _get_in_flight_counts(): LaterGauge( - "synapse_http_request_metrics_in_flight_requests_count", + "synapse_http_server_in_flight_requests_count", "", ["method", "servlet"], _get_in_flight_counts, @@ -145,12 +155,16 @@ class RequestMetrics(object): self.name = name self.method = method - self._request_stats = _RequestStats.from_context(self.start_context) + # _request_stats records resource usage that we have already added + # to the "in flight" metrics. + self._request_stats = self.start_context.get_resource_usage() - _in_flight_requests.add(self) + with _in_flight_requests_lock: + _in_flight_requests.add(self) def stop(self, time_sec, request): - _in_flight_requests.discard(self) + with _in_flight_requests_lock: + _in_flight_requests.discard(self) context = LoggingContext.current_context() @@ -165,26 +179,32 @@ class RequestMetrics(object): ) return - outgoing_responses_counter.labels(request.method, str(request.code)).inc() + response_code = str(request.code) + + outgoing_responses_counter.labels(request.method, response_code).inc() response_count.labels(request.method, self.name, tag).inc() - response_timer.labels(request.method, self.name, tag).observe( + response_timer.labels(request.method, self.name, tag, response_code).observe( time_sec - self.start ) - ru_utime, ru_stime = context.get_resource_usage() + resource_usage = context.get_resource_usage() - response_ru_utime.labels(request.method, self.name, tag).inc(ru_utime) - response_ru_stime.labels(request.method, self.name, tag).inc(ru_stime) + response_ru_utime.labels(request.method, self.name, tag).inc( + resource_usage.ru_utime, + ) + response_ru_stime.labels(request.method, self.name, tag).inc( + resource_usage.ru_stime, + ) response_db_txn_count.labels(request.method, self.name, tag).inc( - context.db_txn_count + resource_usage.db_txn_count ) response_db_txn_duration.labels(request.method, self.name, tag).inc( - context.db_txn_duration_sec + resource_usage.db_txn_duration_sec ) response_db_sched_duration.labels(request.method, self.name, tag).inc( - context.db_sched_duration_sec + resource_usage.db_sched_duration_sec ) response_size.labels(request.method, self.name, tag).inc(request.sentLength) @@ -197,7 +217,10 @@ class RequestMetrics(object): def update_metrics(self): """Updates the in flight metrics with values from this request. """ - diff = self._request_stats.update(self.start_context) + new_stats = self.start_context.get_resource_usage() + + diff = new_stats - self._request_stats + self._request_stats = new_stats in_flight_requests_ru_utime.labels(self.method, self.name).inc(diff.ru_utime) in_flight_requests_ru_stime.labels(self.method, self.name).inc(diff.ru_stime) @@ -213,61 +236,3 @@ class RequestMetrics(object): in_flight_requests_db_sched_duration.labels(self.method, self.name).inc( diff.db_sched_duration_sec ) - - -class _RequestStats(object): - """Keeps tracks of various metrics for an in flight request. - """ - - __slots__ = [ - "ru_utime", - "ru_stime", - "db_txn_count", - "db_txn_duration_sec", - "db_sched_duration_sec", - ] - - def __init__( - self, ru_utime, ru_stime, db_txn_count, db_txn_duration_sec, db_sched_duration_sec - ): - self.ru_utime = ru_utime - self.ru_stime = ru_stime - self.db_txn_count = db_txn_count - self.db_txn_duration_sec = db_txn_duration_sec - self.db_sched_duration_sec = db_sched_duration_sec - - @staticmethod - def from_context(context): - ru_utime, ru_stime = context.get_resource_usage() - - return _RequestStats( - ru_utime, ru_stime, - context.db_txn_count, - context.db_txn_duration_sec, - context.db_sched_duration_sec, - ) - - def update(self, context): - """Updates the current values and returns the difference between the - old and new values. - - Returns: - _RequestStats: The difference between the old and new values - """ - new = _RequestStats.from_context(context) - - diff = _RequestStats( - new.ru_utime - self.ru_utime, - new.ru_stime - self.ru_stime, - new.db_txn_count - self.db_txn_count, - new.db_txn_duration_sec - self.db_txn_duration_sec, - new.db_sched_duration_sec - self.db_sched_duration_sec, - ) - - self.ru_utime = new.ru_utime - self.ru_stime = new.ru_stime - self.db_txn_count = new.db_txn_count - self.db_txn_duration_sec = new.db_txn_duration_sec - self.db_sched_duration_sec = new.db_sched_duration_sec - - return diff diff --git a/synapse/http/server.py b/synapse/http/server.py index bc09b8b2be..2d5c23e673 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -13,35 +13,38 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + import cgi -from six.moves import http_client +import collections +import logging -from synapse.api.errors import ( - cs_exception, SynapseError, CodeMessageException, UnrecognizedRequestError, Codes -) -from synapse.http.request_metrics import ( - requests_counter, -) -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext -from synapse.util.caches import intern_dict -from synapse.util.metrics import Measure -import synapse.metrics -import synapse.events +from six import PY3 +from six.moves import http_client, urllib -from canonicaljson import ( - encode_canonical_json, encode_pretty_printed_json -) +from canonicaljson import encode_canonical_json, encode_pretty_printed_json, json from twisted.internet import defer from twisted.python import failure -from twisted.web import server, resource +from twisted.web import resource from twisted.web.server import NOT_DONE_YET +from twisted.web.static import NoRangeStaticProducer from twisted.web.util import redirectTo -import collections -import logging -import urllib -import simplejson +import synapse.events +import synapse.metrics +from synapse.api.errors import ( + CodeMessageException, + Codes, + SynapseError, + UnrecognizedRequestError, +) +from synapse.util.caches import intern_dict +from synapse.util.logcontext import preserve_fn + +if PY3: + from io import BytesIO +else: + from cStringIO import StringIO as BytesIO logger = logging.getLogger(__name__) @@ -61,11 +64,10 @@ HTML_ERROR_TEMPLATE = """<!DOCTYPE html> def wrap_json_request_handler(h): """Wraps a request handler method with exception handling. - Also adds logging as per wrap_request_handler_with_logging. + Also does the wrapping with request.processing as per wrap_async_request_handler. The handler method must have a signature of "handle_foo(self, request)", - where "self" must have a "clock" attribute (and "request" must be a - SynapseRequest). + where "request" must be a SynapseRequest. The handler must return a deferred. If the deferred succeeds we assume that a response has been sent. If the deferred fails with a SynapseError we use @@ -77,16 +79,13 @@ def wrap_json_request_handler(h): def wrapped_request_handler(self, request): try: yield h(self, request) - except CodeMessageException as e: + except SynapseError as e: code = e.code - if isinstance(e, SynapseError): - logger.info( - "%s SynapseError: %s - %s", request, code, e.msg - ) - else: - logger.exception(e) + logger.info( + "%s SynapseError: %s - %s", request, code, e.msg + ) respond_with_json( - request, code, cs_exception(e), send_cors=True, + request, code, e.error_dict(), send_cors=True, pretty_print=_request_user_agent_is_curl(request), ) @@ -112,24 +111,23 @@ def wrap_json_request_handler(h): pretty_print=_request_user_agent_is_curl(request), ) - return wrap_request_handler_with_logging(wrapped_request_handler) + return wrap_async_request_handler(wrapped_request_handler) def wrap_html_request_handler(h): """Wraps a request handler method with exception handling. - Also adds logging as per wrap_request_handler_with_logging. + Also does the wrapping with request.processing as per wrap_async_request_handler. The handler method must have a signature of "handle_foo(self, request)", - where "self" must have a "clock" attribute (and "request" must be a - SynapseRequest). + where "request" must be a SynapseRequest. """ def wrapped_request_handler(self, request): d = defer.maybeDeferred(h, self, request) d.addErrback(_return_html_error, request) return d - return wrap_request_handler_with_logging(wrapped_request_handler) + return wrap_async_request_handler(wrapped_request_handler) def _return_html_error(f, request): @@ -174,46 +172,26 @@ def _return_html_error(f, request): finish_request(request) -def wrap_request_handler_with_logging(h): - """Wraps a request handler to provide logging and metrics +def wrap_async_request_handler(h): + """Wraps an async request handler so that it calls request.processing. + + This helps ensure that work done by the request handler after the request is completed + is correctly recorded against the request metrics/logs. The handler method must have a signature of "handle_foo(self, request)", - where "self" must have a "clock" attribute (and "request" must be a - SynapseRequest). + where "request" must be a SynapseRequest. - As well as calling `request.processing` (which will log the response and - duration for this request), the wrapped request handler will insert the - request id into the logging context. + The handler may return a deferred, in which case the completion of the request isn't + logged until the deferred completes. """ @defer.inlineCallbacks - def wrapped_request_handler(self, request): - """ - Args: - self: - request (synapse.http.site.SynapseRequest): - """ + def wrapped_async_request_handler(self, request): + with request.processing(): + yield h(self, request) - request_id = request.get_request_id() - with LoggingContext(request_id) as request_context: - request_context.request = request_id - with Measure(self.clock, "wrapped_request_handler"): - # we start the request metrics timer here with an initial stab - # at the servlet name. For most requests that name will be - # JsonResource (or a subclass), and JsonResource._async_render - # will update it once it picks a servlet. - servlet_name = self.__class__.__name__ - with request.processing(servlet_name): - with PreserveLoggingContext(request_context): - d = defer.maybeDeferred(h, self, request) - - # record the arrival of the request *after* - # dispatching to the handler, so that the handler - # can update the servlet name in the request - # metrics - requests_counter.labels(request.method, - request.request_metrics.name).inc() - yield d - return wrapped_request_handler + # we need to preserve_fn here, because the synchronous render method won't yield for + # us (obviously) + return preserve_fn(wrapped_async_request_handler) class HttpServer(object): @@ -265,6 +243,7 @@ class JsonResource(HttpServer, resource.Resource): self.hs = hs def register_paths(self, method, path_patterns, callback): + method = method.encode("utf-8") # method is bytes on py3 for path_pattern in path_patterns: logger.debug("Registering for %s %s", method, path_pattern.pattern) self.path_regexs.setdefault(method, []).append( @@ -275,7 +254,7 @@ class JsonResource(HttpServer, resource.Resource): """ This gets called by twisted every time someone sends us a request. """ self._async_render(request) - return server.NOT_DONE_YET + return NOT_DONE_YET @wrap_json_request_handler @defer.inlineCallbacks @@ -297,8 +276,19 @@ class JsonResource(HttpServer, resource.Resource): # here. If it throws an exception, that is handled by the wrapper # installed by @request_handler. + def _unquote(s): + if PY3: + # On Python 3, unquote is unicode -> unicode + return urllib.parse.unquote(s) + else: + # On Python 2, unquote is bytes -> bytes We need to encode the + # URL again (as it was decoded by _get_handler_for request), as + # ASCII because it's a URL, and then decode it to get the UTF-8 + # characters that were quoted. + return urllib.parse.unquote(s.encode('ascii')).decode('utf8') + kwargs = intern_dict({ - name: urllib.unquote(value).decode("UTF-8") if value else value + name: _unquote(value) if value else value for name, value in group_dict.items() }) @@ -314,9 +304,9 @@ class JsonResource(HttpServer, resource.Resource): request (twisted.web.http.Request): Returns: - Tuple[Callable, dict[str, str]]: callback method, and the dict - mapping keys to path components as specified in the handler's - path match regexp. + Tuple[Callable, dict[unicode, unicode]]: callback method, and the + dict mapping keys to path components as specified in the + handler's path match regexp. The callback will normally be a method registered via register_paths, so will return (possibly via Deferred) either @@ -328,7 +318,7 @@ class JsonResource(HttpServer, resource.Resource): # Loop through all the registered callbacks to check if the method # and path regex match for path_entry in self.path_regexs.get(request.method, []): - m = path_entry.pattern.match(request.path) + m = path_entry.pattern.match(request.path.decode('ascii')) if m: # We found a match! return path_entry.callback, m.groupdict() @@ -384,7 +374,7 @@ class RootRedirect(resource.Resource): self.url = path def render_GET(self, request): - return redirectTo(self.url, request) + return redirectTo(self.url.encode('ascii'), request) def getChild(self, name, request): if len(name) == 0: @@ -405,12 +395,13 @@ def respond_with_json(request, code, json_object, send_cors=False, return if pretty_print: - json_bytes = encode_pretty_printed_json(json_object) + "\n" + json_bytes = encode_pretty_printed_json(json_object) + b"\n" else: if canonical_json or synapse.events.USE_FROZEN_DICTS: + # canonicaljson already encodes to bytes json_bytes = encode_canonical_json(json_object) else: - json_bytes = simplejson.dumps(json_object) + json_bytes = json.dumps(json_object).encode("utf-8") return respond_with_json_bytes( request, code, json_bytes, @@ -440,8 +431,12 @@ def respond_with_json_bytes(request, code, json_bytes, send_cors=False, if send_cors: set_cors_headers(request) - request.write(json_bytes) - finish_request(request) + # todo: we can almost certainly avoid this copy and encode the json straight into + # the bytesIO, but it would involve faffing around with string->bytes wrappers. + bytes_io = BytesIO(json_bytes) + + producer = NoRangeStaticProducer(request, bytes_io) + producer.start() return NOT_DONE_YET diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py index ef8e62901b..a1e4b88e6d 100644 --- a/synapse/http/servlet.py +++ b/synapse/http/servlet.py @@ -15,10 +15,11 @@ """ This module contains base REST classes for constructing REST servlets. """ -from synapse.api.errors import SynapseError, Codes - import logging -import simplejson + +from canonicaljson import json + +from synapse.api.errors import Codes, SynapseError logger = logging.getLogger(__name__) @@ -28,7 +29,7 @@ def parse_integer(request, name, default=None, required=False): Args: request: the twisted HTTP request. - name (str): the name of the query parameter. + name (bytes/unicode): the name of the query parameter. default (int|None): value to use if the parameter is absent, defaults to None. required (bool): whether to raise a 400 SynapseError if the @@ -45,6 +46,10 @@ def parse_integer(request, name, default=None, required=False): def parse_integer_from_args(args, name, default=None, required=False): + + if not isinstance(name, bytes): + name = name.encode('ascii') + if name in args: try: return int(args[name][0]) @@ -64,7 +69,7 @@ def parse_boolean(request, name, default=None, required=False): Args: request: the twisted HTTP request. - name (str): the name of the query parameter. + name (bytes/unicode): the name of the query parameter. default (bool|None): value to use if the parameter is absent, defaults to None. required (bool): whether to raise a 400 SynapseError if the @@ -82,11 +87,15 @@ def parse_boolean(request, name, default=None, required=False): def parse_boolean_from_args(args, name, default=None, required=False): + + if not isinstance(name, bytes): + name = name.encode('ascii') + if name in args: try: return { - "true": True, - "false": False, + b"true": True, + b"false": False, }[args[name][0]] except Exception: message = ( @@ -103,21 +112,29 @@ def parse_boolean_from_args(args, name, default=None, required=False): def parse_string(request, name, default=None, required=False, - allowed_values=None, param_type="string"): - """Parse a string parameter from the request query string. + allowed_values=None, param_type="string", encoding='ascii'): + """ + Parse a string parameter from the request query string. + + If encoding is not None, the content of the query param will be + decoded to Unicode using the encoding, otherwise it will be encoded Args: request: the twisted HTTP request. - name (str): the name of the query parameter. - default (str|None): value to use if the parameter is absent, defaults - to None. + name (bytes/unicode): the name of the query parameter. + default (bytes/unicode|None): value to use if the parameter is absent, + defaults to None. Must be bytes if encoding is None. required (bool): whether to raise a 400 SynapseError if the parameter is absent, defaults to False. - allowed_values (list[str]): List of allowed values for the string, - or None if any value is allowed, defaults to None + allowed_values (list[bytes/unicode]): List of allowed values for the + string, or None if any value is allowed, defaults to None. Must be + the same type as name, if given. + encoding: The encoding to decode the name to, and decode the string + content with. Returns: - str|None: A string value or the default. + bytes/unicode|None: A string value or the default. Unicode if encoding + was given, bytes otherwise. Raises: SynapseError if the parameter is absent and required, or if the @@ -125,14 +142,22 @@ def parse_string(request, name, default=None, required=False, is not one of those allowed values. """ return parse_string_from_args( - request.args, name, default, required, allowed_values, param_type, + request.args, name, default, required, allowed_values, param_type, encoding ) def parse_string_from_args(args, name, default=None, required=False, - allowed_values=None, param_type="string"): + allowed_values=None, param_type="string", encoding='ascii'): + + if not isinstance(name, bytes): + name = name.encode('ascii') + if name in args: value = args[name][0] + + if encoding: + value = value.decode(encoding) + if allowed_values is not None and value not in allowed_values: message = "Query parameter %r must be one of [%s]" % ( name, ", ".join(repr(v) for v in allowed_values) @@ -145,6 +170,10 @@ def parse_string_from_args(args, name, default=None, required=False, message = "Missing %s query parameter %r" % (param_type, name) raise SynapseError(400, message, errcode=Codes.MISSING_PARAM) else: + + if encoding and isinstance(default, bytes): + return default.decode(encoding) + return default @@ -170,8 +199,16 @@ def parse_json_value_from_request(request, allow_empty_body=False): if not content_bytes and allow_empty_body: return None + # Decode to Unicode so that simplejson will return Unicode strings on + # Python 2 + try: + content_unicode = content_bytes.decode('utf8') + except UnicodeDecodeError: + logger.warn("Unable to decode UTF-8") + raise SynapseError(400, "Content not JSON.", errcode=Codes.NOT_JSON) + try: - content = simplejson.loads(content_bytes) + content = json.loads(content_unicode) except Exception as e: logger.warn("Unable to parse JSON: %s", e) raise SynapseError(400, "Content not JSON.", errcode=Codes.NOT_JSON) @@ -205,7 +242,7 @@ def parse_json_object_from_request(request, allow_empty_body=False): return content -def assert_params_in_request(body, required): +def assert_params_in_dict(body, required): absent = [] for k in required: if k not in body: diff --git a/synapse/http/site.py b/synapse/http/site.py index 2664006f8c..88ed3714f9 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -11,16 +11,15 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import contextlib import logging import time -from twisted.web.server import Site, Request +from twisted.web.server import Request, Site from synapse.http import redact_uri -from synapse.http.request_metrics import RequestMetrics -from synapse.util.logcontext import LoggingContext +from synapse.http.request_metrics import RequestMetrics, requests_counter +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext logger = logging.getLogger(__name__) @@ -34,24 +33,43 @@ class SynapseRequest(Request): It extends twisted's twisted.web.server.Request, and adds: * Unique request ID + * A log context associated with the request * Redaction of access_token query-params in __repr__ * Logging at start and end * Metrics to record CPU, wallclock and DB time by endpoint. - It provides a method `processing` which should be called by the Resource - which is handling the request, and returns a context manager. + It also provides a method `processing`, which returns a context manager. If this + method is called, the request won't be logged until the context manager is closed; + this is useful for asynchronous request handlers which may go on processing the + request even after the client has disconnected. + Attributes: + logcontext(LoggingContext) : the log context for this request """ - def __init__(self, site, *args, **kw): - Request.__init__(self, *args, **kw) + def __init__(self, site, channel, *args, **kw): + Request.__init__(self, channel, *args, **kw) self.site = site + self._channel = channel # this is used by the tests self.authenticated_entity = None self.start_time = 0 + # we can't yet create the logcontext, as we don't know the method. + self.logcontext = None + global _next_request_seq self.request_seq = _next_request_seq _next_request_seq += 1 + # whether an asynchronous request handler has called processing() + self._is_processing = False + + # the time when the asynchronous request handler completed its processing + self._processing_finished_time = None + + # what time we finished sending the response to the client (or the connection + # dropped) + self.finish_time = None + def __repr__(self): # We overwrite this so that we don't log ``access_token`` return '<%s at 0x%x method=%r uri=%r clientproto=%r site=%r>' % ( @@ -73,11 +91,116 @@ class SynapseRequest(Request): return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1] def render(self, resrc): + # this is called once a Resource has been found to serve the request; in our + # case the Resource in question will normally be a JsonResource. + + # create a LogContext for this request + request_id = self.get_request_id() + logcontext = self.logcontext = LoggingContext(request_id) + logcontext.request = request_id + # override the Server header which is set by twisted self.setHeader("Server", self.site.server_version_string) - return Request.render(self, resrc) + + with PreserveLoggingContext(self.logcontext): + # we start the request metrics timer here with an initial stab + # at the servlet name. For most requests that name will be + # JsonResource (or a subclass), and JsonResource._async_render + # will update it once it picks a servlet. + servlet_name = resrc.__class__.__name__ + self._started_processing(servlet_name) + + Request.render(self, resrc) + + # record the arrival of the request *after* + # dispatching to the handler, so that the handler + # can update the servlet name in the request + # metrics + requests_counter.labels(self.method, + self.request_metrics.name).inc() + + @contextlib.contextmanager + def processing(self): + """Record the fact that we are processing this request. + + Returns a context manager; the correct way to use this is: + + @defer.inlineCallbacks + def handle_request(request): + with request.processing("FooServlet"): + yield really_handle_the_request() + + Once the context manager is closed, the completion of the request will be logged, + and the various metrics will be updated. + """ + if self._is_processing: + raise RuntimeError("Request is already processing") + self._is_processing = True + + try: + yield + except Exception: + # this should already have been caught, and sent back to the client as a 500. + logger.exception("Asynchronous messge handler raised an uncaught exception") + finally: + # the request handler has finished its work and either sent the whole response + # back, or handed over responsibility to a Producer. + + self._processing_finished_time = time.time() + self._is_processing = False + + # if we've already sent the response, log it now; otherwise, we wait for the + # response to be sent. + if self.finish_time is not None: + self._finished_processing() + + def finish(self): + """Called when all response data has been written to this Request. + + Overrides twisted.web.server.Request.finish to record the finish time and do + logging. + """ + self.finish_time = time.time() + Request.finish(self) + if not self._is_processing: + with PreserveLoggingContext(self.logcontext): + self._finished_processing() + + def connectionLost(self, reason): + """Called when the client connection is closed before the response is written. + + Overrides twisted.web.server.Request.connectionLost to record the finish time and + do logging. + """ + self.finish_time = time.time() + Request.connectionLost(self, reason) + + # we only get here if the connection to the client drops before we send + # the response. + # + # It's useful to log it here so that we can get an idea of when + # the client disconnects. + with PreserveLoggingContext(self.logcontext): + logger.warn( + "Error processing request %r: %s %s", self, reason.type, reason.value, + ) + + if not self._is_processing: + self._finished_processing() def _started_processing(self, servlet_name): + """Record the fact that we are processing this request. + + This will log the request's arrival. Once the request completes, + be sure to call finished_processing. + + Args: + servlet_name (str): the name of the servlet which will be + processing this request. This is used in the metrics. + + It is possible to update this afterwards by updating + self.request_metrics.name. + """ self.start_time = time.time() self.request_metrics = RequestMetrics() self.request_metrics.start( @@ -93,72 +216,79 @@ class SynapseRequest(Request): ) def _finished_processing(self): - try: - context = LoggingContext.current_context() - ru_utime, ru_stime = context.get_resource_usage() - db_txn_count = context.db_txn_count - db_txn_duration_sec = context.db_txn_duration_sec - db_sched_duration_sec = context.db_sched_duration_sec - except Exception: - ru_utime, ru_stime = (0, 0) - db_txn_count, db_txn_duration_sec = (0, 0) + """Log the completion of this request and update the metrics + """ - end_time = time.time() + if self.logcontext is None: + # this can happen if the connection closed before we read the + # headers (so render was never called). In that case we'll already + # have logged a warning, so just bail out. + return + + usage = self.logcontext.get_resource_usage() + + if self._processing_finished_time is None: + # we completed the request without anything calling processing() + self._processing_finished_time = time.time() + + # the time between receiving the request and the request handler finishing + processing_time = self._processing_finished_time - self.start_time + + # the time between the request handler finishing and the response being sent + # to the client (nb may be negative) + response_send_time = self.finish_time - self._processing_finished_time + + # need to decode as it could be raw utf-8 bytes + # from a IDN servname in an auth header + authenticated_entity = self.authenticated_entity + if authenticated_entity is not None and isinstance(authenticated_entity, bytes): + authenticated_entity = authenticated_entity.decode("utf-8", "replace") + + # ...or could be raw utf-8 bytes in the User-Agent header. + # N.B. if you don't do this, the logger explodes cryptically + # with maximum recursion trying to log errors about + # the charset problem. + # c.f. https://github.com/matrix-org/synapse/issues/3471 + user_agent = self.get_user_agent() + if user_agent is not None: + user_agent = user_agent.decode("utf-8", "replace") + else: + user_agent = "-" + + code = str(self.code) + if not self.finished: + # we didn't send the full response before we gave up (presumably because + # the connection dropped) + code += "!" self.site.access_logger.info( "%s - %s - {%s}" - " Processed request: %.3fsec (%.3fsec, %.3fsec) (%.3fsec/%.3fsec/%d)" - " %sB %s \"%s %s %s\" \"%s\"", + " Processed request: %.3fsec/%.3fsec (%.3fsec, %.3fsec) (%.3fsec/%.3fsec/%d)" + " %sB %s \"%s %s %s\" \"%s\" [%d dbevts]", self.getClientIP(), self.site.site_tag, - self.authenticated_entity, - end_time - self.start_time, - ru_utime, - ru_stime, - db_sched_duration_sec, - db_txn_duration_sec, - int(db_txn_count), + authenticated_entity, + processing_time, + response_send_time, + usage.ru_utime, + usage.ru_stime, + usage.db_sched_duration_sec, + usage.db_txn_duration_sec, + int(usage.db_txn_count), self.sentLength, - self.code, + code, self.method, self.get_redacted_uri(), self.clientproto, - self.get_user_agent(), + user_agent, + usage.evt_db_fetch_count, ) try: - self.request_metrics.stop(end_time, self) + self.request_metrics.stop(self.finish_time, self) except Exception as e: logger.warn("Failed to stop metrics: %r", e) - @contextlib.contextmanager - def processing(self, servlet_name): - """Record the fact that we are processing this request. - - Returns a context manager; the correct way to use this is: - - @defer.inlineCallbacks - def handle_request(request): - with request.processing("FooServlet"): - yield really_handle_the_request() - - This will log the request's arrival. Once the context manager is - closed, the completion of the request will be logged, and the various - metrics will be updated. - - Args: - servlet_name (str): the name of the servlet which will be - processing this request. This is used in the metrics. - - It is possible to update this afterwards by updating - self.request_metrics.servlet_name. - """ - # TODO: we should probably just move this into render() and finish(), - # to save having to call a separate method. - self._started_processing(servlet_name) - yield - self._finished_processing() - class XForwardedForRequest(SynapseRequest): def __init__(self, *args, **kw): @@ -204,7 +334,7 @@ class SynapseSite(Site): proxied = config.get("x_forwarded", False) self.requestFactory = SynapseRequestFactory(self, proxied) self.access_logger = logging.getLogger(logger_name) - self.server_version_string = server_version_string + self.server_version_string = server_version_string.encode('ascii') def log(self, request): pass |