summary refs log tree commit diff
path: root/synapse/http
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/http')
-rw-r--r--synapse/http/additional_resource.py19
-rw-r--r--synapse/http/client.py118
-rw-r--r--synapse/http/federation/matrix_federation_agent.py28
-rw-r--r--synapse/http/federation/srv_resolver.py10
-rw-r--r--synapse/http/federation/well_known_resolver.py22
-rw-r--r--synapse/http/matrixfederationclient.py204
-rw-r--r--synapse/http/server.py645
-rw-r--r--synapse/http/servlet.py13
-rw-r--r--synapse/http/site.py30
9 files changed, 670 insertions, 419 deletions
diff --git a/synapse/http/additional_resource.py b/synapse/http/additional_resource.py
index 096619a8c2..479746c9c5 100644
--- a/synapse/http/additional_resource.py
+++ b/synapse/http/additional_resource.py
@@ -13,13 +13,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.web.resource import Resource
-from twisted.web.server import NOT_DONE_YET
+from synapse.http.server import DirectServeJsonResource
 
-from synapse.http.server import wrap_json_request_handler
 
-
-class AdditionalResource(Resource):
+class AdditionalResource(DirectServeJsonResource):
     """Resource wrapper for additional_resources
 
     If the user has configured additional_resources, we need to wrap the
@@ -41,16 +38,10 @@ class AdditionalResource(Resource):
             handler ((twisted.web.server.Request) -> twisted.internet.defer.Deferred):
                 function to be called to handle the request.
         """
-        Resource.__init__(self)
+        super().__init__()
         self._handler = handler
 
-        # required by the request_handler wrapper
-        self.clock = hs.get_clock()
-
-    def render(self, request):
-        self._async_render(request)
-        return NOT_DONE_YET
-
-    @wrap_json_request_handler
     def _async_render(self, request):
+        # Cheekily pass the result straight through, so we don't need to worry
+        # if its an awaitable or not.
         return self._handler(request)
diff --git a/synapse/http/client.py b/synapse/http/client.py
index 3cef747a4d..dad01a8e56 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -15,13 +15,11 @@
 # limitations under the License.
 
 import logging
+import urllib
 from io import BytesIO
 
-from six import raise_from, text_type
-from six.moves import urllib
-
 import treq
-from canonicaljson import encode_canonical_json, json
+from canonicaljson import encode_canonical_json
 from netaddr import IPAddress
 from prometheus_client import Counter
 from zope.interface import implementer, provider
@@ -33,6 +31,7 @@ from twisted.internet.interfaces import (
     IReactorPluggableNameResolver,
     IResolutionReceiver,
 )
+from twisted.internet.task import Cooperator
 from twisted.python.failure import Failure
 from twisted.web._newclient import ResponseDone
 from twisted.web.client import Agent, HTTPConnectionPool, readBody
