summary refs log tree commit diff
path: root/synapse/http/matrixfederationclient.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/http/matrixfederationclient.py')
-rw-r--r--synapse/http/matrixfederationclient.py229
1 files changed, 195 insertions, 34 deletions
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 9094dab0fe..08c7fc1631 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -50,7 +50,7 @@ from twisted.internet.interfaces import IReactorTime
 from twisted.internet.task import Cooperator
 from twisted.web.client import ResponseFailed
 from twisted.web.http_headers import Headers
-from twisted.web.iweb import IBodyProducer, IResponse
+from twisted.web.iweb import IAgent, IBodyProducer, IResponse
 
 import synapse.metrics
 import synapse.util.retryutils
@@ -71,7 +71,9 @@ from synapse.http.client import (
     encode_query_args,
     read_body_with_max_size,
 )
+from synapse.http.connectproxyclient import BearerProxyCredentials
 from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
+from synapse.http.proxyagent import ProxyAgent
 from synapse.http.types import QueryParams
 from synapse.logging import opentracing
 from synapse.logging.context import make_deferred_yieldable, run_in_background
@@ -95,8 +97,6 @@ incoming_responses_counter = Counter(
 )
 
 
-MAX_LONG_RETRIES = 10
-MAX_SHORT_RETRIES = 3
 MAXINT = sys.maxsize
 
 
@@ -174,7 +174,14 @@ class MatrixFederationRequest:
 
         # The object is frozen so we can pre-compute this.
         uri = urllib.parse.urlunparse(
-            (b"matrix", destination_bytes, path_bytes, None, query_bytes, b"")
+            (
+                b"matrix-federation",
+                destination_bytes,
+                path_bytes,
+                None,
+                query_bytes,
+                b"",
+            )
         )
         object.__setattr__(self, "uri", uri)
 
@@ -236,7 +243,7 @@ class LegacyJsonSendParser(_BaseJsonParser[Tuple[int, JsonDict]]):
         return (
             isinstance(v, list)
             and len(v) == 2
-            and type(v[0]) == int
+            and type(v[0]) == int  # noqa: E721
             and isinstance(v[1], dict)
         )
 
@@ -388,17 +395,41 @@ class MatrixFederationHttpClient:
         if hs.config.server.user_agent_suffix:
             user_agent = "%s %s" % (user_agent, hs.config.server.user_agent_suffix)
 
