diff options
author | Erik Johnston <erik@matrix.org> | 2021-05-20 16:11:48 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-05-20 16:11:48 +0100 |
commit | 64887f06fcac63e069364d625d984b4951bf1ffc (patch) | |
tree | eabd30c1ad56d057667271573d3bfc96276d11bd /synapse/http | |
parent | Allow a user who could join a restricted room to see it in spaces summary. (#... (diff) | |
download | synapse-64887f06fcac63e069364d625d984b4951bf1ffc.tar.xz |
Use ijson to parse the response to `/send_join`, reducing memory usage. (#9958)
Instead of parsing the full response to `/send_join` into Python objects (which can be huge for large rooms) and *then* parsing that into events, we instead use ijson to stream parse the response directly into `EventBase` objects.
Diffstat (limited to 'synapse/http')
-rw-r--r-- | synapse/http/client.py | 7 | ||||
-rw-r--r-- | synapse/http/matrixfederationclient.py | 160 |
2 files changed, 131 insertions, 36 deletions
diff --git a/synapse/http/client.py b/synapse/http/client.py index 5f40f16e24..1ca6624fd5 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -813,7 +813,12 @@ class _ReadBodyWithMaxSizeProtocol(protocol.Protocol): if self.deferred.called: return - self.stream.write(data) + try: + self.stream.write(data) + except Exception: + self.deferred.errback() + return + self.length += len(data) # The first time the maximum size is exceeded, error and cancel the # connection. dataReceived might be called again if data was received diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index bb837b7b19..f5503b394b 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import abc import cgi import codecs import logging @@ -19,13 +20,24 @@ import sys import typing import urllib.parse from io import BytesIO, StringIO -from typing import Callable, Dict, List, Optional, Tuple, Union +from typing import ( + Callable, + Dict, + Generic, + List, + Optional, + Tuple, + TypeVar, + Union, + overload, +) import attr import treq from canonicaljson import encode_canonical_json from prometheus_client import Counter from signedjson.sign import sign_json +from typing_extensions import Literal from twisted.internet import defer from twisted.internet.error import DNSLookupError @@ -48,6 +60,7 @@ from synapse.http.client import ( BlacklistingAgentWrapper, BlacklistingReactorWrapper, BodyExceededMaxSize, + ByteWriteable, encode_query_args, read_body_with_max_size, ) @@ -88,6 +101,27 @@ _next_id = 1 QueryArgs = Dict[str, Union[str, List[str]]] +T = TypeVar("T") + + +class ByteParser(ByteWriteable, Generic[T], abc.ABC): + """A `ByteWriteable` that has an additional `finish` function that returns + the parsed data. + """ + + CONTENT_TYPE = abc.abstractproperty() # type: str # type: ignore + """The expected content type of the response, e.g. `application/json`. If + the content type doesn't match we fail the request. + """ + + @abc.abstractmethod + def finish(self) -> T: + """Called when response has finished streaming and the parser should + return the final result (or error). + """ + pass + + @attr.s(slots=True, frozen=True) class MatrixFederationRequest: method = attr.ib(type=str) @@ -148,15 +182,32 @@ class MatrixFederationRequest: return self.json -async def _handle_json_response( +class JsonParser(ByteParser[Union[JsonDict, list]]): + """A parser that buffers the response and tries to parse it as JSON.""" + + CONTENT_TYPE = "application/json" + + def __init__(self): + self._buffer = StringIO() + self._binary_wrapper = BinaryIOWrapper(self._buffer) + + def write(self, data: bytes) -> int: + return self._binary_wrapper.write(data) + + def finish(self) -> Union[JsonDict, list]: + return json_decoder.decode(self._buffer.getvalue()) + + +async def _handle_response( reactor: IReactorTime, timeout_sec: float, request: MatrixFederationRequest, response: IResponse, start_ms: int, -) -> JsonDict: + parser: ByteParser[T], +) -> T: """ - Reads the JSON body of a response, with a timeout + Reads the body of a response with a timeout and sends it to a parser Args: reactor: twisted reactor, for the timeout @@ -164,23 +215,21 @@ async def _handle_json_response( request: the request that triggered the response response: response to the request start_ms: Timestamp when request was made + parser: The parser for the response Returns: - The parsed JSON response + The parsed response """ + try: - check_content_type_is_json(response.headers) + check_content_type_is(response.headers, parser.CONTENT_TYPE) - buf = StringIO() - d = read_body_with_max_size(response, BinaryIOWrapper(buf), MAX_RESPONSE_SIZE) + d = read_body_with_max_size(response, parser, MAX_RESPONSE_SIZE) d = timeout_deferred(d, timeout=timeout_sec, reactor=reactor) - def parse(_len: int): - return json_decoder.decode(buf.getvalue()) - - d.addCallback(parse) + length = await make_deferred_yieldable(d) - body = await make_deferred_yieldable(d) + value = parser.finish() except BodyExceededMaxSize as e: # The response was too big. logger.warning( @@ -193,9 +242,9 @@ async def _handle_json_response( ) raise RequestSendFailed(e, can_retry=False) from e except ValueError as e: - # The JSON content was invalid. + # The content was invalid. logger.warning( - "{%s} [%s] Failed to parse JSON response - %s %s", + "{%s} [%s] Failed to parse response - %s %s", request.txn_id, request.destination, request.method, @@ -225,16 +274,17 @@ async def _handle_json_response( time_taken_secs = reactor.seconds() - start_ms / 1000 logger.info( - "{%s} [%s] Completed request: %d %s in %.2f secs - %s %s", + "{%s} [%s] Completed request: %d %s in %.2f secs, got %d bytes - %s %s", request.txn_id, request.destination, response.code, response.phrase.decode("ascii", errors="replace"), time_taken_secs, + length, request.method, request.uri.decode("ascii"), ) - return body + return value class BinaryIOWrapper: @@ -671,6 +721,7 @@ class MatrixFederationHttpClient: ) return auth_headers + @overload async def put_json( self, destination: str, @@ -683,7 +734,41 @@ class MatrixFederationHttpClient: ignore_backoff: bool = False, backoff_on_404: bool = False, try_trailing_slash_on_400: bool = False, + parser: Literal[None] = None, ) -> Union[JsonDict, list]: + ... + + @overload + async def put_json( + self, + destination: str, + path: str, + args: Optional[QueryArgs] = None, + data: Optional[JsonDict] = None, + json_data_callback: Optional[Callable[[], JsonDict]] = None, + long_retries: bool = False, + timeout: Optional[int] = None, + ignore_backoff: bool = False, + backoff_on_404: bool = False, + try_trailing_slash_on_400: bool = False, + parser: Optional[ByteParser[T]] = None, + ) -> T: + ... + + async def put_json( + self, + destination: str, + path: str, + args: Optional[QueryArgs] = None, + data: Optional[JsonDict] = None, + json_data_callback: Optional[Callable[[], JsonDict]] = None, + long_retries: bool = False, + timeout: Optional[int] = None, + ignore_backoff: bool = False, + backoff_on_404: bool = False, + try_trailing_slash_on_400: bool = False, + parser: Optional[ByteParser] = None, + ): """Sends the specified json data using PUT Args: @@ -716,6 +801,8 @@ class MatrixFederationHttpClient: of the request. Workaround for #3622 in Synapse <= v0.99.3. This will be attempted before backing off if backing off has been enabled. + parser: The parser to use to decode the response. Defaults to + parsing as JSON. Returns: Succeeds when we get a 2xx HTTP response. The @@ -756,8 +843,16 @@ class MatrixFederationHttpClient: else: _sec_timeout = self.default_timeout - body = await _handle_json_response( - self.reactor, _sec_timeout, request, response, start_ms + if parser is None: + parser = JsonParser() + + body = await _handle_response( + self.reactor, + _sec_timeout, + request, + response, + start_ms, + parser=parser, ) return body @@ -830,12 +925,8 @@ class MatrixFederationHttpClient: else: _sec_timeout = self.default_timeout - body = await _handle_json_response( - self.reactor, - _sec_timeout, - request, - response, - start_ms, + body = await _handle_response( + self.reactor, _sec_timeout, request, response, start_ms, parser=JsonParser() ) return body @@ -907,8 +998,8 @@ class MatrixFederationHttpClient: else: _sec_timeout = self.default_timeout - body = await _handle_json_response( - self.reactor, _sec_timeout, request, response, start_ms + body = await _handle_response( + self.reactor, _sec_timeout, request, response, start_ms, parser=JsonParser() ) return body @@ -975,8 +1066,8 @@ class MatrixFederationHttpClient: else: _sec_timeout = self.default_timeout - body = await _handle_json_response( - self.reactor, _sec_timeout, request, response, start_ms + body = await _handle_response( + self.reactor, _sec_timeout, request, response, start_ms, parser=JsonParser() ) return body @@ -1068,16 +1159,16 @@ def _flatten_response_never_received(e): return repr(e) -def check_content_type_is_json(headers: Headers) -> None: +def check_content_type_is(headers: Headers, expected_content_type: str) -> None: """ Check that a set of HTTP headers have a Content-Type header, and that it - is application/json. + is the expected value.. Args: headers: headers to check Raises: - RequestSendFailed: if the Content-Type header is missing or isn't JSON + RequestSendFailed: if the Content-Type header is missing or doesn't match """ content_type_headers = headers.getRawHeaders(b"Content-Type") @@ -1089,11 +1180,10 @@ def check_content_type_is_json(headers: Headers) -> None: c_type = content_type_headers[0].decode("ascii") # only the first header val, options = cgi.parse_header(c_type) - if val != "application/json": + if val != expected_content_type: raise RequestSendFailed( RuntimeError( - "Remote server sent Content-Type header of '%s', not 'application/json'" - % c_type, + f"Remote server sent Content-Type header of '{c_type}', not '{expected_content_type}'", ), can_retry=False, ) |