summary refs log tree commit diff
path: root/synapse/http
diff options
context:
space:
mode:
authorAndrew Morgan <andrew@amorgan.xyz>2020-10-19 19:12:39 +0100
committerAndrew Morgan <andrew@amorgan.xyz>2020-10-19 19:12:39 +0100
commit687d30b2eddd4feb530879c9499f248197ffba00 (patch)
treee3ee7460dcf934f1450359d29179079724a57f53 /synapse/http
parentMerge commit '3c01724b3' into anoa/dinsic_release_1_21_x (diff)
parentRemove `ChainedIdGenerator`. (#8123) (diff)
downloadsynapse-687d30b2eddd4feb530879c9499f248197ffba00.tar.xz
Merge commit 'c9c544cda' into anoa/dinsic_release_1_21_x
* commit 'c9c544cda':
  Remove `ChainedIdGenerator`. (#8123)
  Switch the JSON byte producer from a pull to a push producer. (#8116)
  Updated docs: Added note about missing 308 redirect support. (#8120)
  Be stricter about JSON that is accepted by Synapse (#8106)
  Convert runWithConnection to async. (#8121)
  Remove the unused inlineCallbacks code-paths in the caching code (#8119)
  Separate `get_current_token` into two. (#8113)
  Convert events worker database to async/await. (#8071)
  Add a link to the matrix-synapse-rest-password-provider. (#8111)
Diffstat (limited to 'synapse/http')
-rw-r--r--synapse/http/client.py11
-rw-r--r--synapse/http/federation/well_known_resolver.py5
-rw-r--r--synapse/http/server.py75
-rw-r--r--synapse/http/servlet.py5
4 files changed, 53 insertions, 43 deletions
diff --git a/synapse/http/client.py b/synapse/http/client.py

index 8aeb70cdec..dad01a8e56 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py
@@ -19,7 +19,7 @@ import urllib from io import BytesIO import treq -from canonicaljson import encode_canonical_json, json +from canonicaljson import encode_canonical_json from netaddr import IPAddress from prometheus_client import Counter from zope.interface import implementer, provider @@ -47,6 +47,7 @@ from synapse.http import ( from synapse.http.proxyagent import ProxyAgent from synapse.logging.context import make_deferred_yieldable from synapse.logging.opentracing import set_tag, start_active_span, tags +from synapse.util import json_decoder from synapse.util.async_helpers import timeout_deferred logger = logging.getLogger(__name__) @@ -391,7 +392,7 @@ class SimpleHttpClient(object): body = await make_deferred_yieldable(readBody(response)) if 200 <= response.code < 300: - return json.loads(body.decode("utf-8")) + return json_decoder.decode(body.decode("utf-8")) else: raise HttpResponseException( response.code, response.phrase.decode("ascii", errors="replace"), body @@ -433,7 +434,7 @@ class SimpleHttpClient(object): body = await make_deferred_yieldable(readBody(response)) if 200 <= response.code < 300: - return json.loads(body.decode("utf-8")) + return json_decoder.decode(body.decode("utf-8")) else: raise HttpResponseException( response.code, response.phrase.decode("ascii", errors="replace"), body @@ -463,7 +464,7 @@ class SimpleHttpClient(object): actual_headers.update(headers) body = await self.get_raw(uri, args, headers=headers) - return json.loads(body.decode("utf-8")) + return json_decoder.decode(body.decode("utf-8")) async def put_json(self, uri, json_body, args={}, headers=None): """ Puts some json to the given URI. @@ -506,7 +507,7 @@ class SimpleHttpClient(object): body = await make_deferred_yieldable(readBody(response)) if 200 <= response.code < 300: - return json.loads(body.decode("utf-8")) + return json_decoder.decode(body.decode("utf-8")) else: raise HttpResponseException( response.code, response.phrase.decode("ascii", errors="replace"), body diff --git a/synapse/http/federation/well_known_resolver.py b/synapse/http/federation/well_known_resolver.py
index 89a3b041ce..f794315deb 100644 --- a/synapse/http/federation/well_known_resolver.py +++ b/synapse/http/federation/well_known_resolver.py
@@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import json import logging import random import time @@ -26,7 +25,7 @@ from twisted.web.http import stringToDatetime from twisted.web.http_headers import Headers from synapse.logging.context import make_deferred_yieldable -from synapse.util import Clock +from synapse.util import Clock, json_decoder from synapse.util.caches.ttlcache import TTLCache from synapse.util.metrics import Measure @@ -181,7 +180,7 @@ class WellKnownResolver(object): if response.code != 200: raise Exception("Non-200 response %s" % (response.code,)) - parsed_body = json.loads(body.decode("utf-8")) + parsed_body = json_decoder.decode(body.decode("utf-8")) logger.info("Response from .well-known: %s", parsed_body) result = parsed_body["m.server"].encode("ascii") diff --git a/synapse/http/server.py b/synapse/http/server.py
index 37fdf14405..8d791bd2ca 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py
@@ -500,7 +500,7 @@ class RootOptionsRedirectResource(OptionsResource, RootRedirect): pass -@implementer(interfaces.IPullProducer) +@implementer(interfaces.IPushProducer) class _ByteProducer: """ Iteratively write bytes to the request. @@ -515,52 +515,64 @@ class _ByteProducer: ): self._request = request self._iterator = iterator + self._paused = False - def start(self) -> None: - self._request.registerProducer(self, False) + # Register the producer and start producing data. + self._request.registerProducer(self, True) + self.resumeProducing() def _send_data(self, data: List[bytes]) -> None: """ - Send a list of strings as a response to the request. + Send a list of bytes as a chunk of a response. """ if not data: return self._request.write(b"".join(data)) + def pauseProducing(self) -> None: + self._paused = True + def resumeProducing(self) -> None: # We've stopped producing in the meantime (note that this might be # re-entrant after calling write). if not self._request: return - # Get the next chunk and write it to the request. - # - # The output of the JSON encoder is coalesced until min_chunk_size is - # reached. (This is because JSON encoders produce a very small output - # per iteration.) - # - # Note that buffer stores a list of bytes (instead of appending to - # bytes) to hopefully avoid many allocations. - buffer = [] - buffered_bytes = 0 - while buffered_bytes < self.min_chunk_size: - try: - data = next(self._iterator) - buffer.append(data) - buffered_bytes += len(data) - except StopIteration: - # The entire JSON object has been serialized, write any - # remaining data, finalize the producer and the request, and - # clean-up any references. - self._send_data(buffer) - self._request.unregisterProducer() - self._request.finish() - self.stopProducing() - return - - self._send_data(buffer) + self._paused = False + + # Write until there's backpressure telling us to stop. + while not self._paused: + # Get the next chunk and write it to the request. + # + # The output of the JSON encoder is buffered and coalesced until + # min_chunk_size is reached. This is because JSON encoders produce + # very small output per iteration and the Request object converts + # each call to write() to a separate chunk. Without this there would + # be an explosion in bytes written (e.g. b"{" becoming "1\r\n{\r\n"). + # + # Note that buffer stores a list of bytes (instead of appending to + # bytes) to hopefully avoid many allocations. + buffer = [] + buffered_bytes = 0 + while buffered_bytes < self.min_chunk_size: + try: + data = next(self._iterator) + buffer.append(data) + buffered_bytes += len(data) + except StopIteration: + # The entire JSON object has been serialized, write any + # remaining data, finalize the producer and the request, and + # clean-up any references. + self._send_data(buffer) + self._request.unregisterProducer() + self._request.finish() + self.stopProducing() + return + + self._send_data(buffer) def stopProducing(self) -> None: + # Clear a circular reference. self._request = None @@ -620,8 +632,7 @@ def respond_with_json( if send_cors: set_cors_headers(request) - producer = _ByteProducer(request, encoder(json_object)) - producer.start() + _ByteProducer(request, encoder(json_object)) return NOT_DONE_YET diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py
index a34e5ead88..53acba56cb 100644 --- a/synapse/http/servlet.py +++ b/synapse/http/servlet.py
@@ -17,9 +17,8 @@ import logging -from canonicaljson import json - from synapse.api.errors import Codes, SynapseError +from synapse.util import json_decoder logger = logging.getLogger(__name__) @@ -215,7 +214,7 @@ def parse_json_value_from_request(request, allow_empty_body=False): return None try: - content = json.loads(content_bytes.decode("utf-8")) + content = json_decoder.decode(content_bytes.decode("utf-8")) except Exception as e: logger.warning("Unable to parse JSON: %s", e) raise SynapseError(400, "Content not JSON.", errcode=Codes.NOT_JSON)