-        federation_agent = MatrixFederationAgent(
-            self.reactor,
-            tls_client_options_factory,
-            user_agent.encode("ascii"),
-            hs.config.server.federation_ip_range_allowlist,
-            hs.config.server.federation_ip_range_blocklist,
+        outbound_federation_restricted_to = (
+            hs.config.worker.outbound_federation_restricted_to
         )
+        if hs.get_instance_name() in outbound_federation_restricted_to:
+            # Talk to federation directly
+            federation_agent: IAgent = MatrixFederationAgent(
+                self.reactor,
+                tls_client_options_factory,
+                user_agent.encode("ascii"),
+                hs.config.server.federation_ip_range_allowlist,
+                hs.config.server.federation_ip_range_blocklist,
+            )
+        else:
+            proxy_authorization_secret = hs.config.worker.worker_replication_secret
+            assert (
+                proxy_authorization_secret is not None
+            ), "`worker_replication_secret` must be set when using `outbound_federation_restricted_to` (used to authenticate requests across workers)"
+            federation_proxy_credentials = BearerProxyCredentials(
+                proxy_authorization_secret.encode("ascii")
+            )
+
+            # We need to talk to federation via the proxy via one of the configured
+            # locations
+            federation_proxy_locations = outbound_federation_restricted_to.locations
+            federation_agent = ProxyAgent(
+                self.reactor,
+                self.reactor,
+                tls_client_options_factory,
+                federation_proxy_locations=federation_proxy_locations,
+                federation_proxy_credentials=federation_proxy_credentials,
+            )
 
         # Use a BlocklistingAgentWrapper to prevent circumventing the IP
         # blocking via IP literals in server names
-        self.agent = BlocklistingAgentWrapper(
+        self.agent: IAgent = BlocklistingAgentWrapper(
             federation_agent,
             ip_blocklist=hs.config.server.federation_ip_range_blocklist,
         )
@@ -406,7 +437,15 @@ class MatrixFederationHttpClient:
         self.clock = hs.get_clock()
         self._store = hs.get_datastores().main
         self.version_string_bytes = hs.version_string.encode("ascii")
-        self.default_timeout = 60
+        self.default_timeout_seconds = hs.config.federation.client_timeout_ms / 1000
+        self.max_long_retry_delay_seconds = (
+            hs.config.federation.max_long_retry_delay_ms / 1000
+        )
+        self.max_short_retry_delay_seconds = (
+            hs.config.federation.max_short_retry_delay_ms / 1000
+        )
+        self.max_long_retries = hs.config.federation.max_long_retries
+        self.max_short_retries = hs.config.federation.max_short_retries
 
         self._cooperator = Cooperator(scheduler=_make_scheduler(self.reactor))
 
@@ -473,6 +512,7 @@ class MatrixFederationHttpClient:
         long_retries: bool = False,
         ignore_backoff: bool = False,
         backoff_on_404: bool = False,
+        backoff_on_all_error_codes: bool = False,
     ) -> IResponse:
         """
         Sends a request to the given server.
@@ -499,13 +539,21 @@ class MatrixFederationHttpClient:
                 Note that the above intervals are *in addition* to the time spent
                 waiting for the request to complete (up to `timeout` ms).
 
-                NB: the long retry algorithm takes over 20 minutes to complete, with
-                a default timeout of 60s!
+                NB: the long retry algorithm takes over 20 minutes to complete, with a
+                default timeout of 60s! It's best not to use the `long_retries` option
+                for something that is blocking a client so we don't make them wait for
+                aaaaages, whereas some things like sending transactions (server to
+                server) we can be a lot more lenient but its very fuzzy / hand-wavey.
+
+                In the future, we could be more intelligent about doing this sort of
+                thing by looking at things with the bigger picture in mind,
+                https://github.com/matrix-org/synapse/issues/8917
 
             ignore_backoff: true to ignore the historical backoff data
                 and try the request anyway.
 
             backoff_on_404: Back off if we get a 404
+            backoff_on_all_error_codes: Back off if we get any error response
 
         Returns:
             Resolves with the HTTP response object on success.
@@ -528,10 +576,10 @@ class MatrixFederationHttpClient:
             logger.exception(f"Invalid destination: {request.destination}.")
             raise FederationDeniedError(request.destination)
 
-        if timeout:
+        if timeout is not None:
             _sec_timeout = timeout / 1000
         else:
-            _sec_timeout = self.default_timeout
+            _sec_timeout = self.default_timeout_seconds
 
         if (
             self.hs.config.federation.federation_domain_whitelist is not None
@@ -548,6 +596,7 @@ class MatrixFederationHttpClient:
             ignore_backoff=ignore_backoff,
             notifier=self.hs.get_notifier(),
             replication_client=self.hs.get_replication_command_handler(),
+            backoff_on_all_error_codes=backoff_on_all_error_codes,
         )
 
         method_bytes = request.method.encode("ascii")
@@ -576,9 +625,9 @@ class MatrixFederationHttpClient:
             # XXX: Would be much nicer to retry only at the transaction-layer
             # (once we have reliable transactions in place)
             if long_retries:
-                retries_left = MAX_LONG_RETRIES
+                retries_left = self.max_long_retries
             else:
-                retries_left = MAX_SHORT_RETRIES
+                retries_left = self.max_short_retries
 
             url_bytes = request.uri
             url_str = url_bytes.decode("ascii")
@@ -723,24 +772,34 @@ class MatrixFederationHttpClient:
 
                     if retries_left and not timeout:
                         if long_retries:
-                            delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left)
-                            delay = min(delay, 60)
-                            delay *= random.uniform(0.8, 1.4)
+                            delay_seconds = 4 ** (
+                                self.max_long_retries + 1 - retries_left
+                            )
+                            delay_seconds = min(
+                                delay_seconds, self.max_long_retry_delay_seconds
+                            )
+                            delay_seconds *= random.uniform(0.8, 1.4)
                         else:
-                            delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left)
-                            delay = min(delay, 2)
-                            delay *= random.uniform(0.8, 1.4)
+                            delay_seconds = 0.5 * 2 ** (
+                                self.max_short_retries - retries_left
+                            )
+                            delay_seconds = min(
+                                delay_seconds, self.max_short_retry_delay_seconds
+                            )
+                            delay_seconds *= random.uniform(0.8, 1.4)
 
                         logger.debug(
                             "{%s} [%s] Waiting %ss before re-sending...",
                             request.txn_id,
                             request.destination,
-                            delay,
+                            delay_seconds,
                         )
 
                         # Sleep for the calculated delay, or wake up immediately
                         # if we get notified that the server is back up.
-                        await self._sleeper.sleep(request.destination, delay * 1000)
+                        await self._sleeper.sleep(
+                            request.destination, delay_seconds * 1000
+                        )
                         retries_left -= 1
                     else:
                         raise
@@ -833,6 +892,7 @@ class MatrixFederationHttpClient:
         backoff_on_404: bool = False,
         try_trailing_slash_on_400: bool = False,
         parser: Literal[None] = None,
+        backoff_on_all_error_codes: bool = False,
     ) -> JsonDict:
         ...
 
@@ -850,6 +910,7 @@ class MatrixFederationHttpClient:
         backoff_on_404: bool = False,
         try_trailing_slash_on_400: bool = False,
         parser: Optional[ByteParser[T]] = None,
+        backoff_on_all_error_codes: bool = False,
     ) -> T:
         ...
 
@@ -866,6 +927,7 @@ class MatrixFederationHttpClient:
         backoff_on_404: bool = False,
         try_trailing_slash_on_400: bool = False,
         parser: Optional[ByteParser[T]] = None,
+        backoff_on_all_error_codes: bool = False,
     ) -> Union[JsonDict, T]:
         """Sends the specified json data using PUT
 
@@ -901,6 +963,7 @@ class MatrixFederationHttpClient:
                 enabled.
             parser: The parser to use to decode the response. Defaults to
                 parsing as JSON.
+            backoff_on_all_error_codes: Back off if we get any error response
 
         Returns:
             Succeeds when we get a 2xx HTTP response. The
@@ -934,12 +997,13 @@ class MatrixFederationHttpClient:
             ignore_backoff=ignore_backoff,
             long_retries=long_retries,
             timeout=timeout,
+            backoff_on_all_error_codes=backoff_on_all_error_codes,
         )
 
         if timeout is not None:
             _sec_timeout = timeout / 1000
         else:
