summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/http/matrixfederationclient.py80
-rw-r--r--synapse/metrics/__init__.py29
-rw-r--r--synapse/python_dependencies.py2
3 files changed, 57 insertions, 54 deletions
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index ed47e701e7..4d74bd5d78 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -16,7 +16,7 @@
 
 from twisted.internet import defer, reactor, protocol
 from twisted.internet.error import DNSLookupError
-from twisted.web.client import readBody, _AgentBase, _URI, HTTPConnectionPool
+from twisted.web.client import readBody, HTTPConnectionPool, Agent
 from twisted.web.http_headers import Headers
 from twisted.web._newclient import ResponseDone
 
@@ -55,41 +55,17 @@ incoming_responses_counter = metrics.register_counter(
 )
 
 
-class MatrixFederationHttpAgent(_AgentBase):
-
-    def __init__(self, reactor, pool=None):
-        _AgentBase.__init__(self, reactor, pool)
-
-    def request(self, destination, endpoint, method, path, params, query,
-                headers, body_producer):
-
-        outgoing_requests_counter.inc(method)
-
-        host = b""
-        port = 0
-        fragment = b""
-
-        parsed_URI = _URI(b"http", destination, host, port, path, params,
-                          query, fragment)
-
-        # Set the connection pool key to be the destination.
-        key = destination
-
-        d = self._requestWithEndpoint(key, endpoint, method, parsed_URI,
-                                      headers, body_producer,
-                                      parsed_URI.originForm)
-
-        def _cb(response):
-            incoming_responses_counter.inc(method, response.code)
-            return response
-
-        def _eb(failure):
-            incoming_responses_counter.inc(method, "ERR")
-            return failure
+class MatrixFederationEndpointFactory(object):
+    def __init__(self, hs):
+        self.tls_context_factory = hs.tls_context_factory
 
-        d.addCallbacks(_cb, _eb)
+    def endpointForURI(self, uri):
+        destination = uri.netloc
 
-        return d
+        return matrix_federation_endpoint(
+            reactor, destination, timeout=10,
+            ssl_context_factory=self.tls_context_factory
+        )
 
 
 class MatrixFederationHttpClient(object):
@@ -107,12 +83,18 @@ class MatrixFederationHttpClient(object):
         self.server_name = hs.hostname
         pool = HTTPConnectionPool(reactor)
         pool.maxPersistentPerHost = 10
-        self.agent = MatrixFederationHttpAgent(reactor, pool=pool)
+        self.agent = Agent.usingEndpointFactory(
+            reactor, MatrixFederationEndpointFactory(hs), pool=pool
+        )
         self.clock = hs.get_clock()
         self.version_string = hs.version_string
-
         self._next_id = 1
 
+    def _create_url(self, destination, path_bytes, param_bytes, query_bytes):
+        return urlparse.urlunparse(
+            ("matrix", destination, path_bytes, param_bytes, query_bytes, "")
+        )
+
     @defer.inlineCallbacks
     def _create_request(self, destination, method, path_bytes,
                         body_callback, headers_dict={}, param_bytes=b"",
@@ -123,8 +105,8 @@ class MatrixFederationHttpClient(object):
         headers_dict[b"User-Agent"] = [self.version_string]
         headers_dict[b"Host"] = [destination]
 
-        url_bytes = urlparse.urlunparse(
-            ("", "", path_bytes, param_bytes, query_bytes, "",)
+        url_bytes = self._create_url(
+            destination, path_bytes, param_bytes, query_bytes
         )
 
         txn_id = "%s-O-%s" % (method, self._next_id)
@@ -139,8 +121,8 @@ class MatrixFederationHttpClient(object):
         # (once we have reliable transactions in place)
         retries_left = 5
 
-        endpoint = preserve_context_over_fn(
-            self._getEndpoint, reactor, destination
+        http_url_bytes = urlparse.urlunparse(
+            ("", "", path_bytes, param_bytes, query_bytes, "")
         )
 
         log_result = None
@@ -148,21 +130,19 @@ class MatrixFederationHttpClient(object):
             while True:
                 producer = None
                 if body_callback:
-                    producer = body_callback(method, url_bytes, headers_dict)
+                    producer = body_callback(method, http_url_bytes, headers_dict)
 
                 try:
                     def send_request():
-                        request_deferred = self.agent.request(
-                            destination,
-                            endpoint,
+                        request_deferred = preserve_context_over_fn(
+                            self.agent.request,
                             method,
-                            path_bytes,
-                            param_bytes,
-                            query_bytes,
+                            url_bytes,
                             Headers(headers_dict),
                             producer
                         )
 
+
                         return self.clock.time_bound_deferred(
                             request_deferred,
                             time_out=timeout/1000. if timeout else 60,
@@ -452,12 +432,6 @@ class MatrixFederationHttpClient(object):
 
         defer.returnValue((length, headers))
 
-    def _getEndpoint(self, reactor, destination):
-        return matrix_federation_endpoint(
-            reactor, destination, timeout=10,
-            ssl_context_factory=self.hs.tls_context_factory
-        )
-
 
 class _ReadBodyToFileProtocol(protocol.Protocol):
     def __init__(self, stream, deferred, max_size):
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index 9233ea3da9..e014b415f3 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -18,8 +18,12 @@ from __future__ import absolute_import
 
 import logging
 from resource import getrusage, getpagesize, RUSAGE_SELF
+import functools
 import os
 import stat
+import time
+
+from twisted.internet import reactor
 
 from .metric import (
     CounterMetric, CallbackMetric, DistributionMetric, CacheMetric
@@ -144,3 +148,28 @@ def _process_fds():
     return counts
 
 get_metrics_for("process").register_callback("fds", _process_fds, labels=["type"])
+
+reactor_metrics = get_metrics_for("reactor")
+tick_time = reactor_metrics.register_distribution("tick_time")
+pending_calls_metric = reactor_metrics.register_distribution("pending_calls")
+
+
+def runUntilCurrentTimer(func):
+
+    @functools.wraps(func)
+    def f(*args, **kwargs):
+        start = time.time() * 1000
+        pending_calls = len(reactor.getDelayedCalls())
+        ret = func(*args, **kwargs)
+        end = time.time() * 1000
+        tick_time.inc_by(end - start)
+        pending_calls_metric.inc_by(pending_calls)
+        return ret
+
+    return f
+
+
+if hasattr(reactor, "runUntilCurrent"):
+    # runUntilCurrent is called when we have pending calls. It is called once
+    # per iteratation after fd polling.
+    reactor.runUntilCurrent = runUntilCurrentTimer(reactor.runUntilCurrent)
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 115bee8c41..fa06480ad1 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -19,7 +19,7 @@ logger = logging.getLogger(__name__)
 
 REQUIREMENTS = {
     "syutil>=0.0.7": ["syutil>=0.0.7"],
-    "Twisted==14.0.2": ["twisted==14.0.2"],
+    "Twisted>=15.1.0": ["twisted>=15.1.0"],
     "service_identity>=1.0.0": ["service_identity>=1.0.0"],
     "pyopenssl>=0.14": ["OpenSSL>=0.14"],
     "pyyaml": ["yaml"],