From 5b6672c66de693c390091c402f2dbb4a0f467aaf Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 28 Dec 2016 22:49:31 +0000 Subject: Wrap connections in an N minute timeout to ensure they get reaped correctly --- synapse/http/endpoint.py | 58 +++++++++++++++++++++++++++++++--- synapse/http/matrixfederationclient.py | 8 ++++- 2 files changed, 61 insertions(+), 5 deletions(-) diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py index 442696d393..16ad09a481 100644 --- a/synapse/http/endpoint.py +++ b/synapse/http/endpoint.py @@ -14,7 +14,7 @@ # limitations under the License. from twisted.internet.endpoints import SSL4ClientEndpoint, TCP4ClientEndpoint -from twisted.internet import defer +from twisted.internet import defer, reactor, task from twisted.internet.error import ConnectError from twisted.names import client, dns from twisted.names.error import DNSNameError, DomainError @@ -66,13 +66,63 @@ def matrix_federation_endpoint(reactor, destination, ssl_context_factory=None, default_port = 8448 if port is None: - return SRVClientEndpoint( + return _WrappingEndointFac(SRVClientEndpoint( reactor, "matrix", domain, protocol="tcp", default_port=default_port, endpoint=transport_endpoint, endpoint_kw_args=endpoint_kw_args - ) + )) else: - return transport_endpoint(reactor, domain, port, **endpoint_kw_args) + return _WrappingEndointFac(transport_endpoint(reactor, domain, port, **endpoint_kw_args)) + + +class _WrappingEndointFac(object): + def __init__(self, endpoint_fac): + self.endpoint_fac = endpoint_fac + + @defer.inlineCallbacks + def connect(self, protocolFactory): + conn = yield self.endpoint_fac.connect(protocolFactory) + conn = _WrappedConnection(conn) + defer.returnValue(conn) + + +class _WrappedConnection(object): + """Wraps a connection and calls abort on it if it hasn't seen any actio + for 5 minutes + """ + __slots__ = ["conn", "last_request"] + + def __init__(self, conn): + object.__setattr__(self, "conn", conn) + object.__setattr__(self, "last_request", time.time()) + + def __getattr__(self, name): + return getattr(self.conn, name) + + def __setattr__(self, name, value): + setattr(self.conn, name, value) + + def _time_things_out_maybe(self): + if time.time() - self.last_request >= 2 * 60: + self.abort() + + def request(self, request): + self.last_request = time.time() + + # Time this connection out if we haven't send a request in the last + # N minutes + reactor.callLater(3 * 60, self._time_things_out_maybe) + + d = self.conn.request(request) + + def update_request_time(res): + self.last_request = time.time() + reactor.callLater(3 * 60, self._time_things_out_maybe) + return res + + d.addCallback(update_request_time) + + return d class SpiderEndpoint(object): diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index d5970c05a8..da98d2d666 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -61,6 +61,11 @@ MAX_LONG_RETRIES = 10 MAX_SHORT_RETRIES = 3 +def test(conn): + conn.loseConnection() + return conn + + class MatrixFederationEndpointFactory(object): def __init__(self, hs): self.tls_server_context_factory = hs.tls_server_context_factory @@ -88,7 +93,8 @@ class MatrixFederationHttpClient(object): self.signing_key = hs.config.signing_key[0] self.server_name = hs.hostname pool = HTTPConnectionPool(reactor) - pool.maxPersistentPerHost = 10 + pool.maxPersistentPerHost = 5 + pool.cachedConnectionTimeout = 2 * 60 self.agent = Agent.usingEndpointFactory( reactor, MatrixFederationEndpointFactory(hs), pool=pool ) -- cgit 1.4.1 From b7336ff32d4f9883c06c538901f6566b7ccb1ebe Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 29 Dec 2016 00:09:33 +0000 Subject: Clean up --- synapse/http/endpoint.py | 6 ++++-- synapse/http/matrixfederationclient.py | 5 ----- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py index 16ad09a481..4d361bbe73 100644 --- a/synapse/http/endpoint.py +++ b/synapse/http/endpoint.py @@ -14,7 +14,7 @@ # limitations under the License. from twisted.internet.endpoints import SSL4ClientEndpoint, TCP4ClientEndpoint -from twisted.internet import defer, reactor, task +from twisted.internet import defer, reactor from twisted.internet.error import ConnectError from twisted.names import client, dns from twisted.names.error import DNSNameError, DomainError @@ -72,7 +72,9 @@ def matrix_federation_endpoint(reactor, destination, ssl_context_factory=None, endpoint_kw_args=endpoint_kw_args )) else: - return _WrappingEndointFac(transport_endpoint(reactor, domain, port, **endpoint_kw_args)) + return _WrappingEndointFac(transport_endpoint( + reactor, domain, port, **endpoint_kw_args + )) class _WrappingEndointFac(object): diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index da98d2d666..4d40219fcc 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -61,11 +61,6 @@ MAX_LONG_RETRIES = 10 MAX_SHORT_RETRIES = 3 -def test(conn): - conn.loseConnection() - return conn - - class MatrixFederationEndpointFactory(object): def __init__(self, hs): self.tls_server_context_factory = hs.tls_server_context_factory -- cgit 1.4.1 From 68030fd37bcae8e1dd3deea971b7f13461e0ef72 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 29 Dec 2016 00:10:49 +0000 Subject: Spelling and comments --- synapse/http/endpoint.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py index 4d361bbe73..95424481ca 100644 --- a/synapse/http/endpoint.py +++ b/synapse/http/endpoint.py @@ -66,18 +66,18 @@ def matrix_federation_endpoint(reactor, destination, ssl_context_factory=None, default_port = 8448 if port is None: - return _WrappingEndointFac(SRVClientEndpoint( + return _WrappingEndpointFac(SRVClientEndpoint( reactor, "matrix", domain, protocol="tcp", default_port=default_port, endpoint=transport_endpoint, endpoint_kw_args=endpoint_kw_args )) else: - return _WrappingEndointFac(transport_endpoint( + return _WrappingEndpointFac(transport_endpoint( reactor, domain, port, **endpoint_kw_args )) -class _WrappingEndointFac(object): +class _WrappingEndpointFac(object): def __init__(self, endpoint_fac): self.endpoint_fac = endpoint_fac @@ -105,7 +105,9 @@ class _WrappedConnection(object): setattr(self.conn, name, value) def _time_things_out_maybe(self): - if time.time() - self.last_request >= 2 * 60: + # We use a slightly shorter timeout here just in case the callLater is + # triggered early. Paranoia ftw. + if time.time() - self.last_request >= 2.5 * 60: self.abort() def request(self, request): -- cgit 1.4.1 From b4bc6fef5b2624f9f1a7319d266827027e260bec Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 29 Dec 2016 00:58:34 +0000 Subject: Respect long_retries param and default to off --- synapse/http/matrixfederationclient.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 4d40219fcc..78b92cef36 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -300,7 +300,7 @@ class MatrixFederationHttpClient(object): defer.returnValue(json.loads(body)) @defer.inlineCallbacks - def post_json(self, destination, path, data={}, long_retries=True, + def post_json(self, destination, path, data={}, long_retries=False, timeout=None): """ Sends the specifed json data using POST @@ -333,7 +333,7 @@ class MatrixFederationHttpClient(object): path.encode("ascii"), body_callback=body_callback, headers_dict={"Content-Type": ["application/json"]}, - long_retries=True, + long_retries=long_retries, timeout=timeout, ) -- cgit 1.4.1 From 97ffc5690b713b64556dc4d0993cf2a96f4477e8 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 29 Dec 2016 15:51:04 +0000 Subject: Manually abort the underlying TLS connection. The abort() method calls loseConnection() which tries to shutdown the TLS connection cleanly. We now call abortConnection() directly which should promptly close both the TLS connection and the underlying TCP connection. I also added some TODO markers to consider cancelling the old previous timeout rather than checking time.time(). But given how urgently we want to get this code released I'd rather leave the existing code with the duplicate timeouts and the time.time() check. --- synapse/http/endpoint.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py index 95424481ca..8c64339a7c 100644 --- a/synapse/http/endpoint.py +++ b/synapse/http/endpoint.py @@ -89,8 +89,8 @@ class _WrappingEndpointFac(object): class _WrappedConnection(object): - """Wraps a connection and calls abort on it if it hasn't seen any actio - for 5 minutes + """Wraps a connection and calls abort on it if it hasn't seen any action + for 2.5-3 minutes. """ __slots__ = ["conn", "last_request"] @@ -107,20 +107,28 @@ class _WrappedConnection(object): def _time_things_out_maybe(self): # We use a slightly shorter timeout here just in case the callLater is # triggered early. Paranoia ftw. + # TODO: Cancel the previous callLater rather than comparing time.time()? if time.time() - self.last_request >= 2.5 * 60: self.abort() + # Abort the underlying TLS connection. The abort() method calls + # loseConnection() on the underlying TLS connection which tries to + # shutdown the connection cleanly. We call abortConnection() + # since that will promptly close the underlying TCP connection. + self.transport.abortConnection() def request(self, request): self.last_request = time.time() # Time this connection out if we haven't send a request in the last # N minutes + # TODO: Cancel the previous callLater? reactor.callLater(3 * 60, self._time_things_out_maybe) d = self.conn.request(request) def update_request_time(res): self.last_request = time.time() + # TODO: Cancel the previous callLater? reactor.callLater(3 * 60, self._time_things_out_maybe) return res -- cgit 1.4.1 From f023be9293d7fb8a95f55304579e15681efc5128 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 29 Dec 2016 16:18:04 +0000 Subject: Bump changelog and version --- CHANGES.rst | 7 +++++++ synapse/__init__.py | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/CHANGES.rst b/CHANGES.rst index aafd61ab4a..108f827cf2 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,3 +1,10 @@ +Changes in synapse v0.18.6-rc1 (2016-12-29) +=========================================== + +Bug fixes: + +* Make sure that outbound connections are closed (PR #1725) + Changes in synapse v0.18.5 (2016-12-16) ======================================= diff --git a/synapse/__init__.py b/synapse/__init__.py index f006e10dc5..84592f53ea 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -16,4 +16,4 @@ """ This is a reference implementation of a Matrix home server. """ -__version__ = "0.18.5" +__version__ = "0.18.6-rc1" -- cgit 1.4.1