summary refs log tree commit diff
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2019-01-21 23:29:47 +0000
committerRichard van der Hoff <richard@matrix.org>2019-01-22 20:34:35 +0000
commit7871146667afd76576939c91986a8f8bacb49446 (patch)
tree791355425937f6f01cb42b02dc95aa8527e1504a
parentMatrixFederationAgent (diff)
downloadsynapse-7871146667afd76576939c91986a8f8bacb49446.tar.xz
Make MatrixFederationClient use MatrixFederationAgent
... instead of the matrix_federation_endpoint
-rw-r--r--synapse/http/matrixfederationclient.py37
-rw-r--r--tests/http/test_fedclient.py96
2 files changed, 106 insertions, 27 deletions
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 250bb1ef91..980e912348 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -32,7 +32,7 @@ from twisted.internet import defer, protocol
 from twisted.internet.error import DNSLookupError
 from twisted.internet.task import _EPSILON, Cooperator
 from twisted.web._newclient import ResponseDone
-from twisted.web.client import Agent, FileBodyProducer, HTTPConnectionPool
+from twisted.web.client import FileBodyProducer
 from twisted.web.http_headers import Headers
 
 import synapse.metrics
@@ -44,7 +44,7 @@ from synapse.api.errors import (
     RequestSendFailed,
     SynapseError,
 )
-from synapse.http.endpoint import matrix_federation_endpoint
+from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
 from synapse.util.async_helpers import timeout_deferred
 from synapse.util.logcontext import make_deferred_yieldable
 from synapse.util.metrics import Measure
@@ -66,20 +66,6 @@ else:
     MAXINT = sys.maxint
 
 
-class MatrixFederationEndpointFactory(object):
-    def __init__(self, hs):
-        self.reactor = hs.get_reactor()
-        self.tls_client_options_factory = hs.tls_client_options_factory
-
-    def endpointForURI(self, uri):
-        destination = uri.netloc.decode('ascii')
-
-        return matrix_federation_endpoint(
-            self.reactor, destination, timeout=10,
-            tls_client_options_factory=self.tls_client_options_factory
-        )
-
-
 _next_id = 1
 
 
@@ -187,12 +173,10 @@ class MatrixFederationHttpClient(object):
         self.signing_key = hs.config.signing_key[0]
         self.server_name = hs.hostname
         reactor = hs.get_reactor()
-        pool = HTTPConnectionPool(reactor)
-        pool.retryAutomatically = False
-        pool.maxPersistentPerHost = 5
-        pool.cachedConnectionTimeout = 2 * 60
-        self.agent = Agent.usingEndpointFactory(
-            reactor, MatrixFederationEndpointFactory(hs), pool=pool
+
+        self.agent = MatrixFederationAgent(
+            hs.get_reactor(),
+            hs.tls_client_options_factory,
         )
         self.clock = hs.get_clock()
         self._store = hs.get_datastore()
@@ -316,9 +300,9 @@ class MatrixFederationHttpClient(object):
                     headers_dict[b"Authorization"] = auth_headers
 
                     logger.info(
-                        "{%s} [%s] Sending request: %s %s",
+                        "{%s} [%s] Sending request: %s %s; timeout %fs",
                         request.txn_id, request.destination, request.method,
-                        url_str,
+                        url_str, _sec_timeout,
                     )
 
                     try:
@@ -338,12 +322,11 @@ class MatrixFederationHttpClient(object):
                                 reactor=self.hs.get_reactor(),
                             )
 
-                            response = yield make_deferred_yieldable(
-                                request_deferred,
-                            )
+                            response = yield request_deferred
                     except DNSLookupError as e:
                         raise_from(RequestSendFailed(e, can_retry=retry_on_dns_fail), e)
                     except Exception as e:
