diff options
author | Michael Telatynski <7t3chguy@gmail.com> | 2018-07-24 17:17:46 +0100 |
---|---|---|
committer | Michael Telatynski <7t3chguy@gmail.com> | 2018-07-24 17:17:46 +0100 |
commit | 87951d3891efb5bccedf72c12b3da0d6ab482253 (patch) | |
tree | de7d997567c66c5a4d8743c1f3b9d6b474f5cfd9 /synapse/http/client.py | |
parent | if inviter_display_name == ""||None then default to inviter MXID (diff) | |
parent | Merge pull request #3595 from matrix-org/erikj/use_deltas (diff) | |
download | synapse-87951d3891efb5bccedf72c12b3da0d6ab482253.tar.xz |
Merge branch 'develop' of github.com:matrix-org/synapse into t3chguy/default_inviter_display_name_3pid
Diffstat (limited to 'synapse/http/client.py')
-rw-r--r-- | synapse/http/client.py | 233 |
1 files changed, 146 insertions, 87 deletions
diff --git a/synapse/http/client.py b/synapse/http/client.py index 9eba046bbf..25b6307884 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,49 +13,49 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from OpenSSL import SSL -from OpenSSL.SSL import VERIFY_NONE +import logging +import urllib -from synapse.api.errors import ( - CodeMessageException, MatrixCodeMessageException, SynapseError, Codes, -) -from synapse.util.logcontext import preserve_context_over_fn -from synapse.util import logcontext -import synapse.metrics -from synapse.http.endpoint import SpiderEndpoint +from six import StringIO -from canonicaljson import encode_canonical_json +from canonicaljson import encode_canonical_json, json +from prometheus_client import Counter -from twisted.internet import defer, reactor, ssl, protocol, task +from OpenSSL import SSL +from OpenSSL.SSL import VERIFY_NONE +from twisted.internet import defer, protocol, reactor, ssl, task from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS +from twisted.web._newclient import ResponseDone from twisted.web.client import ( - BrowserLikeRedirectAgent, ContentDecoderAgent, GzipDecoder, Agent, - readBody, PartialDownloadError, + Agent, + BrowserLikeRedirectAgent, + ContentDecoderAgent, + FileBodyProducer as TwistedFileBodyProducer, + GzipDecoder, + HTTPConnectionPool, + PartialDownloadError, + readBody, ) -from twisted.web.client import FileBodyProducer as TwistedFileBodyProducer from twisted.web.http import PotentialDataLoss from twisted.web.http_headers import Headers -from twisted.web._newclient import ResponseDone - -from StringIO import StringIO - -import simplejson as json -import logging -import urllib +from synapse.api.errors import ( + CodeMessageException, + Codes, + MatrixCodeMessageException, + SynapseError, +) +from synapse.http import cancelled_to_request_timed_out_error, redact_uri +from synapse.http.endpoint import SpiderEndpoint +from synapse.util.async import add_timeout_to_deferred +from synapse.util.caches import CACHE_SIZE_FACTOR +from synapse.util.logcontext import make_deferred_yieldable logger = logging.getLogger(__name__) -metrics = synapse.metrics.get_metrics_for(__name__) - -outgoing_requests_counter = metrics.register_counter( - "requests", - labels=["method"], -) -incoming_responses_counter = metrics.register_counter( - "responses", - labels=["method", "code"], -) +outgoing_requests_counter = Counter("synapse_http_client_requests", "", ["method"]) +incoming_responses_counter = Counter("synapse_http_client_responses", "", + ["method", "code"]) class SimpleHttpClient(object): @@ -64,13 +65,23 @@ class SimpleHttpClient(object): """ def __init__(self, hs): self.hs = hs + + pool = HTTPConnectionPool(reactor) + + # the pusher makes lots of concurrent SSL connections to sygnal, and + # tends to do so in batches, so we need to allow the pool to keep lots + # of idle connections around. + pool.maxPersistentPerHost = max((100 * CACHE_SIZE_FACTOR, 5)) + pool.cachedConnectionTimeout = 2 * 60 + # The default context factory in Twisted 14.0.0 (which we require) is # BrowserLikePolicyForHTTPS which will do regular cert validation # 'like a browser' self.agent = Agent( reactor, connectTimeout=15, - contextFactory=hs.get_http_client_context_factory() + contextFactory=hs.get_http_client_context_factory(), + pool=pool, ) self.user_agent = hs.version_string self.clock = hs.get_clock() @@ -81,76 +92,103 @@ class SimpleHttpClient(object): def request(self, method, uri, *args, **kwargs): # A small wrapper around self.agent.request() so we can easily attach # counters to it - outgoing_requests_counter.inc(method) + outgoing_requests_counter.labels(method).inc() - def send_request(): + # log request but strip `access_token` (AS requests for example include this) + logger.info("Sending request %s %s", method, redact_uri(uri)) + + try: request_deferred = self.agent.request( method, uri, *args, **kwargs ) - - return self.clock.time_bound_deferred( - request_deferred, - time_out=60, + add_timeout_to_deferred( + request_deferred, 60, self.hs.get_reactor(), + cancelled_to_request_timed_out_error, ) + response = yield make_deferred_yieldable(request_deferred) - logger.info("Sending request %s %s", method, uri) - - try: - with logcontext.PreserveLoggingContext(): - response = yield send_request() - - incoming_responses_counter.inc(method, response.code) + incoming_responses_counter.labels(method, response.code).inc() logger.info( "Received response to %s %s: %s", - method, uri, response.code + method, redact_uri(uri), response.code ) defer.returnValue(response) except Exception as e: - incoming_responses_counter.inc(method, "ERR") + incoming_responses_counter.labels(method, "ERR").inc() logger.info( "Error sending request to %s %s: %s %s", - method, uri, type(e).__name__, e.message + method, redact_uri(uri), type(e).__name__, e.message ) - raise e + raise @defer.inlineCallbacks - def post_urlencoded_get_json(self, uri, args={}): + def post_urlencoded_get_json(self, uri, args={}, headers=None): + """ + Args: + uri (str): + args (dict[str, str|List[str]]): query params + headers (dict[str, List[str]]|None): If not None, a map from + header name to a list of values for that header + + Returns: + Deferred[object]: parsed json + """ + # TODO: Do we ever want to log message contents? logger.debug("post_urlencoded_get_json args: %s", args) query_bytes = urllib.urlencode(encode_urlencode_args(args), True) + actual_headers = { + b"Content-Type": [b"application/x-www-form-urlencoded"], + b"User-Agent": [self.user_agent], + } + if headers: + actual_headers.update(headers) + response = yield self.request( "POST", uri.encode("ascii"), - headers=Headers({ - b"Content-Type": [b"application/x-www-form-urlencoded"], - b"User-Agent": [self.user_agent], - }), + headers=Headers(actual_headers), bodyProducer=FileBodyProducer(StringIO(query_bytes)) ) - body = yield preserve_context_over_fn(readBody, response) + body = yield make_deferred_yieldable(readBody(response)) defer.returnValue(json.loads(body)) @defer.inlineCallbacks - def post_json_get_json(self, uri, post_json): + def post_json_get_json(self, uri, post_json, headers=None): + """ + + Args: + uri (str): + post_json (object): + headers (dict[str, List[str]]|None): If not None, a map from + header name to a list of values for that header + + Returns: + Deferred[object]: parsed json + """ json_str = encode_canonical_json(post_json) logger.debug("HTTP POST %s -> %s", json_str, uri) + actual_headers = { + b"Content-Type": [b"application/json"], + b"User-Agent": [self.user_agent], + } + if headers: + actual_headers.update(headers) + response = yield self.request( "POST", uri.encode("ascii"), - headers=Headers({ - b"Content-Type": [b"application/json"], - b"User-Agent": [self.user_agent], - }), + headers=Headers(actual_headers), bodyProducer=FileBodyProducer(StringIO(json_str)) ) - body = yield preserve_context_over_fn(readBody, response) + body = yield make_deferred_yieldable(readBody(response)) if 200 <= response.code < 300: defer.returnValue(json.loads(body)) @@ -160,7 +198,7 @@ class SimpleHttpClient(object): defer.returnValue(json.loads(body)) @defer.inlineCallbacks - def get_json(self, uri, args={}): + def get_json(self, uri, args={}, headers=None): """ Gets some json from the given URI. Args: @@ -169,6 +207,8 @@ class SimpleHttpClient(object): None. **Note**: The value of each key is assumed to be an iterable and *not* a string. + headers (dict[str, List[str]]|None): If not None, a map from + header name to a list of values for that header Returns: Deferred: Succeeds when we get *any* 2xx HTTP response, with the HTTP body as JSON. @@ -177,13 +217,13 @@ class SimpleHttpClient(object): error message. """ try: - body = yield self.get_raw(uri, args) + body = yield self.get_raw(uri, args, headers=headers) defer.returnValue(json.loads(body)) except CodeMessageException as e: raise self._exceptionFromFailedRequest(e.code, e.msg) @defer.inlineCallbacks - def put_json(self, uri, json_body, args={}): + def put_json(self, uri, json_body, args={}, headers=None): """ Puts some json to the given URI. Args: @@ -193,6 +233,8 @@ class SimpleHttpClient(object): None. **Note**: The value of each key is assumed to be an iterable and *not* a string. + headers (dict[str, List[str]]|None): If not None, a map from + header name to a list of values for that header Returns: Deferred: Succeeds when we get *any* 2xx HTTP response, with the HTTP body as JSON. @@ -205,17 +247,21 @@ class SimpleHttpClient(object): json_str = encode_canonical_json(json_body) + actual_headers = { + b"Content-Type": [b"application/json"], + b"User-Agent": [self.user_agent], + } + if headers: + actual_headers.update(headers) + response = yield self.request( "PUT", uri.encode("ascii"), - headers=Headers({ - b"User-Agent": [self.user_agent], - "Content-Type": ["application/json"] - }), + headers=Headers(actual_headers), bodyProducer=FileBodyProducer(StringIO(json_str)) ) - body = yield preserve_context_over_fn(readBody, response) + body = yield make_deferred_yieldable(readBody(response)) if 200 <= response.code < 300: defer.returnValue(json.loads(body)) @@ -226,7 +272,7 @@ class SimpleHttpClient(object): raise CodeMessageException(response.code, body) @defer.inlineCallbacks - def get_raw(self, uri, args={}): + def get_raw(self, uri, args={}, headers=None): """ Gets raw text from the given URI. Args: @@ -235,6 +281,8 @@ class SimpleHttpClient(object): None. **Note**: The value of each key is assumed to be an iterable and *not* a string. + headers (dict[str, List[str]]|None): If not None, a map from + header name to a list of values for that header Returns: Deferred: Succeeds when we get *any* 2xx HTTP response, with the HTTP body at text. @@ -246,15 +294,19 @@ class SimpleHttpClient(object): query_bytes = urllib.urlencode(args, True) uri = "%s?%s" % (uri, query_bytes) + actual_headers = { + b"User-Agent": [self.user_agent], + } + if headers: + actual_headers.update(headers) + response = yield self.request( "GET", uri.encode("ascii"), - headers=Headers({ - b"User-Agent": [self.user_agent], - }) + headers=Headers(actual_headers), ) - body = yield preserve_context_over_fn(readBody, response) + body = yield make_deferred_yieldable(readBody(response)) if 200 <= response.code < 300: defer.returnValue(body) @@ -274,27 +326,33 @@ class SimpleHttpClient(object): # The two should be factored out. @defer.inlineCallbacks - def get_file(self, url, output_stream, max_size=None): + def get_file(self, url, output_stream, max_size=None, headers=None): """GETs a file from a given URL Args: url (str): The URL to GET output_stream (file): File to write the response body to. + headers (dict[str, List[str]]|None): If not None, a map from + header name to a list of values for that header Returns: A (int,dict,string,int) tuple of the file length, dict of the response headers, absolute URI of the response and HTTP response code. """ + actual_headers = { + b"User-Agent": [self.user_agent], + } + if headers: + actual_headers.update(headers) + response = yield self.request( "GET", url.encode("ascii"), - headers=Headers({ - b"User-Agent": [self.user_agent], - }) + headers=Headers(actual_headers), ) - headers = dict(response.headers.getAllRawHeaders()) + resp_headers = dict(response.headers.getAllRawHeaders()) - if 'Content-Length' in headers and headers['Content-Length'] > max_size: + if 'Content-Length' in resp_headers and resp_headers['Content-Length'] > max_size: logger.warn("Requested URL is too large > %r bytes" % (self.max_size,)) raise SynapseError( 502, @@ -315,10 +373,9 @@ class SimpleHttpClient(object): # straight back in again try: - length = yield preserve_context_over_fn( - _readBodyToFile, - response, output_stream, max_size - ) + length = yield make_deferred_yieldable(_readBodyToFile( + response, output_stream, max_size, + )) except Exception as e: logger.exception("Failed to download body") raise SynapseError( @@ -327,7 +384,9 @@ class SimpleHttpClient(object): Codes.UNKNOWN, ) - defer.returnValue((length, headers, response.request.absoluteURI, response.code)) + defer.returnValue( + (length, resp_headers, response.request.absoluteURI, response.code), + ) # XXX: FIXME: This is horribly copy-pasted from matrixfederationclient. @@ -395,7 +454,7 @@ class CaptchaServerHttpClient(SimpleHttpClient): ) try: - body = yield preserve_context_over_fn(readBody, response) + body = yield make_deferred_yieldable(readBody(response)) defer.returnValue(body) except PartialDownloadError as e: # twisted dislikes google's response, no content length. @@ -446,7 +505,7 @@ class SpiderHttpClient(SimpleHttpClient): reactor, SpiderEndpointFactory(hs) ) - ), [('gzip', GzipDecoder)] + ), [(b'gzip', GzipDecoder)] ) # We could look like Chrome: # self.user_agent = ("Mozilla/5.0 (%s) (KHTML, like Gecko) |