summary refs log tree commit diff
path: root/synapse/http/matrixfederationclient.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/http/matrixfederationclient.py')
-rw-r--r--synapse/http/matrixfederationclient.py98
1 files changed, 42 insertions, 56 deletions
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index cf920bc041..c49dbacd93 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -26,7 +26,7 @@ from canonicaljson import encode_canonical_json
 from prometheus_client import Counter
 from signedjson.sign import sign_json
 
-from twisted.internet import defer, protocol, reactor
+from twisted.internet import defer, protocol
 from twisted.internet.error import DNSLookupError
 from twisted.web._newclient import ResponseDone
 from twisted.web.client import Agent, HTTPConnectionPool
@@ -40,10 +40,8 @@ from synapse.api.errors import (
     HttpResponseException,
     SynapseError,
 )
-from synapse.http import cancelled_to_request_timed_out_error
 from synapse.http.endpoint import matrix_federation_endpoint
 from synapse.util import logcontext
-from synapse.util.async_helpers import add_timeout_to_deferred
 from synapse.util.logcontext import make_deferred_yieldable
 
 logger = logging.getLogger(__name__)
@@ -66,13 +64,14 @@ else:
 
 class MatrixFederationEndpointFactory(object):
     def __init__(self, hs):
+        self.reactor = hs.get_reactor()
         self.tls_client_options_factory = hs.tls_client_options_factory
 
     def endpointForURI(self, uri):
         destination = uri.netloc.decode('ascii')
 
         return matrix_federation_endpoint(
-            reactor, destination, timeout=10,
+            self.reactor, destination, timeout=10,
             tls_client_options_factory=self.tls_client_options_factory
         )
 
@@ -90,6 +89,7 @@ class MatrixFederationHttpClient(object):
         self.hs = hs
         self.signing_key = hs.config.signing_key[0]
         self.server_name = hs.hostname
+        reactor = hs.get_reactor()
         pool = HTTPConnectionPool(reactor)
         pool.maxPersistentPerHost = 5
         pool.cachedConnectionTimeout = 2 * 60
@@ -100,6 +100,7 @@ class MatrixFederationHttpClient(object):
         self._store = hs.get_datastore()
         self.version_string = hs.version_string.encode('ascii')
         self._next_id = 1