-            _sec_timeout = self.default_timeout
+            _sec_timeout = self.default_timeout_seconds
 
         if parser is None:
             parser = cast(ByteParser[T], JsonParser())
@@ -1017,10 +1081,10 @@ class MatrixFederationHttpClient:
             ignore_backoff=ignore_backoff,
         )
 
-        if timeout:
+        if timeout is not None:
             _sec_timeout = timeout / 1000
         else:
-            _sec_timeout = self.default_timeout
+            _sec_timeout = self.default_timeout_seconds
 
         body = await _handle_response(
             self.reactor, _sec_timeout, request, response, start_ms, parser=JsonParser()
@@ -1110,6 +1174,101 @@ class MatrixFederationHttpClient:
             RequestSendFailed: If there were problems connecting to the
                 remote, due to e.g. DNS failures, connection timeouts etc.
         """
+        json_dict, _ = await self.get_json_with_headers(
+            destination=destination,
+            path=path,
+            args=args,
+            retry_on_dns_fail=retry_on_dns_fail,
+            timeout=timeout,
+            ignore_backoff=ignore_backoff,
+            try_trailing_slash_on_400=try_trailing_slash_on_400,
+            parser=parser,
+        )
+        return json_dict
+
+    @overload
+    async def get_json_with_headers(
+        self,
+        destination: str,
+        path: str,
+        args: Optional[QueryParams] = None,
+        retry_on_dns_fail: bool = True,
+        timeout: Optional[int] = None,
+        ignore_backoff: bool = False,
+        try_trailing_slash_on_400: bool = False,
+        parser: Literal[None] = None,
+    ) -> Tuple[JsonDict, Dict[bytes, List[bytes]]]:
+        ...
+
+    @overload
+    async def get_json_with_headers(
+        self,
+        destination: str,
+        path: str,
+        args: Optional[QueryParams] = ...,
+        retry_on_dns_fail: bool = ...,
+        timeout: Optional[int] = ...,
+        ignore_backoff: bool = ...,
+        try_trailing_slash_on_400: bool = ...,
+        parser: ByteParser[T] = ...,
+    ) -> Tuple[T, Dict[bytes, List[bytes]]]:
+        ...
+
+    async def get_json_with_headers(
+        self,
+        destination: str,
+        path: str,
+        args: Optional[QueryParams] = None,
+        retry_on_dns_fail: bool = True,
+        timeout: Optional[int] = None,
+        ignore_backoff: bool = False,
+        try_trailing_slash_on_400: bool = False,
+        parser: Optional[ByteParser[T]] = None,
+    ) -> Tuple[Union[JsonDict, T], Dict[bytes, List[bytes]]]:
+        """GETs some json from the given host homeserver and path
+
+        Args:
+            destination: The remote server to send the HTTP request to.
+
+            path: The HTTP path.
+
+            args: A dictionary used to create query strings, defaults to
+                None.
+
+            retry_on_dns_fail: true if the request should be retried on DNS failures
+
+            timeout: number of milliseconds to wait for the response.
+                self._default_timeout (60s) by default.
+
+                Note that we may make several attempts to send the request; this
+                timeout applies to the time spent waiting for response headers for
+                *each* attempt (including connection time) as well as the time spent
+                reading the response body after a 200 response.
+
+            ignore_backoff: true to ignore the historical backoff data
+                and try the request anyway.
+
+            try_trailing_slash_on_400: True if on a 400 M_UNRECOGNIZED
+                response we should try appending a trailing slash to the end of
+                the request. Workaround for #3622 in Synapse <= v0.99.3.
+
+            parser: The parser to use to decode the response. Defaults to
+                parsing as JSON.
+
+        Returns:
+            Succeeds when we get a 2xx HTTP response. The result will be a tuple of the
+            decoded JSON body and a dict of the response headers.
+
+        Raises:
+            HttpResponseException: If we get an HTTP response code >= 300
+                (except 429).
+            NotRetryingDestination: If we are not yet ready to retry this
+                server.
+            FederationDeniedError: If this destination is not on our
+                federation whitelist
+            RequestSendFailed: If there were problems connecting to the
+                remote, due to e.g. DNS failures, connection timeouts etc.
+        """
         request = MatrixFederationRequest(
             method="GET", destination=destination, path=path, query=args
         )
@@ -1125,10 +1284,12 @@ class MatrixFederationHttpClient:
             timeout=timeout,
         )
 
+        headers = dict(response.headers.getAllRawHeaders())
+
         if timeout is not None:
             _sec_timeout = timeout / 1000
         else:
-            _sec_timeout = self.default_timeout
+            _sec_timeout = self.default_timeout_seconds
 
         if parser is None:
             parser = cast(ByteParser[T], JsonParser())
@@ -1142,7 +1303,7 @@ class MatrixFederationHttpClient:
             parser=parser,
         )
 
-        return body
+        return body, headers
 
     async def delete_json(
         self,
@@ -1204,7 +1365,7 @@ class MatrixFederationHttpClient:
         if timeout is not None:
             _sec_timeout = timeout / 1000
         else:
-            _sec_timeout = self.default_timeout
+            _sec_timeout = self.default_timeout_seconds
 
         body = await _handle_response(
             self.reactor, _sec_timeout, request, response, start_ms, parser=JsonParser()
@@ -1256,7 +1417,7 @@ class MatrixFederationHttpClient:
 
         try:
             d = read_body_with_max_size(response, output_stream, max_size)
-            d.addTimeout(self.default_timeout, self.reactor)
+            d.addTimeout(self.default_timeout_seconds, self.reactor)
             length = await make_deferred_yieldable(d)
         except BodyExceededMaxSize:
             msg = "Requested file is too large > %r bytes" % (max_size,)