diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 9094dab0fe..08c7fc1631 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -50,7 +50,7 @@ from twisted.internet.interfaces import IReactorTime
from twisted.internet.task import Cooperator
from twisted.web.client import ResponseFailed
from twisted.web.http_headers import Headers
-from twisted.web.iweb import IBodyProducer, IResponse
+from twisted.web.iweb import IAgent, IBodyProducer, IResponse
import synapse.metrics
import synapse.util.retryutils
@@ -71,7 +71,9 @@ from synapse.http.client import (
encode_query_args,
read_body_with_max_size,
)
+from synapse.http.connectproxyclient import BearerProxyCredentials
from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
+from synapse.http.proxyagent import ProxyAgent
from synapse.http.types import QueryParams
from synapse.logging import opentracing
from synapse.logging.context import make_deferred_yieldable, run_in_background
@@ -95,8 +97,6 @@ incoming_responses_counter = Counter(
)
-MAX_LONG_RETRIES = 10
-MAX_SHORT_RETRIES = 3
MAXINT = sys.maxsize
@@ -174,7 +174,14 @@ class MatrixFederationRequest:
# The object is frozen so we can pre-compute this.
uri = urllib.parse.urlunparse(
- (b"matrix", destination_bytes, path_bytes, None, query_bytes, b"")
+ (
+ b"matrix-federation",
+ destination_bytes,
+ path_bytes,
+ None,
+ query_bytes,
+ b"",
+ )
)
object.__setattr__(self, "uri", uri)
@@ -236,7 +243,7 @@ class LegacyJsonSendParser(_BaseJsonParser[Tuple[int, JsonDict]]):
return (
isinstance(v, list)
and len(v) == 2
- and type(v[0]) == int
+ and type(v[0]) == int # noqa: E721
and isinstance(v[1], dict)
)
@@ -388,17 +395,41 @@ class MatrixFederationHttpClient:
if hs.config.server.user_agent_suffix:
user_agent = "%s %s" % (user_agent, hs.config.server.user_agent_suffix)
- federation_agent = MatrixFederationAgent(
- self.reactor,
- tls_client_options_factory,
- user_agent.encode("ascii"),
- hs.config.server.federation_ip_range_allowlist,
- hs.config.server.federation_ip_range_blocklist,
+ outbound_federation_restricted_to = (
+ hs.config.worker.outbound_federation_restricted_to
)
+ if hs.get_instance_name() in outbound_federation_restricted_to:
+ # Talk to federation directly
+ federation_agent: IAgent = MatrixFederationAgent(
+ self.reactor,
+ tls_client_options_factory,
+ user_agent.encode("ascii"),
+ hs.config.server.federation_ip_range_allowlist,
+ hs.config.server.federation_ip_range_blocklist,
+ )
+ else:
+ proxy_authorization_secret = hs.config.worker.worker_replication_secret
+ assert (
+ proxy_authorization_secret is not None
+ ), "`worker_replication_secret` must be set when using `outbound_federation_restricted_to` (used to authenticate requests across workers)"
+ federation_proxy_credentials = BearerProxyCredentials(
+ proxy_authorization_secret.encode("ascii")
+ )
+
+ # We need to talk to federation via the proxy via one of the configured
+ # locations
+ federation_proxy_locations = outbound_federation_restricted_to.locations
+ federation_agent = ProxyAgent(
+ self.reactor,
+ self.reactor,
+ tls_client_options_factory,
+ federation_proxy_locations=federation_proxy_locations,
+ federation_proxy_credentials=federation_proxy_credentials,
+ )
# Use a BlocklistingAgentWrapper to prevent circumventing the IP
# blocking via IP literals in server names
- self.agent = BlocklistingAgentWrapper(
+ self.agent: IAgent = BlocklistingAgentWrapper(
federation_agent,
ip_blocklist=hs.config.server.federation_ip_range_blocklist,
)
@@ -406,7 +437,15 @@ class MatrixFederationHttpClient:
self.clock = hs.get_clock()
self._store = hs.get_datastores().main
self.version_string_bytes = hs.version_string.encode("ascii")
- self.default_timeout = 60
+ self.default_timeout_seconds = hs.config.federation.client_timeout_ms / 1000
+ self.max_long_retry_delay_seconds = (
+ hs.config.federation.max_long_retry_delay_ms / 1000
+ )
+ self.max_short_retry_delay_seconds = (
+ hs.config.federation.max_short_retry_delay_ms / 1000
+ )
+ self.max_long_retries = hs.config.federation.max_long_retries
+ self.max_short_retries = hs.config.federation.max_short_retries
self._cooperator = Cooperator(scheduler=_make_scheduler(self.reactor))
@@ -473,6 +512,7 @@ class MatrixFederationHttpClient:
long_retries: bool = False,
ignore_backoff: bool = False,
backoff_on_404: bool = False,
+ backoff_on_all_error_codes: bool = False,
) -> IResponse:
"""
Sends a request to the given server.
@@ -499,13 +539,21 @@ class MatrixFederationHttpClient:
Note that the above intervals are *in addition* to the time spent
waiting for the request to complete (up to `timeout` ms).
- NB: the long retry algorithm takes over 20 minutes to complete, with
- a default timeout of 60s!
+ NB: the long retry algorithm takes over 20 minutes to complete, with a
+ default timeout of 60s! It's best not to use the `long_retries` option
+ for something that is blocking a client so we don't make them wait for
+ aaaaages, whereas some things like sending transactions (server to
+ server) we can be a lot more lenient but its very fuzzy / hand-wavey.
+
+ In the future, we could be more intelligent about doing this sort of
+ thing by looking at things with the bigger picture in mind,
+ https://github.com/matrix-org/synapse/issues/8917
ignore_backoff: true to ignore the historical backoff data
and try the request anyway.
backoff_on_404: Back off if we get a 404
+ backoff_on_all_error_codes: Back off if we get any error response
Returns:
Resolves with the HTTP response object on success.
@@ -528,10 +576,10 @@ class MatrixFederationHttpClient:
logger.exception(f"Invalid destination: {request.destination}.")
raise FederationDeniedError(request.destination)
- if timeout:
+ if timeout is not None:
_sec_timeout = timeout / 1000
else:
- _sec_timeout = self.default_timeout
+ _sec_timeout = self.default_timeout_seconds
if (
self.hs.config.federation.federation_domain_whitelist is not None
@@ -548,6 +596,7 @@ class MatrixFederationHttpClient:
ignore_backoff=ignore_backoff,
notifier=self.hs.get_notifier(),
replication_client=self.hs.get_replication_command_handler(),
+ backoff_on_all_error_codes=backoff_on_all_error_codes,
)
method_bytes = request.method.encode("ascii")
@@ -576,9 +625,9 @@ class MatrixFederationHttpClient:
# XXX: Would be much nicer to retry only at the transaction-layer
# (once we have reliable transactions in place)
if long_retries:
- retries_left = MAX_LONG_RETRIES
+ retries_left = self.max_long_retries
else:
- retries_left = MAX_SHORT_RETRIES
+ retries_left = self.max_short_retries
url_bytes = request.uri
url_str = url_bytes.decode("ascii")
@@ -723,24 +772,34 @@ class MatrixFederationHttpClient:
if retries_left and not timeout:
if long_retries:
- delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left)
- delay = min(delay, 60)
- delay *= random.uniform(0.8, 1.4)
+ delay_seconds = 4 ** (
+ self.max_long_retries + 1 - retries_left
+ )
+ delay_seconds = min(
+ delay_seconds, self.max_long_retry_delay_seconds
+ )
+ delay_seconds *= random.uniform(0.8, 1.4)
else:
- delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left)
- delay = min(delay, 2)
- delay *= random.uniform(0.8, 1.4)
+ delay_seconds = 0.5 * 2 ** (
+ self.max_short_retries - retries_left
+ )
+ delay_seconds = min(
+ delay_seconds, self.max_short_retry_delay_seconds
+ )
+ delay_seconds *= random.uniform(0.8, 1.4)
logger.debug(
"{%s} [%s] Waiting %ss before re-sending...",
request.txn_id,
request.destination,
- delay,
+ delay_seconds,
)
# Sleep for the calculated delay, or wake up immediately
# if we get notified that the server is back up.
- await self._sleeper.sleep(request.destination, delay * 1000)
+ await self._sleeper.sleep(
+ request.destination, delay_seconds * 1000
+ )
retries_left -= 1
else:
raise
@@ -833,6 +892,7 @@ class MatrixFederationHttpClient:
backoff_on_404: bool = False,
try_trailing_slash_on_400: bool = False,
parser: Literal[None] = None,
+ backoff_on_all_error_codes: bool = False,
) -> JsonDict:
...
@@ -850,6 +910,7 @@ class MatrixFederationHttpClient:
backoff_on_404: bool = False,
try_trailing_slash_on_400: bool = False,
parser: Optional[ByteParser[T]] = None,
+ backoff_on_all_error_codes: bool = False,
) -> T:
...
@@ -866,6 +927,7 @@ class MatrixFederationHttpClient:
backoff_on_404: bool = False,
try_trailing_slash_on_400: bool = False,
parser: Optional[ByteParser[T]] = None,
+ backoff_on_all_error_codes: bool = False,
) -> Union[JsonDict, T]:
"""Sends the specified json data using PUT
@@ -901,6 +963,7 @@ class MatrixFederationHttpClient:
enabled.
parser: The parser to use to decode the response. Defaults to
parsing as JSON.
+ backoff_on_all_error_codes: Back off if we get any error response
Returns:
Succeeds when we get a 2xx HTTP response. The
@@ -934,12 +997,13 @@ class MatrixFederationHttpClient:
ignore_backoff=ignore_backoff,
long_retries=long_retries,
timeout=timeout,
+ backoff_on_all_error_codes=backoff_on_all_error_codes,
)
if timeout is not None:
_sec_timeout = timeout / 1000
else:
- _sec_timeout = self.default_timeout
+ _sec_timeout = self.default_timeout_seconds
if parser is None:
parser = cast(ByteParser[T], JsonParser())
@@ -1017,10 +1081,10 @@ class MatrixFederationHttpClient:
ignore_backoff=ignore_backoff,
)
- if timeout:
+ if timeout is not None:
_sec_timeout = timeout / 1000
else:
- _sec_timeout = self.default_timeout
+ _sec_timeout = self.default_timeout_seconds
body = await _handle_response(
self.reactor, _sec_timeout, request, response, start_ms, parser=JsonParser()
@@ -1110,6 +1174,101 @@ class MatrixFederationHttpClient:
RequestSendFailed: If there were problems connecting to the
remote, due to e.g. DNS failures, connection timeouts etc.
"""
+ json_dict, _ = await self.get_json_with_headers(
+ destination=destination,
+ path=path,
+ args=args,
+ retry_on_dns_fail=retry_on_dns_fail,
+ timeout=timeout,
+ ignore_backoff=ignore_backoff,
+ try_trailing_slash_on_400=try_trailing_slash_on_400,
+ parser=parser,
+ )
+ return json_dict
+
+ @overload
+ async def get_json_with_headers(
+ self,
+ destination: str,
+ path: str,
+ args: Optional[QueryParams] = None,
+ retry_on_dns_fail: bool = True,
+ timeout: Optional[int] = None,
+ ignore_backoff: bool = False,
+ try_trailing_slash_on_400: bool = False,
+ parser: Literal[None] = None,
+ ) -> Tuple[JsonDict, Dict[bytes, List[bytes]]]:
+ ...
+
+ @overload
+ async def get_json_with_headers(
+ self,
+ destination: str,
+ path: str,
+ args: Optional[QueryParams] = ...,
+ retry_on_dns_fail: bool = ...,
+ timeout: Optional[int] = ...,
+ ignore_backoff: bool = ...,
+ try_trailing_slash_on_400: bool = ...,
+ parser: ByteParser[T] = ...,
+ ) -> Tuple[T, Dict[bytes, List[bytes]]]:
+ ...
+
+ async def get_json_with_headers(
+ self,
+ destination: str,
+ path: str,
+ args: Optional[QueryParams] = None,
+ retry_on_dns_fail: bool = True,
+ timeout: Optional[int] = None,
+ ignore_backoff: bool = False,
+ try_trailing_slash_on_400: bool = False,
+ parser: Optional[ByteParser[T]] = None,
+ ) -> Tuple[Union[JsonDict, T], Dict[bytes, List[bytes]]]:
+ """GETs some json from the given host homeserver and path
+
+ Args:
+ destination: The remote server to send the HTTP request to.
+
+ path: The HTTP path.
+
+ args: A dictionary used to create query strings, defaults to
+ None.
+
+ retry_on_dns_fail: true if the request should be retried on DNS failures
+
+ timeout: number of milliseconds to wait for the response.
+ self._default_timeout (60s) by default.
+
+ Note that we may make several attempts to send the request; this
+ timeout applies to the time spent waiting for response headers for
+ *each* attempt (including connection time) as well as the time spent
+ reading the response body after a 200 response.
+
+ ignore_backoff: true to ignore the historical backoff data
+ and try the request anyway.
+
+ try_trailing_slash_on_400: True if on a 400 M_UNRECOGNIZED
+ response we should try appending a trailing slash to the end of
+ the request. Workaround for #3622 in Synapse <= v0.99.3.
+
+ parser: The parser to use to decode the response. Defaults to
+ parsing as JSON.
+
+ Returns:
+ Succeeds when we get a 2xx HTTP response. The result will be a tuple of the
+ decoded JSON body and a dict of the response headers.
+
+ Raises:
+ HttpResponseException: If we get an HTTP response code >= 300
+ (except 429).
+ NotRetryingDestination: If we are not yet ready to retry this
+ server.
+ FederationDeniedError: If this destination is not on our
+ federation whitelist
+ RequestSendFailed: If there were problems connecting to the
+ remote, due to e.g. DNS failures, connection timeouts etc.
+ """
request = MatrixFederationRequest(
method="GET", destination=destination, path=path, query=args
)
@@ -1125,10 +1284,12 @@ class MatrixFederationHttpClient:
timeout=timeout,
)
+ headers = dict(response.headers.getAllRawHeaders())
+
if timeout is not None:
_sec_timeout = timeout / 1000
else:
- _sec_timeout = self.default_timeout
+ _sec_timeout = self.default_timeout_seconds
if parser is None:
parser = cast(ByteParser[T], JsonParser())
@@ -1142,7 +1303,7 @@ class MatrixFederationHttpClient:
parser=parser,
)
- return body
+ return body, headers
async def delete_json(
self,
@@ -1204,7 +1365,7 @@ class MatrixFederationHttpClient:
if timeout is not None:
_sec_timeout = timeout / 1000
else:
- _sec_timeout = self.default_timeout
+ _sec_timeout = self.default_timeout_seconds
body = await _handle_response(
self.reactor, _sec_timeout, request, response, start_ms, parser=JsonParser()
@@ -1256,7 +1417,7 @@ class MatrixFederationHttpClient:
try:
d = read_body_with_max_size(response, output_stream, max_size)
- d.addTimeout(self.default_timeout, self.reactor)
+ d.addTimeout(self.default_timeout_seconds, self.reactor)
length = await make_deferred_yieldable(d)
except BodyExceededMaxSize:
msg = "Requested file is too large > %r bytes" % (max_size,)
|