diff --git a/demo/start.sh b/demo/start.sh
index b5dea5e176..572dbfab0b 100755
--- a/demo/start.sh
+++ b/demo/start.sh
@@ -23,7 +23,6 @@ for port in 8080 8081 8082; do
#rm $DIR/etc/$port.config
python -m synapse.app.homeserver \
--generate-config \
- --enable_registration \
-H "localhost:$https_port" \
--config-path "$DIR/etc/$port.config" \
@@ -36,6 +35,8 @@ for port in 8080 8081 8082; do
fi
fi
+ perl -p -i -e 's/^enable_registration:.*/enable_registration: true/g' $DIR/etc/$port.config
+
python -m synapse.app.homeserver \
--config-path "$DIR/etc/$port.config" \
-D \
diff --git a/setup.py b/setup.py
index f9929591e7..16ccc0f1b8 100755
--- a/setup.py
+++ b/setup.py
@@ -48,7 +48,7 @@ setup(
description="Reference Synapse Home Server",
install_requires=dependencies['requirements'](include_conditional=True).keys(),
setup_requires=[
- "Twisted==14.0.2", # Here to override setuptools_trial's dependency on Twisted>=2.4.0
+ "Twisted>=15.1.0", # Here to override setuptools_trial's dependency on Twisted>=2.4.0
"setuptools_trial",
"mock"
],
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index ed47e701e7..4d74bd5d78 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -16,7 +16,7 @@
from twisted.internet import defer, reactor, protocol
from twisted.internet.error import DNSLookupError
-from twisted.web.client import readBody, _AgentBase, _URI, HTTPConnectionPool
+from twisted.web.client import readBody, HTTPConnectionPool, Agent
from twisted.web.http_headers import Headers
from twisted.web._newclient import ResponseDone
@@ -55,41 +55,17 @@ incoming_responses_counter = metrics.register_counter(
)
-class MatrixFederationHttpAgent(_AgentBase):
-
- def __init__(self, reactor, pool=None):
- _AgentBase.__init__(self, reactor, pool)
-
- def request(self, destination, endpoint, method, path, params, query,
- headers, body_producer):
-
- outgoing_requests_counter.inc(method)
-
- host = b""
- port = 0
- fragment = b""
-
- parsed_URI = _URI(b"http", destination, host, port, path, params,
- query, fragment)
-
- # Set the connection pool key to be the destination.
- key = destination
-
- d = self._requestWithEndpoint(key, endpoint, method, parsed_URI,
- headers, body_producer,
- parsed_URI.originForm)
-
- def _cb(response):
- incoming_responses_counter.inc(method, response.code)
- return response
-
- def _eb(failure):
- incoming_responses_counter.inc(method, "ERR")
- return failure
+class MatrixFederationEndpointFactory(object):
+ def __init__(self, hs):
+ self.tls_context_factory = hs.tls_context_factory
- d.addCallbacks(_cb, _eb)
+ def endpointForURI(self, uri):
+ destination = uri.netloc
- return d
+ return matrix_federation_endpoint(
+ reactor, destination, timeout=10,
+ ssl_context_factory=self.tls_context_factory
+ )
class MatrixFederationHttpClient(object):
@@ -107,12 +83,18 @@ class MatrixFederationHttpClient(object):
self.server_name = hs.hostname
pool = HTTPConnectionPool(reactor)
pool.maxPersistentPerHost = 10
- self.agent = MatrixFederationHttpAgent(reactor, pool=pool)
+ self.agent = Agent.usingEndpointFactory(
+ reactor, MatrixFederationEndpointFactory(hs), pool=pool
+ )
self.clock = hs.get_clock()
self.version_string = hs.version_string
-
self._next_id = 1
+ def _create_url(self, destination, path_bytes, param_bytes, query_bytes):
+ return urlparse.urlunparse(
+ ("matrix", destination, path_bytes, param_bytes, query_bytes, "")
+ )
+
@defer.inlineCallbacks
def _create_request(self, destination, method, path_bytes,
body_callback, headers_dict={}, param_bytes=b"",
@@ -123,8 +105,8 @@ class MatrixFederationHttpClient(object):
headers_dict[b"User-Agent"] = [self.version_string]
headers_dict[b"Host"] = [destination]
- url_bytes = urlparse.urlunparse(
- ("", "", path_bytes, param_bytes, query_bytes, "",)
+ url_bytes = self._create_url(
+ destination, path_bytes, param_bytes, query_bytes
)
txn_id = "%s-O-%s" % (method, self._next_id)
@@ -139,8 +121,8 @@ class MatrixFederationHttpClient(object):
# (once we have reliable transactions in place)
retries_left = 5
- endpoint = preserve_context_over_fn(
- self._getEndpoint, reactor, destination
+ http_url_bytes = urlparse.urlunparse(
+ ("", "", path_bytes, param_bytes, query_bytes, "")
)
log_result = None
@@ -148,21 +130,19 @@ class MatrixFederationHttpClient(object):
while True:
producer = None
if body_callback:
- producer = body_callback(method, url_bytes, headers_dict)
+ producer = body_callback(method, http_url_bytes, headers_dict)
try:
def send_request():
- request_deferred = self.agent.request(
- destination,
- endpoint,
+ request_deferred = preserve_context_over_fn(
+ self.agent.request,
method,
- path_bytes,
- param_bytes,
- query_bytes,
+ url_bytes,
Headers(headers_dict),
producer
)
+
return self.clock.time_bound_deferred(
request_deferred,
time_out=timeout/1000. if timeout else 60,
@@ -452,12 +432,6 @@ class MatrixFederationHttpClient(object):
defer.returnValue((length, headers))
- def _getEndpoint(self, reactor, destination):
- return matrix_federation_endpoint(
- reactor, destination, timeout=10,
- ssl_context_factory=self.hs.tls_context_factory
- )
-
class _ReadBodyToFileProtocol(protocol.Protocol):
def __init__(self, stream, deferred, max_size):
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index 9233ea3da9..e014b415f3 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -18,8 +18,12 @@ from __future__ import absolute_import
import logging
from resource import getrusage, getpagesize, RUSAGE_SELF
+import functools
import os
import stat
+import time
+
+from twisted.internet import reactor
from .metric import (
CounterMetric, CallbackMetric, DistributionMetric, CacheMetric
@@ -144,3 +148,28 @@ def _process_fds():
return counts
get_metrics_for("process").register_callback("fds", _process_fds, labels=["type"])
+
+reactor_metrics = get_metrics_for("reactor")
+tick_time = reactor_metrics.register_distribution("tick_time")
+pending_calls_metric = reactor_metrics.register_distribution("pending_calls")
+
+
+def runUntilCurrentTimer(func):
+
+ @functools.wraps(func)
+ def f(*args, **kwargs):
+ start = time.time() * 1000
+ pending_calls = len(reactor.getDelayedCalls())
+ ret = func(*args, **kwargs)
+ end = time.time() * 1000
+ tick_time.inc_by(end - start)
+ pending_calls_metric.inc_by(pending_calls)
+ return ret
+
+ return f
+
+
+if hasattr(reactor, "runUntilCurrent"):
+ # runUntilCurrent is called when we have pending calls. It is called once
+ # per iteratation after fd polling.
+ reactor.runUntilCurrent = runUntilCurrentTimer(reactor.runUntilCurrent)
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 115bee8c41..fa06480ad1 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -19,7 +19,7 @@ logger = logging.getLogger(__name__)
REQUIREMENTS = {
"syutil>=0.0.7": ["syutil>=0.0.7"],
- "Twisted==14.0.2": ["twisted==14.0.2"],
+ "Twisted>=15.1.0": ["twisted>=15.1.0"],
"service_identity>=1.0.0": ["service_identity>=1.0.0"],
"pyopenssl>=0.14": ["OpenSSL>=0.14"],
"pyyaml": ["yaml"],
|