diff options
Diffstat (limited to '')
-rw-r--r-- | changelog.d/10905.feature | 1 | ||||
-rw-r--r-- | synapse/http/server.py | 72 | ||||
-rw-r--r-- | synapse/push/emailpusher.py | 2 | ||||
-rw-r--r-- | synapse/util/iterutils.py | 19 |
4 files changed, 76 insertions, 18 deletions
diff --git a/changelog.d/10905.feature b/changelog.d/10905.feature new file mode 100644 index 0000000000..07e7b2c6a7 --- /dev/null +++ b/changelog.d/10905.feature @@ -0,0 +1 @@ +Speed up responding with large JSON objects to requests. diff --git a/synapse/http/server.py b/synapse/http/server.py index e28b56abb9..1a50305dcf 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -21,7 +21,6 @@ import types import urllib from http import HTTPStatus from inspect import isawaitable -from io import BytesIO from typing import ( Any, Awaitable, @@ -37,7 +36,7 @@ from typing import ( ) import jinja2 -from canonicaljson import iterencode_canonical_json +from canonicaljson import encode_canonical_json from typing_extensions import Protocol from zope.interface import implementer @@ -45,7 +44,7 @@ from twisted.internet import defer, interfaces from twisted.python import failure from twisted.web import resource from twisted.web.server import NOT_DONE_YET, Request -from twisted.web.static import File, NoRangeStaticProducer +from twisted.web.static import File from twisted.web.util import redirectTo from synapse.api.errors import ( @@ -56,10 +55,11 @@ from synapse.api.errors import ( UnrecognizedRequestError, ) from synapse.http.site import SynapseRequest -from synapse.logging.context import preserve_fn +from synapse.logging.context import defer_to_thread, preserve_fn, run_in_background from synapse.logging.opentracing import trace_servlet from synapse.util import json_encoder from synapse.util.caches import intern_dict +from synapse.util.iterutils import chunk_seq logger = logging.getLogger(__name__) @@ -620,12 +620,11 @@ class _ByteProducer: self._request = None -def _encode_json_bytes(json_object: Any) -> Iterator[bytes]: +def _encode_json_bytes(json_object: Any) -> bytes: """ Encode an object into JSON. Returns an iterator of bytes. """ - for chunk in json_encoder.iterencode(json_object): - yield chunk.encode("utf-8") + return json_encoder.encode(json_object).encode("utf-8") def respond_with_json( @@ -659,7 +658,7 @@ def respond_with_json( return None if canonical_json: - encoder = iterencode_canonical_json + encoder = encode_canonical_json else: encoder = _encode_json_bytes @@ -670,7 +669,9 @@ def respond_with_json( if send_cors: set_cors_headers(request) - _ByteProducer(request, encoder(json_object)) + run_in_background( + _async_write_json_to_request_in_thread, request, encoder, json_object + ) return NOT_DONE_YET @@ -706,15 +707,56 @@ def respond_with_json_bytes( if send_cors: set_cors_headers(request) - # note that this is zero-copy (the bytesio shares a copy-on-write buffer with - # the original `bytes`). - bytes_io = BytesIO(json_bytes) - - producer = NoRangeStaticProducer(request, bytes_io) - producer.start() + _write_bytes_to_request(request, json_bytes) return NOT_DONE_YET +async def _async_write_json_to_request_in_thread( + request: SynapseRequest, + json_encoder: Callable[[Any], bytes], + json_object: Any, +): + """Encodes the given JSON object on a thread and then writes it to the + request. + + This is done so that encoding large JSON objects doesn't block the reactor + thread. + + Note: We don't use JsonEncoder.iterencode here as that falls back to the + Python implementation (rather than the C backend), which is *much* more + expensive. + """ + + json_str = await defer_to_thread(request.reactor, json_encoder, json_object) + + _write_bytes_to_request(request, json_str) + + +def _write_bytes_to_request(request: Request, bytes_to_write: bytes) -> None: + """Writes the bytes to the request using an appropriate producer. + + Note: This should be used instead of `Request.write` to correctly handle + large response bodies. + """ + + # The problem with dumping all of the response into the `Request` object at + # once (via `Request.write`) is that doing so starts the timeout for the + # next request to be received: so if it takes longer than 60s to stream back + # the response to the client, the client never gets it. + # + # The correct solution is to use a Producer; then the timeout is only + # started once all of the content is sent over the TCP connection. + + # To make sure we don't write all of the bytes at once we split it up into + # chunks. + chunk_size = 4096 + bytes_generator = chunk_seq(bytes_to_write, chunk_size) + + # We use a `_ByteProducer` here rather than `NoRangeStaticProducer` as the + # unit tests can't cope with being given a pull producer. + _ByteProducer(request, bytes_generator) + + def set_cors_headers(request: Request): """Set the CORS headers so that javascript running in a web browsers can use this API diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index e08e125cb8..cf5abdfbda 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -184,7 +184,7 @@ class EmailPusher(Pusher): should_notify_at = max(notif_ready_at, room_ready_at) - if should_notify_at < self.clock.time_msec(): + if should_notify_at <= self.clock.time_msec(): # one of our notifications is ready for sending, so we send # *one* email updating the user on their notifications, # we then consider all previously outstanding notifications diff --git a/synapse/util/iterutils.py b/synapse/util/iterutils.py index 8ac3eab2f5..4938ddf703 100644 --- a/synapse/util/iterutils.py +++ b/synapse/util/iterutils.py @@ -21,13 +21,28 @@ from typing import ( Iterable, Iterator, Mapping, - Sequence, Set, + Sized, Tuple, TypeVar, ) +from typing_extensions import Protocol + T = TypeVar("T") +S = TypeVar("S", bound="_SelfSlice") + + +class _SelfSlice(Sized, Protocol): + """A helper protocol that matches types where taking a slice results in the + same type being returned. + + This is more specific than `Sequence`, which allows another `Sequence` to be + returned. + """ + + def __getitem__(self: S, i: slice) -> S: + ... def batch_iter(iterable: Iterable[T], size: int) -> Iterator[Tuple[T, ...]]: @@ -46,7 +61,7 @@ def batch_iter(iterable: Iterable[T], size: int) -> Iterator[Tuple[T, ...]]: return iter(lambda: tuple(islice(sourceiter, size)), ()) -def chunk_seq(iseq: Sequence[T], maxlen: int) -> Iterable[Sequence[T]]: +def chunk_seq(iseq: S, maxlen: int) -> Iterator[S]: """Split the given sequence into chunks of the given size The last chunk may be shorter than the given size. |