summary refs log tree commit diff
path: root/synapse/http
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/http/__init__.py4
-rw-r--r--synapse/http/client.py96
-rw-r--r--synapse/http/endpoint.py13
-rw-r--r--synapse/http/matrixfederationclient.py590
-rw-r--r--synapse/http/request_metrics.py22
-rw-r--r--synapse/http/server.py49
-rw-r--r--synapse/http/site.py40
7 files changed, 472 insertions, 342 deletions
diff --git a/synapse/http/__init__.py b/synapse/http/__init__.py
index 58ef8d3ce4..a3f9e4f67c 100644
--- a/synapse/http/__init__.py
+++ b/synapse/http/__init__.py
@@ -38,12 +38,12 @@ def cancelled_to_request_timed_out_error(value, timeout):
     return value
 
 
-ACCESS_TOKEN_RE = re.compile(br'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$')
+ACCESS_TOKEN_RE = re.compile(r'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$')
 
 
 def redact_uri(uri):
     """Strips access tokens from the uri replaces with <redacted>"""
     return ACCESS_TOKEN_RE.sub(
-        br'\1<redacted>\3',
+        r'\1<redacted>\3',
         uri
     )
diff --git a/synapse/http/client.py b/synapse/http/client.py
index ab4fbf59b2..3d05f83b8c 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -13,24 +13,25 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
 import logging
-import urllib
 
-from six import StringIO
+from six import text_type
+from six.moves import urllib
 
+import treq
 from canonicaljson import encode_canonical_json, json
 from prometheus_client import Counter
 
 from OpenSSL import SSL
 from OpenSSL.SSL import VERIFY_NONE
