diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 749b01dd0e..6fd75fd381 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -90,7 +90,7 @@ from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.logging.opentracing import set_tag, start_active_span, tags
from synapse.types import JsonDict
from synapse.util import json_decoder
-from synapse.util.async_helpers import AwakenableSleeper, timeout_deferred
+from synapse.util.async_helpers import AwakenableSleeper, Linearizer, timeout_deferred
from synapse.util.metrics import Measure
from synapse.util.stringutils import parse_and_validate_server_name
@@ -475,6 +475,8 @@ class MatrixFederationHttpClient:
use_proxy=True,
)
+ self.remote_download_linearizer = Linearizer("remote_download_linearizer", 6)
+
def wake_destination(self, destination: str) -> None:
"""Called when the remote server may have come back online."""
@@ -1486,35 +1488,44 @@ class MatrixFederationHttpClient:
)
headers = dict(response.headers.getAllRawHeaders())
-
expected_size = response.length
- # if we don't get an expected length then use the max length
+
if expected_size == UNKNOWN_LENGTH:
expected_size = max_size
- logger.debug(
- f"File size unknown, assuming file is max allowable size: {max_size}"
- )
+ else:
+ if int(expected_size) > max_size:
+ msg = "Requested file is too large > %r bytes" % (max_size,)
+ logger.warning(
+ "{%s} [%s] %s",
+ request.txn_id,
+ request.destination,
+ msg,
+ )
+ raise SynapseError(HTTPStatus.BAD_GATEWAY, msg, Codes.TOO_LARGE)
- read_body, _ = await download_ratelimiter.can_do_action(
- requester=None,
- key=ip_address,
- n_actions=expected_size,
- )
- if not read_body:
- msg = "Requested file size exceeds ratelimits"
- logger.warning(
- "{%s} [%s] %s",
- request.txn_id,
- request.destination,
- msg,
+ read_body, _ = await download_ratelimiter.can_do_action(
+ requester=None,
+ key=ip_address,
+ n_actions=expected_size,
)
- raise SynapseError(HTTPStatus.TOO_MANY_REQUESTS, msg, Codes.LIMIT_EXCEEDED)
+ if not read_body:
+ msg = "Requested file size exceeds ratelimits"
+ logger.warning(
+ "{%s} [%s] %s",
+ request.txn_id,
+ request.destination,
+ msg,
+ )
+ raise SynapseError(
+ HTTPStatus.TOO_MANY_REQUESTS, msg, Codes.LIMIT_EXCEEDED
+ )
try:
- # add a byte of headroom to max size as function errs at >=
- d = read_body_with_max_size(response, output_stream, expected_size + 1)
- d.addTimeout(self.default_timeout_seconds, self.reactor)
- length = await make_deferred_yieldable(d)
+ async with self.remote_download_linearizer.queue(ip_address):
+ # add a byte of headroom to max size as function errs at >=
+ d = read_body_with_max_size(response, output_stream, expected_size + 1)
+ d.addTimeout(self.default_timeout_seconds, self.reactor)
+ length = await make_deferred_yieldable(d)
except BodyExceededMaxSize:
msg = "Requested file is too large > %r bytes" % (expected_size,)
logger.warning(
@@ -1560,6 +1571,13 @@ class MatrixFederationHttpClient:
request.method,
request.uri.decode("ascii"),
)
+
+ # if we didn't know the length upfront, decrement the actual size from ratelimiter
+ if response.length == UNKNOWN_LENGTH:
+ download_ratelimiter.record_action(
+ requester=None, key=ip_address, n_actions=length
+ )
+
return length, headers
async def federation_get_file(
@@ -1630,29 +1648,37 @@ class MatrixFederationHttpClient:
)
headers = dict(response.headers.getAllRawHeaders())
-
expected_size = response.length
- # if we don't get an expected length then use the max length
+
if expected_size == UNKNOWN_LENGTH:
expected_size = max_size
- logger.debug(
- f"File size unknown, assuming file is max allowable size: {max_size}"
- )
+ else:
+ if int(expected_size) > max_size:
+ msg = "Requested file is too large > %r bytes" % (max_size,)
+ logger.warning(
+ "{%s} [%s] %s",
+ request.txn_id,
+ request.destination,
+ msg,
+ )
+ raise SynapseError(HTTPStatus.BAD_GATEWAY, msg, Codes.TOO_LARGE)
- read_body, _ = await download_ratelimiter.can_do_action(
- requester=None,
- key=ip_address,
- n_actions=expected_size,
- )
- if not read_body:
- msg = "Requested file size exceeds ratelimits"
- logger.warning(
- "{%s} [%s] %s",
- request.txn_id,
- request.destination,
- msg,
+ read_body, _ = await download_ratelimiter.can_do_action(
+ requester=None,
+ key=ip_address,
+ n_actions=expected_size,
)
- raise SynapseError(HTTPStatus.TOO_MANY_REQUESTS, msg, Codes.LIMIT_EXCEEDED)
+ if not read_body:
+ msg = "Requested file size exceeds ratelimits"
+ logger.warning(
+ "{%s} [%s] %s",
+ request.txn_id,
+ request.destination,
+ msg,
+ )
+ raise SynapseError(
+ HTTPStatus.TOO_MANY_REQUESTS, msg, Codes.LIMIT_EXCEEDED
+ )
# this should be a multipart/mixed response with the boundary string in the header
try:
@@ -1672,11 +1698,12 @@ class MatrixFederationHttpClient:
raise SynapseError(HTTPStatus.BAD_GATEWAY, msg)
try:
- # add a byte of headroom to max size as `_MultipartParserProtocol.dataReceived` errs at >=
- deferred = read_multipart_response(
- response, output_stream, boundary, expected_size + 1
- )
- deferred.addTimeout(self.default_timeout_seconds, self.reactor)
+ async with self.remote_download_linearizer.queue(ip_address):
+ # add a byte of headroom to max size as `_MultipartParserProtocol.dataReceived` errs at >=
+ deferred = read_multipart_response(
+ response, output_stream, boundary, expected_size + 1
+ )
+ deferred.addTimeout(self.default_timeout_seconds, self.reactor)
except BodyExceededMaxSize:
msg = "Requested file is too large > %r bytes" % (expected_size,)
logger.warning(
@@ -1743,6 +1770,13 @@ class MatrixFederationHttpClient:
request.method,
request.uri.decode("ascii"),
)
+
+ # if we didn't know the length upfront, decrement the actual size from ratelimiter
+ if response.length == UNKNOWN_LENGTH:
+ download_ratelimiter.record_action(
+ requester=None, key=ip_address, n_actions=length
+ )
+
return length, headers, multipart_response.json
|