diff --git a/synapse/http/client.py b/synapse/http/client.py
index 8aeb70cdec..dad01a8e56 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -19,7 +19,7 @@ import urllib
from io import BytesIO
import treq
-from canonicaljson import encode_canonical_json, json
+from canonicaljson import encode_canonical_json
from netaddr import IPAddress
from prometheus_client import Counter
from zope.interface import implementer, provider
@@ -47,6 +47,7 @@ from synapse.http import (
from synapse.http.proxyagent import ProxyAgent
from synapse.logging.context import make_deferred_yieldable
from synapse.logging.opentracing import set_tag, start_active_span, tags
+from synapse.util import json_decoder
from synapse.util.async_helpers import timeout_deferred
logger = logging.getLogger(__name__)
@@ -391,7 +392,7 @@ class SimpleHttpClient(object):
body = await make_deferred_yieldable(readBody(response))
if 200 <= response.code < 300:
- return json.loads(body.decode("utf-8"))
+ return json_decoder.decode(body.decode("utf-8"))
else:
raise HttpResponseException(
response.code, response.phrase.decode("ascii", errors="replace"), body
@@ -433,7 +434,7 @@ class SimpleHttpClient(object):
body = await make_deferred_yieldable(readBody(response))
if 200 <= response.code < 300:
- return json.loads(body.decode("utf-8"))
+ return json_decoder.decode(body.decode("utf-8"))
else:
raise HttpResponseException(
response.code, response.phrase.decode("ascii", errors="replace"), body
@@ -463,7 +464,7 @@ class SimpleHttpClient(object):
actual_headers.update(headers)
body = await self.get_raw(uri, args, headers=headers)
- return json.loads(body.decode("utf-8"))
+ return json_decoder.decode(body.decode("utf-8"))
async def put_json(self, uri, json_body, args={}, headers=None):
""" Puts some json to the given URI.
@@ -506,7 +507,7 @@ class SimpleHttpClient(object):
body = await make_deferred_yieldable(readBody(response))
if 200 <= response.code < 300:
- return json.loads(body.decode("utf-8"))
+ return json_decoder.decode(body.decode("utf-8"))
else:
raise HttpResponseException(
response.code, response.phrase.decode("ascii", errors="replace"), body
diff --git a/synapse/http/federation/well_known_resolver.py b/synapse/http/federation/well_known_resolver.py
index 89a3b041ce..f794315deb 100644
--- a/synapse/http/federation/well_known_resolver.py
+++ b/synapse/http/federation/well_known_resolver.py
@@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import json
import logging
import random
import time
@@ -26,7 +25,7 @@ from twisted.web.http import stringToDatetime
from twisted.web.http_headers import Headers
from synapse.logging.context import make_deferred_yieldable
-from synapse.util import Clock
+from synapse.util import Clock, json_decoder
from synapse.util.caches.ttlcache import TTLCache
from synapse.util.metrics import Measure
@@ -181,7 +180,7 @@ class WellKnownResolver(object):
if response.code != 200:
raise Exception("Non-200 response %s" % (response.code,))
- parsed_body = json.loads(body.decode("utf-8"))
+ parsed_body = json_decoder.decode(body.decode("utf-8"))
logger.info("Response from .well-known: %s", parsed_body)
result = parsed_body["m.server"].encode("ascii")
diff --git a/synapse/http/server.py b/synapse/http/server.py
index 37fdf14405..8d791bd2ca 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -500,7 +500,7 @@ class RootOptionsRedirectResource(OptionsResource, RootRedirect):
pass
-@implementer(interfaces.IPullProducer)
+@implementer(interfaces.IPushProducer)
class _ByteProducer:
"""
Iteratively write bytes to the request.
@@ -515,52 +515,64 @@ class _ByteProducer:
):
self._request = request
self._iterator = iterator
+ self._paused = False
- def start(self) -> None:
- self._request.registerProducer(self, False)
+ # Register the producer and start producing data.
+ self._request.registerProducer(self, True)
+ self.resumeProducing()
def _send_data(self, data: List[bytes]) -> None:
"""
- Send a list of strings as a response to the request.
+ Send a list of bytes as a chunk of a response.
"""
if not data:
return
self._request.write(b"".join(data))
+ def pauseProducing(self) -> None:
+ self._paused = True
+
def resumeProducing(self) -> None:
# We've stopped producing in the meantime (note that this might be
# re-entrant after calling write).
if not self._request:
return
- # Get the next chunk and write it to the request.
- #
- # The output of the JSON encoder is coalesced until min_chunk_size is
- # reached. (This is because JSON encoders produce a very small output
- # per iteration.)
- #
- # Note that buffer stores a list of bytes (instead of appending to
- # bytes) to hopefully avoid many allocations.
- buffer = []
- buffered_bytes = 0
- while buffered_bytes < self.min_chunk_size:
- try:
- data = next(self._iterator)
- buffer.append(data)
- buffered_bytes += len(data)
- except StopIteration:
- # The entire JSON object has been serialized, write any
- # remaining data, finalize the producer and the request, and
- # clean-up any references.
- self._send_data(buffer)
- self._request.unregisterProducer()
- self._request.finish()
- self.stopProducing()
- return
-
- self._send_data(buffer)
+ self._paused = False
+
+ # Write until there's backpressure telling us to stop.
+ while not self._paused:
+ # Get the next chunk and write it to the request.
+ #
+ # The output of the JSON encoder is buffered and coalesced until
+ # min_chunk_size is reached. This is because JSON encoders produce
+ # very small output per iteration and the Request object converts
+ # each call to write() to a separate chunk. Without this there would
+ # be an explosion in bytes written (e.g. b"{" becoming "1\r\n{\r\n").
+ #
+ # Note that buffer stores a list of bytes (instead of appending to
+ # bytes) to hopefully avoid many allocations.
+ buffer = []
+ buffered_bytes = 0
+ while buffered_bytes < self.min_chunk_size:
+ try:
+ data = next(self._iterator)
+ buffer.append(data)
+ buffered_bytes += len(data)
+ except StopIteration:
+ # The entire JSON object has been serialized, write any
+ # remaining data, finalize the producer and the request, and
+ # clean-up any references.
+ self._send_data(buffer)
+ self._request.unregisterProducer()
+ self._request.finish()
+ self.stopProducing()
+ return
+
+ self._send_data(buffer)
def stopProducing(self) -> None:
+ # Clear a circular reference.
self._request = None
@@ -620,8 +632,7 @@ def respond_with_json(
if send_cors:
set_cors_headers(request)
- producer = _ByteProducer(request, encoder(json_object))
- producer.start()
+ _ByteProducer(request, encoder(json_object))
return NOT_DONE_YET
diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py
index a34e5ead88..53acba56cb 100644
--- a/synapse/http/servlet.py
+++ b/synapse/http/servlet.py
@@ -17,9 +17,8 @@
import logging
-from canonicaljson import json
-
from synapse.api.errors import Codes, SynapseError
+from synapse.util import json_decoder
logger = logging.getLogger(__name__)
@@ -215,7 +214,7 @@ def parse_json_value_from_request(request, allow_empty_body=False):
return None
try:
- content = json.loads(content_bytes.decode("utf-8"))
+ content = json_decoder.decode(content_bytes.decode("utf-8"))
except Exception as e:
logger.warning("Unable to parse JSON: %s", e)
raise SynapseError(400, "Content not JSON.", errcode=Codes.NOT_JSON)
|