summary refs log tree commit diff
path: root/synapse/http
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/http')
-rw-r--r--synapse/http/client.py5
-rw-r--r--synapse/http/endpoint.py280
-rw-r--r--synapse/http/federation/__init__.py14
-rw-r--r--synapse/http/federation/matrix_federation_agent.py125
-rw-r--r--synapse/http/federation/srv_resolver.py169
-rw-r--r--synapse/http/matrixfederationclient.py63
6 files changed, 334 insertions, 322 deletions
diff --git a/synapse/http/client.py b/synapse/http/client.py
index afcf698b29..47a1f82ff0 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -333,9 +333,10 @@ class SimpleHttpClient(object):
             "POST", uri, headers=Headers(actual_headers), data=query_bytes
         )
 
+        body = yield make_deferred_yieldable(readBody(response))
+
         if 200 <= response.code < 300:
-            body = yield make_deferred_yieldable(treq.json_content(response))
-            defer.returnValue(body)
+            defer.returnValue(json.loads(body))
         else:
             raise HttpResponseException(response.code, response.phrase, body)
 
diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py
index f86a0b624e..cd79ebab62 100644
--- a/synapse/http/endpoint.py
+++ b/synapse/http/endpoint.py
@@ -12,30 +12,11 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-import collections
 import logging
-import random
 import re
-import time
-
-from twisted.internet import defer
-from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
-from twisted.internet.error import ConnectError
-from twisted.names import client, dns
-from twisted.names.error import DNSNameError, DomainError
 
 logger = logging.getLogger(__name__)
 
