diff --git a/synapse/http/server.py b/synapse/http/server.py
index ece71c78b2..0930f645fd 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -21,7 +21,6 @@ import types
import urllib
from http import HTTPStatus
from inspect import isawaitable
-from io import BytesIO
from typing import (
Any,
Awaitable,
@@ -45,7 +44,7 @@ from twisted.internet import defer, interfaces
from twisted.python import failure
from twisted.web import resource
from twisted.web.server import NOT_DONE_YET, Request
-from twisted.web.static import File, NoRangeStaticProducer
+from twisted.web.static import File
from twisted.web.util import redirectTo
from synapse.api.errors import (
@@ -60,6 +59,7 @@ from synapse.logging.context import defer_to_thread, preserve_fn, run_in_backgro
from synapse.logging.opentracing import trace_servlet
from synapse.util import json_encoder
from synapse.util.caches import intern_dict
+from synapse.util.iterutils import chunk_seq
logger = logging.getLogger(__name__)
@@ -707,15 +707,35 @@ def respond_with_json_bytes(
if send_cors:
set_cors_headers(request)
- # note that this is zero-copy (the bytesio shares a copy-on-write buffer with
- # the original `bytes`).
- bytes_io = BytesIO(json_bytes)
-
- producer = NoRangeStaticProducer(request, bytes_io)
- producer.start()
+ _write_json_bytes_to_request(request, json_bytes)
return NOT_DONE_YET
+def _write_json_bytes_to_request(request: Request, json_bytes: bytes) -> None:
+ """Writes the JSON bytes to the request using an appropriate producer.
+
+ Note: This should be used instead of `Request.write` to correctly handle
+ large response bodies.
+ """
+
+ # The problem with dumping all of the json response into the `Request`
+ # object at once (via `Request.write`) is that doing so starts the timeout
+ # for the next request to be received: so if it takes longer than 60s to
+ # stream back the response to the client, the client never gets it.
+ #
+ # The correct solution is to use a Producer; then the timeout is only
+ # started once all of the content is sent over the TCP connection.
+
+ # To make sure we don't write the whole of the json at once we split it up
+ # into chunks.
+ chunk_size = 4096
+ bytes_generator = chunk_seq(json_bytes, chunk_size)
+
+ # We use a `_ByteProducer` here rather than `NoRangeStaticProducer` as the
+ # unit tests can't cope with being given a pull producer.
+ _ByteProducer(request, bytes_generator)
+
+
def set_cors_headers(request: Request):
"""Set the CORS headers so that javascript running in a web browsers can
use this API
@@ -814,7 +834,7 @@ def finish_request(request: Request):
async def _async_write_json_to_request_in_thread(
request: SynapseRequest,
- json_encoder: Callable[[Any], str],
+ json_encoder: Callable[[Any], bytes],
json_object: Any,
):
"""Encodes the given JSON object on a thread and then writes it to the
@@ -830,8 +850,4 @@ async def _async_write_json_to_request_in_thread(
json_str = await defer_to_thread(request.reactor, json_encoder, json_object)
- try:
- request.write(json_str)
- request.finish()
- except RuntimeError as e:
- logger.info("Connection disconnected before response was written: %r", e)
+ _write_json_bytes_to_request(request, json_str)
diff --git a/synapse/util/iterutils.py b/synapse/util/iterutils.py
index 8ac3eab2f5..4938ddf703 100644
--- a/synapse/util/iterutils.py
+++ b/synapse/util/iterutils.py
@@ -21,13 +21,28 @@ from typing import (
Iterable,
Iterator,
Mapping,
- Sequence,
Set,
+ Sized,
Tuple,
TypeVar,
)
+from typing_extensions import Protocol
+
T = TypeVar("T")
+S = TypeVar("S", bound="_SelfSlice")
+
+
+class _SelfSlice(Sized, Protocol):
+ """A helper protocol that matches types where taking a slice results in the
+ same type being returned.
+
+ This is more specific than `Sequence`, which allows another `Sequence` to be
+ returned.
+ """
+
+ def __getitem__(self: S, i: slice) -> S:
+ ...
def batch_iter(iterable: Iterable[T], size: int) -> Iterator[Tuple[T, ...]]:
@@ -46,7 +61,7 @@ def batch_iter(iterable: Iterable[T], size: int) -> Iterator[Tuple[T, ...]]:
return iter(lambda: tuple(islice(sourceiter, size)), ())
-def chunk_seq(iseq: Sequence[T], maxlen: int) -> Iterable[Sequence[T]]:
+def chunk_seq(iseq: S, maxlen: int) -> Iterator[S]:
"""Split the given sequence into chunks of the given size
The last chunk may be shorter than the given size.
|