@@ -48,6 +47,7 @@ from synapse.http import (
 from synapse.http.proxyagent import ProxyAgent
 from synapse.logging.context import make_deferred_yieldable
 from synapse.logging.opentracing import set_tag, start_active_span, tags
+from synapse.util import json_decoder
 from synapse.util.async_helpers import timeout_deferred
 
 logger = logging.getLogger(__name__)
@@ -71,6 +71,21 @@ def check_against_blacklist(ip_address, ip_whitelist, ip_blacklist):
     return False
 
 
+_EPSILON = 0.00000001
+
+
+def _make_scheduler(reactor):
+    """Makes a schedular suitable for a Cooperator using the given reactor.
+
+    (This is effectively just a copy from `twisted.internet.task`)
+    """
+
+    def _scheduler(x):
+        return reactor.callLater(_EPSILON, x)
+
+    return _scheduler
+
+
 class IPBlacklistingResolver(object):
     """
     A proxy for reactor.nameResolver which only produces non-blacklisted IP
@@ -214,6 +229,10 @@ class SimpleHttpClient(object):
         if hs.config.user_agent_suffix:
             self.user_agent = "%s %s" % (self.user_agent, hs.config.user_agent_suffix)
 
+        # We use this for our body producers to ensure that they use the correct
+        # reactor.
+        self._cooperator = Cooperator(scheduler=_make_scheduler(hs.get_reactor()))
+
         self.user_agent = self.user_agent.encode("ascii")
 
         if self._ip_blacklist:
@@ -266,8 +285,7 @@ class SimpleHttpClient(object):
                 ip_blacklist=self._ip_blacklist,
             )
 
-    @defer.inlineCallbacks
-    def request(self, method, uri, data=None, headers=None):
+    async def request(self, method, uri, data=None, headers=None):
         """
         Args:
             method (str): HTTP method to use.
@@ -280,7 +298,7 @@ class SimpleHttpClient(object):
         outgoing_requests_counter.labels(method).inc()
 
         # log request but strip `access_token` (AS requests for example include this)
-        logger.info("Sending request %s %s", method, redact_uri(uri))
+        logger.debug("Sending request %s %s", method, redact_uri(uri))
 
         with start_active_span(
             "outgoing-client-request",
@@ -294,7 +312,9 @@ class SimpleHttpClient(object):
             try:
                 body_producer = None
                 if data is not None:
-                    body_producer = QuieterFileBodyProducer(BytesIO(data))
+                    body_producer = QuieterFileBodyProducer(
+                        BytesIO(data), cooperator=self._cooperator,
+                    )
 
                 request_deferred = treq.request(
                     method,
@@ -310,7 +330,7 @@ class SimpleHttpClient(object):
                     self.hs.get_reactor(),
                     cancelled_to_request_timed_out_error,
                 )
-                response = yield make_deferred_yieldable(request_deferred)
+                response = await make_deferred_yieldable(request_deferred)
 
                 incoming_responses_counter.labels(method, response.code).inc()
                 logger.info(
@@ -333,8 +353,7 @@ class SimpleHttpClient(object):
                 set_tag("error_reason", e.args[0])
                 raise
 
-    @defer.inlineCallbacks
-    def post_urlencoded_get_json(self, uri, args={}, headers=None):
+    async def post_urlencoded_get_json(self, uri, args={}, headers=None):
         """
         Args:
             uri (str):
@@ -343,7 +362,7 @@ class SimpleHttpClient(object):
                header name to a list of values for that header
 
         Returns:
-            Deferred[object]: parsed json
+            object: parsed json
 
         Raises:
             HttpResponseException: On a non-2xx HTTP response.
@@ -366,19 +385,20 @@ class SimpleHttpClient(object):
         if headers:
             actual_headers.update(headers)
 
-        response = yield self.request(
+        response = await self.request(
             "POST", uri, headers=Headers(actual_headers), data=query_bytes
         )
 
-        body = yield make_deferred_yieldable(readBody(response))
+        body = await make_deferred_yieldable(readBody(response))
 
         if 200 <= response.code < 300:
-            return json.loads(body)
+            return json_decoder.decode(body.decode("utf-8"))
         else:
-            raise HttpResponseException(response.code, response.phrase, body)
+            raise HttpResponseException(
+                response.code, response.phrase.decode("ascii", errors="replace"), body
+            )
 
-    @defer.inlineCallbacks
-    def post_json_get_json(self, uri, post_json, headers=None):
+    async def post_json_get_json(self, uri, post_json, headers=None):
         """
 
         Args:
@@ -388,7 +408,7 @@ class SimpleHttpClient(object):
                header name to a list of values for that header
 
         Returns:
-            Deferred[object]: parsed json
+            object: parsed json
 
         Raises:
             HttpResponseException: On a non-2xx HTTP response.
@@ -407,19 +427,20 @@ class SimpleHttpClient(object):
         if headers:
             actual_headers.update(headers)
 
-        response = yield self.request(
+        response = await self.request(
             "POST", uri, headers=Headers(actual_headers), data=json_str
         )
 
-        body = yield make_deferred_yieldable(readBody(response))
+        body = await make_deferred_yieldable(readBody(response))
 
         if 200 <= response.code < 300:
-            return json.loads(body)
+            return json_decoder.decode(body.decode("utf-8"))
         else:
-            raise HttpResponseException(response.code, response.phrase, body)
+            raise HttpResponseException(
+                response.code, response.phrase.decode("ascii", errors="replace"), body
+            )
 
-    @defer.inlineCallbacks
-    def get_json(self, uri, args={}, headers=None):
+    async def get_json(self, uri, args={}, headers=None):
         """ Gets some json from the given URI.
 
         Args:
@@ -431,7 +452,7 @@ class SimpleHttpClient(object):
             headers (dict[str|bytes, List[str|bytes]]|None): If not None, a map from
                header name to a list of values for that header
         Returns:
-            Deferred: Succeeds when we get *any* 2xx HTTP response, with the
+            Succeeds when we get *any* 2xx HTTP response, with the
             HTTP body as JSON.
         Raises:
             HttpResponseException On a non-2xx HTTP response.
@@ -442,11 +463,10 @@ class SimpleHttpClient(object):
         if headers:
             actual_headers.update(headers)
 
-        body = yield self.get_raw(uri, args, headers=headers)
-        return json.loads(body)
+        body = await self.get_raw(uri, args, headers=headers)
+        return json_decoder.decode(body.decode("utf-8"))
 
-    @defer.inlineCallbacks
-    def put_json(self, uri, json_body, args={}, headers=None):
+    async def put_json(self, uri, json_body, args={}, headers=None):
         """ Puts some json to the given URI.
 
         Args:
@@ -459,7 +479,7 @@ class SimpleHttpClient(object):
             headers (dict[str|bytes, List[str|bytes]]|None): If not None, a map from
                header name to a list of values for that header
         Returns:
-            Deferred: Succeeds when we get *any* 2xx HTTP response, with the
+            Succeeds when we get *any* 2xx HTTP response, with the
             HTTP body as JSON.
         Raises:
             HttpResponseException On a non-2xx HTTP response.
@@ -480,19 +500,20 @@ class SimpleHttpClient(object):
         if headers:
             actual_headers.update(headers)
 
-        response = yield self.request(
+        response = await self.request(
             "PUT", uri, headers=Headers(actual_headers), data=json_str
         )
 
-        body = yield make_deferred_yieldable(readBody(response))
+        body = await make_deferred_yieldable(readBody(response))
 
         if 200 <= response.code < 300:
-            return json.loads(body)
+            return json_decoder.decode(body.decode("utf-8"))
         else:
-            raise HttpResponseException(response.code, response.phrase, body)
+            raise HttpResponseException(
+                response.code, response.phrase.decode("ascii", errors="replace"), body
+            )
 
-    @defer.inlineCallbacks
-    def get_raw(self, uri, args={}, headers=None):
+    async def get_raw(self, uri, args={}, headers=None):
         """ Gets raw text from the given URI.
 
         Args:
@@ -504,8 +525,8 @@ class SimpleHttpClient(object):
             headers (dict[str|bytes, List[str|bytes]]|None): If not None, a map from
                header name to a list of values for that header
         Returns:
-            Deferred: Succeeds when we get *any* 2xx HTTP response, with the
-            HTTP body at text.
+            Succeeds when we get *any* 2xx HTTP response, with the
+            HTTP body as bytes.
         Raises:
             HttpResponseException on a non-2xx HTTP response.
         """
@@ -517,20 +538,21 @@ class SimpleHttpClient(object):
         if headers:
             actual_headers.update(headers)
 
-        response = yield self.request("GET", uri, headers=Headers(actual_headers))
+        response = await self.request("GET", uri, headers=Headers(actual_headers))
 
-        body = yield make_deferred_yieldable(readBody(response))
+        body = await make_deferred_yieldable(readBody(response))
 
         if 200 <= response.code < 300:
             return body
         else:
-            raise HttpResponseException(response.code, response.phrase, body)
+            raise HttpResponseException(
+                response.code, response.phrase.decode("ascii", errors="replace"), body
+            )
 
     # XXX: FIXME: This is horribly copy-pasted from matrixfederationclient.
     # The two should be factored out.
 
-    @defer.inlineCallbacks
-    def get_file(self, url, output_stream, max_size=None, headers=None):
+    async def get_file(self, url, output_stream, max_size=None, headers=None):
         """GETs a file from a given URL
         Args:
             url (str): The URL to GET
@@ -546,7 +568,7 @@ class SimpleHttpClient(object):
         if headers:
             actual_headers.update(headers)
 
-        response = yield self.request("GET", url, headers=Headers(actual_headers))
+        response = await self.request("GET", url, headers=Headers(actual_headers))
 
         resp_headers = dict(response.headers.getAllRawHeaders())
 
@@ -570,14 +592,14 @@ class SimpleHttpClient(object):
         # straight back in again
 
         try:
-            length = yield make_deferred_yieldable(
+            length = await make_deferred_yieldable(
                 _readBodyToFile(response, output_stream, max_size)
             )
         except SynapseError:
             # This can happen e.g. because the body is too large.
             raise
         except Exception as e:
-            raise_from(SynapseError(502, ("Failed to download remote body: %s" % e)), e)
+            raise SynapseError(502, ("Failed to download remote body: %s" % e)) from e
 
         return (
             length,
@@ -638,7 +660,7 @@ def encode_urlencode_args(args):
 
 
 def encode_urlencode_arg(arg):
-    if isinstance(arg, text_type):
+    if isinstance(arg, str):
         return arg.encode("utf-8")
     elif isinstance(arg, list):
         return [encode_urlencode_arg(i) for i in arg]
diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py
index f5f917f5ae..369bf9c2fc 100644
--- a/synapse/http/federation/matrix_federation_agent.py
+++ b/synapse/http/federation/matrix_federation_agent.py
@@ -15,6 +15,7 @@
 
 import logging
 import urllib
+from typing import List
 
 from netaddr import AddrFormatError, IPAddress
 from zope.interface import implementer
@@ -48,6 +49,9 @@ class MatrixFederationAgent(object):
         tls_client_options_factory (FederationPolicyForHTTPS|None):
             factory to use for fetching client tls options, or none to disable TLS.
 
+        user_agent (bytes):
+            The user agent header to use for federation requests.
+
         _srv_resolver (SrvResolver|None):
             SRVResolver impl to use for looking up SRV records. None to use a default
             implementation.
@@ -61,6 +65,7 @@ class MatrixFederationAgent(object):
         self,
         reactor,
         tls_client_options_factory,
+        user_agent,
         _srv_resolver=None,
         _well_known_resolver=None,
     ):
@@ -78,6 +83,7 @@ class MatrixFederationAgent(object):
             ),
             pool=self._pool,
         )
+        self.user_agent = user_agent
 
         if _well_known_resolver is None:
             _well_known_resolver = WellKnownResolver(
@@ -87,6 +93,7 @@ class MatrixFederationAgent(object):
                     pool=self._pool,
                     contextFactory=tls_client_options_factory,
                 ),
+                user_agent=self.user_agent,
             )
 
         self._well_known_resolver = _well_known_resolver
@@ -149,7 +156,7 @@ class MatrixFederationAgent(object):
             parsed_uri = urllib.parse.urlparse(uri)
 
         # We need to make sure the host header is set to the netloc of the
-        # server.
+        # server and that a user-agent is provided.
         if headers is None:
             headers = Headers()
         else:
@@ -157,6 +164,8 @@ class MatrixFederationAgent(object):
 
         if not headers.hasHeader(b"host"):
             headers.addRawHeader(b"host", parsed_uri.netloc)
+        if not headers.hasHeader(b"user-agent"):
+            headers.addRawHeader(b"user-agent", self.user_agent)
 
         res = yield make_deferred_yieldable(
             self._agent.request(method, uri, headers, bodyProducer)
@@ -228,22 +237,21 @@ class MatrixHostnameEndpoint(object):
 
         return run_in_background(self._do_connect, protocol_factory)
 
-    @defer.inlineCallbacks
-    def _do_connect(self, protocol_factory):
+    async def _do_connect(self, protocol_factory):
         first_exception = None
 
-        server_list = yield self._resolve_server()
+        server_list = await self._resolve_server()
 
         for server in server_list:
             host = server.host
             port = server.port
 
             try:
-                logger.info("Connecting to %s:%i", host.decode("ascii"), port)
+                logger.debug("Connecting to %s:%i", host.decode("ascii"), port)
                 endpoint = HostnameEndpoint(self._reactor, host, port)
                 if self._tls_options:
                     endpoint = wrapClientTLS(self._tls_options, endpoint)
-                result = yield make_deferred_yieldable(
+                result = await make_deferred_yieldable(
                     endpoint.connect(protocol_factory)
                 )
 
@@ -263,13 +271,9 @@ class MatrixHostnameEndpoint(object):
         # to try and if that doesn't work then we'll have an exception.
         raise Exception("Failed to resolve server %r" % (self._parsed_uri.netloc,))
 
-    @defer.inlineCallbacks
-    def _resolve_server(self):
+    async def _resolve_server(self) -> List[Server]:
         """Resolves the server name to a list of hosts and ports to attempt to
         connect to.
-
-        Returns:
-            Deferred[list[Server]]
         """
 
         if self._parsed_uri.scheme != b"matrix":
@@ -290,7 +294,7 @@ class MatrixHostnameEndpoint(object):
         if port or _is_ip_literal(host):
             return [Server(host, port or 8448)]
 
-        server_list = yield self._srv_resolver.resolve_service(b"_matrix._tcp." + host)
+        server_list = await self._srv_resolver.resolve_service(b"_matrix._tcp." + host)
 
         if server_list:
             return server_list
diff --git a/synapse/http/federation/srv_resolver.py b/synapse/http/federation/srv_resolver.py
index 021b233a7d..2ede90a9b1 100644
--- a/synapse/http/federation/srv_resolver.py
+++ b/synapse/http/federation/srv_resolver.py
@@ -17,10 +17,10 @@
 import logging
 import random
 import time
+from typing import List
 
 import attr
 
-from twisted.internet import defer
 from twisted.internet.error import ConnectError
 from twisted.names import client, dns
 from twisted.names.error import DNSNameError, DomainError
@@ -113,16 +113,14 @@ class SrvResolver(object):
         self._cache = cache
         self._get_time = get_time
 
-    @defer.inlineCallbacks
-    def resolve_service(self, service_name):
+    async def resolve_service(self, service_name: bytes) -> List[Server]:
         """Look up a SRV record
 
         Args:
             service_name (bytes): record to look up
 
         Returns:
-            Deferred[list[Server]]:
-                a list of the SRV records, or an empty list if none found
+            a list of the SRV records, or an empty list if none found
         """
         now = int(self._get_time())
 
@@ -136,7 +134,7 @@ class SrvResolver(object):
                 return _sort_server_list(servers)
 
         try:
-            answers, _, _ = yield make_deferred_yieldable(
+            answers, _, _ = await make_deferred_yieldable(
                 self._dns_client.lookupService(service_name)
             )
         except DNSNameError:
diff --git a/synapse/http/federation/well_known_resolver.py b/synapse/http/federation/well_known_resolver.py
index 7ddfad286d..f794315deb 100644
--- a/synapse/http/federation/well_known_resolver.py
+++ b/synapse/http/federation/well_known_resolver.py
@@ -13,7 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import json
 import logging
 import random
 import time
@@ -23,9 +22,10 @@ import attr
 from twisted.internet import defer
 from twisted.web.client import RedirectAgent, readBody
 from twisted.web.http import stringToDatetime
+from twisted.web.http_headers import Headers
 
 from synapse.logging.context import make_deferred_yieldable
-from synapse.util import Clock
+from synapse.util import Clock, json_decoder
 from synapse.util.caches.ttlcache import TTLCache
 from synapse.util.metrics import Measure
 
@@ -78,7 +78,12 @@ class WellKnownResolver(object):
     """
 
     def __init__(
-        self, reactor, agent, well_known_cache=None, had_well_known_cache=None
+        self,
+        reactor,
+        agent,
+        user_agent,
+        well_known_cache=None,
+        had_well_known_cache=None,
     ):
         self._reactor = reactor
         self._clock = Clock(reactor)
@@ -92,6 +97,7 @@ class WellKnownResolver(object):
         self._well_known_cache = well_known_cache
         self._had_valid_well_known_cache = had_well_known_cache
         self._well_known_agent = RedirectAgent(agent)
+        self.user_agent = user_agent
 
     @defer.inlineCallbacks
     def get_well_known(self, server_name):
@@ -174,7 +180,7 @@ class WellKnownResolver(object):
             if response.code != 200:
                 raise Exception("Non-200 response %s" % (response.code,))
 
-            parsed_body = json.loads(body.decode("utf-8"))
+            parsed_body = json_decoder.decode(body.decode("utf-8"))
             logger.info("Response from .well-known: %s", parsed_body)
 
             result = parsed_body["m.server"].encode("ascii")
@@ -227,6 +233,10 @@ class WellKnownResolver(object):
         uri = b"https://%s/.well-known/matrix/server" % (server_name,)
         uri_str = uri.decode("ascii")
 
+        headers = {
+            b"User-Agent": [self.user_agent],
+        }
+
         i = 0
         while True:
             i += 1
@@ -234,7 +244,9 @@ class WellKnownResolver(object):
             logger.info("Fetching %s", uri_str)
             try:
                 response = yield make_deferred_yieldable(
-                    self._well_known_agent.request(b"GET", uri)
+                    self._well_known_agent.request(
+                        b"GET", uri, headers=Headers(headers)
+                    )
                 )
                 body = yield make_deferred_yieldable(readBody(response))
 
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 2d47b9ea00..738be43f46 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -17,11 +17,9 @@ import cgi
 import logging
 import random
 import sys
+import urllib
 from io import BytesIO
 
-from six import raise_from, string_types
-from six.moves import urllib
-
 import attr
 import treq
 from canonicaljson import encode_canonical_json
@@ -31,10 +29,11 @@ from zope.interface import implementer
 
 from twisted.internet import defer, protocol
 from twisted.internet.error import DNSLookupError
-from twisted.internet.interfaces import IReactorPluggableNameResolver
+from twisted.internet.interfaces import IReactorPluggableNameResolver, IReactorTime
 from twisted.internet.task import _EPSILON, Cooperator
 from twisted.web._newclient import ResponseDone
 from twisted.web.http_headers import Headers
+from twisted.web.iweb import IResponse
 
 import synapse.metrics
 import synapse.util.retryutils
@@ -76,7 +75,7 @@ MAXINT = sys.maxsize
 _next_id = 1
 
 
-@attr.s
+@attr.s(frozen=True)
 class MatrixFederationRequest(object):
     method = attr.ib()
     """HTTP method
@@ -112,27 +111,52 @@ class MatrixFederationRequest(object):
     :type: str|None
     """
 
+    uri = attr.ib(init=False, type=bytes)
+    """The URI of this request
+    """
+
     def __attrs_post_init__(self):
         global _next_id
-        self.txn_id = "%s-O-%s" % (self.method, _next_id)
+        txn_id = "%s-O-%s" % (self.method, _next_id)
         _next_id = (_next_id + 1) % (MAXINT - 1)
 
+        object.__setattr__(self, "txn_id", txn_id)
+
+        destination_bytes = self.destination.encode("ascii")
+        path_bytes = self.path.encode("ascii")
+        if self.query:
+            query_bytes = encode_query_args(self.query)
+        else:
+            query_bytes = b""
+
+        # The object is frozen so we can pre-compute this.
+        uri = urllib.parse.urlunparse(
+            (b"matrix", destination_bytes, path_bytes, None, query_bytes, b"")
+        )
+        object.__setattr__(self, "uri", uri)
+
     def get_json(self):
         if self.json_callback:
             return self.json_callback()
         return self.json
 
 
-@defer.inlineCallbacks
-def _handle_json_response(reactor, timeout_sec, request, response):
+async def _handle_json_response(
+    reactor: IReactorTime,
+    timeout_sec: float,
+    request: MatrixFederationRequest,
+    response: IResponse,
+    start_ms: int,
+):
     """
     Reads the JSON body of a response, with a timeout
 
     Args:
-        reactor (IReactor): twisted reactor, for the timeout
-        timeout_sec (float): number of seconds to wait for response to complete
-        request (MatrixFederationRequest): the request that triggered the response
-        response (IResponse): response to the request
+        reactor: twisted reactor, for the timeout
+        timeout_sec: number of seconds to wait for response to complete
+        request: the request that triggered the response
+        response: response to the request
+        start_ms: Timestamp when request was made
 
     Returns:
         dict: parsed JSON response
@@ -143,26 +167,38 @@ def _handle_json_response(reactor, timeout_sec, request, response):
         d = treq.json_content(response)
         d = timeout_deferred(d, timeout=timeout_sec, reactor=reactor)
 
-        body = yield make_deferred_yieldable(d)
+        body = await make_deferred_yieldable(d)
     except TimeoutError as e:
         logger.warning(
-            "{%s} [%s] Timed out reading response", request.txn_id, request.destination,
+            "{%s} [%s] Timed out reading response - %s %s",
+            request.txn_id,
+            request.destination,
+            request.method,
+            request.uri.decode("ascii"),
         )
         raise RequestSendFailed(e, can_retry=True) from e
     except Exception as e:
         logger.warning(
-            "{%s} [%s] Error reading response: %s",
+            "{%s} [%s] Error reading response %s %s: %s",
             request.txn_id,
             request.destination,
+            request.method,
+            request.uri.decode("ascii"),
             e,
         )
         raise
+
+    time_taken_secs = reactor.seconds() - start_ms / 1000
+
     logger.info(
-        "{%s} [%s] Completed: %d %s",
+        "{%s} [%s] Completed request: %d %s in %.2f secs - %s %s",
         request.txn_id,
         request.destination,
         response.code,
         response.phrase.decode("ascii", errors="replace"),
+        time_taken_secs,
+        request.method,
+        request.uri.decode("ascii"),
     )
     return body
 
@@ -178,7 +214,7 @@ class MatrixFederationHttpClient(object):
 
     def __init__(self, hs, tls_client_options_factory):
         self.hs = hs
-        self.signing_key = hs.config.signing_key[0]
+        self.signing_key = hs.signing_key
         self.server_name = hs.hostname
 
         real_reactor = hs.get_reactor()
@@ -199,7 +235,14 @@ class MatrixFederationHttpClient(object):
 
         self.reactor = Reactor()
 
-        self.agent = MatrixFederationAgent(self.reactor, tls_client_options_factory)
+        user_agent = hs.version_string
+        if hs.config.user_agent_suffix:
+            user_agent = "%s %s" % (user_agent, hs.config.user_agent_suffix)
+        user_agent = user_agent.encode("ascii")
+
+        self.agent = MatrixFederationAgent(
+            self.reactor, tls_client_options_factory, user_agent
+        )
 
         # Use a BlacklistingAgentWrapper to prevent circumventing the IP
         # blacklist via IP literals in server names
@@ -219,8 +262,7 @@ class MatrixFederationHttpClient(object):
 
         self._cooperator = Cooperator(scheduler=schedule)
 
-    @defer.inlineCallbacks
-    def _send_request_with_optional_trailing_slash(
+    async def _send_request_with_optional_trailing_slash(
         self, request, try_trailing_slash_on_400=False, **send_request_args
     ):
         """Wrapper for _send_request which can optionally retry the request
@@ -241,10 +283,10 @@ class MatrixFederationHttpClient(object):
                 (except 429).
 
         Returns:
-            Deferred[Dict]: Parsed JSON response body.
+            Dict: Parsed JSON response body.
         """
         try:
-            response = yield self._send_request(request, **send_request_args)
+            response = await self._send_request(request, **send_request_args)
         except HttpResponseException as e:
             # Received an HTTP error > 300. Check if it meets the requirements
             # to retry with a trailing slash
@@ -258,14 +300,15 @@ class MatrixFederationHttpClient(object):
             # 'M_UNRECOGNIZED' which some endpoints can return when omitting a
             # trailing slash on Synapse <= v0.99.3.
             logger.info("Retrying request with trailing slash")
-            request.path += "/"
 
-            response = yield self._send_request(request, **send_request_args)
+            # Request is frozen so we create a new instance
+            request = attr.evolve(request, path=request.path + "/")
+
+            response = await self._send_request(request, **send_request_args)
 
         return response
 
-    @defer.inlineCallbacks
-    def _send_request(
+    async def _send_request(
         self,
         request,
         retry_on_dns_fail=True,
@@ -306,7 +349,7 @@ class MatrixFederationHttpClient(object):
             backoff_on_404 (bool): Back off if we get a 404
 
         Returns:
-            Deferred[twisted.web.client.Response]: resolves with the HTTP
+            twisted.web.client.Response: resolves with the HTTP
             response object on success.
 
         Raises:
@@ -330,7 +373,7 @@ class MatrixFederationHttpClient(object):
         ):
             raise FederationDeniedError(request.destination)
 
-        limiter = yield synapse.util.retryutils.get_retry_limiter(
+        limiter = await synapse.util.retryutils.get_retry_limiter(
             request.destination,
             self.clock,
             self._store,
@@ -371,9 +414,7 @@ class MatrixFederationHttpClient(object):
             else:
                 retries_left = MAX_SHORT_RETRIES
 
-            url_bytes = urllib.parse.urlunparse(
-                (b"matrix", destination_bytes, path_bytes, None, query_bytes, b"")
-            )
+            url_bytes = request.uri
             url_str = url_bytes.decode("ascii")
 
             url_to_sign_bytes = urllib.parse.urlunparse(
@@ -400,7 +441,7 @@ class MatrixFederationHttpClient(object):
 
                     headers_dict[b"Authorization"] = auth_headers
 
-                    logger.info(
+                    logger.debug(
                         "{%s} [%s] Sending request: %s %s; timeout %fs",
                         request.txn_id,
                         request.destination,
@@ -428,20 +469,20 @@ class MatrixFederationHttpClient(object):
                                 reactor=self.reactor,
                             )
 
-                            response = yield request_deferred
+                            response = await request_deferred
                     except TimeoutError as e:
                         raise RequestSendFailed(e, can_retry=True) from e
                     except DNSLookupError as e:
-                        raise_from(RequestSendFailed(e, can_retry=retry_on_dns_fail), e)
+                        raise RequestSendFailed(e, can_retry=retry_on_dns_fail) from e
                     except Exception as e:
-                        logger.info("Failed to send request: %s", e)
-                        raise_from(RequestSendFailed(e, can_retry=True), e)
+                        raise RequestSendFailed(e, can_retry=True) from e
 
                     incoming_responses_counter.labels(
                         request.method, response.code
                     ).inc()
 
                     set_tag(tags.HTTP_STATUS_CODE, response.code)
+                    response_phrase = response.phrase.decode("ascii", errors="replace")
 
                     if 200 <= response.code < 300:
                         logger.debug(
@@ -449,7 +490,7 @@ class MatrixFederationHttpClient(object):
                             request.txn_id,
                             request.destination,
                             response.code,
-                            response.phrase.decode("ascii", errors="replace"),
+                            response_phrase,
                         )
                         pass
                     else:
@@ -458,7 +499,7 @@ class MatrixFederationHttpClient(object):
                             request.txn_id,
                             request.destination,
                             response.code,
-                            response.phrase.decode("ascii", errors="replace"),
+                            response_phrase,
                         )
                         # :'(
                         # Update transactions table?
@@ -468,7 +509,7 @@ class MatrixFederationHttpClient(object):
                         )
 
                         try:
-                            body = yield make_deferred_yieldable(d)
+                            body = await make_deferred_yieldable(d)
                         except Exception as e:
                             # Eh, we're already going to raise an exception so lets
                             # ignore if this fails.
@@ -482,18 +523,18 @@ class MatrixFederationHttpClient(object):
                             )
                             body = None
 
-                        e = HttpResponseException(response.code, response.phrase, body)
+                        e = HttpResponseException(response.code, response_phrase, body)
 
                         # Retry if the error is a 429 (Too Many Requests),
                         # otherwise just raise a standard HttpResponseException
                         if response.code == 429:
-                            raise_from(RequestSendFailed(e, can_retry=True), e)
+                            raise RequestSendFailed(e, can_retry=True) from e
                         else:
                             raise e
 
                     break
                 except RequestSendFailed as e:
-                    logger.warning(
+                    logger.info(
                         "{%s} [%s] Request failed: %s %s: %s",
                         request.txn_id,
                         request.destination,
@@ -522,7 +563,7 @@ class MatrixFederationHttpClient(object):
                             delay,
                         )
 
-                        yield self.clock.sleep(delay)
+                        await self.clock.sleep(delay)
                         retries_left -= 1
                     else:
                         raise
@@ -557,13 +598,17 @@ class MatrixFederationHttpClient(object):
         Returns:
             list[bytes]: a list of headers to be added as "Authorization:" headers
         """
-        request = {"method": method, "uri": url_bytes, "origin": self.server_name}
+        request = {
+            "method": method.decode("ascii"),
+            "uri": url_bytes.decode("ascii"),
+            "origin": self.server_name,
+        }
 
         if destination is not None:
-            request["destination"] = destination
+            request["destination"] = destination.decode("ascii")
 
         if destination_is is not None:
-            request["destination_is"] = destination_is
+            request["destination_is"] = destination_is.decode("ascii")
 
         if content is not None:
             request["content"] = content
@@ -581,8 +626,7 @@ class MatrixFederationHttpClient(object):
             )
         return auth_headers
 
-    @defer.inlineCallbacks
-    def put_json(
+    async def put_json(
         self,
         destination,
         path,
@@ -626,7 +670,7 @@ class MatrixFederationHttpClient(object):
                 enabled.
 
         Returns:
-            Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The
+            dict|list: Succeeds when we get a 2xx HTTP response. The
             result will be the decoded JSON body.
 
         Raises:
@@ -648,7 +692,9 @@ class MatrixFederationHttpClient(object):
             json=data,
         )
 
-        response = yield self._send_request_with_optional_trailing_slash(
+        start_ms = self.clock.time_msec()
+
+        response = await self._send_request_with_optional_trailing_slash(
             request,
             try_trailing_slash_on_400,
             backoff_on_404=backoff_on_404,
@@ -657,14 +703,13 @@ class MatrixFederationHttpClient(object):
             timeout=timeout,
         )
 
-        body = yield _handle_json_response(
-            self.reactor, self.default_timeout, request, response
+        body = await _handle_json_response(
+            self.reactor, self.default_timeout, request, response, start_ms
         )
 
         return body
 
-    @defer.inlineCallbacks
-    def post_json(
+    async def post_json(
         self,
         destination,
         path,
@@ -697,7 +742,7 @@ class MatrixFederationHttpClient(object):
 
             args (dict): query params
         Returns:
-            Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The
+            dict|list: Succeeds when we get a 2xx HTTP response. The
             result will be the decoded JSON body.
 
         Raises:
@@ -715,7 +760,9 @@ class MatrixFederationHttpClient(object):
             method="POST", destination=destination, path=path, query=args, json=data
         )
 
-        response = yield self._send_request(
+        start_ms = self.clock.time_msec()
+
+        response = await self._send_request(
             request,
             long_retries=long_retries,
             timeout=timeout,
@@ -727,13 +774,12 @@ class MatrixFederationHttpClient(object):
         else:
             _sec_timeout = self.default_timeout
 
-        body = yield _handle_json_response(
-            self.reactor, _sec_timeout, request, response
+        body = await _handle_json_response(
+            self.reactor, _sec_timeout, request, response, start_ms,
         )
         return body
 
-    @defer.inlineCallbacks
-    def get_json(
+    async def get_json(
         self,
         destination,
         path,
@@ -765,7 +811,7 @@ class MatrixFederationHttpClient(object):
                 response we should try appending a trailing slash to the end of
                 the request. Workaround for #3622 in Synapse <= v0.99.3.
         Returns:
-            Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The
+            dict|list: Succeeds when we get a 2xx HTTP response. The
             result will be the decoded JSON body.
 
         Raises:
@@ -782,7 +828,9 @@ class MatrixFederationHttpClient(object):
             method="GET", destination=destination, path=path, query=args
         )
 
-        response = yield self._send_request_with_optional_trailing_slash(
+        start_ms = self.clock.time_msec()
+
+        response = await self._send_request_with_optional_trailing_slash(
             request,
             try_trailing_slash_on_400,
             backoff_on_404=False,
@@ -791,14 +839,13 @@ class MatrixFederationHttpClient(object):
             timeout=timeout,
         )
 
-        body = yield _handle_json_response(
-            self.reactor, self.default_timeout, request, response
+        body = await _handle_json_response(
+            self.reactor, self.default_timeout, request, response, start_ms
         )
 
         return body
 
-    @defer.inlineCallbacks
-    def delete_json(
+    async def delete_json(
         self,
         destination,
         path,
@@ -826,7 +873,7 @@ class MatrixFederationHttpClient(object):
 
             args (dict): query params
         Returns:
-            Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The
+            dict|list: Succeeds when we get a 2xx HTTP response. The
             result will be the decoded JSON body.
 
         Raises:
@@ -843,20 +890,21 @@ class MatrixFederationHttpClient(object):
             method="DELETE", destination=destination, path=path, query=args
         )
 
-        response = yield self._send_request(
+        start_ms = self.clock.time_msec()
+
+        response = await self._send_request(
             request,
             long_retries=long_retries,
             timeout=timeout,
             ignore_backoff=ignore_backoff,
         )
 
-        body = yield _handle_json_response(
-            self.reactor, self.default_timeout, request, response
+        body = await _handle_json_response(
+            self.reactor, self.default_timeout, request, response, start_ms
         )
         return body
 
-    @defer.inlineCallbacks
-    def get_file(
+    async def get_file(
         self,
         destination,
         path,
@@ -876,7 +924,7 @@ class MatrixFederationHttpClient(object):
                 and try the request anyway.
 
         Returns:
-            Deferred[tuple[int, dict]]: Resolves with an (int,dict) tuple of
+            tuple[int, dict]: Resolves with an (int,dict) tuple of
             the file length and a dict of the response headers.
 
         Raises:
@@ -893,7 +941,7 @@ class MatrixFederationHttpClient(object):
             method="GET", destination=destination, path=path, query=args
         )
 
-        response = yield self._send_request(
+        response = await self._send_request(
             request, retry_on_dns_fail=retry_on_dns_fail, ignore_backoff=ignore_backoff
         )
 
@@ -902,7 +950,7 @@ class MatrixFederationHttpClient(object):
         try:
             d = _readBodyToFile(response, output_stream, max_size)
             d.addTimeout(self.default_timeout, self.reactor)
-            length = yield make_deferred_yieldable(d)
+            length = await make_deferred_yieldable(d)
         except Exception as e:
             logger.warning(
                 "{%s} [%s] Error reading response: %s",
@@ -912,12 +960,14 @@ class MatrixFederationHttpClient(object):
             )
             raise
         logger.info(
-            "{%s} [%s] Completed: %d %s [%d bytes]",
+            "{%s} [%s] Completed: %d %s [%d bytes] %s %s",
             request.txn_id,
             request.destination,
             response.code,
             response.phrase.decode("ascii", errors="replace"),
             length,
+            request.method,
+            request.uri.decode("ascii"),
         )
         return (length, headers)
 
@@ -998,7 +1048,7 @@ def encode_query_args(args):
 
     encoded_args = {}
     for k, vs in args.items():
-        if isinstance(vs, string_types):
+        if isinstance(vs, str):
             vs = [vs]
         encoded_args[k] = [v.encode("UTF-8") for v in vs]
 
diff --git a/synapse/http/server.py b/synapse/http/server.py
index 2487a72171..8d791bd2ca 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -14,23 +14,25 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import abc
 import collections
 import html
-import http.client
 import logging
 import types
 import urllib
+from http import HTTPStatus
 from io import BytesIO
-from typing import Awaitable, Callable, TypeVar, Union
+from typing import Any, Callable, Dict, Iterator, List, Tuple, Union
 
 import jinja2
-from canonicaljson import encode_canonical_json, encode_pretty_printed_json, json
+from canonicaljson import iterencode_canonical_json, iterencode_pretty_printed_json
+from zope.interface import implementer
 
-from twisted.internet import defer
+from twisted.internet import defer, interfaces
 from twisted.python import failure
 from twisted.web import resource
 from twisted.web.server import NOT_DONE_YET, Request
-from twisted.web.static import NoRangeStaticProducer
+from twisted.web.static import File, NoRangeStaticProducer
 from twisted.web.util import redirectTo
 
 import synapse.events
@@ -45,6 +47,7 @@ from synapse.api.errors import (
 from synapse.http.site import SynapseRequest
 from synapse.logging.context import preserve_fn
 from synapse.logging.opentracing import trace_servlet
+from synapse.util import json_encoder
 from synapse.util.caches import intern_dict
 
 logger = logging.getLogger(__name__)
@@ -62,99 +65,43 @@ HTML_ERROR_TEMPLATE = """<!DOCTYPE html>
 """
 
 
-def wrap_json_request_handler(h):
-    """Wraps a request handler method with exception handling.
-
-    Also does the wrapping with request.processing as per wrap_async_request_handler.
-
-    The handler method must have a signature of "handle_foo(self, request)",
-    where "request" must be a SynapseRequest.
-
-    The handler must return a deferred or a coroutine. If the deferred succeeds
-    we assume that a response has been sent. If the deferred fails with a SynapseError we use
-    it to send a JSON response with the appropriate HTTP reponse code. If the
-    deferred fails with any other type of error we send a 500 reponse.
+def return_json_error(f: failure.Failure, request: SynapseRequest) -> None:
+    """Sends a JSON error response to clients.
     """
 
-    async def wrapped_request_handler(self, request):
-        try:
-            await h(self, request)
-        except SynapseError as e:
-            code = e.code
-            logger.info("%s SynapseError: %s - %s", request, code, e.msg)
-
-            # Only respond with an error response if we haven't already started
-            # writing, otherwise lets just kill the connection
-            if request.startedWriting:
-                if request.transport:
-                    try:
-                        request.transport.abortConnection()
-                    except Exception:
-                        # abortConnection throws if the connection is already closed
-                        pass
-            else:
-                respond_with_json(
-                    request,
-                    code,
-                    e.error_dict(),
-                    send_cors=True,
-                    pretty_print=_request_user_agent_is_curl(request),
-                )
-
-        except Exception:
-            # failure.Failure() fishes the original Failure out
-            # of our stack, and thus gives us a sensible stack
-            # trace.
-            f = failure.Failure()
-            logger.error(
-                "Failed handle request via %r: %r",
-                request.request_metrics.name,
-                request,
-                exc_info=(f.type, f.value, f.getTracebackObject()),
-            )
-            # Only respond with an error response if we haven't already started
-            # writing, otherwise lets just kill the connection
-            if request.startedWriting:
-                if request.transport:
-                    try:
-                        request.transport.abortConnection()
-                    except Exception:
-                        # abortConnection throws if the connection is already closed
-                        pass
-            else:
-                respond_with_json(
-                    request,
-                    500,
-                    {"error": "Internal server error", "errcode": Codes.UNKNOWN},
-                    send_cors=True,
-                    pretty_print=_request_user_agent_is_curl(request),
-                )
-
-    return wrap_async_request_handler(wrapped_request_handler)
-
+    if f.check(SynapseError):
+        error_code = f.value.code
+        error_dict = f.value.error_dict()
 
-TV = TypeVar("TV")
-
-
-def wrap_html_request_handler(
-    h: Callable[[TV, SynapseRequest], Awaitable]
-) -> Callable[[TV, SynapseRequest], Awaitable[None]]:
-    """Wraps a request handler method with exception handling.
-
-    Also does the wrapping with request.processing as per wrap_async_request_handler.
-
-    The handler method must have a signature of "handle_foo(self, request)",
-    where "request" must be a SynapseRequest.
-    """
+        logger.info("%s SynapseError: %s - %s", request, error_code, f.value.msg)
+    else:
+        error_code = 500
+        error_dict = {"error": "Internal server error", "errcode": Codes.UNKNOWN}
 
-    async def wrapped_request_handler(self, request):
-        try:
-            await h(self, request)
-        except Exception:
-            f = failure.Failure()
-            return_html_error(f, request, HTML_ERROR_TEMPLATE)
+        logger.error(
+            "Failed handle request via %r: %r",
+            request.request_metrics.name,
+            request,
+            exc_info=(f.type, f.value, f.getTracebackObject()),
+        )
 
-    return wrap_async_request_handler(wrapped_request_handler)
+    # Only respond with an error response if we haven't already started writing,
+    # otherwise lets just kill the connection
+    if request.startedWriting:
+        if request.transport:
+            try:
+                request.transport.abortConnection()
+            except Exception:
+                # abortConnection throws if the connection is already closed
+                pass
+    else:
+        respond_with_json(
+            request,
+            error_code,
+            error_dict,
+            send_cors=True,
+            pretty_print=_request_user_agent_is_curl(request),
+        )
 
 
 def return_html_error(
@@ -188,7 +135,7 @@ def return_html_error(
                 exc_info=(f.type, f.value, f.getTracebackObject()),
             )
     else:
-        code = http.HTTPStatus.INTERNAL_SERVER_ERROR
+        code = HTTPStatus.INTERNAL_SERVER_ERROR
         msg = "Internal server error"
 
         logger.error(
@@ -202,12 +149,7 @@ def return_html_error(
     else:
         body = error_template.render(code=code, msg=msg)
 
-    body_bytes = body.encode("utf-8")
-    request.setResponseCode(code)
-    request.setHeader(b"Content-Type", b"text/html; charset=utf-8")
-    request.setHeader(b"Content-Length", b"%i" % (len(body_bytes),))
-    request.write(body_bytes)
-    finish_request(request)
+    respond_with_html(request, code, body)
 
 
 def wrap_async_request_handler(h):
@@ -254,7 +196,115 @@ class HttpServer(object):
         pass
 
 
-class JsonResource(HttpServer, resource.Resource):
+class _AsyncResource(resource.Resource, metaclass=abc.ABCMeta):
+    """Base class for resources that have async handlers.
+
+    Sub classes can either implement `_async_render_<METHOD>` to handle
+    requests by method, or override `_async_render` to handle all requests.
+
+    Args:
+        extract_context: Whether to attempt to extract the opentracing
+            context from the request the servlet is handling.
+    """
+
+    def __init__(self, extract_context=False):
+        super().__init__()
+
+        self._extract_context = extract_context
+
+    def render(self, request):
+        """ This gets called by twisted every time someone sends us a request.
+        """
+        defer.ensureDeferred(self._async_render_wrapper(request))
+        return NOT_DONE_YET
+
+    @wrap_async_request_handler
+    async def _async_render_wrapper(self, request: SynapseRequest):
+        """This is a wrapper that delegates to `_async_render` and handles
+        exceptions, return values, metrics, etc.
+        """
+        try:
+            request.request_metrics.name = self.__class__.__name__
+
+            with trace_servlet(request, self._extract_context):
+                callback_return = await self._async_render(request)
+
+                if callback_return is not None:
+                    code, response = callback_return
+                    self._send_response(request, code, response)
+        except Exception:
+            # failure.Failure() fishes the original Failure out
+            # of our stack, and thus gives us a sensible stack
+            # trace.
+            f = failure.Failure()
+            self._send_error_response(f, request)
+
+    async def _async_render(self, request: Request):
+        """Delegates to `_async_render_<METHOD>` methods, or returns a 400 if
+        no appropriate method exists. Can be overriden in sub classes for
+        different routing.
+        """
+        # Treat HEAD requests as GET requests.
+        request_method = request.method.decode("ascii")
+        if request_method == "HEAD":
+            request_method = "GET"
+
+        method_handler = getattr(self, "_async_render_%s" % (request_method,), None)
+        if method_handler:
+            raw_callback_return = method_handler(request)
+
+            # Is it synchronous? We'll allow this for now.
+            if isinstance(raw_callback_return, (defer.Deferred, types.CoroutineType)):
+                callback_return = await raw_callback_return
+            else:
+                callback_return = raw_callback_return
+
+            return callback_return
+
+        _unrecognised_request_handler(request)
+
+    @abc.abstractmethod
+    def _send_response(
+        self, request: SynapseRequest, code: int, response_object: Any,
+    ) -> None:
+        raise NotImplementedError()
+
+    @abc.abstractmethod
+    def _send_error_response(
+        self, f: failure.Failure, request: SynapseRequest,
+    ) -> None:
+        raise NotImplementedError()
+
+
+class DirectServeJsonResource(_AsyncResource):
+    """A resource that will call `self._async_on_<METHOD>` on new requests,
+    formatting responses and errors as JSON.
+    """
+
+    def _send_response(
+        self, request: Request, code: int, response_object: Any,
+    ):
+        """Implements _AsyncResource._send_response
+        """
+        # TODO: Only enable CORS for the requests that need it.
+        respond_with_json(
+            request,
+            code,
+            response_object,
+            send_cors=True,
+            pretty_print=_request_user_agent_is_curl(request),
+            canonical_json=self.canonical_json,
+        )
+
+    def _send_error_response(
+        self, f: failure.Failure, request: SynapseRequest,
+    ) -> None:
+        """Implements _AsyncResource._send_error_response
+        """
+        return_json_error(f, request)
+
+
+class JsonResource(DirectServeJsonResource):
     """ This implements the HttpServer interface and provides JSON support for
     Resources.
 
@@ -274,17 +324,15 @@ class JsonResource(HttpServer, resource.Resource):
         "_PathEntry", ["pattern", "callback", "servlet_classname"]
     )
 
-    def __init__(self, hs, canonical_json=True):
-        resource.Resource.__init__(self)
+    def __init__(self, hs, canonical_json=True, extract_context=False):
+        super().__init__(extract_context)
 
         self.canonical_json = canonical_json
         self.clock = hs.get_clock()
         self.path_regexs = {}
         self.hs = hs
 
-    def register_paths(
-        self, method, path_patterns, callback, servlet_classname, trace=True
-    ):
+    def register_paths(self, method, path_patterns, callback, servlet_classname):
         """
         Registers a request handler against a regular expression. Later request URLs are
         checked against these regular expressions in order to identify an appropriate
@@ -300,37 +348,46 @@ class JsonResource(HttpServer, resource.Resource):
 
             servlet_classname (str): The name of the handler to be used in prometheus
                 and opentracing logs.
-
-            trace (bool): Whether we should start a span to trace the servlet.
         """
         method = method.encode("utf-8")  # method is bytes on py3
 
-        if trace:
-            # We don't extract the context from the servlet because we can't
-            # trust the sender
-            callback = trace_servlet(servlet_classname)(callback)
-
         for path_pattern in path_patterns:
             logger.debug("Registering for %s %s", method, path_pattern.pattern)
             self.path_regexs.setdefault(method, []).append(
                 self._PathEntry(path_pattern, callback, servlet_classname)
             )
 
-    def render(self, request):
-        """ This gets called by twisted every time someone sends us a request.
+    def _get_handler_for_request(
+        self, request: SynapseRequest
+    ) -> Tuple[Callable, str, Dict[str, str]]:
+        """Finds a callback method to handle the given request.
+
+        Returns:
+            A tuple of the callback to use, the name of the servlet, and the
+            key word arguments to pass to the callback
         """
-        defer.ensureDeferred(self._async_render(request))
-        return NOT_DONE_YET
+        # Treat HEAD requests as GET requests.
+        request_path = request.path.decode("ascii")
+        request_method = request.method
+        if request_method == b"HEAD":
+            request_method = b"GET"
+
+        # Loop through all the registered callbacks to check if the method
+        # and path regex match
+        for path_entry in self.path_regexs.get(request_method, []):
+            m = path_entry.pattern.match(request_path)
+            if m:
+                # We found a match!
+                return path_entry.callback, path_entry.servlet_classname, m.groupdict()
+
+        # Huh. No one wanted to handle that? Fiiiiiine. Send 400.
+        return _unrecognised_request_handler, "unrecognised_request_handler", {}
 
-    @wrap_json_request_handler
     async def _async_render(self, request):
-        """ This gets called from render() every time someone sends us a request.
-            This checks if anyone has registered a callback for that method and
-            path.
-        """
         callback, servlet_classname, group_dict = self._get_handler_for_request(request)
 
-        # Make sure we have a name for this handler in prometheus.
+        # Make sure we have an appopriate name for this handler in prometheus
+        # (rather than the default of JsonResource).
         request.request_metrics.name = servlet_classname
 
         # Now trigger the callback. If it returns a response, we send it
@@ -343,96 +400,54 @@ class JsonResource(HttpServer, resource.Resource):
             }
         )
 
-        callback_return = callback(request, **kwargs)
+        raw_callback_return = callback(request, **kwargs)
 
         # Is it synchronous? We'll allow this for now.
-        if isinstance(callback_return, (defer.Deferred, types.CoroutineType)):
-            callback_return = await callback_return
-
-        if callback_return is not None:
-            code, response = callback_return
-            self._send_response(request, code, response)
-
-    def _get_handler_for_request(self, request):
-        """Finds a callback method to handle the given request
+        if isinstance(raw_callback_return, (defer.Deferred, types.CoroutineType)):
+            callback_return = await raw_callback_return
+        else:
+            callback_return = raw_callback_return
 
-        Args:
-            request (twisted.web.http.Request):
+        return callback_return
 
-        Returns:
-            Tuple[Callable, str, dict[unicode, unicode]]: callback method, the
-                label to use for that method in prometheus metrics, and the
-                dict mapping keys to path components as specified in the
-                handler's path match regexp.
-
-                The callback will normally be a method registered via
-                register_paths, so will return (possibly via Deferred) either
-                None, or a tuple of (http code, response body).
-        """
-        request_path = request.path.decode("ascii")
 
-        # Loop through all the registered callbacks to check if the method
-        # and path regex match
-        for path_entry in self.path_regexs.get(request.method, []):
-            m = path_entry.pattern.match(request_path)
-            if m:
-                # We found a match!
-                return path_entry.callback, path_entry.servlet_classname, m.groupdict()
+class DirectServeHtmlResource(_AsyncResource):
+    """A resource that will call `self._async_on_<METHOD>` on new requests,
+    formatting responses and errors as HTML.
+    """
 
-        # Huh. No one wanted to handle that? Fiiiiiine. Send 400.
-        return _unrecognised_request_handler, "unrecognised_request_handler", {}
+    # The error template to use for this resource
+    ERROR_TEMPLATE = HTML_ERROR_TEMPLATE
 
     def _send_response(
-        self, request, code, response_json_object, response_code_message=None
+        self, request: SynapseRequest, code: int, response_object: Any,
     ):
-        # TODO: Only enable CORS for the requests that need it.
-        respond_with_json(
-            request,
-            code,
-            response_json_object,
-            send_cors=True,
-            response_code_message=response_code_message,
-            pretty_print=_request_user_agent_is_curl(request),
-            canonical_json=self.canonical_json,
-        )
-
-
-class DirectServeResource(resource.Resource):
-    def render(self, request):
+        """Implements _AsyncResource._send_response
         """
-        Render the request, using an asynchronous render handler if it exists.
-        """
-        async_render_callback_name = "_async_render_" + request.method.decode("ascii")
-
-        # Try and get the async renderer
-        callback = getattr(self, async_render_callback_name, None)
-
-        # No async renderer for this request method.
-        if not callback:
-            return super().render(request)
-
-        resp = trace_servlet(self.__class__.__name__)(callback)(request)
-
-        # If it's a coroutine, turn it into a Deferred
-        if isinstance(resp, types.CoroutineType):
-            defer.ensureDeferred(resp)
-
-        return NOT_DONE_YET
+        # We expect to get bytes for us to write
+        assert isinstance(response_object, bytes)
+        html_bytes = response_object
 
+        respond_with_html_bytes(request, 200, html_bytes)
 
-def _options_handler(request):
-    """Request handler for OPTIONS requests
+    def _send_error_response(
+        self, f: failure.Failure, request: SynapseRequest,
+    ) -> None:
+        """Implements _AsyncResource._send_error_response
+        """
+        return_html_error(f, request, self.ERROR_TEMPLATE)
 
-    This is a request handler suitable for return from
-    _get_handler_for_request. It returns a 200 and an empty body.
 
-    Args:
-        request (twisted.web.http.Request):
+class StaticResource(File):
+    """
+    A resource that represents a plain non-interpreted file or directory.
 
-    Returns:
-        Tuple[int, dict]: http code, response body.
+    Differs from the File resource by adding clickjacking protection.
     """
-    return 200, {}
+
+    def render_GET(self, request: Request):
+        set_clickjacking_protection_headers(request)
+        return super().render_GET(request)
 
 
 def _unrecognised_request_handler(request):
@@ -468,11 +483,12 @@ class OptionsResource(resource.Resource):
     """Responds to OPTION requests for itself and all children."""
 
     def render_OPTIONS(self, request):
-        code, response_json_object = _options_handler(request)
+        request.setResponseCode(204)
+        request.setHeader(b"Content-Length", b"0")
 
-        return respond_with_json(
-            request, code, response_json_object, send_cors=True, canonical_json=False,
-        )
+        set_cors_headers(request)
+
+        return b""
 
     def getChildWithDefault(self, path, request):
         if request.method == b"OPTIONS":
@@ -484,15 +500,114 @@ class RootOptionsRedirectResource(OptionsResource, RootRedirect):
     pass
 
 
+@implementer(interfaces.IPushProducer)
+class _ByteProducer:
+    """
+    Iteratively write bytes to the request.
+    """
+
+    # The minimum number of bytes for each chunk. Note that the last chunk will
+    # usually be smaller than this.
+    min_chunk_size = 1024
+
+    def __init__(
+        self, request: Request, iterator: Iterator[bytes],
+    ):
+        self._request = request
+        self._iterator = iterator
+        self._paused = False
+
+        # Register the producer and start producing data.
+        self._request.registerProducer(self, True)
+        self.resumeProducing()
+
+    def _send_data(self, data: List[bytes]) -> None:
+        """
+        Send a list of bytes as a chunk of a response.
+        """
+        if not data:
+            return
+        self._request.write(b"".join(data))
+
+    def pauseProducing(self) -> None:
+        self._paused = True
+
+    def resumeProducing(self) -> None:
+        # We've stopped producing in the meantime (note that this might be
+        # re-entrant after calling write).
+        if not self._request:
+            return
+
+        self._paused = False
+
+        # Write until there's backpressure telling us to stop.
+        while not self._paused:
+            # Get the next chunk and write it to the request.
+            #
+            # The output of the JSON encoder is buffered and coalesced until
+            # min_chunk_size is reached. This is because JSON encoders produce
+            # very small output per iteration and the Request object converts
+            # each call to write() to a separate chunk. Without this there would
+            # be an explosion in bytes written (e.g. b"{" becoming "1\r\n{\r\n").
+            #
+            # Note that buffer stores a list of bytes (instead of appending to
+            # bytes) to hopefully avoid many allocations.
+            buffer = []
+            buffered_bytes = 0
+            while buffered_bytes < self.min_chunk_size:
+                try:
+                    data = next(self._iterator)
+                    buffer.append(data)
+                    buffered_bytes += len(data)
+                except StopIteration:
+                    # The entire JSON object has been serialized, write any
+                    # remaining data, finalize the producer and the request, and
+                    # clean-up any references.
+                    self._send_data(buffer)
+                    self._request.unregisterProducer()
+                    self._request.finish()
+                    self.stopProducing()
+                    return
+
+            self._send_data(buffer)
+
+    def stopProducing(self) -> None:
+        # Clear a circular reference.
+        self._request = None
+
+
+def _encode_json_bytes(json_object: Any) -> Iterator[bytes]:
+    """
+    Encode an object into JSON. Returns an iterator of bytes.
+    """
+    for chunk in json_encoder.iterencode(json_object):
+        yield chunk.encode("utf-8")
+
+
 def respond_with_json(
-    request,
-    code,
-    json_object,
-    send_cors=False,
-    response_code_message=None,
-    pretty_print=False,
-    canonical_json=True,
+    request: Request,
+    code: int,
+    json_object: Any,
+    send_cors: bool = False,
+    pretty_print: bool = False,
+    canonical_json: bool = True,
 ):
+    """Sends encoded JSON in response to the given request.
+
+    Args:
+        request: The http request to respond to.
+        code: The HTTP response code.
+        json_object: The object to serialize to JSON.
+        send_cors: Whether to send Cross-Origin Resource Sharing headers
+            https://fetch.spec.whatwg.org/#http-cors-protocol
+        pretty_print: Whether to include indentation and line-breaks in the
+            resulting JSON bytes.
+        canonical_json: Whether to use the canonicaljson algorithm when encoding
+            the JSON bytes.
+
+    Returns:
+        twisted.web.server.NOT_DONE_YET if the request is still active.
+    """
     # could alternatively use request.notifyFinish() and flip a flag when
     # the Deferred fires, but since the flag is RIGHT THERE it seems like
     # a waste.
@@ -500,41 +615,44 @@ def respond_with_json(
         logger.warning(
             "Not sending response to request %s, already disconnected.", request
         )
-        return
+        return None
 
     if pretty_print:
-        json_bytes = encode_pretty_printed_json(json_object) + b"\n"
+        encoder = iterencode_pretty_printed_json
     else:
         if canonical_json or synapse.events.USE_FROZEN_DICTS:
-            # canonicaljson already encodes to bytes
-            json_bytes = encode_canonical_json(json_object)
+            encoder = iterencode_canonical_json
         else:
-            json_bytes = json.dumps(json_object).encode("utf-8")
-
-    return respond_with_json_bytes(
-        request,
-        code,
-        json_bytes,
-        send_cors=send_cors,
-        response_code_message=response_code_message,
-    )
+            encoder = _encode_json_bytes
+
+    request.setResponseCode(code)
+    request.setHeader(b"Content-Type", b"application/json")
+    request.setHeader(b"Cache-Control", b"no-cache, no-store, must-revalidate")
+
+    if send_cors:
+        set_cors_headers(request)
+
+    _ByteProducer(request, encoder(json_object))
+    return NOT_DONE_YET
 
 
 def respond_with_json_bytes(
-    request, code, json_bytes, send_cors=False, response_code_message=None
+    request: Request, code: int, json_bytes: bytes, send_cors: bool = False,
 ):
     """Sends encoded JSON in response to the given request.
 
     Args:
-        request (twisted.web.http.Request): The http request to respond to.
-        code (int): The HTTP response code.
-        json_bytes (bytes): The json bytes to use as the response body.
-        send_cors (bool): Whether to send Cross-Origin Resource Sharing headers
-            http://www.w3.org/TR/cors/
+        request: The http request to respond to.
+        code: The HTTP response code.
+        json_bytes: The json bytes to use as the response body.
+        send_cors: Whether to send Cross-Origin Resource Sharing headers
+            https://fetch.spec.whatwg.org/#http-cors-protocol
+
     Returns:
-        twisted.web.server.NOT_DONE_YET"""
+        twisted.web.server.NOT_DONE_YET if the request is still active.
+    """
 
-    request.setResponseCode(code, message=response_code_message)
+    request.setResponseCode(code)
     request.setHeader(b"Content-Type", b"application/json")
     request.setHeader(b"Content-Length", b"%d" % (len(json_bytes),))
     request.setHeader(b"Cache-Control", b"no-cache, no-store, must-revalidate")
@@ -542,8 +660,8 @@ def respond_with_json_bytes(
     if send_cors:
         set_cors_headers(request)
 
-    # todo: we can almost certainly avoid this copy and encode the json straight into
-    # the bytesIO, but it would involve faffing around with string->bytes wrappers.
+    # note that this is zero-copy (the bytesio shares a copy-on-write buffer with
+    # the original `bytes`).
     bytes_io = BytesIO(json_bytes)
 
     producer = NoRangeStaticProducer(request, bytes_io)
@@ -551,16 +669,16 @@ def respond_with_json_bytes(
     return NOT_DONE_YET
 
 
-def set_cors_headers(request):
-    """Set the CORs headers so that javascript running in a web browsers can
+def set_cors_headers(request: Request):
+    """Set the CORS headers so that javascript running in a web browsers can
     use this API
 
     Args:
-        request (twisted.web.http.Request): The http request to add CORs to.
+        request: The http request to add CORS to.
     """
     request.setHeader(b"Access-Control-Allow-Origin", b"*")
     request.setHeader(
-        b"Access-Control-Allow-Methods", b"GET, POST, PUT, DELETE, OPTIONS"
+        b"Access-Control-Allow-Methods", b"GET, HEAD, POST, PUT, DELETE, OPTIONS"
     )
     request.setHeader(
         b"Access-Control-Allow-Headers",
@@ -568,7 +686,60 @@ def set_cors_headers(request):
     )
 
 
-def finish_request(request):
+def respond_with_html(request: Request, code: int, html: str):
+    """
+    Wraps `respond_with_html_bytes` by first encoding HTML from a str to UTF-8 bytes.
+    """
+    respond_with_html_bytes(request, code, html.encode("utf-8"))
+
+
+def respond_with_html_bytes(request: Request, code: int, html_bytes: bytes):
+    """
+    Sends HTML (encoded as UTF-8 bytes) as the response to the given request.
+
+    Note that this adds clickjacking protection headers and finishes the request.
+
+    Args:
+        request: The http request to respond to.
+        code: The HTTP response code.
+        html_bytes: The HTML bytes to use as the response body.
+    """
+    # could alternatively use request.notifyFinish() and flip a flag when
+    # the Deferred fires, but since the flag is RIGHT THERE it seems like
+    # a waste.
+    if request._disconnected:
+        logger.warning(
+            "Not sending response to request %s, already disconnected.", request
+        )
+        return
+
+    request.setResponseCode(code)
+    request.setHeader(b"Content-Type", b"text/html; charset=utf-8")
+    request.setHeader(b"Content-Length", b"%d" % (len(html_bytes),))
+
+    # Ensure this content cannot be embedded.
+    set_clickjacking_protection_headers(request)
+
+    request.write(html_bytes)
+    finish_request(request)
+
+
+def set_clickjacking_protection_headers(request: Request):
+    """
+    Set headers to guard against clickjacking of embedded content.
+
+    This sets the X-Frame-Options and Content-Security-Policy headers which instructs
+    browsers to not allow the HTML of the response to be embedded onto another
+    page.
+
+    Args:
+        request: The http request to add the headers to.
+    """
+    request.setHeader(b"X-Frame-Options", b"DENY")
+    request.setHeader(b"Content-Security-Policy", b"frame-ancestors 'none';")
+
+
+def finish_request(request: Request):
     """ Finish writing the response to the request.
 
     Twisted throws a RuntimeException if the connection closed before the
@@ -587,7 +758,7 @@ def finish_request(request):
         logger.info("Connection disconnected before response was written: %r", e)
 
 
-def _request_user_agent_is_curl(request):
+def _request_user_agent_is_curl(request: Request) -> bool:
     user_agents = request.requestHeaders.getRawHeaders(b"User-Agent", default=[])
     for user_agent in user_agents:
         if b"curl" in user_agent:
diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py
index 13fcb408a6..53acba56cb 100644
--- a/synapse/http/servlet.py
+++ b/synapse/http/servlet.py
@@ -17,9 +17,8 @@
 
 import logging
 
-from canonicaljson import json
-
 from synapse.api.errors import Codes, SynapseError
+from synapse.util import json_decoder
 
 logger = logging.getLogger(__name__)
 
@@ -214,16 +213,8 @@ def parse_json_value_from_request(request, allow_empty_body=False):
     if not content_bytes and allow_empty_body:
         return None
 
-    # Decode to Unicode so that simplejson will return Unicode strings on
-    # Python 2
-    try:
-        content_unicode = content_bytes.decode("utf8")
-    except UnicodeDecodeError:
-        logger.warning("Unable to decode UTF-8")
-        raise SynapseError(400, "Content not JSON.", errcode=Codes.NOT_JSON)
-
     try:
-        content = json.loads(content_unicode)
+        content = json_decoder.decode(content_bytes.decode("utf-8"))
     except Exception as e:
         logger.warning("Unable to parse JSON: %s", e)
         raise SynapseError(400, "Content not JSON.", errcode=Codes.NOT_JSON)
diff --git a/synapse/http/site.py b/synapse/http/site.py
index 167293c46d..6e79b47828 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -19,6 +19,7 @@ from typing import Optional
 from twisted.python.failure import Failure
 from twisted.web.server import Request, Site
 
+from synapse.config.server import ListenerConfig
 from synapse.http import redact_uri
 from synapse.http.request_metrics import RequestMetrics, requests_counter
 from synapse.logging.context import LoggingContext, PreserveLoggingContext
@@ -145,10 +146,9 @@ class SynapseRequest(Request):
 
         Returns a context manager; the correct way to use this is:
 
-        @defer.inlineCallbacks
-        def handle_request(request):
+        async def handle_request(request):
             with request.processing("FooServlet"):
-                yield really_handle_the_request()
+                await really_handle_the_request()
 
         Once the context manager is closed, the completion of the request will be logged,
         and the various metrics will be updated.
@@ -214,9 +214,7 @@ class SynapseRequest(Request):
         # It's useful to log it here so that we can get an idea of when
         # the client disconnects.
         with PreserveLoggingContext(self.logcontext):
-            logger.warning(
-                "Error processing request %r: %s %s", self, reason.type, reason.value
-            )
+            logger.info("Connection from client lost before response was sent")
 
             if not self._is_processing:
                 self._finished_processing()
@@ -288,7 +286,9 @@ class SynapseRequest(Request):
             # the connection dropped)
             code += "!"
 
-        self.site.access_logger.info(
+        log_level = logging.INFO if self._should_log_request() else logging.DEBUG
+        self.site.access_logger.log(
+            log_level,
             "%s - %s - {%s}"
             " Processed request: %.3fsec/%.3fsec (%.3fsec, %.3fsec) (%.3fsec/%.3fsec/%d)"
             ' %sB %s "%s %s %s" "%s" [%d dbevts]',
@@ -316,6 +316,17 @@ class SynapseRequest(Request):
         except Exception as e:
             logger.warning("Failed to stop metrics: %r", e)
 
+    def _should_log_request(self) -> bool:
+        """Whether we should log at INFO that we processed the request.
+        """
+        if self.path == b"/health":
+            return False
+
+        if self.method == b"OPTIONS":
+            return False
+
+        return True
+
 
 class XForwardedForRequest(SynapseRequest):
     def __init__(self, *args, **kw):
@@ -350,7 +361,7 @@ class SynapseSite(Site):
         self,
         logger_name,
         site_tag,
-        config,
+        config: ListenerConfig,
         resource,
         server_version_string,
         *args,
@@ -360,7 +371,8 @@ class SynapseSite(Site):
 
         self.site_tag = site_tag
 
-        proxied = config.get("x_forwarded", False)
+        assert config.http_options is not None
+        proxied = config.http_options.x_forwarded
         self.requestFactory = XForwardedForRequest if proxied else SynapseRequest
         self.access_logger = logging.getLogger(logger_name)
         self.server_version_string = server_version_string.encode("ascii")