From 0e6b3e4e40ae918eacdef87bb50ff1d19b304e7c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 10 Feb 2015 18:17:27 +0000 Subject: Time out HTTP federation requests --- synapse/http/matrixfederationclient.py | 11 +++++++++-- synapse/util/async.py | 20 ++++++++++++++++++++ 2 files changed, 29 insertions(+), 2 deletions(-) (limited to 'synapse') diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index aa14782b0f..fdc1e2678e 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -22,7 +22,7 @@ from twisted.web._newclient import ResponseDone from synapse.http.agent_name import AGENT_NAME from synapse.http.endpoint import matrix_federation_endpoint -from synapse.util.async import sleep +from synapse.util.async import sleep, time_bound_deferred from synapse.util.logcontext import PreserveLoggingContext from syutil.jsonutil import encode_canonical_json @@ -78,6 +78,7 @@ class MatrixFederationHttpClient(object): self.signing_key = hs.config.signing_key[0] self.server_name = hs.hostname self.agent = MatrixFederationHttpAgent(reactor) + self.clock = hs.get_clock() @defer.inlineCallbacks def _create_request(self, destination, method, path_bytes, @@ -117,7 +118,7 @@ class MatrixFederationHttpClient(object): try: with PreserveLoggingContext(): - response = yield self.agent.request( + request_deferred = self.agent.request( destination, endpoint, method, @@ -128,6 +129,12 @@ class MatrixFederationHttpClient(object): producer ) + response = yield time_bound_deferred( + request_deferred, + clock=self.clock, + time_out=60, + ) + logger.debug("Got response to %s", method) break except Exception as e: diff --git a/synapse/util/async.py b/synapse/util/async.py index c4fe5d522f..d4d1d4b472 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -32,3 +32,23 @@ def run_on_reactor(): iteration of the main loop """ return sleep(0) + + +def time_bound_deferred(given_deferred, clock, time_out): + ret_deferred = defer.Deferred() + + def timed_out(): + if not given_deferred.called: + given_deferred.cancel() + ret_deferred.errback(RuntimeError("Timed out")) + + timer = clock.call_later(time_out, timed_out) + + def succeed(result): + clock.cancel_call_later(timer) + ret_deferred.callback(result) + + given_deferred.addCallback(succeed) + given_deferred.addErrback(ret_deferred.errback) + + return ret_deferred -- cgit 1.4.1 From dcf52469e821b3b2b69a0610c2c4f025a5aac68f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 11 Feb 2015 10:25:06 +0000 Subject: Move time_bound_deferred into Clock --- synapse/http/matrixfederationclient.py | 5 ++--- synapse/util/__init__.py | 21 ++++++++++++++++++++- synapse/util/async.py | 20 -------------------- 3 files changed, 22 insertions(+), 24 deletions(-) (limited to 'synapse') diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index fdc1e2678e..74e523960f 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -22,7 +22,7 @@ from twisted.web._newclient import ResponseDone from synapse.http.agent_name import AGENT_NAME from synapse.http.endpoint import matrix_federation_endpoint -from synapse.util.async import sleep, time_bound_deferred +from synapse.util.async import sleep from synapse.util.logcontext import PreserveLoggingContext from syutil.jsonutil import encode_canonical_json @@ -129,9 +129,8 @@ class MatrixFederationHttpClient(object): producer ) - response = yield time_bound_deferred( + response = yield self.clock.time_bound_deferred( request_deferred, - clock=self.clock, time_out=60, ) diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 4e837a918e..2da8dfa719 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -15,7 +15,7 @@ from synapse.util.logcontext import LoggingContext -from twisted.internet import reactor +from twisted.internet import defer, reactor import time @@ -45,3 +45,22 @@ class Clock(object): def cancel_call_later(self, timer): timer.cancel() + + def time_bound_deferred(self, given_deferred, time_out): + ret_deferred = defer.Deferred() + + def timed_out(): + if not given_deferred.called: + given_deferred.cancel() + ret_deferred.errback(RuntimeError("Timed out")) + + timer = self.call_later(time_out, timed_out) + + def succeed(result): + self.cancel_call_later(timer) + ret_deferred.callback(result) + + given_deferred.addCallback(succeed) + given_deferred.addErrback(ret_deferred.errback) + + return ret_deferred diff --git a/synapse/util/async.py b/synapse/util/async.py index d4d1d4b472..c4fe5d522f 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -32,23 +32,3 @@ def run_on_reactor(): iteration of the main loop """ return sleep(0) - - -def time_bound_deferred(given_deferred, clock, time_out): - ret_deferred = defer.Deferred() - - def timed_out(): - if not given_deferred.called: - given_deferred.cancel() - ret_deferred.errback(RuntimeError("Timed out")) - - timer = clock.call_later(time_out, timed_out) - - def succeed(result): - clock.cancel_call_later(timer) - ret_deferred.callback(result) - - given_deferred.addCallback(succeed) - given_deferred.addErrback(ret_deferred.errback) - - return ret_deferred -- cgit 1.4.1 From 05b961d7e3f6c926520c3200ae3031c77cebc212 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 11 Feb 2015 10:28:46 +0000 Subject: PEP8 --- synapse/http/matrixfederationclient.py | 1 - 1 file changed, 1 deletion(-) (limited to 'synapse') diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 74e523960f..92f887f778 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -72,7 +72,6 @@ class MatrixFederationHttpClient(object): requests. """ - def __init__(self, hs): self.hs = hs self.signing_key = hs.config.signing_key[0] -- cgit 1.4.1 From ef276e8770e19c66d14462aa325b9cb241121bb6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 11 Feb 2015 16:48:05 +0000 Subject: Fix so timing out connections to actually work. --- synapse/federation/replication.py | 2 ++ synapse/util/__init__.py | 52 ++++++++++++++++++++++++++++++++------- 2 files changed, 45 insertions(+), 9 deletions(-) (limited to 'synapse') diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index e442c6c5d5..54a0c7ad8e 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -72,5 +72,7 @@ class ReplicationLayer(FederationClient, FederationServer): self._order = 0 + self.hs = hs + def __str__(self): return "" % self.server_name diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 29f1344c5b..e77eba90ad 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -18,6 +18,9 @@ from synapse.util.logcontext import LoggingContext from twisted.internet import defer, reactor, task import time +import logging + +logger = logging.getLogger(__name__) class Clock(object): @@ -55,20 +58,51 @@ class Clock(object): timer.cancel() def time_bound_deferred(self, given_deferred, time_out): + if given_deferred.called: + return given_deferred + ret_deferred = defer.Deferred() - def timed_out(): - if not given_deferred.called: - given_deferred.cancel() + def timed_out_fn(): + try: ret_deferred.errback(RuntimeError("Timed out")) + except: + pass + + try: + given_deferred.cancel() + except: + pass + + timer = None + + def cancel(res): + try: + self.cancel_call_later(timer) + except: + pass + return res + + ret_deferred.addBoth(cancel) + + def sucess(res): + try: + ret_deferred.callback(res) + except: + pass + + return res + + def err(res): + try: + ret_deferred.errback(res) + except: + pass - timer = self.call_later(time_out, timed_out) + return res - def succeed(result): - self.cancel_call_later(timer) - ret_deferred.callback(result) + given_deferred.addCallbacks(callback=sucess, errback=err) - given_deferred.addCallback(succeed) - given_deferred.addErrback(ret_deferred.errback) + timer = self.call_later(time_out, timed_out_fn) return ret_deferred -- cgit 1.4.1