+                        logger.info("Failed to send request: %s", e)
                         raise_from(RequestSendFailed(e, can_retry=True), e)
 
                     logger.info(
diff --git a/tests/http/test_fedclient.py b/tests/http/test_fedclient.py
index 8426eee400..d37f8f9981 100644
--- a/tests/http/test_fedclient.py
+++ b/tests/http/test_fedclient.py
@@ -15,6 +15,7 @@
 
 from mock import Mock
 
+from twisted.internet import defer
 from twisted.internet.defer import TimeoutError
 from twisted.internet.error import ConnectingCancelledError, DNSLookupError
 from twisted.test.proto_helpers import StringTransport
@@ -26,11 +27,20 @@ from synapse.http.matrixfederationclient import (
     MatrixFederationHttpClient,
     MatrixFederationRequest,
 )
+from synapse.util.logcontext import LoggingContext
 
 from tests.server import FakeTransport
 from tests.unittest import HomeserverTestCase
 
 
+def check_logcontext(context):
+    current = LoggingContext.current_context()
+    if current is not context:
+        raise AssertionError(
+            "Expected logcontext %s but was %s" % (context, current),
+        )
+
+
 class FederationClientTests(HomeserverTestCase):
     def make_homeserver(self, reactor, clock):
 
@@ -43,6 +53,70 @@ class FederationClientTests(HomeserverTestCase):
         self.cl = MatrixFederationHttpClient(self.hs)
         self.reactor.lookups["testserv"] = "1.2.3.4"
 
+    def test_client_get(self):
+        """
+        happy-path test of a GET request
+        """
+        @defer.inlineCallbacks
+        def do_request():
+            with LoggingContext("one") as context:
+                fetch_d = self.cl.get_json("testserv:8008", "foo/bar")
+
+                # Nothing happened yet
+                self.assertNoResult(fetch_d)
+
+                # should have reset logcontext to the sentinel
+                check_logcontext(LoggingContext.sentinel)
+
+                try:
+                    fetch_res = yield fetch_d
+                    defer.returnValue(fetch_res)
+                finally:
+                    check_logcontext(context)
+
+        test_d = do_request()
+
+        self.pump()
+
+        # Nothing happened yet
+        self.assertNoResult(test_d)
+
+        # Make sure treq is trying to connect
+        clients = self.reactor.tcpClients
+        self.assertEqual(len(clients), 1)
+        (host, port, factory, _timeout, _bindAddress) = clients[0]
+        self.assertEqual(host, '1.2.3.4')
+        self.assertEqual(port, 8008)
+
+        # complete the connection and wire it up to a fake transport
+        protocol = factory.buildProtocol(None)
+        transport = StringTransport()
+        protocol.makeConnection(transport)
+
+        # that should have made it send the request to the transport
+        self.assertRegex(transport.value(), b"^GET /foo/bar")
+
+        # Deferred is still without a result
+        self.assertNoResult(test_d)
+
+        # Send it the HTTP response
+        res_json = '{ "a": 1 }'.encode('ascii')
+        protocol.dataReceived(
+            b"HTTP/1.1 200 OK\r\n"
+            b"Server: Fake\r\n"
+            b"Content-Type: application/json\r\n"
+            b"Content-Length: %i\r\n"
+            b"\r\n"
+            b"%s" % (len(res_json), res_json)
+        )
+
+        self.pump()
+
+        res = self.successResultOf(test_d)
+
+        # check the response is as expected
+        self.assertEqual(res, {"a": 1})
+
     def test_dns_error(self):
         """
         If the DNS lookup returns an error, it will bubble up.
@@ -54,6 +128,28 @@ class FederationClientTests(HomeserverTestCase):
         self.assertIsInstance(f.value, RequestSendFailed)
         self.assertIsInstance(f.value.inner_exception, DNSLookupError)
 
+    def test_client_connection_refused(self):
+        d = self.cl.get_json("testserv:8008", "foo/bar", timeout=10000)
+
+        self.pump()
+
+        # Nothing happened yet
+        self.assertNoResult(d)
+
+        clients = self.reactor.tcpClients
+        self.assertEqual(len(clients), 1)
+        (host, port, factory, _timeout, _bindAddress) = clients[0]
+        self.assertEqual(host, '1.2.3.4')
+        self.assertEqual(port, 8008)
+        e = Exception("go away")
+        factory.clientConnectionFailed(None, e)
+        self.pump(0.5)
+
+        f = self.failureResultOf(d)
+
+        self.assertIsInstance(f.value, RequestSendFailed)
+        self.assertIs(f.value.inner_exception, e)
+
     def test_client_never_connect(self):
         """
         If the HTTP request is not connected and is timed out, it'll give a