diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 104b803b0f..749b01dd0e 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -75,9 +75,11 @@ from synapse.http.client import (
BlocklistingAgentWrapper,
BodyExceededMaxSize,
ByteWriteable,
+ SimpleHttpClient,
_make_scheduler,
encode_query_args,
read_body_with_max_size,
+ read_multipart_response,
)
from synapse.http.connectproxyclient import BearerProxyCredentials
from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
@@ -466,6 +468,13 @@ class MatrixFederationHttpClient:
self._sleeper = AwakenableSleeper(self.reactor)
+ self._simple_http_client = SimpleHttpClient(
+ hs,
+ ip_blocklist=hs.config.server.federation_ip_range_blocklist,
+ ip_allowlist=hs.config.server.federation_ip_range_allowlist,
+ use_proxy=True,
+ )
+
def wake_destination(self, destination: str) -> None:
"""Called when the remote server may have come back online."""
@@ -1553,6 +1562,189 @@ class MatrixFederationHttpClient:
)
return length, headers
+ async def federation_get_file(
+ self,
+ destination: str,
+ path: str,
+ output_stream: BinaryIO,
+ download_ratelimiter: Ratelimiter,
+ ip_address: str,
+ max_size: int,
+ args: Optional[QueryParams] = None,
+ retry_on_dns_fail: bool = True,
+ ignore_backoff: bool = False,
+ ) -> Tuple[int, Dict[bytes, List[bytes]], bytes]:
+ """GETs a file from a given homeserver over the federation /download endpoint
+ Args:
+ destination: The remote server to send the HTTP request to.
+ path: The HTTP path to GET.
+ output_stream: File to write the response body to.
+ download_ratelimiter: a ratelimiter to limit remote media downloads, keyed to
+ requester IP
+ ip_address: IP address of the requester
+ max_size: maximum allowable size in bytes of the file
+ args: Optional dictionary used to create the query string.
+ ignore_backoff: true to ignore the historical backoff data
+ and try the request anyway.
+
+ Returns:
+ Resolves to an (int, dict, bytes) tuple of
+ the file length, a dict of the response headers, and the file json
+
+ Raises:
+ HttpResponseException: If we get an HTTP response code >= 300
+ (except 429).
+ NotRetryingDestination: If we are not yet ready to retry this
+ server.
+ FederationDeniedError: If this destination is not on our
+ federation whitelist
+ RequestSendFailed: If there were problems connecting to the
+ remote, due to e.g. DNS failures, connection timeouts etc.
+ SynapseError: If the requested file exceeds ratelimits or the response from the
+ remote server is not a multipart response
+ AssertionError: if the resolved multipart response's length is None
+ """
+ request = MatrixFederationRequest(
+ method="GET", destination=destination, path=path, query=args
+ )
+
+ # check for a minimum balance of 1MiB in ratelimiter before initiating request
+ send_req, _ = await download_ratelimiter.can_do_action(
+ requester=None, key=ip_address, n_actions=1048576, update=False
+ )
+
+ if not send_req:
+ msg = "Requested file size exceeds ratelimits"
+ logger.warning(
+ "{%s} [%s] %s",
+ request.txn_id,
+ request.destination,
+ msg,
+ )
+ raise SynapseError(HTTPStatus.TOO_MANY_REQUESTS, msg, Codes.LIMIT_EXCEEDED)
+
+ response = await self._send_request(
+ request,
+ retry_on_dns_fail=retry_on_dns_fail,
+ ignore_backoff=ignore_backoff,
+ )
+
+ headers = dict(response.headers.getAllRawHeaders())
+
+ expected_size = response.length
+ # if we don't get an expected length then use the max length
+ if expected_size == UNKNOWN_LENGTH:
+ expected_size = max_size
+ logger.debug(
+ f"File size unknown, assuming file is max allowable size: {max_size}"
+ )
+
+ read_body, _ = await download_ratelimiter.can_do_action(
+ requester=None,
+ key=ip_address,
+ n_actions=expected_size,
+ )
+ if not read_body:
+ msg = "Requested file size exceeds ratelimits"
+ logger.warning(
+ "{%s} [%s] %s",
+ request.txn_id,
+ request.destination,
+ msg,
+ )
+ raise SynapseError(HTTPStatus.TOO_MANY_REQUESTS, msg, Codes.LIMIT_EXCEEDED)
+
+ # this should be a multipart/mixed response with the boundary string in the header
+ try:
+ raw_content_type = headers.get(b"Content-Type")
+ assert raw_content_type is not None
+ content_type = raw_content_type[0].decode("UTF-8")
+ content_type_parts = content_type.split("boundary=")
+ boundary = content_type_parts[1]
+ except Exception:
+ msg = "Remote response is malformed: expected Content-Type of multipart/mixed with a boundary present."
+ logger.warning(
+ "{%s} [%s] %s",
+ request.txn_id,
+ request.destination,
+ msg,
+ )
+ raise SynapseError(HTTPStatus.BAD_GATEWAY, msg)
+
+ try:
+ # add a byte of headroom to max size as `_MultipartParserProtocol.dataReceived` errs at >=
+ deferred = read_multipart_response(
+ response, output_stream, boundary, expected_size + 1
+ )
+ deferred.addTimeout(self.default_timeout_seconds, self.reactor)
+ except BodyExceededMaxSize:
+ msg = "Requested file is too large > %r bytes" % (expected_size,)
+ logger.warning(
+ "{%s} [%s] %s",
+ request.txn_id,
+ request.destination,
+ msg,
+ )
+ raise SynapseError(HTTPStatus.BAD_GATEWAY, msg, Codes.TOO_LARGE)
+ except defer.TimeoutError as e:
+ logger.warning(
+ "{%s} [%s] Timed out reading response - %s %s",
+ request.txn_id,
+ request.destination,
+ request.method,
+ request.uri.decode("ascii"),
+ )
+ raise RequestSendFailed(e, can_retry=True) from e
+ except ResponseFailed as e:
+ logger.warning(
+ "{%s} [%s] Failed to read response - %s %s",
+ request.txn_id,
+ request.destination,
+ request.method,
+ request.uri.decode("ascii"),
+ )
+ raise RequestSendFailed(e, can_retry=True) from e
+ except Exception as e:
+ logger.warning(
+ "{%s} [%s] Error reading response: %s",
+ request.txn_id,
+ request.destination,
+ e,
+ )
+ raise
+
+ multipart_response = await make_deferred_yieldable(deferred)
+ if not multipart_response.url:
+ assert multipart_response.length is not None
+ length = multipart_response.length
+ headers[b"Content-Type"] = [multipart_response.content_type]
+ headers[b"Content-Disposition"] = [multipart_response.disposition]
+
+ # the response contained a redirect url to download the file from
+ else:
+ str_url = multipart_response.url.decode("utf-8")
+ logger.info(
+ "{%s} [%s] File download redirected, now downloading from: %s",
+ request.txn_id,
+ request.destination,
+ str_url,
+ )
+ length, headers, _, _ = await self._simple_http_client.get_file(
+ str_url, output_stream, expected_size
+ )
+
+ logger.info(
+ "{%s} [%s] Completed: %d %s [%d bytes] %s %s",
+ request.txn_id,
+ request.destination,
+ response.code,
+ response.phrase.decode("ascii", errors="replace"),
+ length,
+ request.method,
+ request.uri.decode("ascii"),
+ )
+ return length, headers, multipart_response.json
+
def _flatten_response_never_received(e: BaseException) -> str:
if hasattr(e, "reasons"):
|