diff options
Diffstat (limited to 'synapse/http')
-rw-r--r-- | synapse/http/matrixfederationclient.py | 35 | ||||
-rw-r--r-- | synapse/http/server.py | 45 | ||||
-rw-r--r-- | synapse/http/servlet.py | 8 |
3 files changed, 71 insertions, 17 deletions
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 6e53538a52..b7b7c2cce8 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -56,7 +56,8 @@ incoming_responses_counter = metrics.register_counter( ) -MAX_RETRIES = 4 +MAX_LONG_RETRIES = 10 +MAX_SHORT_RETRIES = 3 class MatrixFederationEndpointFactory(object): @@ -103,7 +104,7 @@ class MatrixFederationHttpClient(object): def _create_request(self, destination, method, path_bytes, body_callback, headers_dict={}, param_bytes=b"", query_bytes=b"", retry_on_dns_fail=True, - timeout=None): + timeout=None, long_retries=False): """ Creates and sends a request to the given url """ headers_dict[b"User-Agent"] = [self.version_string] @@ -123,7 +124,10 @@ class MatrixFederationHttpClient(object): # XXX: Would be much nicer to retry only at the transaction-layer # (once we have reliable transactions in place) - retries_left = MAX_RETRIES + if long_retries: + retries_left = MAX_LONG_RETRIES + else: + retries_left = MAX_SHORT_RETRIES http_url_bytes = urlparse.urlunparse( ("", "", path_bytes, param_bytes, query_bytes, "") @@ -184,8 +188,15 @@ class MatrixFederationHttpClient(object): ) if retries_left and not timeout: - delay = 5 ** (MAX_RETRIES + 1 - retries_left) - delay *= random.uniform(0.8, 1.4) + if long_retries: + delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left) + delay = min(delay, 60) + delay *= random.uniform(0.8, 1.4) + else: + delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left) + delay = min(delay, 2) + delay *= random.uniform(0.8, 1.4) + yield sleep(delay) retries_left -= 1 else: @@ -236,7 +247,8 @@ class MatrixFederationHttpClient(object): headers_dict[b"Authorization"] = auth_headers @defer.inlineCallbacks - def put_json(self, destination, path, data={}, json_data_callback=None): + def put_json(self, destination, path, data={}, json_data_callback=None, + long_retries=False): """ Sends the specifed json data using PUT Args: @@ -247,6 +259,8 @@ class MatrixFederationHttpClient(object): the request body. This will be encoded as JSON. json_data_callback (callable): A callable returning the dict to use as the request body. + long_retries (bool): A boolean that indicates whether we should + retry for a short or long time. Returns: Deferred: Succeeds when we get a 2xx HTTP response. The result @@ -272,6 +286,7 @@ class MatrixFederationHttpClient(object): path.encode("ascii"), body_callback=body_callback, headers_dict={"Content-Type": ["application/json"]}, + long_retries=long_retries, ) if 200 <= response.code < 300: @@ -287,7 +302,7 @@ class MatrixFederationHttpClient(object): defer.returnValue(json.loads(body)) @defer.inlineCallbacks - def post_json(self, destination, path, data={}): + def post_json(self, destination, path, data={}, long_retries=True): """ Sends the specifed json data using POST Args: @@ -296,6 +311,8 @@ class MatrixFederationHttpClient(object): path (str): The HTTP path. data (dict): A dict containing the data that will be used as the request body. This will be encoded as JSON. + long_retries (bool): A boolean that indicates whether we should + retry for a short or long time. Returns: Deferred: Succeeds when we get a 2xx HTTP response. The result @@ -315,6 +332,7 @@ class MatrixFederationHttpClient(object): path.encode("ascii"), body_callback=body_callback, headers_dict={"Content-Type": ["application/json"]}, + long_retries=True, ) if 200 <= response.code < 300: @@ -490,6 +508,9 @@ class _JsonProducer(object): def stopProducing(self): pass + def resumeProducing(self): + pass + def _flatten_response_never_received(e): if hasattr(e, "reasons"): diff --git a/synapse/http/server.py b/synapse/http/server.py index 50feea6f1c..c44bdfc888 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -53,6 +53,23 @@ response_timer = metrics.register_distribution( labels=["method", "servlet"] ) +response_ru_utime = metrics.register_distribution( + "response_ru_utime", labels=["method", "servlet"] +) + +response_ru_stime = metrics.register_distribution( + "response_ru_stime", labels=["method", "servlet"] +) + +response_db_txn_count = metrics.register_distribution( + "response_db_txn_count", labels=["method", "servlet"] +) + +response_db_txn_duration = metrics.register_distribution( + "response_db_txn_duration", labels=["method", "servlet"] +) + + _next_request_id = 0 @@ -120,7 +137,7 @@ class HttpServer(object): """ Interface for registering callbacks on a HTTP server """ - def register_path(self, method, path_pattern, callback): + def register_paths(self, method, path_patterns, callback): """ Register a callback that gets fired if we receive a http request with the given method for a path that matches the given regex. @@ -129,7 +146,7 @@ class HttpServer(object): Args: method (str): The method to listen to. - path_pattern (str): The regex used to match requests. + path_patterns (list<SRE_Pattern>): The regex used to match requests. callback (function): The function to fire if we receive a matched request. The first argument will be the request object and subsequent arguments will be any matched groups from the regex. @@ -165,10 +182,11 @@ class JsonResource(HttpServer, resource.Resource): self.version_string = hs.version_string self.hs = hs - def register_path(self, method, path_pattern, callback): - self.path_regexs.setdefault(method, []).append( - self._PathEntry(path_pattern, callback) - ) + def register_paths(self, method, path_patterns, callback): + for path_pattern in path_patterns: + self.path_regexs.setdefault(method, []).append( + self._PathEntry(path_pattern, callback) + ) def render(self, request): """ This gets called by twisted every time someone sends us a request. @@ -220,6 +238,21 @@ class JsonResource(HttpServer, resource.Resource): self.clock.time_msec() - start, request.method, servlet_classname ) + try: + context = LoggingContext.current_context() + ru_utime, ru_stime = context.get_resource_usage() + + response_ru_utime.inc_by(ru_utime, request.method, servlet_classname) + response_ru_stime.inc_by(ru_stime, request.method, servlet_classname) + response_db_txn_count.inc_by( + context.db_txn_count, request.method, servlet_classname + ) + response_db_txn_duration.inc_by( + context.db_txn_duration, request.method, servlet_classname + ) + except: + pass + return # Huh. No one wanted to handle that? Fiiiiiine. Send 400. diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py index 9cda17fcf8..32b6d6cd72 100644 --- a/synapse/http/servlet.py +++ b/synapse/http/servlet.py @@ -19,7 +19,6 @@ from synapse.api.errors import SynapseError import logging - logger = logging.getLogger(__name__) @@ -102,12 +101,13 @@ class RestServlet(object): def register(self, http_server): """ Register this servlet with the given HTTP server. """ - if hasattr(self, "PATTERN"): - pattern = self.PATTERN + if hasattr(self, "PATTERNS"): + patterns = self.PATTERNS for method in ("GET", "PUT", "POST", "OPTIONS", "DELETE"): if hasattr(self, "on_%s" % (method,)): method_handler = getattr(self, "on_%s" % (method,)) - http_server.register_path(method, pattern, method_handler) + http_server.register_paths(method, patterns, method_handler) + else: raise NotImplementedError("RestServlet must register something.") |