-SERVER_CACHE = {}
-
-# our record of an individual server which can be tried to reach a destination.
-#
-# "host" is the hostname acquired from the SRV record. Except when there's
-# no SRV record, in which case it is the original hostname.
-_Server = collections.namedtuple(
-    "_Server", "priority weight host port expires"
-)
-
 
 def parse_server_name(server_name):
     """Split a server name into host/port parts.
@@ -100,264 +81,3 @@ def parse_and_validate_server_name(server_name):
         ))
 
     return host, port
-
-
-def matrix_federation_endpoint(reactor, destination, tls_client_options_factory=None,
-                               timeout=None):
-    """Construct an endpoint for the given matrix destination.
-
-    Args:
-        reactor: Twisted reactor.
-        destination (unicode): The name of the server to connect to.
-        tls_client_options_factory
-            (synapse.crypto.context_factory.ClientTLSOptionsFactory):
-            Factory which generates TLS options for client connections.
-        timeout (int): connection timeout in seconds
-    """
-
-    domain, port = parse_server_name(destination)
-
-    endpoint_kw_args = {}
-
-    if timeout is not None:
-        endpoint_kw_args.update(timeout=timeout)
-
-    if tls_client_options_factory is None:
-        transport_endpoint = HostnameEndpoint
-        default_port = 8008
-    else:
-        # the SNI string should be the same as the Host header, minus the port.
-        # as per https://github.com/matrix-org/synapse/issues/2525#issuecomment-336896777,
-        # the Host header and SNI should therefore be the server_name of the remote
-        # server.
-        tls_options = tls_client_options_factory.get_options(domain)
-
-        def transport_endpoint(reactor, host, port, timeout):
-            return wrapClientTLS(
-                tls_options,
-                HostnameEndpoint(reactor, host, port, timeout=timeout),
-            )
-        default_port = 8448
-
-    if port is None:
-        return _WrappingEndpointFac(SRVClientEndpoint(
-            reactor, "matrix", domain, protocol="tcp",
-            default_port=default_port, endpoint=transport_endpoint,
-            endpoint_kw_args=endpoint_kw_args
-        ), reactor)
-    else:
-        return _WrappingEndpointFac(transport_endpoint(
-            reactor, domain, port, **endpoint_kw_args
-        ), reactor)
-
-
-class _WrappingEndpointFac(object):
-    def __init__(self, endpoint_fac, reactor):
-        self.endpoint_fac = endpoint_fac
-        self.reactor = reactor
-
-    @defer.inlineCallbacks
-    def connect(self, protocolFactory):
-        conn = yield self.endpoint_fac.connect(protocolFactory)
-        conn = _WrappedConnection(conn, self.reactor)
-        defer.returnValue(conn)
-
-
-class _WrappedConnection(object):
-    """Wraps a connection and calls abort on it if it hasn't seen any action
-    for 2.5-3 minutes.
-    """
-    __slots__ = ["conn", "last_request"]
-
-    def __init__(self, conn, reactor):
-        object.__setattr__(self, "conn", conn)
-        object.__setattr__(self, "last_request", time.time())
-        self._reactor = reactor
-
-    def __getattr__(self, name):
-        return getattr(self.conn, name)
-
-    def __setattr__(self, name, value):
-        setattr(self.conn, name, value)
-
-    def _time_things_out_maybe(self):
-        # We use a slightly shorter timeout here just in case the callLater is
-        # triggered early. Paranoia ftw.
-        # TODO: Cancel the previous callLater rather than comparing time.time()?
-        if time.time() - self.last_request >= 2.5 * 60:
-            self.abort()
-            # Abort the underlying TLS connection. The abort() method calls
-            # loseConnection() on the TLS connection which tries to
-            # shutdown the connection cleanly. We call abortConnection()
-            # since that will promptly close the TLS connection.
-            #
-            # In Twisted >18.4; the TLS connection will be None if it has closed
-            # which will make abortConnection() throw. Check that the TLS connection
-            # is not None before trying to close it.
-            if self.transport.getHandle() is not None:
-                self.transport.abortConnection()
-
-    def request(self, request):
-        self.last_request = time.time()
-
-        # Time this connection out if we haven't send a request in the last
-        # N minutes
-        # TODO: Cancel the previous callLater?
-        self._reactor.callLater(3 * 60, self._time_things_out_maybe)
-
-        d = self.conn.request(request)
-
-        def update_request_time(res):
-            self.last_request = time.time()
-            # TODO: Cancel the previous callLater?
-            self._reactor.callLater(3 * 60, self._time_things_out_maybe)
-            return res
-
-        d.addCallback(update_request_time)
-
-        return d
-
-
-class SRVClientEndpoint(object):
-    """An endpoint which looks up SRV records for a service.
-    Cycles through the list of servers starting with each call to connect
-    picking the next server.
-    Implements twisted.internet.interfaces.IStreamClientEndpoint.
-    """
-
-    def __init__(self, reactor, service, domain, protocol="tcp",
-                 default_port=None, endpoint=HostnameEndpoint,
-                 endpoint_kw_args={}):
-        self.reactor = reactor
-        self.service_name = "_%s._%s.%s" % (service, protocol, domain)
-
-        if default_port is not None:
-            self.default_server = _Server(
-                host=domain,
-                port=default_port,
-                priority=0,
-                weight=0,
-                expires=0,
-            )
-        else:
-            self.default_server = None
-
-        self.endpoint = endpoint
-        self.endpoint_kw_args = endpoint_kw_args
-
-        self.servers = None
-        self.used_servers = None
-
-    @defer.inlineCallbacks
-    def fetch_servers(self):
-        self.used_servers = []
-        self.servers = yield resolve_service(self.service_name)
-
-    def pick_server(self):
-        if not self.servers:
-            if self.used_servers:
-                self.servers = self.used_servers
-                self.used_servers = []
-                self.servers.sort()
-            elif self.default_server:
-                return self.default_server
-            else:
-                raise ConnectError(
-                    "No server available for %s" % self.service_name
-                )
-
-        # look for all servers with the same priority
-        min_priority = self.servers[0].priority
-        weight_indexes = list(
-            (index, server.weight + 1)
-            for index, server in enumerate(self.servers)
-            if server.priority == min_priority
-        )
-
-        total_weight = sum(weight for index, weight in weight_indexes)
-        target_weight = random.randint(0, total_weight)
-        for index, weight in weight_indexes:
-            target_weight -= weight
-            if target_weight <= 0:
-                server = self.servers[index]
-                # XXX: this looks totally dubious:
-                #
-                # (a) we never reuse a server until we have been through
-                #     all of the servers at the same priority, so if the
-                #     weights are A: 100, B:1, we always do ABABAB instead of
-                #     AAAA...AAAB (approximately).
-                #
-                # (b) After using all the servers at the lowest priority,
-                #     we move onto the next priority. We should only use the
-                #     second priority if servers at the top priority are
-                #     unreachable.
-                #
-                del self.servers[index]
-                self.used_servers.append(server)
-                return server
-
-    @defer.inlineCallbacks
-    def connect(self, protocolFactory):
-        if self.servers is None:
-            yield self.fetch_servers()
-        server = self.pick_server()
-        logger.info("Connecting to %s:%s", server.host, server.port)
-        endpoint = self.endpoint(
-            self.reactor, server.host, server.port, **self.endpoint_kw_args
-        )
-        connection = yield endpoint.connect(protocolFactory)
-        defer.returnValue(connection)
-
-
-@defer.inlineCallbacks
-def resolve_service(service_name, dns_client=client, cache=SERVER_CACHE, clock=time):
-    cache_entry = cache.get(service_name, None)
-    if cache_entry:
-        if all(s.expires > int(clock.time()) for s in cache_entry):
-            servers = list(cache_entry)
-            defer.returnValue(servers)
-
-    servers = []
-
-    try:
-        try:
-            answers, _, _ = yield dns_client.lookupService(service_name)
-        except DNSNameError:
-            defer.returnValue([])
-
-        if (len(answers) == 1
-                and answers[0].type == dns.SRV
-                and answers[0].payload
-                and answers[0].payload.target == dns.Name(b'.')):
-            raise ConnectError("Service %s unavailable" % service_name)
-
-        for answer in answers:
-            if answer.type != dns.SRV or not answer.payload:
-                continue
-
-            payload = answer.payload
-
-            servers.append(_Server(
-                host=str(payload.target),
-                port=int(payload.port),
-                priority=int(payload.priority),
-                weight=int(payload.weight),
-                expires=int(clock.time()) + answer.ttl,
-            ))
-
-        servers.sort()
-        cache[service_name] = list(servers)
-    except DomainError as e:
-        # We failed to resolve the name (other than a NameError)
-        # Try something in the cache, else rereaise
-        cache_entry = cache.get(service_name, None)
-        if cache_entry:
-            logger.warn(
-                "Failed to resolve %r, falling back to cache. %r",
-                service_name, e
-            )
-            servers = list(cache_entry)
-        else:
-            raise e
-
-    defer.returnValue(servers)
diff --git a/synapse/http/federation/__init__.py b/synapse/http/federation/__init__.py
new file mode 100644
index 0000000000..1453d04571
--- /dev/null
+++ b/synapse/http/federation/__init__.py
@@ -0,0 +1,14 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py
new file mode 100644
index 0000000000..0ec28c6696
--- /dev/null
+++ b/synapse/http/federation/matrix_federation_agent.py
@@ -0,0 +1,125 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+
+from zope.interface import implementer
+
+from twisted.internet import defer
+from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
+from twisted.web.client import URI, Agent, HTTPConnectionPool
+from twisted.web.iweb import IAgent
+
+from synapse.http.endpoint import parse_server_name
+from synapse.http.federation.srv_resolver import SrvResolver, pick_server_from_list
+from synapse.util.logcontext import make_deferred_yieldable
+
+logger = logging.getLogger(__name__)
+
+
+@implementer(IAgent)
+class MatrixFederationAgent(object):
+    """An Agent-like thing which provides a `request` method which will look up a matrix
+    server and send an HTTP request to it.
+
+    Doesn't implement any retries. (Those are done in MatrixFederationHttpClient.)
+
+    Args:
+        reactor (IReactor): twisted reactor to use for underlying requests
+
+        tls_client_options_factory (ClientTLSOptionsFactory|None):
+            factory to use for fetching client tls options, or none to disable TLS.
+
+        srv_resolver (SrvResolver|None):
+            SRVResolver impl to use for looking up SRV records. None to use a default
+            implementation.
+    """
+
+    def __init__(
+        self, reactor, tls_client_options_factory, _srv_resolver=None,
+    ):
+        self._reactor = reactor
+        self._tls_client_options_factory = tls_client_options_factory
+        if _srv_resolver is None:
+            _srv_resolver = SrvResolver()
+        self._srv_resolver = _srv_resolver
+
+        self._pool = HTTPConnectionPool(reactor)
+        self._pool.retryAutomatically = False
+        self._pool.maxPersistentPerHost = 5
+        self._pool.cachedConnectionTimeout = 2 * 60
+
+    @defer.inlineCallbacks
+    def request(self, method, uri, headers=None, bodyProducer=None):
+        """
+        Args:
+            method (bytes): HTTP method: GET/POST/etc
+
+            uri (bytes): Absolute URI to be retrieved
+
+            headers (twisted.web.http_headers.Headers|None):
+                HTTP headers to send with the request, or None to
+                send no extra headers.
+
+            bodyProducer (twisted.web.iweb.IBodyProducer|None):
+                An object which can generate bytes to make up the
+                body of this request (for example, the properly encoded contents of
+                a file for a file upload).  Or None if the request is to have
+                no body.
+
+        Returns:
+            Deferred[twisted.web.iweb.IResponse]:
+                fires when the header of the response has been received (regardless of the
+                response status code). Fails if there is any problem which prevents that
+                response from being received (including problems that prevent the request
+                from being sent).
+        """
+
+        parsed_uri = URI.fromBytes(uri)
+        server_name_bytes = parsed_uri.netloc
+        host, port = parse_server_name(server_name_bytes.decode("ascii"))
+
+        # XXX disabling TLS is really only supported here for the benefit of the
+        # unit tests. We should make the UTs cope with TLS rather than having to make
+        # the code support the unit tests.
+        if self._tls_client_options_factory is None:
+            tls_options = None
+        else:
+            tls_options = self._tls_client_options_factory.get_options(host)
+
+        if port is not None:
+            target = (host, port)
+        else:
+            service_name = b"_matrix._tcp.%s" % (server_name_bytes, )
+            server_list = yield self._srv_resolver.resolve_service(service_name)
+            if not server_list:
+                target = (host, 8448)
+                logger.debug("No SRV record for %s, using %s", host, target)
+            else:
+                target = pick_server_from_list(server_list)
+
+        class EndpointFactory(object):
+            @staticmethod
+            def endpointForURI(_uri):
+                logger.info("Connecting to %s:%s", target[0], target[1])
+                ep = HostnameEndpoint(self._reactor, host=target[0], port=target[1])
+                if tls_options is not None:
+                    ep = wrapClientTLS(tls_options, ep)
+                return ep
+
+        agent = Agent.usingEndpointFactory(self._reactor, EndpointFactory(), self._pool)
+        res = yield make_deferred_yieldable(
+            agent.request(method, uri, headers, bodyProducer)
+        )
+        defer.returnValue(res)
diff --git a/synapse/http/federation/srv_resolver.py b/synapse/http/federation/srv_resolver.py
new file mode 100644
index 0000000000..71830c549d
--- /dev/null
+++ b/synapse/http/federation/srv_resolver.py
@@ -0,0 +1,169 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import random
+import time
+
+import attr
+
+from twisted.internet import defer
+from twisted.internet.error import ConnectError
+from twisted.names import client, dns
+from twisted.names.error import DNSNameError, DomainError
+
+from synapse.util.logcontext import make_deferred_yieldable
+
+logger = logging.getLogger(__name__)
+
+SERVER_CACHE = {}
+
+
+@attr.s
+class Server(object):
+    """
+    Our record of an individual server which can be tried to reach a destination.
+
+    Attributes:
+        host (bytes): target hostname
+        port (int):
+        priority (int):
+        weight (int):
+        expires (int): when the cache should expire this record - in *seconds* since
+            the epoch
+    """
+    host = attr.ib()
+    port = attr.ib()
+    priority = attr.ib(default=0)
+    weight = attr.ib(default=0)
+    expires = attr.ib(default=0)
+
+
+def pick_server_from_list(server_list):
+    """Randomly choose a server from the server list
+
+    Args:
+        server_list (list[Server]): list of candidate servers
+
+    Returns:
+        Tuple[bytes, int]: (host, port) pair for the chosen server
+    """
+    if not server_list:
+        raise RuntimeError("pick_server_from_list called with empty list")
+
+    # TODO: currently we only use the lowest-priority servers. We should maintain a
+    # cache of servers known to be "down" and filter them out
+
+    min_priority = min(s.priority for s in server_list)
+    eligible_servers = list(s for s in server_list if s.priority == min_priority)
+    total_weight = sum(s.weight for s in eligible_servers)
+    target_weight = random.randint(0, total_weight)
+
+    for s in eligible_servers:
+        target_weight -= s.weight
+
+        if target_weight <= 0:
+            return s.host, s.port
+
+    # this should be impossible.
+    raise RuntimeError(
+        "pick_server_from_list got to end of eligible server list.",
+    )
+
+
+class SrvResolver(object):
+    """Interface to the dns client to do SRV lookups, with result caching.
+
+    The default resolver in twisted.names doesn't do any caching (it has a CacheResolver,
+    but the cache never gets populated), so we add our own caching layer here.
+
+    Args:
+        dns_client (twisted.internet.interfaces.IResolver): twisted resolver impl
+        cache (dict): cache object
+        get_time (callable): clock implementation. Should return seconds since the epoch
+    """
+    def __init__(self, dns_client=client, cache=SERVER_CACHE, get_time=time.time):
+        self._dns_client = dns_client
+        self._cache = cache
+        self._get_time = get_time
+
+    @defer.inlineCallbacks
+    def resolve_service(self, service_name):
+        """Look up a SRV record
+
+        Args:
+            service_name (bytes): record to look up
+
+        Returns:
+            Deferred[list[Server]]:
+                a list of the SRV records, or an empty list if none found
+        """
+        now = int(self._get_time())
+
+        if not isinstance(service_name, bytes):
+            raise TypeError("%r is not a byte string" % (service_name,))
+
+        cache_entry = self._cache.get(service_name, None)
+        if cache_entry:
+            if all(s.expires > now for s in cache_entry):
+                servers = list(cache_entry)
+                defer.returnValue(servers)
+
+        try:
+            answers, _, _ = yield make_deferred_yieldable(
+                self._dns_client.lookupService(service_name),
+            )
+        except DNSNameError:
+            # TODO: cache this. We can get the SOA out of the exception, and use
+            # the negative-TTL value.
+            defer.returnValue([])
+        except DomainError as e:
+            # We failed to resolve the name (other than a NameError)
+            # Try something in the cache, else rereaise
+            cache_entry = self._cache.get(service_name, None)
+            if cache_entry:
+                logger.warn(
+                    "Failed to resolve %r, falling back to cache. %r",
+                    service_name, e
+                )
+                defer.returnValue(list(cache_entry))
+            else:
+                raise e
+
+        if (len(answers) == 1
+                and answers[0].type == dns.SRV
+                and answers[0].payload
+                and answers[0].payload.target == dns.Name(b'.')):
+            raise ConnectError("Service %s unavailable" % service_name)
+
+        servers = []
+
+        for answer in answers:
+            if answer.type != dns.SRV or not answer.payload:
+                continue
+
+            payload = answer.payload
+
+            servers.append(Server(
+                host=payload.target.name,
+                port=payload.port,
+                priority=payload.priority,
+                weight=payload.weight,
+                expires=now + answer.ttl,
+            ))
+
+        self._cache[service_name] = list(servers)
+        defer.returnValue(servers)
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index ea2fc64b99..980e912348 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -32,7 +32,7 @@ from twisted.internet import defer, protocol
 from twisted.internet.error import DNSLookupError
 from twisted.internet.task import _EPSILON, Cooperator
 from twisted.web._newclient import ResponseDone
-from twisted.web.client import Agent, FileBodyProducer, HTTPConnectionPool
+from twisted.web.client import FileBodyProducer
 from twisted.web.http_headers import Headers
 
 import synapse.metrics
@@ -44,7 +44,7 @@ from synapse.api.errors import (
     RequestSendFailed,
     SynapseError,
 )
-from synapse.http.endpoint import matrix_federation_endpoint
+from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
 from synapse.util.async_helpers import timeout_deferred
 from synapse.util.logcontext import make_deferred_yieldable
 from synapse.util.metrics import Measure
@@ -66,20 +66,6 @@ else:
     MAXINT = sys.maxint
 
 
-class MatrixFederationEndpointFactory(object):
-    def __init__(self, hs):
-        self.reactor = hs.get_reactor()
-        self.tls_client_options_factory = hs.tls_client_options_factory
-
-    def endpointForURI(self, uri):
-        destination = uri.netloc.decode('ascii')
-
-        return matrix_federation_endpoint(
-            self.reactor, destination, timeout=10,
-            tls_client_options_factory=self.tls_client_options_factory
-        )
-
-
 _next_id = 1
 
 
@@ -187,12 +173,10 @@ class MatrixFederationHttpClient(object):
         self.signing_key = hs.config.signing_key[0]
         self.server_name = hs.hostname
         reactor = hs.get_reactor()
-        pool = HTTPConnectionPool(reactor)
-        pool.retryAutomatically = False
-        pool.maxPersistentPerHost = 5
-        pool.cachedConnectionTimeout = 2 * 60
-        self.agent = Agent.usingEndpointFactory(
-            reactor, MatrixFederationEndpointFactory(hs), pool=pool
+
+        self.agent = MatrixFederationAgent(
+            hs.get_reactor(),
+            hs.tls_client_options_factory,
         )
         self.clock = hs.get_clock()
         self._store = hs.get_datastore()
@@ -316,34 +300,33 @@ class MatrixFederationHttpClient(object):
                     headers_dict[b"Authorization"] = auth_headers
 
                     logger.info(
-                        "{%s} [%s] Sending request: %s %s",
+                        "{%s} [%s] Sending request: %s %s; timeout %fs",
                         request.txn_id, request.destination, request.method,
-                        url_str,
-                    )
-
-                    # we don't want all the fancy cookie and redirect handling that
-                    # treq.request gives: just use the raw Agent.
-                    request_deferred = self.agent.request(
-                        method_bytes,
-                        url_bytes,
-                        headers=Headers(headers_dict),
-                        bodyProducer=producer,
-                    )
-
-                    request_deferred = timeout_deferred(
-                        request_deferred,
-                        timeout=_sec_timeout,
-                        reactor=self.hs.get_reactor(),
+                        url_str, _sec_timeout,
                     )
 
                     try:
                         with Measure(self.clock, "outbound_request"):
-                            response = yield make_deferred_yieldable(
+                            # we don't want all the fancy cookie and redirect handling
+                            # that treq.request gives: just use the raw Agent.
+                            request_deferred = self.agent.request(
+                                method_bytes,
+                                url_bytes,
+                                headers=Headers(headers_dict),
+                                bodyProducer=producer,
+                            )
+
+                            request_deferred = timeout_deferred(
                                 request_deferred,
+                                timeout=_sec_timeout,
+                                reactor=self.hs.get_reactor(),
                             )
+
+                            response = yield request_deferred
                     except DNSLookupError as e:
                         raise_from(RequestSendFailed(e, can_retry=retry_on_dns_fail), e)
                     except Exception as e:
+                        logger.info("Failed to send request: %s", e)
                         raise_from(RequestSendFailed(e, can_retry=True), e)
 
                     logger.info(