+        self.default_timeout = 60
 
     def _create_url(self, destination, path_bytes, param_bytes, query_bytes):
         return urllib.parse.urlunparse(
@@ -143,6 +144,11 @@ class MatrixFederationHttpClient(object):
             (May also fail with plenty of other Exceptions for things like DNS
                 failures, connection failures, SSL failures.)
         """
+        if timeout:
+            _sec_timeout = timeout / 1000
+        else:
+            _sec_timeout = self.default_timeout
+
         if (
             self.hs.config.federation_domain_whitelist is not None and
             destination not in self.hs.config.federation_domain_whitelist
@@ -215,13 +221,9 @@ class MatrixFederationHttpClient(object):
                         headers=Headers(headers_dict),
                         data=data,
                         agent=self.agent,
+                        reactor=self.hs.get_reactor()
                     )
-                    add_timeout_to_deferred(
-                        request_deferred,
-                        timeout / 1000. if timeout else 60,
-                        self.hs.get_reactor(),
-                        cancelled_to_request_timed_out_error,
-                    )
+                    request_deferred.addTimeout(_sec_timeout, self.hs.get_reactor())
                     response = yield make_deferred_yieldable(
                         request_deferred,
                     )
@@ -261,6 +263,13 @@ class MatrixFederationHttpClient(object):
                             delay = min(delay, 2)
                             delay *= random.uniform(0.8, 1.4)
 
+                        logger.debug(
+                            "{%s} Waiting %s before sending to %s...",
+                            txn_id,
+                            delay,
+                            destination
+                        )
+
                         yield self.clock.sleep(delay)
                         retries_left -= 1
                     else:
@@ -279,10 +288,9 @@ class MatrixFederationHttpClient(object):
                 # :'(
                 # Update transactions table?
                 with logcontext.PreserveLoggingContext():
-                    body = yield self._timeout_deferred(
-                        treq.content(response),
-                        timeout,
-                    )
+                    d = treq.content(response)
+                    d.addTimeout(_sec_timeout, self.hs.get_reactor())
+                    body = yield make_deferred_yieldable(d)
                 raise HttpResponseException(
                     response.code, response.phrase, body
                 )
@@ -396,10 +404,9 @@ class MatrixFederationHttpClient(object):
             check_content_type_is_json(response.headers)
 
         with logcontext.PreserveLoggingContext():
-            body = yield self._timeout_deferred(
-                treq.json_content(response),
-                timeout,
-            )
+            d = treq.json_content(response)
+            d.addTimeout(self.default_timeout, self.hs.get_reactor())
+            body = yield make_deferred_yieldable(d)
         defer.returnValue(body)
 
     @defer.inlineCallbacks
@@ -449,10 +456,14 @@ class MatrixFederationHttpClient(object):
             check_content_type_is_json(response.headers)
 
         with logcontext.PreserveLoggingContext():
-            body = yield self._timeout_deferred(
-                treq.json_content(response),
-                timeout,
-            )
+            d = treq.json_content(response)
+            if timeout:
+                _sec_timeout = timeout / 1000
+            else:
+                _sec_timeout = self.default_timeout
+
+            d.addTimeout(_sec_timeout, self.hs.get_reactor())
+            body = yield make_deferred_yieldable(d)
 
         defer.returnValue(body)
 
@@ -504,10 +515,9 @@ class MatrixFederationHttpClient(object):
             check_content_type_is_json(response.headers)
 
         with logcontext.PreserveLoggingContext():
-            body = yield self._timeout_deferred(
-                treq.json_content(response),
-                timeout,
-            )
+            d = treq.json_content(response)
+            d.addTimeout(self.default_timeout, self.hs.get_reactor())
+            body = yield make_deferred_yieldable(d)
 
         defer.returnValue(body)
 
@@ -554,10 +564,9 @@ class MatrixFederationHttpClient(object):
             check_content_type_is_json(response.headers)
 
         with logcontext.PreserveLoggingContext():
-            body = yield self._timeout_deferred(
-                treq.json_content(response),
-                timeout,
-            )
+            d = treq.json_content(response)
+            d.addTimeout(self.default_timeout, self.hs.get_reactor())
+            body = yield make_deferred_yieldable(d)
 
         defer.returnValue(body)
 
@@ -599,38 +608,15 @@ class MatrixFederationHttpClient(object):
 
         try:
             with logcontext.PreserveLoggingContext():
-                length = yield self._timeout_deferred(
-                    _readBodyToFile(
-                        response, output_stream, max_size
-                    ),
-                )
+                d = _readBodyToFile(response, output_stream, max_size)
+                d.addTimeout(self.default_timeout, self.hs.get_reactor())
+                length = yield make_deferred_yieldable(d)
         except Exception:
             logger.exception("Failed to download body")
             raise
 
         defer.returnValue((length, headers))
 
-    def _timeout_deferred(self, deferred, timeout_ms=None):
-        """Times the deferred out after `timeout_ms` ms
-
-        Args:
-            deferred (Deferred)
-            timeout_ms (int|None): Timeout in milliseconds. If None defaults
-                to 60 seconds.
-
-        Returns:
-            Deferred
-        """
-
-        add_timeout_to_deferred(
-            deferred,
-            timeout_ms / 1000. if timeout_ms else 60,
-            self.hs.get_reactor(),
-            cancelled_to_request_timed_out_error,
-        )
-
-        return deferred
-
 
 class _ReadBodyToFileProtocol(protocol.Protocol):
     def __init__(self, stream, deferred, max_size):