diff --git a/synapse/http/server.py b/synapse/http/server.py
index d4f9ad6e67..996a31a9ec 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -22,12 +22,13 @@ import types
import urllib
from http import HTTPStatus
from io import BytesIO
-from typing import Any, Callable, Dict, Tuple, Union
+from typing import Any, Callable, Dict, Iterator, List, Tuple, Union
import jinja2
-from canonicaljson import encode_canonical_json, encode_pretty_printed_json, json
+from canonicaljson import iterencode_canonical_json, iterencode_pretty_printed_json
+from zope.interface import implementer
-from twisted.internet import defer
+from twisted.internet import defer, interfaces
from twisted.python import failure
from twisted.web import resource
from twisted.web.server import NOT_DONE_YET, Request
@@ -46,6 +47,7 @@ from synapse.api.errors import (
from synapse.http.site import SynapseRequest
from synapse.logging.context import preserve_fn
from synapse.logging.opentracing import trace_servlet
+from synapse.util import json_encoder
from synapse.util.caches import intern_dict
logger = logging.getLogger(__name__)
@@ -172,7 +174,7 @@ def wrap_async_request_handler(h):
return preserve_fn(wrapped_async_request_handler)
-class HttpServer(object):
+class HttpServer:
""" Interface for registering callbacks on a HTTP server
"""
@@ -242,10 +244,12 @@ class _AsyncResource(resource.Resource, metaclass=abc.ABCMeta):
no appropriate method exists. Can be overriden in sub classes for
different routing.
"""
+ # Treat HEAD requests as GET requests.
+ request_method = request.method.decode("ascii")
+ if request_method == "HEAD":
+ request_method = "GET"
- method_handler = getattr(
- self, "_async_render_%s" % (request.method.decode("ascii"),), None
- )
+ method_handler = getattr(self, "_async_render_%s" % (request_method,), None)
if method_handler:
raw_callback_return = method_handler(request)
@@ -362,11 +366,15 @@ class JsonResource(DirectServeJsonResource):
A tuple of the callback to use, the name of the servlet, and the
key word arguments to pass to the callback
"""
+ # Treat HEAD requests as GET requests.
request_path = request.path.decode("ascii")
+ request_method = request.method
+ if request_method == b"HEAD":
+ request_method = b"GET"
# Loop through all the registered callbacks to check if the method
# and path regex match
- for path_entry in self.path_regexs.get(request.method, []):
+ for path_entry in self.path_regexs.get(request_method, []):
m = path_entry.pattern.match(request_path)
if m:
# We found a match!
@@ -492,6 +500,90 @@ class RootOptionsRedirectResource(OptionsResource, RootRedirect):
pass
+@implementer(interfaces.IPushProducer)
+class _ByteProducer:
+ """
+ Iteratively write bytes to the request.
+ """
+
+ # The minimum number of bytes for each chunk. Note that the last chunk will
+ # usually be smaller than this.
+ min_chunk_size = 1024
+
+ def __init__(
+ self, request: Request, iterator: Iterator[bytes],
+ ):
+ self._request = request
+ self._iterator = iterator
+ self._paused = False
+
+ # Register the producer and start producing data.
+ self._request.registerProducer(self, True)
+ self.resumeProducing()
+
+ def _send_data(self, data: List[bytes]) -> None:
+ """
+ Send a list of bytes as a chunk of a response.
+ """
+ if not data:
+ return
+ self._request.write(b"".join(data))
+
+ def pauseProducing(self) -> None:
+ self._paused = True
+
+ def resumeProducing(self) -> None:
+ # We've stopped producing in the meantime (note that this might be
+ # re-entrant after calling write).
+ if not self._request:
+ return
+
+ self._paused = False
+
+ # Write until there's backpressure telling us to stop.
+ while not self._paused:
+ # Get the next chunk and write it to the request.
+ #
+ # The output of the JSON encoder is buffered and coalesced until
+ # min_chunk_size is reached. This is because JSON encoders produce
+ # very small output per iteration and the Request object converts
+ # each call to write() to a separate chunk. Without this there would
+ # be an explosion in bytes written (e.g. b"{" becoming "1\r\n{\r\n").
+ #
+ # Note that buffer stores a list of bytes (instead of appending to
+ # bytes) to hopefully avoid many allocations.
+ buffer = []
+ buffered_bytes = 0
+ while buffered_bytes < self.min_chunk_size:
+ try:
+ data = next(self._iterator)
+ buffer.append(data)
+ buffered_bytes += len(data)
+ except StopIteration:
+ # The entire JSON object has been serialized, write any
+ # remaining data, finalize the producer and the request, and
+ # clean-up any references.
+ self._send_data(buffer)
+ self._request.unregisterProducer()
+ self._request.finish()
+ self.stopProducing()
+ return
+
+ self._send_data(buffer)
+
+ def stopProducing(self) -> None:
+ # Clear a circular reference.
+ self._request = None
+
+
+def _encode_json_bytes(json_object: Any) -> Iterator[bytes]:
+ """
+ Encode an object into JSON. Returns an iterator of bytes.
+ """
+ for chunk in json_encoder.iterencode(json_object):
+ yield chunk.encode("utf-8")
+
+
def respond_with_json(
request: Request,
code: int,
@@ -526,15 +618,22 @@ def respond_with_json(
return None
if pretty_print:
- json_bytes = encode_pretty_printed_json(json_object) + b"\n"
+ encoder = iterencode_pretty_printed_json
else:
if canonical_json or synapse.events.USE_FROZEN_DICTS:
- # canonicaljson already encodes to bytes
- json_bytes = encode_canonical_json(json_object)
+ encoder = iterencode_canonical_json
else:
- json_bytes = json.dumps(json_object).encode("utf-8")
+ encoder = _encode_json_bytes
- return respond_with_json_bytes(request, code, json_bytes, send_cors=send_cors)
+ request.setResponseCode(code)
+ request.setHeader(b"Content-Type", b"application/json")
+ request.setHeader(b"Cache-Control", b"no-cache, no-store, must-revalidate")
+
+ if send_cors:
+ set_cors_headers(request)
+
+ _ByteProducer(request, encoder(json_object))
+ return NOT_DONE_YET
def respond_with_json_bytes(
@@ -579,7 +678,7 @@ def set_cors_headers(request: Request):
"""
request.setHeader(b"Access-Control-Allow-Origin", b"*")
request.setHeader(
- b"Access-Control-Allow-Methods", b"GET, POST, PUT, DELETE, OPTIONS"
+ b"Access-Control-Allow-Methods", b"GET, HEAD, POST, PUT, DELETE, OPTIONS"
)
request.setHeader(
b"Access-Control-Allow-Headers",
|