-from twisted.internet import defer, protocol, reactor, ssl, task
+from twisted.internet import defer, protocol, reactor, ssl
 from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
 from twisted.web._newclient import ResponseDone
 from twisted.web.client import (
     Agent,
     BrowserLikeRedirectAgent,
     ContentDecoderAgent,
-    FileBodyProducer as TwistedFileBodyProducer,
     GzipDecoder,
     HTTPConnectionPool,
     PartialDownloadError,
@@ -42,7 +43,7 @@ from twisted.web.http_headers import Headers
 from synapse.api.errors import Codes, HttpResponseException, SynapseError
 from synapse.http import cancelled_to_request_timed_out_error, redact_uri
 from synapse.http.endpoint import SpiderEndpoint
-from synapse.util.async_helpers import add_timeout_to_deferred
+from synapse.util.async_helpers import timeout_deferred
 from synapse.util.caches import CACHE_SIZE_FACTOR
 from synapse.util.logcontext import make_deferred_yieldable
 
@@ -83,8 +84,10 @@ class SimpleHttpClient(object):
         if hs.config.user_agent_suffix:
             self.user_agent = "%s %s" % (self.user_agent, hs.config.user_agent_suffix,)
 
+        self.user_agent = self.user_agent.encode('ascii')
+
     @defer.inlineCallbacks
-    def request(self, method, uri, *args, **kwargs):
+    def request(self, method, uri, data=b'', headers=None):
         # A small wrapper around self.agent.request() so we can easily attach
         # counters to it
         outgoing_requests_counter.labels(method).inc()
@@ -93,10 +96,10 @@ class SimpleHttpClient(object):
         logger.info("Sending request %s %s", method, redact_uri(uri))
 
         try:
-            request_deferred = self.agent.request(
-                method, uri, *args, **kwargs
+            request_deferred = treq.request(
+                method, uri, agent=self.agent, data=data, headers=headers
             )
-            add_timeout_to_deferred(
+            request_deferred = timeout_deferred(
                 request_deferred, 60, self.hs.get_reactor(),
                 cancelled_to_request_timed_out_error,
             )
@@ -112,7 +115,7 @@ class SimpleHttpClient(object):
             incoming_responses_counter.labels(method, "ERR").inc()
             logger.info(
                 "Error sending request to  %s %s: %s %s",
-                method, redact_uri(uri), type(e).__name__, e.message
+                method, redact_uri(uri), type(e).__name__, e.args[0]
             )
             raise
 
@@ -137,7 +140,8 @@ class SimpleHttpClient(object):
         # TODO: Do we ever want to log message contents?
         logger.debug("post_urlencoded_get_json args: %s", args)
 
-        query_bytes = urllib.urlencode(encode_urlencode_args(args), True)
+        query_bytes = urllib.parse.urlencode(
+            encode_urlencode_args(args), True).encode("utf8")
 
         actual_headers = {
             b"Content-Type": [b"application/x-www-form-urlencoded"],
@@ -148,15 +152,14 @@ class SimpleHttpClient(object):
 
         response = yield self.request(
             "POST",
-            uri.encode("ascii"),
+            uri,
             headers=Headers(actual_headers),
-            bodyProducer=FileBodyProducer(StringIO(query_bytes))
+            data=query_bytes
         )
 
-        body = yield make_deferred_yieldable(readBody(response))
-
         if 200 <= response.code < 300:
-            defer.returnValue(json.loads(body))
+            body = yield make_deferred_yieldable(treq.json_content(response))
+            defer.returnValue(body)
         else:
             raise HttpResponseException(response.code, response.phrase, body)
 
@@ -191,9 +194,9 @@ class SimpleHttpClient(object):
 
         response = yield self.request(
             "POST",
-            uri.encode("ascii"),
+            uri,
             headers=Headers(actual_headers),
-            bodyProducer=FileBodyProducer(StringIO(json_str))
+            data=json_str
         )
 
         body = yield make_deferred_yieldable(readBody(response))
@@ -248,7 +251,7 @@ class SimpleHttpClient(object):
             ValueError: if the response was not JSON
         """
         if len(args):
-            query_bytes = urllib.urlencode(args, True)
+            query_bytes = urllib.parse.urlencode(args, True)
             uri = "%s?%s" % (uri, query_bytes)
 
         json_str = encode_canonical_json(json_body)
@@ -262,9 +265,9 @@ class SimpleHttpClient(object):
 
         response = yield self.request(
             "PUT",
-            uri.encode("ascii"),
+            uri,
             headers=Headers(actual_headers),
-            bodyProducer=FileBodyProducer(StringIO(json_str))
+            data=json_str
         )
 
         body = yield make_deferred_yieldable(readBody(response))
@@ -293,7 +296,7 @@ class SimpleHttpClient(object):
             HttpResponseException on a non-2xx HTTP response.
         """
         if len(args):
-            query_bytes = urllib.urlencode(args, True)
+            query_bytes = urllib.parse.urlencode(args, True)
             uri = "%s?%s" % (uri, query_bytes)
 
         actual_headers = {
@@ -304,7 +307,7 @@ class SimpleHttpClient(object):
 
         response = yield self.request(
             "GET",
-            uri.encode("ascii"),
+            uri,
             headers=Headers(actual_headers),
         )
 
@@ -339,13 +342,14 @@ class SimpleHttpClient(object):
 
         response = yield self.request(
             "GET",
-            url.encode("ascii"),
+            url,
             headers=Headers(actual_headers),
         )
 
         resp_headers = dict(response.headers.getAllRawHeaders())
 
-        if 'Content-Length' in resp_headers and resp_headers['Content-Length'] > max_size:
+        if (b'Content-Length' in resp_headers and
+                int(resp_headers[b'Content-Length']) > max_size):
             logger.warn("Requested URL is too large > %r bytes" % (self.max_size,))
             raise SynapseError(
                 502,
@@ -378,7 +382,12 @@ class SimpleHttpClient(object):
             )
 
         defer.returnValue(
-            (length, resp_headers, response.request.absoluteURI, response.code),
+            (
+                length,
+                resp_headers,
+                response.request.absoluteURI.decode('ascii'),
+                response.code,
+            ),
         )
 
 
@@ -434,12 +443,12 @@ class CaptchaServerHttpClient(SimpleHttpClient):
 
     @defer.inlineCallbacks
     def post_urlencoded_get_raw(self, url, args={}):
-        query_bytes = urllib.urlencode(encode_urlencode_args(args), True)
+        query_bytes = urllib.parse.urlencode(encode_urlencode_args(args), True)
 
         response = yield self.request(
             "POST",
-            url.encode("ascii"),
-            bodyProducer=FileBodyProducer(StringIO(query_bytes)),
+            url,
+            data=query_bytes,
             headers=Headers({
                 b"Content-Type": [b"application/x-www-form-urlencoded"],
                 b"User-Agent": [self.user_agent],
@@ -463,9 +472,9 @@ class SpiderEndpointFactory(object):
     def endpointForURI(self, uri):
         logger.info("Getting endpoint for %s", uri.toBytes())
 
-        if uri.scheme == "http":
+        if uri.scheme == b"http":
             endpoint_factory = HostnameEndpoint
-        elif uri.scheme == "https":
+        elif uri.scheme == b"https":
             tlsCreator = self.policyForHTTPS.creatorForNetloc(uri.host, uri.port)
 
             def endpoint_factory(reactor, host, port, **kw):
@@ -510,7 +519,7 @@ def encode_urlencode_args(args):
 
 
 def encode_urlencode_arg(arg):
-    if isinstance(arg, unicode):
+    if isinstance(arg, text_type):
         return arg.encode('utf-8')
     elif isinstance(arg, list):
         return [encode_urlencode_arg(i) for i in arg]
@@ -542,26 +551,3 @@ class InsecureInterceptableContextFactory(ssl.ContextFactory):
 
     def creatorForNetloc(self, hostname, port):
         return self
-
-
-class FileBodyProducer(TwistedFileBodyProducer):
-    """Workaround for https://twistedmatrix.com/trac/ticket/8473
-
-    We override the pauseProducing and resumeProducing methods in twisted's
-    FileBodyProducer so that they do not raise exceptions if the task has
-    already completed.
-    """
-
-    def pauseProducing(self):
-        try:
-            super(FileBodyProducer, self).pauseProducing()
-        except task.TaskDone:
-            # task has already completed
-            pass
-
-    def resumeProducing(self):
-        try:
-            super(FileBodyProducer, self).resumeProducing()
-        except task.NotPaused:
-            # task was not paused (probably because it had already completed)
-            pass
diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py
index b0c9369519..91025037a3 100644
--- a/synapse/http/endpoint.py
+++ b/synapse/http/endpoint.py
@@ -108,7 +108,7 @@ def matrix_federation_endpoint(reactor, destination, tls_client_options_factory=
 
     Args:
         reactor: Twisted reactor.
-        destination (bytes): The name of the server to connect to.
+        destination (unicode): The name of the server to connect to.
         tls_client_options_factory
             (synapse.crypto.context_factory.ClientTLSOptionsFactory):
             Factory which generates TLS options for client connections.
@@ -126,10 +126,17 @@ def matrix_federation_endpoint(reactor, destination, tls_client_options_factory=
         transport_endpoint = HostnameEndpoint
         default_port = 8008
     else:
+        # the SNI string should be the same as the Host header, minus the port.
+        # as per https://github.com/matrix-org/synapse/issues/2525#issuecomment-336896777,
+        # the Host header and SNI should therefore be the server_name of the remote
+        # server.
+        tls_options = tls_client_options_factory.get_options(domain)
+
         def transport_endpoint(reactor, host, port, timeout):
             return wrapClientTLS(
-                tls_client_options_factory.get_options(host),
-                HostnameEndpoint(reactor, host, port, timeout=timeout))
+                tls_options,
+                HostnameEndpoint(reactor, host, port, timeout=timeout),
+            )
         default_port = 8448
 
     if port is None:
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index b34bb8e31a..14b12cd1c4 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -17,19 +17,22 @@ import cgi
 import logging
 import random
 import sys
-import urllib
+from io import BytesIO
 
-from six import string_types
-from six.moves.urllib import parse as urlparse
+from six import PY3, string_types
+from six.moves import urllib
 
-from canonicaljson import encode_canonical_json, json
+import attr
+import treq
+from canonicaljson import encode_canonical_json
 from prometheus_client import Counter
 from signedjson.sign import sign_json
 
-from twisted.internet import defer, protocol, reactor
+from twisted.internet import defer, protocol
 from twisted.internet.error import DNSLookupError
+from twisted.internet.task import _EPSILON, Cooperator
 from twisted.web._newclient import ResponseDone
-from twisted.web.client import Agent, HTTPConnectionPool, readBody
+from twisted.web.client import Agent, FileBodyProducer, HTTPConnectionPool
 from twisted.web.http_headers import Headers
 
 import synapse.metrics
@@ -40,14 +43,12 @@ from synapse.api.errors import (
     HttpResponseException,
     SynapseError,
 )
-from synapse.http import cancelled_to_request_timed_out_error
 from synapse.http.endpoint import matrix_federation_endpoint
-from synapse.util import logcontext
-from synapse.util.async_helpers import add_timeout_to_deferred
+from synapse.util.async_helpers import timeout_deferred
 from synapse.util.logcontext import make_deferred_yieldable
+from synapse.util.metrics import Measure
 
 logger = logging.getLogger(__name__)
-outbound_logger = logging.getLogger("synapse.http.outbound")
 
 outgoing_requests_counter = Counter("synapse_http_matrixfederationclient_requests",
                                     "", ["method"])
@@ -58,20 +59,119 @@ incoming_responses_counter = Counter("synapse_http_matrixfederationclient_respon
 MAX_LONG_RETRIES = 10
 MAX_SHORT_RETRIES = 3
 
+if PY3:
+    MAXINT = sys.maxsize
+else:
+    MAXINT = sys.maxint
+
 
 class MatrixFederationEndpointFactory(object):
     def __init__(self, hs):
+        self.reactor = hs.get_reactor()
         self.tls_client_options_factory = hs.tls_client_options_factory
 
     def endpointForURI(self, uri):
-        destination = uri.netloc
+        destination = uri.netloc.decode('ascii')
 
         return matrix_federation_endpoint(
-            reactor, destination, timeout=10,
+            self.reactor, destination, timeout=10,
             tls_client_options_factory=self.tls_client_options_factory
         )
 
 
+_next_id = 1
+
+
+@attr.s
+class MatrixFederationRequest(object):
+    method = attr.ib()
+    """HTTP method
+    :type: str
+    """
+
+    path = attr.ib()
+    """HTTP path
+    :type: str
+    """
+
+    destination = attr.ib()
+    """The remote server to send the HTTP request to.
+    :type: str"""
+
+    json = attr.ib(default=None)
+    """JSON to send in the body.
+    :type: dict|None
+    """
+
+    json_callback = attr.ib(default=None)
+    """A callback to generate the JSON.
+    :type: func|None
+    """
+
+    query = attr.ib(default=None)
+    """Query arguments.
+    :type: dict|None
+    """
+
+    txn_id = attr.ib(default=None)
+    """Unique ID for this request (for logging)
+    :type: str|None
+    """
+
+    def __attrs_post_init__(self):
+        global _next_id
+        self.txn_id = "%s-O-%s" % (self.method, _next_id)
+        _next_id = (_next_id + 1) % (MAXINT - 1)
+
+    def get_json(self):
+        if self.json_callback:
+            return self.json_callback()
+        return self.json
+
+
+@defer.inlineCallbacks
+def _handle_json_response(reactor, timeout_sec, request, response):
+    """
+    Reads the JSON body of a response, with a timeout
+
+    Args:
+        reactor (IReactor): twisted reactor, for the timeout
+        timeout_sec (float): number of seconds to wait for response to complete
+        request (MatrixFederationRequest): the request that triggered the response
+        response (IResponse): response to the request
+
+    Returns:
+        dict: parsed JSON response
+    """
+    try:
+        check_content_type_is_json(response.headers)
+
+        d = treq.json_content(response)
+        d = timeout_deferred(
+            d,
+            timeout=timeout_sec,
+            reactor=reactor,
+        )
+
+        body = yield make_deferred_yieldable(d)
+    except Exception as e:
+        logger.warn(
+            "{%s} [%s] Error reading response: %s",
+            request.txn_id,
+            request.destination,
+            e,
+        )
+        raise
+    logger.info(
+        "{%s} [%s] Completed: %d %s",
+        request.txn_id,
+        request.destination,
+        response.code,
+        response.phrase.decode('ascii', errors='replace'),
+    )
+    defer.returnValue(body)
+
+
 class MatrixFederationHttpClient(object):
     """HTTP client used to talk to other homeservers over the federation
     protocol. Send client certificates and signs requests.
@@ -85,7 +185,9 @@ class MatrixFederationHttpClient(object):
         self.hs = hs
         self.signing_key = hs.config.signing_key[0]
         self.server_name = hs.hostname
+        reactor = hs.get_reactor()
         pool = HTTPConnectionPool(reactor)
+        pool.retryAutomatically = False
         pool.maxPersistentPerHost = 5
         pool.cachedConnectionTimeout = 2 * 60
         self.agent = Agent.usingEndpointFactory(
@@ -93,28 +195,36 @@ class MatrixFederationHttpClient(object):
         )
         self.clock = hs.get_clock()
         self._store = hs.get_datastore()
-        self.version_string = hs.version_string
-        self._next_id = 1
+        self.version_string = hs.version_string.encode('ascii')
+        self.default_timeout = 60
 
-    def _create_url(self, destination, path_bytes, param_bytes, query_bytes):
-        return urlparse.urlunparse(
-            ("matrix", destination, path_bytes, param_bytes, query_bytes, "")
-        )
+        def schedule(x):
+            reactor.callLater(_EPSILON, x)
+
+        self._cooperator = Cooperator(scheduler=schedule)
 
     @defer.inlineCallbacks
-    def _request(self, destination, method, path,
-                 body_callback, headers_dict={}, param_bytes=b"",
-                 query_bytes=b"", retry_on_dns_fail=True,
-                 timeout=None, long_retries=False,
-                 ignore_backoff=False,
-                 backoff_on_404=False):
-        """ Creates and sends a request to the given server
+    def _send_request(
+        self,
+        request,
+        retry_on_dns_fail=True,
+        timeout=None,
+        long_retries=False,
+        ignore_backoff=False,
+        backoff_on_404=False
+    ):
+        """
+        Sends a request to the given server.
+
         Args:
-            destination (str): The remote server to send the HTTP request to.
-            method (str): HTTP method
-            path (str): The HTTP path
+            request (MatrixFederationRequest): details of request to be sent
+
+            timeout (int|None): number of milliseconds to wait for the response headers
+                (including connecting to the server). 60s by default.
+
             ignore_backoff (bool): true to ignore the historical backoff data
                 and try the request anyway.
+
             backoff_on_404 (bool): Back off if we get a 404
 
         Returns:
@@ -132,38 +242,39 @@ class MatrixFederationHttpClient(object):
             (May also fail with plenty of other Exceptions for things like DNS
                 failures, connection failures, SSL failures.)
         """
+        if timeout:
+            _sec_timeout = timeout / 1000
+        else:
+            _sec_timeout = self.default_timeout
+
         if (
             self.hs.config.federation_domain_whitelist is not None and
-            destination not in self.hs.config.federation_domain_whitelist
+            request.destination not in self.hs.config.federation_domain_whitelist
         ):
-            raise FederationDeniedError(destination)
+            raise FederationDeniedError(request.destination)
 
         limiter = yield synapse.util.retryutils.get_retry_limiter(
-            destination,
+            request.destination,
             self.clock,
             self._store,
             backoff_on_404=backoff_on_404,
             ignore_backoff=ignore_backoff,
         )
 
-        destination = destination.encode("ascii")
-        path_bytes = path.encode("ascii")
-        with limiter:
-            headers_dict[b"User-Agent"] = [self.version_string]
-            headers_dict[b"Host"] = [destination]
-
-            url_bytes = self._create_url(
-                destination, path_bytes, param_bytes, query_bytes
-            )
-
-            txn_id = "%s-O-%s" % (method, self._next_id)
-            self._next_id = (self._next_id + 1) % (sys.maxint - 1)
+        method = request.method
+        destination = request.destination
+        path_bytes = request.path.encode("ascii")
+        if request.query:
+            query_bytes = encode_query_args(request.query)
+        else:
+            query_bytes = b""
 
-            outbound_logger.info(
-                "{%s} [%s] Sending request: %s %s",
-                txn_id, destination, method, url_bytes
-            )
+        headers_dict = {
+            "User-Agent": [self.version_string],
+            "Host": [request.destination],
+        }
 
+        with limiter:
             # XXX: Would be much nicer to retry only at the transaction-layer
             # (once we have reliable transactions in place)
             if long_retries:
@@ -171,88 +282,119 @@ class MatrixFederationHttpClient(object):
             else:
                 retries_left = MAX_SHORT_RETRIES
 
-            http_url_bytes = urlparse.urlunparse(
-                ("", "", path_bytes, param_bytes, query_bytes, "")
-            )
-
-            log_result = None
-            try:
-                while True:
-                    producer = None
-                    if body_callback:
-                        producer = body_callback(method, http_url_bytes, headers_dict)
-
-                    try:
-                        request_deferred = self.agent.request(
-                            method,
-                            url_bytes,
-                            Headers(headers_dict),
-                            producer
+            url = urllib.parse.urlunparse((
+                b"matrix", destination.encode("ascii"),
+                path_bytes, None, query_bytes, b"",
+            )).decode('ascii')
+
+            http_url = urllib.parse.urlunparse((
+                b"", b"",
+                path_bytes, None, query_bytes, b"",
+            )).decode('ascii')
+
+            while True:
+                try:
+                    json = request.get_json()
+                    if json:
+                        data = encode_canonical_json(json)
+                        headers_dict["Content-Type"] = ["application/json"]
+                        self.sign_request(
+                            destination, method, http_url, headers_dict, json
                         )
-                        add_timeout_to_deferred(
-                            request_deferred,
-                            timeout / 1000. if timeout else 60,
-                            self.hs.get_reactor(),
-                            cancelled_to_request_timed_out_error,
+                    else:
+                        data = None
+                        self.sign_request(destination, method, http_url, headers_dict)
+
+                    logger.info(
+                        "{%s} [%s] Sending request: %s %s",
+                        request.txn_id, destination, method, url
+                    )
+
+                    if data:
+                        producer = FileBodyProducer(
+                            BytesIO(data),
+                            cooperator=self._cooperator
                         )
+                    else:
+                        producer = None
+
+                    request_deferred = treq.request(
+                        method,
+                        url,
+                        headers=Headers(headers_dict),
+                        data=producer,
+                        agent=self.agent,
+                        reactor=self.hs.get_reactor(),
+                        unbuffered=True
+                    )
+
+                    request_deferred = timeout_deferred(
+                        request_deferred,
+                        timeout=_sec_timeout,
+                        reactor=self.hs.get_reactor(),
+                    )
+
+                    with Measure(self.clock, "outbound_request"):
                         response = yield make_deferred_yieldable(
                             request_deferred,
                         )
 
-                        log_result = "%d %s" % (response.code, response.phrase,)
-                        break
-                    except Exception as e:
-                        if not retry_on_dns_fail and isinstance(e, DNSLookupError):
-                            logger.warn(
-                                "DNS Lookup failed to %s with %s",
-                                destination,
-                                e
-                            )
-                            log_result = "DNS Lookup failed to %s with %s" % (
-                                destination, e
-                            )
-                            raise
-
-                        logger.warn(
-                            "{%s} Sending request failed to %s: %s %s: %s",
-                            txn_id,
+                    break
+                except Exception as e:
+                    logger.warn(
+                        "{%s} [%s] Request failed: %s %s: %s",
+                        request.txn_id,
+                        destination,
+                        method,
+                        url,
+                        _flatten_response_never_received(e),
+                    )
+
+                    if not retry_on_dns_fail and isinstance(e, DNSLookupError):
+                        raise
+
+                    if retries_left and not timeout:
+                        if long_retries:
+                            delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left)
+                            delay = min(delay, 60)
+                            delay *= random.uniform(0.8, 1.4)
+                        else:
+                            delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left)
+                            delay = min(delay, 2)
+                            delay *= random.uniform(0.8, 1.4)
+
+                        logger.debug(
+                            "{%s} [%s] Waiting %ss before re-sending...",
+                            request.txn_id,
                             destination,
-                            method,
-                            url_bytes,
-                            _flatten_response_never_received(e),
+                            delay,
                         )
 
-                        log_result = _flatten_response_never_received(e)
-
-                        if retries_left and not timeout:
-                            if long_retries:
-                                delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left)
-                                delay = min(delay, 60)
-                                delay *= random.uniform(0.8, 1.4)
-                            else:
-                                delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left)
-                                delay = min(delay, 2)
-                                delay *= random.uniform(0.8, 1.4)
-
-                            yield self.clock.sleep(delay)
-                            retries_left -= 1
-                        else:
-                            raise
-            finally:
-                outbound_logger.info(
-                    "{%s} [%s] Result: %s",
-                    txn_id,
-                    destination,
-                    log_result,
-                )
+                        yield self.clock.sleep(delay)
+                        retries_left -= 1
+                    else:
+                        raise
+
+            logger.info(
+                "{%s} [%s] Got response headers: %d %s",
+                request.txn_id,
+                destination,
+                response.code,
+                response.phrase.decode('ascii', errors='replace'),
+            )
 
             if 200 <= response.code < 300:
                 pass
             else:
                 # :'(
                 # Update transactions table?
-                with logcontext.PreserveLoggingContext():
-                    body = yield readBody(response)
+                d = treq.content(response)
+                d = timeout_deferred(
+                    d,
+                    timeout=_sec_timeout,
+                    reactor=self.hs.get_reactor(),
+                )
+                body = yield make_deferred_yieldable(d)
                 raise HttpResponseException(
                     response.code, response.phrase, body
                 )
@@ -297,11 +439,11 @@ class MatrixFederationHttpClient(object):
         auth_headers = []
 
         for key, sig in request["signatures"][self.server_name].items():
-            auth_headers.append(bytes(
+            auth_headers.append((
                 "X-Matrix origin=%s,key=\"%s\",sig=\"%s\"" % (
                     self.server_name, key, sig,
-                )
-            ))
+                )).encode('ascii')
+            )
 
         headers_dict[b"Authorization"] = auth_headers
 
@@ -346,38 +488,27 @@ class MatrixFederationHttpClient(object):
             is not on our federation whitelist
         """
 
-        if not json_data_callback:
-            def json_data_callback():
-                return data
+        request = MatrixFederationRequest(
+            method="PUT",
+            destination=destination,
+            path=path,
+            query=args,
+            json_callback=json_data_callback,
+            json=data,
+        )
 
-        def body_callback(method, url_bytes, headers_dict):
-            json_data = json_data_callback()
-            self.sign_request(
-                destination, method, url_bytes, headers_dict, json_data
-            )
-            producer = _JsonProducer(json_data)
-            return producer
-
-        response = yield self._request(
-            destination,
-            "PUT",
-            path,
-            body_callback=body_callback,
-            headers_dict={"Content-Type": ["application/json"]},
-            query_bytes=encode_query_args(args),
+        response = yield self._send_request(
+            request,
             long_retries=long_retries,
             timeout=timeout,
             ignore_backoff=ignore_backoff,
             backoff_on_404=backoff_on_404,
         )
 
-        if 200 <= response.code < 300:
-            # We need to update the transactions table to say it was sent?
-            check_content_type_is_json(response.headers)
-
-        with logcontext.PreserveLoggingContext():
-            body = yield readBody(response)
-        defer.returnValue(json.loads(body))
+        body = yield _handle_json_response(
+            self.hs.get_reactor(), self.default_timeout, request, response,
+        )
+        defer.returnValue(body)
 
     @defer.inlineCallbacks
     def post_json(self, destination, path, data={}, long_retries=False,
@@ -411,32 +542,30 @@ class MatrixFederationHttpClient(object):
             is not on our federation whitelist
         """
 
-        def body_callback(method, url_bytes, headers_dict):
-            self.sign_request(
-                destination, method, url_bytes, headers_dict, data
-            )
-            return _JsonProducer(data)
-
-        response = yield self._request(
-            destination,
-            "POST",
-            path,
-            query_bytes=encode_query_args(args),
-            body_callback=body_callback,
-            headers_dict={"Content-Type": ["application/json"]},
+        request = MatrixFederationRequest(
+            method="POST",
+            destination=destination,
+            path=path,
+            query=args,
+            json=data,
+        )
+
+        response = yield self._send_request(
+            request,
             long_retries=long_retries,
             timeout=timeout,
             ignore_backoff=ignore_backoff,
         )
 
-        if 200 <= response.code < 300:
-            # We need to update the transactions table to say it was sent?
-            check_content_type_is_json(response.headers)
-
-        with logcontext.PreserveLoggingContext():
-            body = yield readBody(response)
+        if timeout:
+            _sec_timeout = timeout / 1000
+        else:
+            _sec_timeout = self.default_timeout
 
-        defer.returnValue(json.loads(body))
+        body = yield _handle_json_response(
+            self.hs.get_reactor(), _sec_timeout, request, response,
+        )
+        defer.returnValue(body)
 
     @defer.inlineCallbacks
     def get_json(self, destination, path, args=None, retry_on_dns_fail=True,
@@ -471,29 +600,24 @@ class MatrixFederationHttpClient(object):
 
         logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail)
 
-        def body_callback(method, url_bytes, headers_dict):
-            self.sign_request(destination, method, url_bytes, headers_dict)
-            return None
+        request = MatrixFederationRequest(
+            method="GET",
+            destination=destination,
+            path=path,
+            query=args,
+        )
 
-        response = yield self._request(
-            destination,
-            "GET",
-            path,
-            query_bytes=encode_query_args(args),
-            body_callback=body_callback,
+        response = yield self._send_request(
+            request,
             retry_on_dns_fail=retry_on_dns_fail,
             timeout=timeout,
             ignore_backoff=ignore_backoff,
         )
 
-        if 200 <= response.code < 300:
-            # We need to update the transactions table to say it was sent?
-            check_content_type_is_json(response.headers)
-
-        with logcontext.PreserveLoggingContext():
-            body = yield readBody(response)
-
-        defer.returnValue(json.loads(body))
+        body = yield _handle_json_response(
+            self.hs.get_reactor(), self.default_timeout, request, response,
+        )
+        defer.returnValue(body)
 
     @defer.inlineCallbacks
     def delete_json(self, destination, path, long_retries=False,
@@ -523,26 +647,24 @@ class MatrixFederationHttpClient(object):
             Fails with ``FederationDeniedError`` if this destination
             is not on our federation whitelist
         """
+        request = MatrixFederationRequest(
+            method="DELETE",
+            destination=destination,
+            path=path,
+            query=args,
+        )
 
-        response = yield self._request(
-            destination,
-            "DELETE",
-            path,
-            query_bytes=encode_query_args(args),
-            headers_dict={"Content-Type": ["application/json"]},
+        response = yield self._send_request(
+            request,
             long_retries=long_retries,
             timeout=timeout,
             ignore_backoff=ignore_backoff,
         )
 
-        if 200 <= response.code < 300:
-            # We need to update the transactions table to say it was sent?
-            check_content_type_is_json(response.headers)
-
-        with logcontext.PreserveLoggingContext():
-            body = yield readBody(response)
-
-        defer.returnValue(json.loads(body))
+        body = yield _handle_json_response(
+            self.hs.get_reactor(), self.default_timeout, request, response,
+        )
+        defer.returnValue(body)
 
     @defer.inlineCallbacks
     def get_file(self, destination, path, output_stream, args={},
@@ -569,26 +691,15 @@ class MatrixFederationHttpClient(object):
             Fails with ``FederationDeniedError`` if this destination
             is not on our federation whitelist
         """
+        request = MatrixFederationRequest(
+            method="GET",
+            destination=destination,
+            path=path,
+            query=args,
+        )
 
-        encoded_args = {}
-        for k, vs in args.items():
-            if isinstance(vs, string_types):
-                vs = [vs]
-            encoded_args[k] = [v.encode("UTF-8") for v in vs]
-
-        query_bytes = urllib.urlencode(encoded_args, True)
-        logger.debug("Query bytes: %s Retry DNS: %s", query_bytes, retry_on_dns_fail)
-
-        def body_callback(method, url_bytes, headers_dict):
-            self.sign_request(destination, method, url_bytes, headers_dict)
-            return None
-
-        response = yield self._request(
-            destination,
-            "GET",
-            path,
-            query_bytes=query_bytes,
-            body_callback=body_callback,
+        response = yield self._send_request(
+            request,
             retry_on_dns_fail=retry_on_dns_fail,
             ignore_backoff=ignore_backoff,
         )
@@ -596,14 +707,25 @@ class MatrixFederationHttpClient(object):
         headers = dict(response.headers.getAllRawHeaders())
 
         try:
-            with logcontext.PreserveLoggingContext():
-                length = yield _readBodyToFile(
-                    response, output_stream, max_size
-                )
-        except Exception:
-            logger.exception("Failed to download body")
+            d = _readBodyToFile(response, output_stream, max_size)
+            d.addTimeout(self.default_timeout, self.hs.get_reactor())
+            length = yield make_deferred_yieldable(d)
+        except Exception as e:
+            logger.warn(
+                "{%s} [%s] Error reading response: %s",
+                request.txn_id,
+                request.destination,
+                e,
+            )
             raise
-
+        logger.info(
+            "{%s} [%s] Completed: %d %s [%d bytes]",
+            request.txn_id,
+            request.destination,
+            response.code,
+            response.phrase.decode('ascii', errors='replace'),
+            length,
+        )
         defer.returnValue((length, headers))
 
 
@@ -639,30 +761,6 @@ def _readBodyToFile(response, stream, max_size):
     return d
 
 
-class _JsonProducer(object):
-    """ Used by the twisted http client to create the HTTP body from json
-    """
-    def __init__(self, jsn):
-        self.reset(jsn)
-
-    def reset(self, jsn):
-        self.body = encode_canonical_json(jsn)
-        self.length = len(self.body)
-
-    def startProducing(self, consumer):
-        consumer.write(self.body)
-        return defer.succeed(None)
-
-    def pauseProducing(self):
-        pass
-
-    def stopProducing(self):
-        pass
-
-    def resumeProducing(self):
-        pass
-
-
 def _flatten_response_never_received(e):
     if hasattr(e, "reasons"):
         reasons = ", ".join(
@@ -693,7 +791,7 @@ def check_content_type_is_json(headers):
             "No Content-Type header"
         )
 
-    c_type = c_type[0]  # only the first header
+    c_type = c_type[0].decode('ascii')  # only the first header
     val, options = cgi.parse_header(c_type)
     if val != "application/json":
         raise RuntimeError(
@@ -711,6 +809,6 @@ def encode_query_args(args):
             vs = [vs]
         encoded_args[k] = [v.encode("UTF-8") for v in vs]
 
-    query_bytes = urllib.urlencode(encoded_args, True)
+    query_bytes = urllib.parse.urlencode(encoded_args, True)
 
-    return query_bytes
+    return query_bytes.encode('utf8')
diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py
index 72c2654678..fedb4e6b18 100644
--- a/synapse/http/request_metrics.py
+++ b/synapse/http/request_metrics.py
@@ -162,7 +162,7 @@ class RequestMetrics(object):
         with _in_flight_requests_lock:
             _in_flight_requests.add(self)
 
-    def stop(self, time_sec, request):
+    def stop(self, time_sec, response_code, sent_bytes):
         with _in_flight_requests_lock:
             _in_flight_requests.discard(self)
 
@@ -179,35 +179,35 @@ class RequestMetrics(object):
                 )
                 return
 
-        response_code = str(request.code)
+        response_code = str(response_code)
 
-        outgoing_responses_counter.labels(request.method, response_code).inc()
+        outgoing_responses_counter.labels(self.method, response_code).inc()
 
-        response_count.labels(request.method, self.name, tag).inc()
+        response_count.labels(self.method, self.name, tag).inc()
 
-        response_timer.labels(request.method, self.name, tag, response_code).observe(
+        response_timer.labels(self.method, self.name, tag, response_code).observe(
             time_sec - self.start
         )
 
         resource_usage = context.get_resource_usage()
 
-        response_ru_utime.labels(request.method, self.name, tag).inc(
+        response_ru_utime.labels(self.method, self.name, tag).inc(
             resource_usage.ru_utime,
         )
-        response_ru_stime.labels(request.method, self.name, tag).inc(
+        response_ru_stime.labels(self.method, self.name, tag).inc(
             resource_usage.ru_stime,
         )
-        response_db_txn_count.labels(request.method, self.name, tag).inc(
+        response_db_txn_count.labels(self.method, self.name, tag).inc(
             resource_usage.db_txn_count
         )
-        response_db_txn_duration.labels(request.method, self.name, tag).inc(
+        response_db_txn_duration.labels(self.method, self.name, tag).inc(
             resource_usage.db_txn_duration_sec
         )
-        response_db_sched_duration.labels(request.method, self.name, tag).inc(
+        response_db_sched_duration.labels(self.method, self.name, tag).inc(
             resource_usage.db_sched_duration_sec
         )
 
-        response_size.labels(request.method, self.name, tag).inc(request.sentLength)
+        response_size.labels(self.method, self.name, tag).inc(sent_bytes)
 
         # We always call this at the end to ensure that we update the metrics
         # regardless of whether a call to /metrics while the request was in
diff --git a/synapse/http/server.py b/synapse/http/server.py
index 2d5c23e673..b4b25cab19 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -84,10 +84,21 @@ def wrap_json_request_handler(h):
             logger.info(
                 "%s SynapseError: %s - %s", request, code, e.msg
             )
-            respond_with_json(
-                request, code, e.error_dict(), send_cors=True,
-                pretty_print=_request_user_agent_is_curl(request),
-            )
+
+            # Only respond with an error response if we haven't already started
+            # writing, otherwise lets just kill the connection
+            if request.startedWriting:
+                if request.transport:
+                    try:
+                        request.transport.abortConnection()
+                    except Exception:
+                        # abortConnection throws if the connection is already closed
+                        pass
+            else:
+                respond_with_json(
+                    request, code, e.error_dict(), send_cors=True,
+                    pretty_print=_request_user_agent_is_curl(request),
+                )
 
         except Exception:
             # failure.Failure() fishes the original Failure out
@@ -100,16 +111,26 @@ def wrap_json_request_handler(h):
                 request,
                 f.getTraceback().rstrip(),
             )
-            respond_with_json(
-                request,
-                500,
-                {
-                    "error": "Internal server error",
-                    "errcode": Codes.UNKNOWN,
-                },
-                send_cors=True,
-                pretty_print=_request_user_agent_is_curl(request),
-            )
+            # Only respond with an error response if we haven't already started
+            # writing, otherwise lets just kill the connection
+            if request.startedWriting:
+                if request.transport:
+                    try:
+                        request.transport.abortConnection()
+                    except Exception:
+                        # abortConnection throws if the connection is already closed
+                        pass
+            else:
+                respond_with_json(
+                    request,
+                    500,
+                    {
+                        "error": "Internal server error",
+                        "errcode": Codes.UNKNOWN,
+                    },
+                    send_cors=True,
+                    pretty_print=_request_user_agent_is_curl(request),
+                )
 
     return wrap_async_request_handler(wrapped_request_handler)
 
diff --git a/synapse/http/site.py b/synapse/http/site.py
index 88ed3714f9..e508c0bd4f 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -75,17 +75,35 @@ class SynapseRequest(Request):
         return '<%s at 0x%x method=%r uri=%r clientproto=%r site=%r>' % (
             self.__class__.__name__,
             id(self),
-            self.method,
+            self.get_method(),
             self.get_redacted_uri(),
-            self.clientproto,
+            self.clientproto.decode('ascii', errors='replace'),
             self.site.site_tag,
         )
 
     def get_request_id(self):
-        return "%s-%i" % (self.method, self.request_seq)
+        return "%s-%i" % (self.get_method(), self.request_seq)
 
     def get_redacted_uri(self):
-        return redact_uri(self.uri)
+        uri = self.uri
+        if isinstance(uri, bytes):
+            uri = self.uri.decode('ascii')
+        return redact_uri(uri)
+
+    def get_method(self):
+        """Gets the method associated with the request (or placeholder if not
+        method has yet been received).
+
+        Note: This is necessary as the placeholder value in twisted is str
+        rather than bytes, so we need to sanitise `self.method`.
+
+        Returns:
+            str
+        """
+        method = self.method
+        if isinstance(method, bytes):
+            method = self.method.decode('ascii')
+        return method
 
     def get_user_agent(self):
         return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1]
@@ -116,7 +134,7 @@ class SynapseRequest(Request):
             # dispatching to the handler, so that the handler
             # can update the servlet name in the request
             # metrics
-            requests_counter.labels(self.method,
+            requests_counter.labels(self.get_method(),
                                     self.request_metrics.name).inc()
 
     @contextlib.contextmanager
@@ -204,14 +222,14 @@ class SynapseRequest(Request):
         self.start_time = time.time()
         self.request_metrics = RequestMetrics()
         self.request_metrics.start(
-            self.start_time, name=servlet_name, method=self.method,
+            self.start_time, name=servlet_name, method=self.get_method(),
         )
 
         self.site.access_logger.info(
             "%s - %s - Received request: %s %s",
             self.getClientIP(),
             self.site.site_tag,
-            self.method,
+            self.get_method(),
             self.get_redacted_uri()
         )
 
@@ -277,15 +295,15 @@ class SynapseRequest(Request):
             int(usage.db_txn_count),
             self.sentLength,
             code,
-            self.method,
+            self.get_method(),
             self.get_redacted_uri(),
-            self.clientproto,
+            self.clientproto.decode('ascii', errors='replace'),
             user_agent,
             usage.evt_db_fetch_count,
         )
 
         try:
-            self.request_metrics.stop(self.finish_time, self)
+            self.request_metrics.stop(self.finish_time, self.code, self.sentLength)
         except Exception as e:
             logger.warn("Failed to stop metrics: %r", e)
 
@@ -305,7 +323,7 @@ class XForwardedForRequest(SynapseRequest):
             C{b"-"}.
         """
         return self.requestHeaders.getRawHeaders(
-            b"x-forwarded-for", [b"-"])[0].split(b",")[0].strip()
+            b"x-forwarded-for", [b"-"])[0].split(b",")[0].strip().decode('ascii')
 
 
 class SynapseRequestFactory(object):