summary refs log tree commit diff
path: root/synapse/http
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/http')
-rw-r--r--synapse/http/client.py15
-rw-r--r--synapse/http/matrixfederationclient.py43
2 files changed, 50 insertions, 8 deletions
diff --git a/synapse/http/client.py b/synapse/http/client.py
index 1730187ffa..5f40f16e24 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -33,6 +33,7 @@ import treq
 from canonicaljson import encode_canonical_json
 from netaddr import AddrFormatError, IPAddress, IPSet
 from prometheus_client import Counter
+from typing_extensions import Protocol
 from zope.interface import implementer, provider
 
 from OpenSSL import SSL
@@ -754,6 +755,16 @@ def _timeout_to_request_timed_out_error(f: Failure):
     return f
 
 
+class ByteWriteable(Protocol):
+    """The type of object which must be passed into read_body_with_max_size.
+
+    Typically this is a file object.
+    """
+
+    def write(self, data: bytes) -> int:
+        pass
+
+
 class BodyExceededMaxSize(Exception):
     """The maximum allowed size of the HTTP body was exceeded."""
 
@@ -790,7 +801,7 @@ class _ReadBodyWithMaxSizeProtocol(protocol.Protocol):
     transport = None  # type: Optional[ITCPTransport]
 
     def __init__(
-        self, stream: BinaryIO, deferred: defer.Deferred, max_size: Optional[int]
+        self, stream: ByteWriteable, deferred: defer.Deferred, max_size: Optional[int]
     ):
         self.stream = stream
         self.deferred = deferred
@@ -830,7 +841,7 @@ class _ReadBodyWithMaxSizeProtocol(protocol.Protocol):
 
 
 def read_body_with_max_size(
-    response: IResponse, stream: BinaryIO, max_size: Optional[int]
+    response: IResponse, stream: ByteWriteable, max_size: Optional[int]
 ) -> defer.Deferred:
     """
     Read a HTTP response body to a file-object. Optionally enforcing a maximum file size.
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index d48721a4e2..bb837b7b19 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -1,5 +1,4 @@
-# Copyright 2014-2016 OpenMarket Ltd
-# Copyright 2018 New Vector Ltd
+# Copyright 2014-2021 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -13,11 +12,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import cgi
+import codecs
 import logging
 import random
 import sys
+import typing
 import urllib.parse
-from io import BytesIO
+from io import BytesIO, StringIO
 from typing import Callable, Dict, List, Optional, Tuple, Union
 
 import attr
@@ -72,6 +73,9 @@ incoming_responses_counter = Counter(
     "synapse_http_matrixfederationclient_responses", "", ["method", "code"]
 )
 
+# a federation response can be rather large (eg a big state_ids is 50M or so), so we
+# need a generous limit here.
+MAX_RESPONSE_SIZE = 100 * 1024 * 1024
 
 MAX_LONG_RETRIES = 10
 MAX_SHORT_RETRIES = 3
@@ -167,12 +171,27 @@ async def _handle_json_response(
     try:
         check_content_type_is_json(response.headers)
 
-        # Use the custom JSON decoder (partially re-implements treq.json_content).
-        d = treq.text_content(response, encoding="utf-8")
-        d.addCallback(json_decoder.decode)
+        buf = StringIO()
+        d = read_body_with_max_size(response, BinaryIOWrapper(buf), MAX_RESPONSE_SIZE)
         d = timeout_deferred(d, timeout=timeout_sec, reactor=reactor)
 
+        def parse(_len: int):
+            return json_decoder.decode(buf.getvalue())
+
+        d.addCallback(parse)
+
         body = await make_deferred_yieldable(d)
+    except BodyExceededMaxSize as e:
+        # The response was too big.
+        logger.warning(
+            "{%s} [%s] JSON response exceeded max size %i - %s %s",
+            request.txn_id,
+            request.destination,
+            MAX_RESPONSE_SIZE,
+            request.method,
+            request.uri.decode("ascii"),
+        )
+        raise RequestSendFailed(e, can_retry=False) from e
     except ValueError as e:
         # The JSON content was invalid.
         logger.warning(
@@ -218,6 +237,18 @@ async def _handle_json_response(
     return body
 
 
+class BinaryIOWrapper:
+    """A wrapper for a TextIO which converts from bytes on the fly."""
+
+    def __init__(self, file: typing.TextIO, encoding="utf-8", errors="strict"):
+        self.decoder = codecs.getincrementaldecoder(encoding)(errors)
+        self.file = file
+
+    def write(self, b: Union[bytes, bytearray]) -> int:
+        self.file.write(self.decoder.decode(b))
+        return len(b)
+
+
 class MatrixFederationHttpClient:
     """HTTP client used to talk to other homeservers over the federation
     protocol. Send client certificates and signs requests.