diff options
Diffstat (limited to 'synapse/http/matrixfederationclient.py')
-rw-r--r-- | synapse/http/matrixfederationclient.py | 142 |
1 files changed, 132 insertions, 10 deletions
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index cc4e258b0f..583c03447c 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -50,7 +50,7 @@ from twisted.internet.interfaces import IReactorTime from twisted.internet.task import Cooperator from twisted.web.client import ResponseFailed from twisted.web.http_headers import Headers -from twisted.web.iweb import IBodyProducer, IResponse +from twisted.web.iweb import IAgent, IBodyProducer, IResponse import synapse.metrics import synapse.util.retryutils @@ -71,7 +71,9 @@ from synapse.http.client import ( encode_query_args, read_body_with_max_size, ) +from synapse.http.connectproxyclient import BearerProxyCredentials from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent +from synapse.http.proxyagent import ProxyAgent from synapse.http.types import QueryParams from synapse.logging import opentracing from synapse.logging.context import make_deferred_yieldable, run_in_background @@ -393,17 +395,41 @@ class MatrixFederationHttpClient: if hs.config.server.user_agent_suffix: user_agent = "%s %s" % (user_agent, hs.config.server.user_agent_suffix) - federation_agent = MatrixFederationAgent( - self.reactor, - tls_client_options_factory, - user_agent.encode("ascii"), - hs.config.server.federation_ip_range_allowlist, - hs.config.server.federation_ip_range_blocklist, + outbound_federation_restricted_to = ( + hs.config.worker.outbound_federation_restricted_to ) + if hs.get_instance_name() in outbound_federation_restricted_to: + # Talk to federation directly + federation_agent: IAgent = MatrixFederationAgent( + self.reactor, + tls_client_options_factory, + user_agent.encode("ascii"), + hs.config.server.federation_ip_range_allowlist, + hs.config.server.federation_ip_range_blocklist, + ) + else: + proxy_authorization_secret = hs.config.worker.worker_replication_secret + assert ( + proxy_authorization_secret is not None + ), "`worker_replication_secret` must be set when using `outbound_federation_restricted_to` (used to authenticate requests across workers)" + federation_proxy_credentials = BearerProxyCredentials( + proxy_authorization_secret.encode("ascii") + ) + + # We need to talk to federation via the proxy via one of the configured + # locations + federation_proxy_locations = outbound_federation_restricted_to.locations + federation_agent = ProxyAgent( + self.reactor, + self.reactor, + tls_client_options_factory, + federation_proxy_locations=federation_proxy_locations, + federation_proxy_credentials=federation_proxy_credentials, + ) # Use a BlocklistingAgentWrapper to prevent circumventing the IP # blocking via IP literals in server names - self.agent = BlocklistingAgentWrapper( + self.agent: IAgent = BlocklistingAgentWrapper( federation_agent, ip_blocklist=hs.config.server.federation_ip_range_blocklist, ) @@ -412,7 +438,6 @@ class MatrixFederationHttpClient: self._store = hs.get_datastores().main self.version_string_bytes = hs.version_string.encode("ascii") self.default_timeout_seconds = hs.config.federation.client_timeout_ms / 1000 - self.max_long_retry_delay_seconds = ( hs.config.federation.max_long_retry_delay_ms / 1000 ) @@ -1141,6 +1166,101 @@ class MatrixFederationHttpClient: RequestSendFailed: If there were problems connecting to the remote, due to e.g. DNS failures, connection timeouts etc. """ + json_dict, _ = await self.get_json_with_headers( + destination=destination, + path=path, + args=args, + retry_on_dns_fail=retry_on_dns_fail, + timeout=timeout, + ignore_backoff=ignore_backoff, + try_trailing_slash_on_400=try_trailing_slash_on_400, + parser=parser, + ) + return json_dict + + @overload + async def get_json_with_headers( + self, + destination: str, + path: str, + args: Optional[QueryParams] = None, + retry_on_dns_fail: bool = True, + timeout: Optional[int] = None, + ignore_backoff: bool = False, + try_trailing_slash_on_400: bool = False, + parser: Literal[None] = None, + ) -> Tuple[JsonDict, Dict[bytes, List[bytes]]]: + ... + + @overload + async def get_json_with_headers( + self, + destination: str, + path: str, + args: Optional[QueryParams] = ..., + retry_on_dns_fail: bool = ..., + timeout: Optional[int] = ..., + ignore_backoff: bool = ..., + try_trailing_slash_on_400: bool = ..., + parser: ByteParser[T] = ..., + ) -> Tuple[T, Dict[bytes, List[bytes]]]: + ... + + async def get_json_with_headers( + self, + destination: str, + path: str, + args: Optional[QueryParams] = None, + retry_on_dns_fail: bool = True, + timeout: Optional[int] = None, + ignore_backoff: bool = False, + try_trailing_slash_on_400: bool = False, + parser: Optional[ByteParser[T]] = None, + ) -> Tuple[Union[JsonDict, T], Dict[bytes, List[bytes]]]: + """GETs some json from the given host homeserver and path + + Args: + destination: The remote server to send the HTTP request to. + + path: The HTTP path. + + args: A dictionary used to create query strings, defaults to + None. + + retry_on_dns_fail: true if the request should be retried on DNS failures + + timeout: number of milliseconds to wait for the response. + self._default_timeout (60s) by default. + + Note that we may make several attempts to send the request; this + timeout applies to the time spent waiting for response headers for + *each* attempt (including connection time) as well as the time spent + reading the response body after a 200 response. + + ignore_backoff: true to ignore the historical backoff data + and try the request anyway. + + try_trailing_slash_on_400: True if on a 400 M_UNRECOGNIZED + response we should try appending a trailing slash to the end of + the request. Workaround for #3622 in Synapse <= v0.99.3. + + parser: The parser to use to decode the response. Defaults to + parsing as JSON. + + Returns: + Succeeds when we get a 2xx HTTP response. The result will be a tuple of the + decoded JSON body and a dict of the response headers. + + Raises: + HttpResponseException: If we get an HTTP response code >= 300 + (except 429). + NotRetryingDestination: If we are not yet ready to retry this + server. + FederationDeniedError: If this destination is not on our + federation whitelist + RequestSendFailed: If there were problems connecting to the + remote, due to e.g. DNS failures, connection timeouts etc. + """ request = MatrixFederationRequest( method="GET", destination=destination, path=path, query=args ) @@ -1156,6 +1276,8 @@ class MatrixFederationHttpClient: timeout=timeout, ) + headers = dict(response.headers.getAllRawHeaders()) + if timeout is not None: _sec_timeout = timeout / 1000 else: @@ -1173,7 +1295,7 @@ class MatrixFederationHttpClient: parser=parser, ) - return body + return body, headers async def delete_json( self, |