diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index e442c6c5d5..54a0c7ad8e 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -72,5 +72,7 @@ class ReplicationLayer(FederationClient, FederationServer):
self._order = 0
+ self.hs = hs
+
def __str__(self):
return "<ReplicationLayer(%s)>" % self.server_name
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 406203acf2..1927948001 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -79,6 +79,7 @@ class MatrixFederationHttpClient(object):
self.signing_key = hs.config.signing_key[0]
self.server_name = hs.hostname
self.agent = MatrixFederationHttpAgent(reactor)
+ self.clock = hs.get_clock()
@defer.inlineCallbacks
def _create_request(self, destination, method, path_bytes,
@@ -118,7 +119,7 @@ class MatrixFederationHttpClient(object):
try:
with PreserveLoggingContext():
- response = yield self.agent.request(
+ request_deferred = self.agent.request(
destination,
endpoint,
method,
@@ -129,6 +130,11 @@ class MatrixFederationHttpClient(object):
producer
)
+ response = yield self.clock.time_bound_deferred(
+ request_deferred,
+ time_out=60,
+ )
+
logger.debug("Got response to %s", method)
break
except Exception as e:
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index fee76b0a9b..e77eba90ad 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -15,9 +15,12 @@
from synapse.util.logcontext import LoggingContext
-from twisted.internet import reactor, task
+from twisted.internet import defer, reactor, task
import time
+import logging
+
+logger = logging.getLogger(__name__)
class Clock(object):
@@ -53,3 +56,53 @@ class Clock(object):
def cancel_call_later(self, timer):
timer.cancel()
+
+ def time_bound_deferred(self, given_deferred, time_out):
+ if given_deferred.called:
+ return given_deferred
+
+ ret_deferred = defer.Deferred()
+
+ def timed_out_fn():
+ try:
+ ret_deferred.errback(RuntimeError("Timed out"))
+ except:
+ pass
+
+ try:
+ given_deferred.cancel()
+ except:
+ pass
+
+ timer = None
+
+ def cancel(res):
+ try:
+ self.cancel_call_later(timer)
+ except:
+ pass
+ return res
+
+ ret_deferred.addBoth(cancel)
+
+ def sucess(res):
+ try:
+ ret_deferred.callback(res)
+ except:
+ pass
+
+ return res
+
+ def err(res):
+ try:
+ ret_deferred.errback(res)
+ except:
+ pass
+
+ return res
+
+ given_deferred.addCallbacks(callback=sucess, errback=err)
+
+ timer = self.call_later(time_out, timed_out_fn)
+
+ return ret_deferred
|