diff options
Diffstat (limited to 'synapse/http/matrixfederationclient.py')
-rw-r--r-- | synapse/http/matrixfederationclient.py | 229 |
1 files changed, 195 insertions, 34 deletions
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 9094dab0fe..08c7fc1631 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -50,7 +50,7 @@ from twisted.internet.interfaces import IReactorTime from twisted.internet.task import Cooperator from twisted.web.client import ResponseFailed from twisted.web.http_headers import Headers -from twisted.web.iweb import IBodyProducer, IResponse +from twisted.web.iweb import IAgent, IBodyProducer, IResponse import synapse.metrics import synapse.util.retryutils @@ -71,7 +71,9 @@ from synapse.http.client import ( encode_query_args, read_body_with_max_size, ) +from synapse.http.connectproxyclient import BearerProxyCredentials from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent +from synapse.http.proxyagent import ProxyAgent from synapse.http.types import QueryParams from synapse.logging import opentracing from synapse.logging.context import make_deferred_yieldable, run_in_background @@ -95,8 +97,6 @@ incoming_responses_counter = Counter( ) -MAX_LONG_RETRIES = 10 -MAX_SHORT_RETRIES = 3 MAXINT = sys.maxsize @@ -174,7 +174,14 @@ class MatrixFederationRequest: # The object is frozen so we can pre-compute this. uri = urllib.parse.urlunparse( - (b"matrix", destination_bytes, path_bytes, None, query_bytes, b"") + ( + b"matrix-federation", + destination_bytes, + path_bytes, + None, + query_bytes, + b"", + ) ) object.__setattr__(self, "uri", uri) @@ -236,7 +243,7 @@ class LegacyJsonSendParser(_BaseJsonParser[Tuple[int, JsonDict]]): return ( isinstance(v, list) and len(v) == 2 - and type(v[0]) == int + and type(v[0]) == int # noqa: E721 and isinstance(v[1], dict) ) @@ -388,17 +395,41 @@ class MatrixFederationHttpClient: if hs.config.server.user_agent_suffix: user_agent = "%s %s" % (user_agent, hs.config.server.user_agent_suffix) - federation_agent = MatrixFederationAgent( - self.reactor, - tls_client_options_factory, - user_agent.encode("ascii"), - hs.config.server.federation_ip_range_allowlist, - hs.config.server.federation_ip_range_blocklist, + outbound_federation_restricted_to = ( + hs.config.worker.outbound_federation_restricted_to ) + if hs.get_instance_name() in outbound_federation_restricted_to: + # Talk to federation directly + federation_agent: IAgent = MatrixFederationAgent( + self.reactor, + tls_client_options_factory, + user_agent.encode("ascii"), + hs.config.server.federation_ip_range_allowlist, + hs.config.server.federation_ip_range_blocklist, + ) + else: + proxy_authorization_secret = hs.config.worker.worker_replication_secret + assert ( + proxy_authorization_secret is not None + ), "`worker_replication_secret` must be set when using `outbound_federation_restricted_to` (used to authenticate requests across workers)" + federation_proxy_credentials = BearerProxyCredentials( + proxy_authorization_secret.encode("ascii") + ) + + # We need to talk to federation via the proxy via one of the configured + # locations + federation_proxy_locations = outbound_federation_restricted_to.locations + federation_agent = ProxyAgent( + self.reactor, + self.reactor, + tls_client_options_factory, + federation_proxy_locations=federation_proxy_locations, + federation_proxy_credentials=federation_proxy_credentials, + ) # Use a BlocklistingAgentWrapper to prevent circumventing the IP # blocking via IP literals in server names - self.agent = BlocklistingAgentWrapper( + self.agent: IAgent = BlocklistingAgentWrapper( federation_agent, ip_blocklist=hs.config.server.federation_ip_range_blocklist, ) @@ -406,7 +437,15 @@ class MatrixFederationHttpClient: self.clock = hs.get_clock() self._store = hs.get_datastores().main self.version_string_bytes = hs.version_string.encode("ascii") - self.default_timeout = 60 + self.default_timeout_seconds = hs.config.federation.client_timeout_ms / 1000 + self.max_long_retry_delay_seconds = ( + hs.config.federation.max_long_retry_delay_ms / 1000 + ) + self.max_short_retry_delay_seconds = ( + hs.config.federation.max_short_retry_delay_ms / 1000 + ) + self.max_long_retries = hs.config.federation.max_long_retries + self.max_short_retries = hs.config.federation.max_short_retries self._cooperator = Cooperator(scheduler=_make_scheduler(self.reactor)) @@ -473,6 +512,7 @@ class MatrixFederationHttpClient: long_retries: bool = False, ignore_backoff: bool = False, backoff_on_404: bool = False, + backoff_on_all_error_codes: bool = False, ) -> IResponse: """ Sends a request to the given server. @@ -499,13 +539,21 @@ class MatrixFederationHttpClient: Note that the above intervals are *in addition* to the time spent waiting for the request to complete (up to `timeout` ms). - NB: the long retry algorithm takes over 20 minutes to complete, with - a default timeout of 60s! + NB: the long retry algorithm takes over 20 minutes to complete, with a + default timeout of 60s! It's best not to use the `long_retries` option + for something that is blocking a client so we don't make them wait for + aaaaages, whereas some things like sending transactions (server to + server) we can be a lot more lenient but its very fuzzy / hand-wavey. + + In the future, we could be more intelligent about doing this sort of + thing by looking at things with the bigger picture in mind, + https://github.com/matrix-org/synapse/issues/8917 ignore_backoff: true to ignore the historical backoff data and try the request anyway. backoff_on_404: Back off if we get a 404 + backoff_on_all_error_codes: Back off if we get any error response Returns: Resolves with the HTTP response object on success. @@ -528,10 +576,10 @@ class MatrixFederationHttpClient: logger.exception(f"Invalid destination: {request.destination}.") raise FederationDeniedError(request.destination) - if timeout: + if timeout is not None: _sec_timeout = timeout / 1000 else: - _sec_timeout = self.default_timeout + _sec_timeout = self.default_timeout_seconds if ( self.hs.config.federation.federation_domain_whitelist is not None @@ -548,6 +596,7 @@ class MatrixFederationHttpClient: ignore_backoff=ignore_backoff, notifier=self.hs.get_notifier(), replication_client=self.hs.get_replication_command_handler(), + backoff_on_all_error_codes=backoff_on_all_error_codes, ) method_bytes = request.method.encode("ascii") @@ -576,9 +625,9 @@ class MatrixFederationHttpClient: # XXX: Would be much nicer to retry only at the transaction-layer # (once we have reliable transactions in place) if long_retries: - retries_left = MAX_LONG_RETRIES + retries_left = self.max_long_retries else: - retries_left = MAX_SHORT_RETRIES + retries_left = self.max_short_retries url_bytes = request.uri url_str = url_bytes.decode("ascii") @@ -723,24 +772,34 @@ class MatrixFederationHttpClient: if retries_left and not timeout: if long_retries: - delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left) - delay = min(delay, 60) - delay *= random.uniform(0.8, 1.4) + delay_seconds = 4 ** ( + self.max_long_retries + 1 - retries_left + ) + delay_seconds = min( + delay_seconds, self.max_long_retry_delay_seconds + ) + delay_seconds *= random.uniform(0.8, 1.4) else: - delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left) - delay = min(delay, 2) - delay *= random.uniform(0.8, 1.4) + delay_seconds = 0.5 * 2 ** ( + self.max_short_retries - retries_left + ) + delay_seconds = min( + delay_seconds, self.max_short_retry_delay_seconds + ) + delay_seconds *= random.uniform(0.8, 1.4) logger.debug( "{%s} [%s] Waiting %ss before re-sending...", request.txn_id, request.destination, - delay, + delay_seconds, ) # Sleep for the calculated delay, or wake up immediately # if we get notified that the server is back up. - await self._sleeper.sleep(request.destination, delay * 1000) + await self._sleeper.sleep( + request.destination, delay_seconds * 1000 + ) retries_left -= 1 else: raise @@ -833,6 +892,7 @@ class MatrixFederationHttpClient: backoff_on_404: bool = False, try_trailing_slash_on_400: bool = False, parser: Literal[None] = None, + backoff_on_all_error_codes: bool = False, ) -> JsonDict: ... @@ -850,6 +910,7 @@ class MatrixFederationHttpClient: backoff_on_404: bool = False, try_trailing_slash_on_400: bool = False, parser: Optional[ByteParser[T]] = None, + backoff_on_all_error_codes: bool = False, ) -> T: ... @@ -866,6 +927,7 @@ class MatrixFederationHttpClient: backoff_on_404: bool = False, try_trailing_slash_on_400: bool = False, parser: Optional[ByteParser[T]] = None, + backoff_on_all_error_codes: bool = False, ) -> Union[JsonDict, T]: """Sends the specified json data using PUT @@ -901,6 +963,7 @@ class MatrixFederationHttpClient: enabled. parser: The parser to use to decode the response. Defaults to parsing as JSON. + backoff_on_all_error_codes: Back off if we get any error response Returns: Succeeds when we get a 2xx HTTP response. The @@ -934,12 +997,13 @@ class MatrixFederationHttpClient: ignore_backoff=ignore_backoff, long_retries=long_retries, timeout=timeout, + backoff_on_all_error_codes=backoff_on_all_error_codes, ) if timeout is not None: _sec_timeout = timeout / 1000 else: - _sec_timeout = self.default_timeout + _sec_timeout = self.default_timeout_seconds if parser is None: parser = cast(ByteParser[T], JsonParser()) @@ -1017,10 +1081,10 @@ class MatrixFederationHttpClient: ignore_backoff=ignore_backoff, ) - if timeout: + if timeout is not None: _sec_timeout = timeout / 1000 else: - _sec_timeout = self.default_timeout + _sec_timeout = self.default_timeout_seconds body = await _handle_response( self.reactor, _sec_timeout, request, response, start_ms, parser=JsonParser() @@ -1110,6 +1174,101 @@ class MatrixFederationHttpClient: RequestSendFailed: If there were problems connecting to the remote, due to e.g. DNS failures, connection timeouts etc. """ + json_dict, _ = await self.get_json_with_headers( + destination=destination, + path=path, + args=args, + retry_on_dns_fail=retry_on_dns_fail, + timeout=timeout, + ignore_backoff=ignore_backoff, + try_trailing_slash_on_400=try_trailing_slash_on_400, + parser=parser, + ) + return json_dict + + @overload + async def get_json_with_headers( + self, + destination: str, + path: str, + args: Optional[QueryParams] = None, + retry_on_dns_fail: bool = True, + timeout: Optional[int] = None, + ignore_backoff: bool = False, + try_trailing_slash_on_400: bool = False, + parser: Literal[None] = None, + ) -> Tuple[JsonDict, Dict[bytes, List[bytes]]]: + ... + + @overload + async def get_json_with_headers( + self, + destination: str, + path: str, + args: Optional[QueryParams] = ..., + retry_on_dns_fail: bool = ..., + timeout: Optional[int] = ..., + ignore_backoff: bool = ..., + try_trailing_slash_on_400: bool = ..., + parser: ByteParser[T] = ..., + ) -> Tuple[T, Dict[bytes, List[bytes]]]: + ... + + async def get_json_with_headers( + self, + destination: str, + path: str, + args: Optional[QueryParams] = None, + retry_on_dns_fail: bool = True, + timeout: Optional[int] = None, + ignore_backoff: bool = False, + try_trailing_slash_on_400: bool = False, + parser: Optional[ByteParser[T]] = None, + ) -> Tuple[Union[JsonDict, T], Dict[bytes, List[bytes]]]: + """GETs some json from the given host homeserver and path + + Args: + destination: The remote server to send the HTTP request to. + + path: The HTTP path. + + args: A dictionary used to create query strings, defaults to + None. + + retry_on_dns_fail: true if the request should be retried on DNS failures + + timeout: number of milliseconds to wait for the response. + self._default_timeout (60s) by default. + + Note that we may make several attempts to send the request; this + timeout applies to the time spent waiting for response headers for + *each* attempt (including connection time) as well as the time spent + reading the response body after a 200 response. + + ignore_backoff: true to ignore the historical backoff data + and try the request anyway. + + try_trailing_slash_on_400: True if on a 400 M_UNRECOGNIZED + response we should try appending a trailing slash to the end of + the request. Workaround for #3622 in Synapse <= v0.99.3. + + parser: The parser to use to decode the response. Defaults to + parsing as JSON. + + Returns: + Succeeds when we get a 2xx HTTP response. The result will be a tuple of the + decoded JSON body and a dict of the response headers. + + Raises: + HttpResponseException: If we get an HTTP response code >= 300 + (except 429). + NotRetryingDestination: If we are not yet ready to retry this + server. + FederationDeniedError: If this destination is not on our + federation whitelist + RequestSendFailed: If there were problems connecting to the + remote, due to e.g. DNS failures, connection timeouts etc. + """ request = MatrixFederationRequest( method="GET", destination=destination, path=path, query=args ) @@ -1125,10 +1284,12 @@ class MatrixFederationHttpClient: timeout=timeout, ) + headers = dict(response.headers.getAllRawHeaders()) + if timeout is not None: _sec_timeout = timeout / 1000 else: - _sec_timeout = self.default_timeout + _sec_timeout = self.default_timeout_seconds if parser is None: parser = cast(ByteParser[T], JsonParser()) @@ -1142,7 +1303,7 @@ class MatrixFederationHttpClient: parser=parser, ) - return body + return body, headers async def delete_json( self, @@ -1204,7 +1365,7 @@ class MatrixFederationHttpClient: if timeout is not None: _sec_timeout = timeout / 1000 else: - _sec_timeout = self.default_timeout + _sec_timeout = self.default_timeout_seconds body = await _handle_response( self.reactor, _sec_timeout, request, response, start_ms, parser=JsonParser() @@ -1256,7 +1417,7 @@ class MatrixFederationHttpClient: try: d = read_body_with_max_size(response, output_stream, max_size) - d.addTimeout(self.default_timeout, self.reactor) + d.addTimeout(self.default_timeout_seconds, self.reactor) length = await make_deferred_yieldable(d) except BodyExceededMaxSize: msg = "Requested file is too large > %r bytes" % (max_size,) |