From 2d2828dcbc2c6360d28a64d3849cf849eb5348c4 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Thu, 6 Sep 2018 00:10:47 +1000 Subject: Port http/ to Python 3 (#3771) --- synapse/http/client.py | 82 +++++-------- synapse/http/matrixfederationclient.py | 204 ++++++++++++++------------------- synapse/http/site.py | 4 +- 3 files changed, 117 insertions(+), 173 deletions(-) (limited to 'synapse/http') diff --git a/synapse/http/client.py b/synapse/http/client.py index ab4fbf59b2..4ba54fed05 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -13,24 +13,25 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + import logging -import urllib -from six import StringIO +from six import text_type +from six.moves import urllib +import treq from canonicaljson import encode_canonical_json, json from prometheus_client import Counter from OpenSSL import SSL from OpenSSL.SSL import VERIFY_NONE -from twisted.internet import defer, protocol, reactor, ssl, task +from twisted.internet import defer, protocol, reactor, ssl from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS from twisted.web._newclient import ResponseDone from twisted.web.client import ( Agent, BrowserLikeRedirectAgent, ContentDecoderAgent, - FileBodyProducer as TwistedFileBodyProducer, GzipDecoder, HTTPConnectionPool, PartialDownloadError, @@ -83,18 +84,20 @@ class SimpleHttpClient(object): if hs.config.user_agent_suffix: self.user_agent = "%s %s" % (self.user_agent, hs.config.user_agent_suffix,) + self.user_agent = self.user_agent.encode('ascii') + @defer.inlineCallbacks - def request(self, method, uri, *args, **kwargs): + def request(self, method, uri, data=b'', headers=None): # A small wrapper around self.agent.request() so we can easily attach # counters to it outgoing_requests_counter.labels(method).inc() # log request but strip `access_token` (AS requests for example include this) - logger.info("Sending request %s %s", method, redact_uri(uri)) + logger.info("Sending request %s %s", method, redact_uri(uri.encode('ascii'))) try: - request_deferred = self.agent.request( - method, uri, *args, **kwargs + request_deferred = treq.request( + method, uri, agent=self.agent, data=data, headers=headers ) add_timeout_to_deferred( request_deferred, 60, self.hs.get_reactor(), @@ -105,14 +108,14 @@ class SimpleHttpClient(object): incoming_responses_counter.labels(method, response.code).inc() logger.info( "Received response to %s %s: %s", - method, redact_uri(uri), response.code + method, redact_uri(uri.encode('ascii')), response.code ) defer.returnValue(response) except Exception as e: incoming_responses_counter.labels(method, "ERR").inc() logger.info( "Error sending request to %s %s: %s %s", - method, redact_uri(uri), type(e).__name__, e.message + method, redact_uri(uri.encode('ascii')), type(e).__name__, e.args[0] ) raise @@ -137,7 +140,8 @@ class SimpleHttpClient(object): # TODO: Do we ever want to log message contents? logger.debug("post_urlencoded_get_json args: %s", args) - query_bytes = urllib.urlencode(encode_urlencode_args(args), True) + query_bytes = urllib.parse.urlencode( + encode_urlencode_args(args), True).encode("utf8") actual_headers = { b"Content-Type": [b"application/x-www-form-urlencoded"], @@ -148,15 +152,14 @@ class SimpleHttpClient(object): response = yield self.request( "POST", - uri.encode("ascii"), + uri, headers=Headers(actual_headers), - bodyProducer=FileBodyProducer(StringIO(query_bytes)) + data=query_bytes ) - body = yield make_deferred_yieldable(readBody(response)) - if 200 <= response.code < 300: - defer.returnValue(json.loads(body)) + body = yield make_deferred_yieldable(treq.json_content(response)) + defer.returnValue(body) else: raise HttpResponseException(response.code, response.phrase, body) @@ -191,9 +194,9 @@ class SimpleHttpClient(object): response = yield self.request( "POST", - uri.encode("ascii"), + uri, headers=Headers(actual_headers), - bodyProducer=FileBodyProducer(StringIO(json_str)) + data=json_str ) body = yield make_deferred_yieldable(readBody(response)) @@ -248,7 +251,7 @@ class SimpleHttpClient(object): ValueError: if the response was not JSON """ if len(args): - query_bytes = urllib.urlencode(args, True) + query_bytes = urllib.parse.urlencode(args, True) uri = "%s?%s" % (uri, query_bytes) json_str = encode_canonical_json(json_body) @@ -262,9 +265,9 @@ class SimpleHttpClient(object): response = yield self.request( "PUT", - uri.encode("ascii"), + uri, headers=Headers(actual_headers), - bodyProducer=FileBodyProducer(StringIO(json_str)) + data=json_str ) body = yield make_deferred_yieldable(readBody(response)) @@ -293,7 +296,7 @@ class SimpleHttpClient(object): HttpResponseException on a non-2xx HTTP response. """ if len(args): - query_bytes = urllib.urlencode(args, True) + query_bytes = urllib.parse.urlencode(args, True) uri = "%s?%s" % (uri, query_bytes) actual_headers = { @@ -304,7 +307,7 @@ class SimpleHttpClient(object): response = yield self.request( "GET", - uri.encode("ascii"), + uri, headers=Headers(actual_headers), ) @@ -339,7 +342,7 @@ class SimpleHttpClient(object): response = yield self.request( "GET", - url.encode("ascii"), + url, headers=Headers(actual_headers), ) @@ -434,12 +437,12 @@ class CaptchaServerHttpClient(SimpleHttpClient): @defer.inlineCallbacks def post_urlencoded_get_raw(self, url, args={}): - query_bytes = urllib.urlencode(encode_urlencode_args(args), True) + query_bytes = urllib.parse.urlencode(encode_urlencode_args(args), True) response = yield self.request( "POST", - url.encode("ascii"), - bodyProducer=FileBodyProducer(StringIO(query_bytes)), + url, + data=query_bytes, headers=Headers({ b"Content-Type": [b"application/x-www-form-urlencoded"], b"User-Agent": [self.user_agent], @@ -510,7 +513,7 @@ def encode_urlencode_args(args): def encode_urlencode_arg(arg): - if isinstance(arg, unicode): + if isinstance(arg, text_type): return arg.encode('utf-8') elif isinstance(arg, list): return [encode_urlencode_arg(i) for i in arg] @@ -542,26 +545,3 @@ class InsecureInterceptableContextFactory(ssl.ContextFactory): def creatorForNetloc(self, hostname, port): return self - - -class FileBodyProducer(TwistedFileBodyProducer): - """Workaround for https://twistedmatrix.com/trac/ticket/8473 - - We override the pauseProducing and resumeProducing methods in twisted's - FileBodyProducer so that they do not raise exceptions if the task has - already completed. - """ - - def pauseProducing(self): - try: - super(FileBodyProducer, self).pauseProducing() - except task.TaskDone: - # task has already completed - pass - - def resumeProducing(self): - try: - super(FileBodyProducer, self).resumeProducing() - except task.NotPaused: - # task was not paused (probably because it had already completed) - pass diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index b34bb8e31a..6a1fc8ca55 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -17,19 +17,19 @@ import cgi import logging import random import sys -import urllib -from six import string_types -from six.moves.urllib import parse as urlparse +from six import PY3, string_types +from six.moves import urllib -from canonicaljson import encode_canonical_json, json +import treq +from canonicaljson import encode_canonical_json from prometheus_client import Counter from signedjson.sign import sign_json from twisted.internet import defer, protocol, reactor from twisted.internet.error import DNSLookupError from twisted.web._newclient import ResponseDone -from twisted.web.client import Agent, HTTPConnectionPool, readBody +from twisted.web.client import Agent, HTTPConnectionPool from twisted.web.http_headers import Headers import synapse.metrics @@ -58,13 +58,18 @@ incoming_responses_counter = Counter("synapse_http_matrixfederationclient_respon MAX_LONG_RETRIES = 10 MAX_SHORT_RETRIES = 3 +if PY3: + MAXINT = sys.maxsize +else: + MAXINT = sys.maxint + class MatrixFederationEndpointFactory(object): def __init__(self, hs): self.tls_client_options_factory = hs.tls_client_options_factory def endpointForURI(self, uri): - destination = uri.netloc + destination = uri.netloc.decode('ascii') return matrix_federation_endpoint( reactor, destination, timeout=10, @@ -93,26 +98,32 @@ class MatrixFederationHttpClient(object): ) self.clock = hs.get_clock() self._store = hs.get_datastore() - self.version_string = hs.version_string + self.version_string = hs.version_string.encode('ascii') self._next_id = 1 def _create_url(self, destination, path_bytes, param_bytes, query_bytes): - return urlparse.urlunparse( - ("matrix", destination, path_bytes, param_bytes, query_bytes, "") + return urllib.parse.urlunparse( + (b"matrix", destination, path_bytes, param_bytes, query_bytes, b"") ) @defer.inlineCallbacks def _request(self, destination, method, path, - body_callback, headers_dict={}, param_bytes=b"", - query_bytes=b"", retry_on_dns_fail=True, + json=None, json_callback=None, + param_bytes=b"", + query=None, retry_on_dns_fail=True, timeout=None, long_retries=False, ignore_backoff=False, backoff_on_404=False): - """ Creates and sends a request to the given server + """ + Creates and sends a request to the given server. + Args: destination (str): The remote server to send the HTTP request to. method (str): HTTP method path (str): The HTTP path + json (dict or None): JSON to send in the body. + json_callback (func or None): A callback to generate the JSON. + query (dict or None): Query arguments. ignore_backoff (bool): true to ignore the historical backoff data and try the request anyway. backoff_on_404 (bool): Back off if we get a 404 @@ -146,22 +157,29 @@ class MatrixFederationHttpClient(object): ignore_backoff=ignore_backoff, ) - destination = destination.encode("ascii") + headers_dict = {} path_bytes = path.encode("ascii") - with limiter: - headers_dict[b"User-Agent"] = [self.version_string] - headers_dict[b"Host"] = [destination] + if query: + query_bytes = encode_query_args(query) + else: + query_bytes = b"" - url_bytes = self._create_url( - destination, path_bytes, param_bytes, query_bytes - ) + headers_dict = { + "User-Agent": [self.version_string], + "Host": [destination], + } + + with limiter: + url = self._create_url( + destination.encode("ascii"), path_bytes, param_bytes, query_bytes + ).decode('ascii') txn_id = "%s-O-%s" % (method, self._next_id) - self._next_id = (self._next_id + 1) % (sys.maxint - 1) + self._next_id = (self._next_id + 1) % (MAXINT - 1) outbound_logger.info( "{%s} [%s] Sending request: %s %s", - txn_id, destination, method, url_bytes + txn_id, destination, method, url ) # XXX: Would be much nicer to retry only at the transaction-layer @@ -171,23 +189,33 @@ class MatrixFederationHttpClient(object): else: retries_left = MAX_SHORT_RETRIES - http_url_bytes = urlparse.urlunparse( - ("", "", path_bytes, param_bytes, query_bytes, "") - ) + http_url = urllib.parse.urlunparse( + (b"", b"", path_bytes, param_bytes, query_bytes, b"") + ).decode('ascii') log_result = None try: while True: - producer = None - if body_callback: - producer = body_callback(method, http_url_bytes, headers_dict) - try: - request_deferred = self.agent.request( + if json_callback: + json = json_callback() + + if json: + data = encode_canonical_json(json) + headers_dict["Content-Type"] = ["application/json"] + self.sign_request( + destination, method, http_url, headers_dict, json + ) + else: + data = None + self.sign_request(destination, method, http_url, headers_dict) + + request_deferred = treq.request( method, - url_bytes, - Headers(headers_dict), - producer + url, + headers=Headers(headers_dict), + data=data, + agent=self.agent, ) add_timeout_to_deferred( request_deferred, @@ -218,7 +246,7 @@ class MatrixFederationHttpClient(object): txn_id, destination, method, - url_bytes, + url, _flatten_response_never_received(e), ) @@ -252,7 +280,7 @@ class MatrixFederationHttpClient(object): # :'( # Update transactions table? with logcontext.PreserveLoggingContext(): - body = yield readBody(response) + body = yield treq.content(response) raise HttpResponseException( response.code, response.phrase, body ) @@ -297,11 +325,11 @@ class MatrixFederationHttpClient(object): auth_headers = [] for key, sig in request["signatures"][self.server_name].items(): - auth_headers.append(bytes( + auth_headers.append(( "X-Matrix origin=%s,key=\"%s\",sig=\"%s\"" % ( self.server_name, key, sig, - ) - )) + )).encode('ascii') + ) headers_dict[b"Authorization"] = auth_headers @@ -347,24 +375,14 @@ class MatrixFederationHttpClient(object): """ if not json_data_callback: - def json_data_callback(): - return data - - def body_callback(method, url_bytes, headers_dict): - json_data = json_data_callback() - self.sign_request( - destination, method, url_bytes, headers_dict, json_data - ) - producer = _JsonProducer(json_data) - return producer + json_data_callback = lambda: data response = yield self._request( destination, "PUT", path, - body_callback=body_callback, - headers_dict={"Content-Type": ["application/json"]}, - query_bytes=encode_query_args(args), + json_callback=json_data_callback, + query=args, long_retries=long_retries, timeout=timeout, ignore_backoff=ignore_backoff, @@ -376,8 +394,8 @@ class MatrixFederationHttpClient(object): check_content_type_is_json(response.headers) with logcontext.PreserveLoggingContext(): - body = yield readBody(response) - defer.returnValue(json.loads(body)) + body = yield treq.json_content(response) + defer.returnValue(body) @defer.inlineCallbacks def post_json(self, destination, path, data={}, long_retries=False, @@ -410,20 +428,12 @@ class MatrixFederationHttpClient(object): Fails with ``FederationDeniedError`` if this destination is not on our federation whitelist """ - - def body_callback(method, url_bytes, headers_dict): - self.sign_request( - destination, method, url_bytes, headers_dict, data - ) - return _JsonProducer(data) - response = yield self._request( destination, "POST", path, - query_bytes=encode_query_args(args), - body_callback=body_callback, - headers_dict={"Content-Type": ["application/json"]}, + query=args, + json=data, long_retries=long_retries, timeout=timeout, ignore_backoff=ignore_backoff, @@ -434,9 +444,9 @@ class MatrixFederationHttpClient(object): check_content_type_is_json(response.headers) with logcontext.PreserveLoggingContext(): - body = yield readBody(response) + body = yield treq.json_content(response) - defer.returnValue(json.loads(body)) + defer.returnValue(body) @defer.inlineCallbacks def get_json(self, destination, path, args=None, retry_on_dns_fail=True, @@ -471,16 +481,11 @@ class MatrixFederationHttpClient(object): logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail) - def body_callback(method, url_bytes, headers_dict): - self.sign_request(destination, method, url_bytes, headers_dict) - return None - response = yield self._request( destination, "GET", path, - query_bytes=encode_query_args(args), - body_callback=body_callback, + query=args, retry_on_dns_fail=retry_on_dns_fail, timeout=timeout, ignore_backoff=ignore_backoff, @@ -491,9 +496,9 @@ class MatrixFederationHttpClient(object): check_content_type_is_json(response.headers) with logcontext.PreserveLoggingContext(): - body = yield readBody(response) + body = yield treq.json_content(response) - defer.returnValue(json.loads(body)) + defer.returnValue(body) @defer.inlineCallbacks def delete_json(self, destination, path, long_retries=False, @@ -523,13 +528,11 @@ class MatrixFederationHttpClient(object): Fails with ``FederationDeniedError`` if this destination is not on our federation whitelist """ - response = yield self._request( destination, "DELETE", path, - query_bytes=encode_query_args(args), - headers_dict={"Content-Type": ["application/json"]}, + query=args, long_retries=long_retries, timeout=timeout, ignore_backoff=ignore_backoff, @@ -540,9 +543,9 @@ class MatrixFederationHttpClient(object): check_content_type_is_json(response.headers) with logcontext.PreserveLoggingContext(): - body = yield readBody(response) + body = yield treq.json_content(response) - defer.returnValue(json.loads(body)) + defer.returnValue(body) @defer.inlineCallbacks def get_file(self, destination, path, output_stream, args={}, @@ -569,26 +572,11 @@ class MatrixFederationHttpClient(object): Fails with ``FederationDeniedError`` if this destination is not on our federation whitelist """ - - encoded_args = {} - for k, vs in args.items(): - if isinstance(vs, string_types): - vs = [vs] - encoded_args[k] = [v.encode("UTF-8") for v in vs] - - query_bytes = urllib.urlencode(encoded_args, True) - logger.debug("Query bytes: %s Retry DNS: %s", query_bytes, retry_on_dns_fail) - - def body_callback(method, url_bytes, headers_dict): - self.sign_request(destination, method, url_bytes, headers_dict) - return None - response = yield self._request( destination, "GET", path, - query_bytes=query_bytes, - body_callback=body_callback, + query=args, retry_on_dns_fail=retry_on_dns_fail, ignore_backoff=ignore_backoff, ) @@ -639,30 +627,6 @@ def _readBodyToFile(response, stream, max_size): return d -class _JsonProducer(object): - """ Used by the twisted http client to create the HTTP body from json - """ - def __init__(self, jsn): - self.reset(jsn) - - def reset(self, jsn): - self.body = encode_canonical_json(jsn) - self.length = len(self.body) - - def startProducing(self, consumer): - consumer.write(self.body) - return defer.succeed(None) - - def pauseProducing(self): - pass - - def stopProducing(self): - pass - - def resumeProducing(self): - pass - - def _flatten_response_never_received(e): if hasattr(e, "reasons"): reasons = ", ".join( @@ -693,7 +657,7 @@ def check_content_type_is_json(headers): "No Content-Type header" ) - c_type = c_type[0] # only the first header + c_type = c_type[0].decode('ascii') # only the first header val, options = cgi.parse_header(c_type) if val != "application/json": raise RuntimeError( @@ -711,6 +675,6 @@ def encode_query_args(args): vs = [vs] encoded_args[k] = [v.encode("UTF-8") for v in vs] - query_bytes = urllib.urlencode(encoded_args, True) + query_bytes = urllib.parse.urlencode(encoded_args, True) - return query_bytes + return query_bytes.encode('utf8') diff --git a/synapse/http/site.py b/synapse/http/site.py index 88ed3714f9..f0828c6542 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -204,14 +204,14 @@ class SynapseRequest(Request): self.start_time = time.time() self.request_metrics = RequestMetrics() self.request_metrics.start( - self.start_time, name=servlet_name, method=self.method, + self.start_time, name=servlet_name, method=self.method.decode('ascii'), ) self.site.access_logger.info( "%s - %s - Received request: %s %s", self.getClientIP(), self.site.site_tag, - self.method, + self.method.decode('ascii'), self.get_redacted_uri() ) -- cgit 1.4.1 From 4084a774a89f0d02406eebda8279c2b8aab89812 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 12 Sep 2018 09:59:15 +0100 Subject: Timeout reading body for outbound HTTP requests --- synapse/http/matrixfederationclient.py | 52 +++++++++++++++++++++++++++++----- 1 file changed, 45 insertions(+), 7 deletions(-) (limited to 'synapse/http') diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 6a1fc8ca55..f9a1fbf95d 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -280,7 +280,10 @@ class MatrixFederationHttpClient(object): # :'( # Update transactions table? with logcontext.PreserveLoggingContext(): - body = yield treq.content(response) + body = yield self._timeout_deferred( + treq.content(response), + timeout, + ) raise HttpResponseException( response.code, response.phrase, body ) @@ -394,7 +397,10 @@ class MatrixFederationHttpClient(object): check_content_type_is_json(response.headers) with logcontext.PreserveLoggingContext(): - body = yield treq.json_content(response) + body = yield self._timeout_deferred( + treq.json_content(response), + timeout, + ) defer.returnValue(body) @defer.inlineCallbacks @@ -444,7 +450,10 @@ class MatrixFederationHttpClient(object): check_content_type_is_json(response.headers) with logcontext.PreserveLoggingContext(): - body = yield treq.json_content(response) + body = yield self._timeout_deferred( + treq.json_content(response), + timeout, + ) defer.returnValue(body) @@ -496,7 +505,10 @@ class MatrixFederationHttpClient(object): check_content_type_is_json(response.headers) with logcontext.PreserveLoggingContext(): - body = yield treq.json_content(response) + body = yield self._timeout_deferred( + treq.json_content(response), + timeout, + ) defer.returnValue(body) @@ -543,7 +555,10 @@ class MatrixFederationHttpClient(object): check_content_type_is_json(response.headers) with logcontext.PreserveLoggingContext(): - body = yield treq.json_content(response) + body = yield self._timeout_deferred( + treq.json_content(response), + timeout, + ) defer.returnValue(body) @@ -585,8 +600,10 @@ class MatrixFederationHttpClient(object): try: with logcontext.PreserveLoggingContext(): - length = yield _readBodyToFile( - response, output_stream, max_size + length = yield self._timeout_deferred( + _readBodyToFile( + response, output_stream, max_size + ), ) except Exception: logger.exception("Failed to download body") @@ -594,6 +611,27 @@ class MatrixFederationHttpClient(object): defer.returnValue((length, headers)) + def _timeout_deferred(self, deferred, timeout_ms=None): + """Times the deferred out after `timeout_ms` ms + + Args: + deferred (Deferred) + timeout_ms (int|None): Timeout in milliseconds. If None defaults + to 60 seconds. + + Returns: + Deferred + """ + + add_timeout_to_deferred( + deferred, + timeout_ms / 1000. if timeout_ms else 60, + self.hs.get_reactor(), + cancelled_to_request_timed_out_error, + ) + + return deferred + class _ReadBodyToFileProtocol(protocol.Protocol): def __init__(self, stream, deferred, max_size): -- cgit 1.4.1 From 8c5b84441bdcc0f321ca9a58e8ec20cb35cdb456 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 12 Sep 2018 16:22:14 +0100 Subject: Log outbound requests when we retry --- synapse/http/matrixfederationclient.py | 149 ++++++++++++++++----------------- 1 file changed, 74 insertions(+), 75 deletions(-) (limited to 'synapse/http') diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index f9a1fbf95d..cf920bc041 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -177,11 +177,6 @@ class MatrixFederationHttpClient(object): txn_id = "%s-O-%s" % (method, self._next_id) self._next_id = (self._next_id + 1) % (MAXINT - 1) - outbound_logger.info( - "{%s} [%s] Sending request: %s %s", - txn_id, destination, method, url - ) - # XXX: Would be much nicer to retry only at the transaction-layer # (once we have reliable transactions in place) if long_retries: @@ -194,85 +189,89 @@ class MatrixFederationHttpClient(object): ).decode('ascii') log_result = None - try: - while True: - try: - if json_callback: - json = json_callback() - - if json: - data = encode_canonical_json(json) - headers_dict["Content-Type"] = ["application/json"] - self.sign_request( - destination, method, http_url, headers_dict, json - ) - else: - data = None - self.sign_request(destination, method, http_url, headers_dict) - - request_deferred = treq.request( - method, - url, - headers=Headers(headers_dict), - data=data, - agent=self.agent, - ) - add_timeout_to_deferred( - request_deferred, - timeout / 1000. if timeout else 60, - self.hs.get_reactor(), - cancelled_to_request_timed_out_error, - ) - response = yield make_deferred_yieldable( - request_deferred, + while True: + try: + if json_callback: + json = json_callback() + + if json: + data = encode_canonical_json(json) + headers_dict["Content-Type"] = ["application/json"] + self.sign_request( + destination, method, http_url, headers_dict, json ) + else: + data = None + self.sign_request(destination, method, http_url, headers_dict) - log_result = "%d %s" % (response.code, response.phrase,) - break - except Exception as e: - if not retry_on_dns_fail and isinstance(e, DNSLookupError): - logger.warn( - "DNS Lookup failed to %s with %s", - destination, - e - ) - log_result = "DNS Lookup failed to %s with %s" % ( - destination, e - ) - raise + outbound_logger.info( + "{%s} [%s] Sending request: %s %s", + txn_id, destination, method, url + ) + request_deferred = treq.request( + method, + url, + headers=Headers(headers_dict), + data=data, + agent=self.agent, + ) + add_timeout_to_deferred( + request_deferred, + timeout / 1000. if timeout else 60, + self.hs.get_reactor(), + cancelled_to_request_timed_out_error, + ) + response = yield make_deferred_yieldable( + request_deferred, + ) + + log_result = "%d %s" % (response.code, response.phrase,) + break + except Exception as e: + if not retry_on_dns_fail and isinstance(e, DNSLookupError): logger.warn( - "{%s} Sending request failed to %s: %s %s: %s", - txn_id, + "DNS Lookup failed to %s with %s", destination, - method, - url, - _flatten_response_never_received(e), + e ) + log_result = "DNS Lookup failed to %s with %s" % ( + destination, e + ) + raise + + logger.warn( + "{%s} Sending request failed to %s: %s %s: %s", + txn_id, + destination, + method, + url, + _flatten_response_never_received(e), + ) - log_result = _flatten_response_never_received(e) - - if retries_left and not timeout: - if long_retries: - delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left) - delay = min(delay, 60) - delay *= random.uniform(0.8, 1.4) - else: - delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left) - delay = min(delay, 2) - delay *= random.uniform(0.8, 1.4) + log_result = _flatten_response_never_received(e) - yield self.clock.sleep(delay) - retries_left -= 1 + if retries_left and not timeout: + if long_retries: + delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left) + delay = min(delay, 60) + delay *= random.uniform(0.8, 1.4) else: - raise - finally: - outbound_logger.info( - "{%s} [%s] Result: %s", - txn_id, - destination, - log_result, - ) + delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left) + delay = min(delay, 2) + delay *= random.uniform(0.8, 1.4) + + yield self.clock.sleep(delay) + retries_left -= 1 + else: + raise + finally: + outbound_logger.info( + "{%s} [%s] Result: %s", + txn_id, + destination, + log_result, + ) if 200 <= response.code < 300: pass -- cgit 1.4.1 From bfa0b759e02c74931606ea77f34bec99dfb10589 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Fri, 14 Sep 2018 00:15:51 +1000 Subject: Attempt to figure out what's going on with timeouts (#3857) --- changelog.d/3857.misc | 1 + synapse/http/matrixfederationclient.py | 98 +++++++++----------- tests/http/test_fedclient.py | 157 +++++++++++++++++++++++++++++++++ tests/server.py | 42 ++++++++- 4 files changed, 241 insertions(+), 57 deletions(-) create mode 100644 changelog.d/3857.misc create mode 100644 tests/http/test_fedclient.py (limited to 'synapse/http') diff --git a/changelog.d/3857.misc b/changelog.d/3857.misc new file mode 100644 index 0000000000..e128d193d9 --- /dev/null +++ b/changelog.d/3857.misc @@ -0,0 +1 @@ +Refactor some HTTP timeout code. \ No newline at end of file diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index cf920bc041..c49dbacd93 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -26,7 +26,7 @@ from canonicaljson import encode_canonical_json from prometheus_client import Counter from signedjson.sign import sign_json -from twisted.internet import defer, protocol, reactor +from twisted.internet import defer, protocol from twisted.internet.error import DNSLookupError from twisted.web._newclient import ResponseDone from twisted.web.client import Agent, HTTPConnectionPool @@ -40,10 +40,8 @@ from synapse.api.errors import ( HttpResponseException, SynapseError, ) -from synapse.http import cancelled_to_request_timed_out_error from synapse.http.endpoint import matrix_federation_endpoint from synapse.util import logcontext -from synapse.util.async_helpers import add_timeout_to_deferred from synapse.util.logcontext import make_deferred_yieldable logger = logging.getLogger(__name__) @@ -66,13 +64,14 @@ else: class MatrixFederationEndpointFactory(object): def __init__(self, hs): + self.reactor = hs.get_reactor() self.tls_client_options_factory = hs.tls_client_options_factory def endpointForURI(self, uri): destination = uri.netloc.decode('ascii') return matrix_federation_endpoint( - reactor, destination, timeout=10, + self.reactor, destination, timeout=10, tls_client_options_factory=self.tls_client_options_factory ) @@ -90,6 +89,7 @@ class MatrixFederationHttpClient(object): self.hs = hs self.signing_key = hs.config.signing_key[0] self.server_name = hs.hostname + reactor = hs.get_reactor() pool = HTTPConnectionPool(reactor) pool.maxPersistentPerHost = 5 pool.cachedConnectionTimeout = 2 * 60 @@ -100,6 +100,7 @@ class MatrixFederationHttpClient(object): self._store = hs.get_datastore() self.version_string = hs.version_string.encode('ascii') self._next_id = 1 + self.default_timeout = 60 def _create_url(self, destination, path_bytes, param_bytes, query_bytes): return urllib.parse.urlunparse( @@ -143,6 +144,11 @@ class MatrixFederationHttpClient(object): (May also fail with plenty of other Exceptions for things like DNS failures, connection failures, SSL failures.) """ + if timeout: + _sec_timeout = timeout / 1000 + else: + _sec_timeout = self.default_timeout + if ( self.hs.config.federation_domain_whitelist is not None and destination not in self.hs.config.federation_domain_whitelist @@ -215,13 +221,9 @@ class MatrixFederationHttpClient(object): headers=Headers(headers_dict), data=data, agent=self.agent, + reactor=self.hs.get_reactor() ) - add_timeout_to_deferred( - request_deferred, - timeout / 1000. if timeout else 60, - self.hs.get_reactor(), - cancelled_to_request_timed_out_error, - ) + request_deferred.addTimeout(_sec_timeout, self.hs.get_reactor()) response = yield make_deferred_yieldable( request_deferred, ) @@ -261,6 +263,13 @@ class MatrixFederationHttpClient(object): delay = min(delay, 2) delay *= random.uniform(0.8, 1.4) + logger.debug( + "{%s} Waiting %s before sending to %s...", + txn_id, + delay, + destination + ) + yield self.clock.sleep(delay) retries_left -= 1 else: @@ -279,10 +288,9 @@ class MatrixFederationHttpClient(object): # :'( # Update transactions table? with logcontext.PreserveLoggingContext(): - body = yield self._timeout_deferred( - treq.content(response), - timeout, - ) + d = treq.content(response) + d.addTimeout(_sec_timeout, self.hs.get_reactor()) + body = yield make_deferred_yieldable(d) raise HttpResponseException( response.code, response.phrase, body ) @@ -396,10 +404,9 @@ class MatrixFederationHttpClient(object): check_content_type_is_json(response.headers) with logcontext.PreserveLoggingContext(): - body = yield self._timeout_deferred( - treq.json_content(response), - timeout, - ) + d = treq.json_content(response) + d.addTimeout(self.default_timeout, self.hs.get_reactor()) + body = yield make_deferred_yieldable(d) defer.returnValue(body) @defer.inlineCallbacks @@ -449,10 +456,14 @@ class MatrixFederationHttpClient(object): check_content_type_is_json(response.headers) with logcontext.PreserveLoggingContext(): - body = yield self._timeout_deferred( - treq.json_content(response), - timeout, - ) + d = treq.json_content(response) + if timeout: + _sec_timeout = timeout / 1000 + else: + _sec_timeout = self.default_timeout + + d.addTimeout(_sec_timeout, self.hs.get_reactor()) + body = yield make_deferred_yieldable(d) defer.returnValue(body) @@ -504,10 +515,9 @@ class MatrixFederationHttpClient(object): check_content_type_is_json(response.headers) with logcontext.PreserveLoggingContext(): - body = yield self._timeout_deferred( - treq.json_content(response), - timeout, - ) + d = treq.json_content(response) + d.addTimeout(self.default_timeout, self.hs.get_reactor()) + body = yield make_deferred_yieldable(d) defer.returnValue(body) @@ -554,10 +564,9 @@ class MatrixFederationHttpClient(object): check_content_type_is_json(response.headers) with logcontext.PreserveLoggingContext(): - body = yield self._timeout_deferred( - treq.json_content(response), - timeout, - ) + d = treq.json_content(response) + d.addTimeout(self.default_timeout, self.hs.get_reactor()) + body = yield make_deferred_yieldable(d) defer.returnValue(body) @@ -599,38 +608,15 @@ class MatrixFederationHttpClient(object): try: with logcontext.PreserveLoggingContext(): - length = yield self._timeout_deferred( - _readBodyToFile( - response, output_stream, max_size - ), - ) + d = _readBodyToFile(response, output_stream, max_size) + d.addTimeout(self.default_timeout, self.hs.get_reactor()) + length = yield make_deferred_yieldable(d) except Exception: logger.exception("Failed to download body") raise defer.returnValue((length, headers)) - def _timeout_deferred(self, deferred, timeout_ms=None): - """Times the deferred out after `timeout_ms` ms - - Args: - deferred (Deferred) - timeout_ms (int|None): Timeout in milliseconds. If None defaults - to 60 seconds. - - Returns: - Deferred - """ - - add_timeout_to_deferred( - deferred, - timeout_ms / 1000. if timeout_ms else 60, - self.hs.get_reactor(), - cancelled_to_request_timed_out_error, - ) - - return deferred - class _ReadBodyToFileProtocol(protocol.Protocol): def __init__(self, stream, deferred, max_size): diff --git a/tests/http/test_fedclient.py b/tests/http/test_fedclient.py new file mode 100644 index 0000000000..1c46c9cfeb --- /dev/null +++ b/tests/http/test_fedclient.py @@ -0,0 +1,157 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from mock import Mock + +from twisted.internet.defer import TimeoutError +from twisted.internet.error import ConnectingCancelledError, DNSLookupError +from twisted.web.client import ResponseNeverReceived + +from synapse.http.matrixfederationclient import MatrixFederationHttpClient + +from tests.unittest import HomeserverTestCase + + +class FederationClientTests(HomeserverTestCase): + def make_homeserver(self, reactor, clock): + + hs = self.setup_test_homeserver(reactor=reactor, clock=clock) + hs.tls_client_options_factory = None + return hs + + def prepare(self, reactor, clock, homeserver): + + self.cl = MatrixFederationHttpClient(self.hs) + self.reactor.lookups["testserv"] = "1.2.3.4" + + def test_dns_error(self): + """ + If the DNS raising returns an error, it will bubble up. + """ + d = self.cl._request("testserv2:8008", "GET", "foo/bar", timeout=10000) + self.pump() + + f = self.failureResultOf(d) + self.assertIsInstance(f.value, DNSLookupError) + + def test_client_never_connect(self): + """ + If the HTTP request is not connected and is timed out, it'll give a + ConnectingCancelledError. + """ + d = self.cl._request("testserv:8008", "GET", "foo/bar", timeout=10000) + + self.pump() + + # Nothing happened yet + self.assertFalse(d.called) + + # Make sure treq is trying to connect + clients = self.reactor.tcpClients + self.assertEqual(len(clients), 1) + self.assertEqual(clients[0][0], '1.2.3.4') + self.assertEqual(clients[0][1], 8008) + + # Deferred is still without a result + self.assertFalse(d.called) + + # Push by enough to time it out + self.reactor.advance(10.5) + f = self.failureResultOf(d) + + self.assertIsInstance(f.value, ConnectingCancelledError) + + def test_client_connect_no_response(self): + """ + If the HTTP request is connected, but gets no response before being + timed out, it'll give a ResponseNeverReceived. + """ + d = self.cl._request("testserv:8008", "GET", "foo/bar", timeout=10000) + + self.pump() + + # Nothing happened yet + self.assertFalse(d.called) + + # Make sure treq is trying to connect + clients = self.reactor.tcpClients + self.assertEqual(len(clients), 1) + self.assertEqual(clients[0][0], '1.2.3.4') + self.assertEqual(clients[0][1], 8008) + + conn = Mock() + client = clients[0][2].buildProtocol(None) + client.makeConnection(conn) + + # Deferred is still without a result + self.assertFalse(d.called) + + # Push by enough to time it out + self.reactor.advance(10.5) + f = self.failureResultOf(d) + + self.assertIsInstance(f.value, ResponseNeverReceived) + + def test_client_gets_headers(self): + """ + Once the client gets the headers, _request returns successfully. + """ + d = self.cl._request("testserv:8008", "GET", "foo/bar", timeout=10000) + + self.pump() + + conn = Mock() + clients = self.reactor.tcpClients + client = clients[0][2].buildProtocol(None) + client.makeConnection(conn) + + # Deferred does not have a result + self.assertFalse(d.called) + + # Send it the HTTP response + client.dataReceived(b"HTTP/1.1 200 OK\r\nServer: Fake\r\n\r\n") + + # We should get a successful response + r = self.successResultOf(d) + self.assertEqual(r.code, 200) + + def test_client_headers_no_body(self): + """ + If the HTTP request is connected, but gets no response before being + timed out, it'll give a ResponseNeverReceived. + """ + d = self.cl.post_json("testserv:8008", "foo/bar", timeout=10000) + + self.pump() + + conn = Mock() + clients = self.reactor.tcpClients + client = clients[0][2].buildProtocol(None) + client.makeConnection(conn) + + # Deferred does not have a result + self.assertFalse(d.called) + + # Send it the HTTP response + client.dataReceived( + (b"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\n" + b"Server: Fake\r\n\r\n") + ) + + # Push by enough to time it out + self.reactor.advance(10.5) + f = self.failureResultOf(d) + + self.assertIsInstance(f.value, TimeoutError) diff --git a/tests/server.py b/tests/server.py index a2c3ca61f6..420ec4e088 100644 --- a/tests/server.py +++ b/tests/server.py @@ -4,9 +4,14 @@ from io import BytesIO from six import text_type import attr +from zope.interface import implementer -from twisted.internet import address, threads +from twisted.internet import address, threads, udp +from twisted.internet._resolver import HostResolution +from twisted.internet.address import IPv4Address from twisted.internet.defer import Deferred +from twisted.internet.error import DNSLookupError +from twisted.internet.interfaces import IReactorPluggableNameResolver from twisted.python.failure import Failure from twisted.test.proto_helpers import MemoryReactorClock @@ -154,11 +159,46 @@ def render(request, resource, clock): wait_until_result(clock, request) +@implementer(IReactorPluggableNameResolver) class ThreadedMemoryReactorClock(MemoryReactorClock): """ A MemoryReactorClock that supports callFromThread. """ + def __init__(self): + self._udp = [] + self.lookups = {} + + class Resolver(object): + def resolveHostName( + _self, + resolutionReceiver, + hostName, + portNumber=0, + addressTypes=None, + transportSemantics='TCP', + ): + + resolution = HostResolution(hostName) + resolutionReceiver.resolutionBegan(resolution) + if hostName not in self.lookups: + raise DNSLookupError("OH NO") + + resolutionReceiver.addressResolved( + IPv4Address('TCP', self.lookups[hostName], portNumber) + ) + resolutionReceiver.resolutionComplete() + return resolution + + self.nameResolver = Resolver() + super(ThreadedMemoryReactorClock, self).__init__() + + def listenUDP(self, port, protocol, interface='', maxPacketSize=8196): + p = udp.Port(port, protocol, interface, maxPacketSize, self) + p.startListening() + self._udp.append(p) + return p + def callFromThread(self, callback, *args, **kwargs): """ Make the callback fire in the next reactor iteration. -- cgit 1.4.1 From 1c3f4d9ca53b3e3a5ef1849a8da68c6d8cf82938 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Fri, 14 Sep 2018 03:09:13 +1000 Subject: buffer? --- synapse/http/matrixfederationclient.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'synapse/http') diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index c49dbacd93..cdbe27eb37 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -221,7 +221,8 @@ class MatrixFederationHttpClient(object): headers=Headers(headers_dict), data=data, agent=self.agent, - reactor=self.hs.get_reactor() + reactor=self.hs.get_reactor(), + unbuffered=True ) request_deferred.addTimeout(_sec_timeout, self.hs.get_reactor()) response = yield make_deferred_yieldable( -- cgit 1.4.1 From 7c27c4d51cfd2a3d6504889b1ccdddabf38ae05c Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Fri, 14 Sep 2018 03:11:11 +1000 Subject: merge (#3576) --- .circleci/config.yml | 17 +++++++---- .travis.yml | 3 ++ changelog.d/3576.feature | 1 + synapse/http/client.py | 14 ++++++--- synapse/push/httppusher.py | 7 ++++- synapse/push/mailer.py | 7 +++-- synapse/replication/slave/storage/devices.py | 23 ++++++++++----- tox.ini | 44 ++-------------------------- 8 files changed, 55 insertions(+), 61 deletions(-) create mode 100644 changelog.d/3576.feature (limited to 'synapse/http') diff --git a/.circleci/config.yml b/.circleci/config.yml index 5266544f3c..605430fb3f 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -53,7 +53,7 @@ jobs: steps: - checkout - run: docker pull matrixdotorg/sytest-synapsepy3 - - run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs hawkowl/sytestpy3 + - run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs matrixdotorg/sytest-synapsepy3 - store_artifacts: path: ~/project/logs destination: logs @@ -76,7 +76,7 @@ jobs: - checkout - run: bash .circleci/merge_base_branch.sh - run: docker pull matrixdotorg/sytest-synapsepy3 - - run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs hawkowl/sytestpy3 + - run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs matrixdotorg/sytest-synapsepy3 - store_artifacts: path: ~/project/logs destination: logs @@ -101,6 +101,8 @@ workflows: jobs: - sytestpy2 - sytestpy2postgres + - sytestpy3 + - sytestpy3postgres - sytestpy2merged: filters: branches: @@ -109,6 +111,11 @@ workflows: filters: branches: ignore: /develop|master/ -# Currently broken while the Python 3 port is incomplete -# - sytestpy3 -# - sytestpy3postgres + - sytestpy3merged: + filters: + branches: + ignore: /develop|master/ + - sytestpy3postgresmerged: + filters: + branches: + ignore: /develop|master/ diff --git a/.travis.yml b/.travis.yml index ebc972ed24..b3ee66da8f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -25,6 +25,9 @@ matrix: services: - postgresql + - python: 3.5 + env: TOX_ENV=py35 + - python: 3.6 env: TOX_ENV=py36 diff --git a/changelog.d/3576.feature b/changelog.d/3576.feature new file mode 100644 index 0000000000..02a10e370d --- /dev/null +++ b/changelog.d/3576.feature @@ -0,0 +1 @@ +Python 3.5+ is now supported. diff --git a/synapse/http/client.py b/synapse/http/client.py index 4ba54fed05..d60f87b04e 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -348,7 +348,8 @@ class SimpleHttpClient(object): resp_headers = dict(response.headers.getAllRawHeaders()) - if 'Content-Length' in resp_headers and resp_headers['Content-Length'] > max_size: + if (b'Content-Length' in resp_headers and + int(resp_headers[b'Content-Length']) > max_size): logger.warn("Requested URL is too large > %r bytes" % (self.max_size,)) raise SynapseError( 502, @@ -381,7 +382,12 @@ class SimpleHttpClient(object): ) defer.returnValue( - (length, resp_headers, response.request.absoluteURI, response.code), + ( + length, + resp_headers, + response.request.absoluteURI.decode('ascii'), + response.code, + ), ) @@ -466,9 +472,9 @@ class SpiderEndpointFactory(object): def endpointForURI(self, uri): logger.info("Getting endpoint for %s", uri.toBytes()) - if uri.scheme == "http": + if uri.scheme == b"http": endpoint_factory = HostnameEndpoint - elif uri.scheme == "https": + elif uri.scheme == b"https": tlsCreator = self.policyForHTTPS.creatorForNetloc(uri.host, uri.port) def endpoint_factory(reactor, host, port, **kw): diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 81e18bcf7d..48abd5e4d6 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -15,6 +15,8 @@ # limitations under the License. import logging +import six + from prometheus_client import Counter from twisted.internet import defer @@ -26,6 +28,9 @@ from synapse.util.metrics import Measure from . import push_rule_evaluator, push_tools +if six.PY3: + long = int + logger = logging.getLogger(__name__) http_push_processed_counter = Counter("synapse_http_httppusher_http_pushes_processed", "") @@ -96,7 +101,7 @@ class HttpPusher(object): @defer.inlineCallbacks def on_new_notifications(self, min_stream_ordering, max_stream_ordering): - self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering) + self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering or 0) yield self._process() @defer.inlineCallbacks diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py index bfa6df7b68..b78ce10396 100644 --- a/synapse/push/mailer.py +++ b/synapse/push/mailer.py @@ -17,10 +17,11 @@ import email.mime.multipart import email.utils import logging import time -import urllib from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText +from six.moves import urllib + import bleach import jinja2 @@ -474,7 +475,7 @@ class Mailer(object): # XXX: make r0 once API is stable return "%s_matrix/client/unstable/pushers/remove?%s" % ( self.hs.config.public_baseurl, - urllib.urlencode(params), + urllib.parse.urlencode(params), ) @@ -561,7 +562,7 @@ def _create_mxc_to_http_filter(config): return "%s_matrix/media/v1/thumbnail/%s?%s%s" % ( config.public_baseurl, serverAndMediaId, - urllib.urlencode(params), + urllib.parse.urlencode(params), fragment or "", ) diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py index 8206a988f7..21b8c468fa 100644 --- a/synapse/replication/slave/storage/devices.py +++ b/synapse/replication/slave/storage/devices.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import six + from synapse.storage import DataStore from synapse.storage.end_to_end_keys import EndToEndKeyStore from synapse.util.caches.stream_change_cache import StreamChangeCache @@ -21,6 +23,13 @@ from ._base import BaseSlavedStore from ._slaved_id_tracker import SlavedIdTracker +def __func__(inp): + if six.PY3: + return inp + else: + return inp.__func__ + + class SlavedDeviceStore(BaseSlavedStore): def __init__(self, db_conn, hs): super(SlavedDeviceStore, self).__init__(db_conn, hs) @@ -38,14 +47,14 @@ class SlavedDeviceStore(BaseSlavedStore): "DeviceListFederationStreamChangeCache", device_list_max, ) - get_device_stream_token = DataStore.get_device_stream_token.__func__ - get_user_whose_devices_changed = DataStore.get_user_whose_devices_changed.__func__ - get_devices_by_remote = DataStore.get_devices_by_remote.__func__ - _get_devices_by_remote_txn = DataStore._get_devices_by_remote_txn.__func__ - _get_e2e_device_keys_txn = DataStore._get_e2e_device_keys_txn.__func__ - mark_as_sent_devices_by_remote = DataStore.mark_as_sent_devices_by_remote.__func__ + get_device_stream_token = __func__(DataStore.get_device_stream_token) + get_user_whose_devices_changed = __func__(DataStore.get_user_whose_devices_changed) + get_devices_by_remote = __func__(DataStore.get_devices_by_remote) + _get_devices_by_remote_txn = __func__(DataStore._get_devices_by_remote_txn) + _get_e2e_device_keys_txn = __func__(DataStore._get_e2e_device_keys_txn) + mark_as_sent_devices_by_remote = __func__(DataStore.mark_as_sent_devices_by_remote) _mark_as_sent_devices_by_remote_txn = ( - DataStore._mark_as_sent_devices_by_remote_txn.__func__ + __func__(DataStore._mark_as_sent_devices_by_remote_txn) ) count_e2e_one_time_keys = EndToEndKeyStore.__dict__["count_e2e_one_time_keys"] diff --git a/tox.ini b/tox.ini index 085f438989..80ac9324df 100644 --- a/tox.ini +++ b/tox.ini @@ -64,49 +64,11 @@ setenv = {[base]setenv} SYNAPSE_POSTGRES = 1 +[testenv:py35] +usedevelop=true + [testenv:py36] usedevelop=true -commands = - /usr/bin/find "{toxinidir}" -name '*.pyc' -delete - coverage run {env:COVERAGE_OPTS:} --source="{toxinidir}/synapse" \ - "{envbindir}/trial" {env:TRIAL_FLAGS:} {posargs:tests/config \ - tests/api/test_filtering.py \ - tests/api/test_ratelimiting.py \ - tests/appservice \ - tests/crypto \ - tests/events \ - tests/handlers/test_appservice.py \ - tests/handlers/test_auth.py \ - tests/handlers/test_device.py \ - tests/handlers/test_directory.py \ - tests/handlers/test_e2e_keys.py \ - tests/handlers/test_presence.py \ - tests/handlers/test_profile.py \ - tests/handlers/test_register.py \ - tests/replication/slave/storage/test_account_data.py \ - tests/replication/slave/storage/test_receipts.py \ - tests/storage/test_appservice.py \ - tests/storage/test_background_update.py \ - tests/storage/test_base.py \ - tests/storage/test__base.py \ - tests/storage/test_client_ips.py \ - tests/storage/test_devices.py \ - tests/storage/test_end_to_end_keys.py \ - tests/storage/test_event_push_actions.py \ - tests/storage/test_keys.py \ - tests/storage/test_presence.py \ - tests/storage/test_profile.py \ - tests/storage/test_registration.py \ - tests/storage/test_room.py \ - tests/storage/test_user_directory.py \ - tests/test_distributor.py \ - tests/test_dns.py \ - tests/test_preview.py \ - tests/test_test_utils.py \ - tests/test_types.py \ - tests/util} \ - {env:TOXSUFFIX:} - {env:DUMP_COVERAGE_COMMAND:coverage report -m} [testenv:packaging] deps = -- cgit 1.4.1 From 63755fa4c25f120941aba24d6192815313d12ea7 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Fri, 14 Sep 2018 03:21:47 +1000 Subject: we do that higher up --- synapse/http/matrixfederationclient.py | 1 + 1 file changed, 1 insertion(+) (limited to 'synapse/http') diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index cdbe27eb37..d1a6e87706 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -91,6 +91,7 @@ class MatrixFederationHttpClient(object): self.server_name = hs.hostname reactor = hs.get_reactor() pool = HTTPConnectionPool(reactor) + pool.retryAutomatically = False pool.maxPersistentPerHost = 5 pool.cachedConnectionTimeout = 2 * 60 self.agent = Agent.usingEndpointFactory( -- cgit 1.4.1 From 7c33ab76da91a747fd442f65c9bfe187a81864f7 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Fri, 14 Sep 2018 03:45:34 +1000 Subject: redact better --- synapse/http/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/http') diff --git a/synapse/http/__init__.py b/synapse/http/__init__.py index 58ef8d3ce4..a3f9e4f67c 100644 --- a/synapse/http/__init__.py +++ b/synapse/http/__init__.py @@ -38,12 +38,12 @@ def cancelled_to_request_timed_out_error(value, timeout): return value -ACCESS_TOKEN_RE = re.compile(br'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$') +ACCESS_TOKEN_RE = re.compile(r'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$') def redact_uri(uri): """Strips access tokens from the uri replaces with """ return ACCESS_TOKEN_RE.sub( - br'\1\3', + r'\1\3', uri ) -- cgit 1.4.1 From 8f08d848f53bc79cdd47f8d4025313268a16b227 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Fri, 14 Sep 2018 03:53:56 +1000 Subject: fix --- synapse/http/site.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/http') diff --git a/synapse/http/site.py b/synapse/http/site.py index f0828c6542..18f2d37e00 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -85,7 +85,7 @@ class SynapseRequest(Request): return "%s-%i" % (self.method, self.request_seq) def get_redacted_uri(self): - return redact_uri(self.uri) + return redact_uri(self.uri.decode('ascii')) def get_user_agent(self): return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1] -- cgit 1.4.1 From c971aa7b9d5b4d3900404a285f34dd5004a76f37 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Fri, 14 Sep 2018 03:57:02 +1000 Subject: fix --- synapse/http/site.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'synapse/http') diff --git a/synapse/http/site.py b/synapse/http/site.py index 18f2d37e00..e1e53e8ae5 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -85,7 +85,10 @@ class SynapseRequest(Request): return "%s-%i" % (self.method, self.request_seq) def get_redacted_uri(self): - return redact_uri(self.uri.decode('ascii')) + uri = self.uri + if isinstance(uri, bytes): + uri = self.uri.decode('ascii') + return redact_uri(uri) def get_user_agent(self): return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1] -- cgit 1.4.1 From 9e2f9a7b57a8c4d2238684a9ccd6145948229e2c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 14 Sep 2018 15:11:26 +0100 Subject: Measure outbound requests --- synapse/http/matrixfederationclient.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) (limited to 'synapse/http') diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index c49dbacd93..c3542b9353 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -43,6 +43,7 @@ from synapse.api.errors import ( from synapse.http.endpoint import matrix_federation_endpoint from synapse.util import logcontext from synapse.util.logcontext import make_deferred_yieldable +from synapse.util.metrics import Measure logger = logging.getLogger(__name__) outbound_logger = logging.getLogger("synapse.http.outbound") @@ -224,9 +225,11 @@ class MatrixFederationHttpClient(object): reactor=self.hs.get_reactor() ) request_deferred.addTimeout(_sec_timeout, self.hs.get_reactor()) - response = yield make_deferred_yieldable( - request_deferred, - ) + + with Measure(self.clock, "outbound_request"): + response = yield make_deferred_yieldable( + request_deferred, + ) log_result = "%d %s" % (response.code, response.phrase,) break -- cgit 1.4.1 From bc9af88a2d971cd75aad419c60e13b0778640b28 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Sat, 15 Sep 2018 00:26:00 +1000 Subject: fix --- synapse/http/client.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'synapse/http') diff --git a/synapse/http/client.py b/synapse/http/client.py index d60f87b04e..ec339a92ad 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -93,7 +93,7 @@ class SimpleHttpClient(object): outgoing_requests_counter.labels(method).inc() # log request but strip `access_token` (AS requests for example include this) - logger.info("Sending request %s %s", method, redact_uri(uri.encode('ascii'))) + logger.info("Sending request %s %s", method, redact_uri(uri)) try: request_deferred = treq.request( @@ -108,14 +108,14 @@ class SimpleHttpClient(object): incoming_responses_counter.labels(method, response.code).inc() logger.info( "Received response to %s %s: %s", - method, redact_uri(uri.encode('ascii')), response.code + method, redact_uri(uri), response.code ) defer.returnValue(response) except Exception as e: incoming_responses_counter.labels(method, "ERR").inc() logger.info( "Error sending request to %s %s: %s %s", - method, redact_uri(uri.encode('ascii')), type(e).__name__, e.args[0] + method, redact_uri(uri), type(e).__name__, e.args[0] ) raise -- cgit 1.4.1 From fcfe7a850dada2e65982f2bc805d8bc409f07512 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 14 Sep 2018 19:23:07 +0100 Subject: Add an awful secondary timeout to fix wedged requests This is an attempt to mitigate #3842 by adding yet-another-timeout --- synapse/http/matrixfederationclient.py | 11 ++++++++ synapse/util/async_helpers.py | 51 ++++++++++++++++++++++++++++++++++ 2 files changed, 62 insertions(+) (limited to 'synapse/http') diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index da16b5dd8c..13b19f7626 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -42,6 +42,7 @@ from synapse.api.errors import ( ) from synapse.http.endpoint import matrix_federation_endpoint from synapse.util import logcontext +from synapse.util.async_helpers import timeout_no_seriously from synapse.util.logcontext import make_deferred_yieldable from synapse.util.metrics import Measure @@ -228,6 +229,16 @@ class MatrixFederationHttpClient(object): ) request_deferred.addTimeout(_sec_timeout, self.hs.get_reactor()) + # Sometimes the timeout above doesn't work, so lets hack yet + # another layer of timeouts in in the vain hope that at some + # point the world made sense and this really really really + # should work. + request_deferred = timeout_no_seriously( + request_deferred, + timeout=_sec_timeout * 2, + reactor=self.hs.get_reactor(), + ) + with Measure(self.clock, "outbound_request"): response = yield make_deferred_yieldable( request_deferred, diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 9b3f2f4b96..083e4f4128 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -438,3 +438,54 @@ def _cancelled_to_timed_out_error(value, timeout): value.trap(CancelledError) raise DeferredTimeoutError(timeout, "Deferred") return value + + +def timeout_no_seriously(deferred, timeout, reactor): + """The in build twisted deferred addTimeout (and the method above) + completely fail to time things out under some unknown circumstances. + + Lets try a different way of timing things out and maybe that will make + things work?! + + TODO: Kill this with fire. + """ + + new_d = defer.Deferred() + + timed_out = [False] + + def time_it_out(): + timed_out[0] = True + deferred.cancel() + + if not new_d.called: + new_d.errback(DeferredTimeoutError(timeout, "Deferred")) + + delayed_call = reactor.callLater(timeout, time_it_out) + + def convert_cancelled(value): + if timed_out[0]: + return _cancelled_to_timed_out_error(value, timeout) + return value + + deferred.addBoth(convert_cancelled) + + def cancel_timeout(result): + # stop the pending call to cancel the deferred if it's been fired + if delayed_call.active(): + delayed_call.cancel() + return result + + deferred.addBoth(cancel_timeout) + + def success_cb(val): + if not new_d.called: + new_d.callback(val) + + def failure_cb(val): + if not new_d.called: + new_d.errback(val) + + deferred.addCallbacks(success_cb, failure_cb) + + return new_d -- cgit 1.4.1