diff options
author | Erik Johnston <erik@matrix.org> | 2015-08-13 11:39:38 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2015-08-13 11:39:38 +0100 |
commit | c044aca1fdcdd7a5c8a16ef709e961c141909004 (patch) | |
tree | 38ebba516ceef2be2a37be9a67a76d74f2739cf9 /synapse | |
parent | Comment (diff) | |
parent | Add some metrics about the reactor (diff) | |
download | synapse-c044aca1fdcdd7a5c8a16ef709e961c141909004.tar.xz |
Merge branch 'erikj/reactor_metrics' into erikj/dictionary_cache
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/http/matrixfederationclient.py | 80 | ||||
-rw-r--r-- | synapse/metrics/__init__.py | 29 | ||||
-rw-r--r-- | synapse/python_dependencies.py | 2 |
3 files changed, 57 insertions, 54 deletions
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index ed47e701e7..4d74bd5d78 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -16,7 +16,7 @@ from twisted.internet import defer, reactor, protocol from twisted.internet.error import DNSLookupError -from twisted.web.client import readBody, _AgentBase, _URI, HTTPConnectionPool +from twisted.web.client import readBody, HTTPConnectionPool, Agent from twisted.web.http_headers import Headers from twisted.web._newclient import ResponseDone @@ -55,41 +55,17 @@ incoming_responses_counter = metrics.register_counter( ) -class MatrixFederationHttpAgent(_AgentBase): - - def __init__(self, reactor, pool=None): - _AgentBase.__init__(self, reactor, pool) - - def request(self, destination, endpoint, method, path, params, query, - headers, body_producer): - - outgoing_requests_counter.inc(method) - - host = b"" - port = 0 - fragment = b"" - - parsed_URI = _URI(b"http", destination, host, port, path, params, - query, fragment) - - # Set the connection pool key to be the destination. - key = destination - - d = self._requestWithEndpoint(key, endpoint, method, parsed_URI, - headers, body_producer, - parsed_URI.originForm) - - def _cb(response): - incoming_responses_counter.inc(method, response.code) - return response - - def _eb(failure): - incoming_responses_counter.inc(method, "ERR") - return failure +class MatrixFederationEndpointFactory(object): + def __init__(self, hs): + self.tls_context_factory = hs.tls_context_factory - d.addCallbacks(_cb, _eb) + def endpointForURI(self, uri): + destination = uri.netloc - return d + return matrix_federation_endpoint( + reactor, destination, timeout=10, + ssl_context_factory=self.tls_context_factory + ) class MatrixFederationHttpClient(object): @@ -107,12 +83,18 @@ class MatrixFederationHttpClient(object): self.server_name = hs.hostname pool = HTTPConnectionPool(reactor) pool.maxPersistentPerHost = 10 - self.agent = MatrixFederationHttpAgent(reactor, pool=pool) + self.agent = Agent.usingEndpointFactory( + reactor, MatrixFederationEndpointFactory(hs), pool=pool + ) self.clock = hs.get_clock() self.version_string = hs.version_string - self._next_id = 1 + def _create_url(self, destination, path_bytes, param_bytes, query_bytes): + return urlparse.urlunparse( + ("matrix", destination, path_bytes, param_bytes, query_bytes, "") + ) + @defer.inlineCallbacks def _create_request(self, destination, method, path_bytes, body_callback, headers_dict={}, param_bytes=b"", @@ -123,8 +105,8 @@ class MatrixFederationHttpClient(object): headers_dict[b"User-Agent"] = [self.version_string] headers_dict[b"Host"] = [destination] - url_bytes = urlparse.urlunparse( - ("", "", path_bytes, param_bytes, query_bytes, "",) + url_bytes = self._create_url( + destination, path_bytes, param_bytes, query_bytes ) txn_id = "%s-O-%s" % (method, self._next_id) @@ -139,8 +121,8 @@ class MatrixFederationHttpClient(object): # (once we have reliable transactions in place) retries_left = 5 - endpoint = preserve_context_over_fn( - self._getEndpoint, reactor, destination + http_url_bytes = urlparse.urlunparse( + ("", "", path_bytes, param_bytes, query_bytes, "") ) log_result = None @@ -148,21 +130,19 @@ class MatrixFederationHttpClient(object): while True: producer = None if body_callback: - producer = body_callback(method, url_bytes, headers_dict) + producer = body_callback(method, http_url_bytes, headers_dict) try: def send_request(): - request_deferred = self.agent.request( - destination, - endpoint, + request_deferred = preserve_context_over_fn( + self.agent.request, method, - path_bytes, - param_bytes, - query_bytes, + url_bytes, Headers(headers_dict), producer ) + return self.clock.time_bound_deferred( request_deferred, time_out=timeout/1000. if timeout else 60, @@ -452,12 +432,6 @@ class MatrixFederationHttpClient(object): defer.returnValue((length, headers)) - def _getEndpoint(self, reactor, destination): - return matrix_federation_endpoint( - reactor, destination, timeout=10, - ssl_context_factory=self.hs.tls_context_factory - ) - class _ReadBodyToFileProtocol(protocol.Protocol): def __init__(self, stream, deferred, max_size): diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 9233ea3da9..e014b415f3 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -18,8 +18,12 @@ from __future__ import absolute_import import logging from resource import getrusage, getpagesize, RUSAGE_SELF +import functools import os import stat +import time + +from twisted.internet import reactor from .metric import ( CounterMetric, CallbackMetric, DistributionMetric, CacheMetric @@ -144,3 +148,28 @@ def _process_fds(): return counts get_metrics_for("process").register_callback("fds", _process_fds, labels=["type"]) + +reactor_metrics = get_metrics_for("reactor") +tick_time = reactor_metrics.register_distribution("tick_time") +pending_calls_metric = reactor_metrics.register_distribution("pending_calls") + + +def runUntilCurrentTimer(func): + + @functools.wraps(func) + def f(*args, **kwargs): + start = time.time() * 1000 + pending_calls = len(reactor.getDelayedCalls()) + ret = func(*args, **kwargs) + end = time.time() * 1000 + tick_time.inc_by(end - start) + pending_calls_metric.inc_by(pending_calls) + return ret + + return f + + +if hasattr(reactor, "runUntilCurrent"): + # runUntilCurrent is called when we have pending calls. It is called once + # per iteratation after fd polling. + reactor.runUntilCurrent = runUntilCurrentTimer(reactor.runUntilCurrent) diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index 115bee8c41..fa06480ad1 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -19,7 +19,7 @@ logger = logging.getLogger(__name__) REQUIREMENTS = { "syutil>=0.0.7": ["syutil>=0.0.7"], - "Twisted==14.0.2": ["twisted==14.0.2"], + "Twisted>=15.1.0": ["twisted>=15.1.0"], "service_identity>=1.0.0": ["service_identity>=1.0.0"], "pyopenssl>=0.14": ["OpenSSL>=0.14"], "pyyaml": ["yaml"], |