summary refs log tree commit diff
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--changelog.d/10905.feature1
-rw-r--r--synapse/http/server.py72
-rw-r--r--synapse/push/emailpusher.py2
-rw-r--r--synapse/util/iterutils.py19
4 files changed, 76 insertions, 18 deletions
diff --git a/changelog.d/10905.feature b/changelog.d/10905.feature
new file mode 100644
index 0000000000..07e7b2c6a7
--- /dev/null
+++ b/changelog.d/10905.feature
@@ -0,0 +1 @@
+Speed up responding with large JSON objects to requests.
diff --git a/synapse/http/server.py b/synapse/http/server.py
index e28b56abb9..1a50305dcf 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -21,7 +21,6 @@ import types
 import urllib
 from http import HTTPStatus
 from inspect import isawaitable
-from io import BytesIO
 from typing import (
     Any,
     Awaitable,
@@ -37,7 +36,7 @@ from typing import (
 )
 
 import jinja2
-from canonicaljson import iterencode_canonical_json
+from canonicaljson import encode_canonical_json
 from typing_extensions import Protocol
 from zope.interface import implementer
 
@@ -45,7 +44,7 @@ from twisted.internet import defer, interfaces
 from twisted.python import failure
 from twisted.web import resource
 from twisted.web.server import NOT_DONE_YET, Request
-from twisted.web.static import File, NoRangeStaticProducer
+from twisted.web.static import File
 from twisted.web.util import redirectTo
 
 from synapse.api.errors import (
@@ -56,10 +55,11 @@ from synapse.api.errors import (
     UnrecognizedRequestError,
 )
 from synapse.http.site import SynapseRequest
-from synapse.logging.context import preserve_fn
+from synapse.logging.context import defer_to_thread, preserve_fn, run_in_background
 from synapse.logging.opentracing import trace_servlet
 from synapse.util import json_encoder
 from synapse.util.caches import intern_dict
+from synapse.util.iterutils import chunk_seq
 
 logger = logging.getLogger(__name__)
 
@@ -620,12 +620,11 @@ class _ByteProducer:
         self._request = None
 
 
-def _encode_json_bytes(json_object: Any) -> Iterator[bytes]:
+def _encode_json_bytes(json_object: Any) -> bytes:
     """
     Encode an object into JSON. Returns an iterator of bytes.
     """
-    for chunk in json_encoder.iterencode(json_object):
-        yield chunk.encode("utf-8")
+    return json_encoder.encode(json_object).encode("utf-8")
 
 
 def respond_with_json(
@@ -659,7 +658,7 @@ def respond_with_json(
         return None
 
     if canonical_json:
-        encoder = iterencode_canonical_json
+        encoder = encode_canonical_json
     else:
         encoder = _encode_json_bytes
 
@@ -670,7 +669,9 @@ def respond_with_json(
     if send_cors:
         set_cors_headers(request)
 
-    _ByteProducer(request, encoder(json_object))
+    run_in_background(
+        _async_write_json_to_request_in_thread, request, encoder, json_object
+    )
     return NOT_DONE_YET
 
 
@@ -706,15 +707,56 @@ def respond_with_json_bytes(
     if send_cors:
         set_cors_headers(request)
 
-    # note that this is zero-copy (the bytesio shares a copy-on-write buffer with
-    # the original `bytes`).
-    bytes_io = BytesIO(json_bytes)
-
-    producer = NoRangeStaticProducer(request, bytes_io)
-    producer.start()
+    _write_bytes_to_request(request, json_bytes)
     return NOT_DONE_YET
 
 
+async def _async_write_json_to_request_in_thread(
+    request: SynapseRequest,
+    json_encoder: Callable[[Any], bytes],
+    json_object: Any,
+):
+    """Encodes the given JSON object on a thread and then writes it to the
+    request.
+
+    This is done so that encoding large JSON objects doesn't block the reactor
+    thread.
+
+    Note: We don't use JsonEncoder.iterencode here as that falls back to the
+    Python implementation (rather than the C backend), which is *much* more
+    expensive.
+    """
+
+    json_str = await defer_to_thread(request.reactor, json_encoder, json_object)
+
+    _write_bytes_to_request(request, json_str)
+
+
+def _write_bytes_to_request(request: Request, bytes_to_write: bytes) -> None:
+    """Writes the bytes to the request using an appropriate producer.
+
+    Note: This should be used instead of `Request.write` to correctly handle
+    large response bodies.
+    """
+
+    # The problem with dumping all of the response into the `Request` object at
+    # once (via `Request.write`) is that doing so starts the timeout for the
+    # next request to be received: so if it takes longer than 60s to stream back
+    # the response to the client, the client never gets it.
+    #
+    # The correct solution is to use a Producer; then the timeout is only
+    # started once all of the content is sent over the TCP connection.
+
+    # To make sure we don't write all of the bytes at once we split it up into
+    # chunks.
+    chunk_size = 4096
+    bytes_generator = chunk_seq(bytes_to_write, chunk_size)
+
+    # We use a `_ByteProducer` here rather than `NoRangeStaticProducer` as the
+    # unit tests can't cope with being given a pull producer.
+    _ByteProducer(request, bytes_generator)
+
+
 def set_cors_headers(request: Request):
     """Set the CORS headers so that javascript running in a web browsers can
     use this API
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index e08e125cb8..cf5abdfbda 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -184,7 +184,7 @@ class EmailPusher(Pusher):
 
             should_notify_at = max(notif_ready_at, room_ready_at)
 
-            if should_notify_at < self.clock.time_msec():
+            if should_notify_at <= self.clock.time_msec():
                 # one of our notifications is ready for sending, so we send
                 # *one* email updating the user on their notifications,
                 # we then consider all previously outstanding notifications
diff --git a/synapse/util/iterutils.py b/synapse/util/iterutils.py
index 8ac3eab2f5..4938ddf703 100644
--- a/synapse/util/iterutils.py
+++ b/synapse/util/iterutils.py
@@ -21,13 +21,28 @@ from typing import (
     Iterable,
     Iterator,
     Mapping,
-    Sequence,
     Set,
+    Sized,
     Tuple,
     TypeVar,
 )
 
+from typing_extensions import Protocol
+
 T = TypeVar("T")
+S = TypeVar("S", bound="_SelfSlice")
+
+
+class _SelfSlice(Sized, Protocol):
+    """A helper protocol that matches types where taking a slice results in the
+    same type being returned.
+
+    This is more specific than `Sequence`, which allows another `Sequence` to be
+    returned.
+    """
+
+    def __getitem__(self: S, i: slice) -> S:
+        ...
 
 
 def batch_iter(iterable: Iterable[T], size: int) -> Iterator[Tuple[T, ...]]:
@@ -46,7 +61,7 @@ def batch_iter(iterable: Iterable[T], size: int) -> Iterator[Tuple[T, ...]]:
     return iter(lambda: tuple(islice(sourceiter, size)), ())
 
 
-def chunk_seq(iseq: Sequence[T], maxlen: int) -> Iterable[Sequence[T]]:
+def chunk_seq(iseq: S, maxlen: int) -> Iterator[S]:
     """Split the given sequence into chunks of the given size
 
     The last chunk may be shorter than the given size.