summary refs log tree commit diff
path: root/synapse/http/matrixfederationclient.py
diff options
context:
space:
mode:
authorEric Eastwood <erice@element.io>2023-07-18 03:49:21 -0500
committerGitHub <noreply@github.com>2023-07-18 09:49:21 +0100
commit1c802de626de3293049206cb788af15cbc8ea17f (patch)
tree52d72c2fcd6442e96943a7a565c519240eca19eb /synapse/http/matrixfederationclient.py
parentBump anyhow from 1.0.71 to 1.0.72 (#15949) (diff)
downloadsynapse-1c802de626de3293049206cb788af15cbc8ea17f.tar.xz
Re-introduce the outbound federation proxy (#15913)
Allow configuring the set of workers to proxy outbound federation traffic through (`outbound_federation_restricted_to`).

This is useful when you have a worker setup with `federation_sender` instances responsible for sending outbound federation requests and want to make sure *all* outbound federation traffic goes through those instances. Before this change, the generic workers would still contact federation themselves for things like profile lookups, backfill, etc. This PR allows you to set more strict access controls/firewall for all workers and only allow the `federation_sender`'s to contact the outside world.
Diffstat (limited to 'synapse/http/matrixfederationclient.py')
-rw-r--r--synapse/http/matrixfederationclient.py142
1 files changed, 132 insertions, 10 deletions
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index cc4e258b0f..583c03447c 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -50,7 +50,7 @@ from twisted.internet.interfaces import IReactorTime
 from twisted.internet.task import Cooperator
 from twisted.web.client import ResponseFailed
 from twisted.web.http_headers import Headers
-from twisted.web.iweb import IBodyProducer, IResponse
+from twisted.web.iweb import IAgent, IBodyProducer, IResponse
 
 import synapse.metrics
 import synapse.util.retryutils
@@ -71,7 +71,9 @@ from synapse.http.client import (
     encode_query_args,
     read_body_with_max_size,
 )
+from synapse.http.connectproxyclient import BearerProxyCredentials
 from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
+from synapse.http.proxyagent import ProxyAgent
 from synapse.http.types import QueryParams
 from synapse.logging import opentracing
 from synapse.logging.context import make_deferred_yieldable, run_in_background
@@ -393,17 +395,41 @@ class MatrixFederationHttpClient:
         if hs.config.server.user_agent_suffix:
             user_agent = "%s %s" % (user_agent, hs.config.server.user_agent_suffix)
 
-        federation_agent = MatrixFederationAgent(
-            self.reactor,
-            tls_client_options_factory,
-            user_agent.encode("ascii"),
-            hs.config.server.federation_ip_range_allowlist,
-            hs.config.server.federation_ip_range_blocklist,
+        outbound_federation_restricted_to = (
+            hs.config.worker.outbound_federation_restricted_to
         )
+        if hs.get_instance_name() in outbound_federation_restricted_to:
+            # Talk to federation directly
+            federation_agent: IAgent = MatrixFederationAgent(
+                self.reactor,
+                tls_client_options_factory,
+                user_agent.encode("ascii"),
+                hs.config.server.federation_ip_range_allowlist,
+                hs.config.server.federation_ip_range_blocklist,
+            )
+        else:
+            proxy_authorization_secret = hs.config.worker.worker_replication_secret
+            assert (
+                proxy_authorization_secret is not None
+            ), "`worker_replication_secret` must be set when using `outbound_federation_restricted_to` (used to authenticate requests across workers)"
+            federation_proxy_credentials = BearerProxyCredentials(
+                proxy_authorization_secret.encode("ascii")
+            )
+
+            # We need to talk to federation via the proxy via one of the configured
+            # locations
+            federation_proxy_locations = outbound_federation_restricted_to.locations
+            federation_agent = ProxyAgent(
+                self.reactor,
+                self.reactor,
+                tls_client_options_factory,
+                federation_proxy_locations=federation_proxy_locations,
+                federation_proxy_credentials=federation_proxy_credentials,
+            )
 
         # Use a BlocklistingAgentWrapper to prevent circumventing the IP
         # blocking via IP literals in server names
-        self.agent = BlocklistingAgentWrapper(
+        self.agent: IAgent = BlocklistingAgentWrapper(
             federation_agent,
             ip_blocklist=hs.config.server.federation_ip_range_blocklist,
         )
@@ -412,7 +438,6 @@ class MatrixFederationHttpClient:
         self._store = hs.get_datastores().main
         self.version_string_bytes = hs.version_string.encode("ascii")
         self.default_timeout_seconds = hs.config.federation.client_timeout_ms / 1000
-
         self.max_long_retry_delay_seconds = (
             hs.config.federation.max_long_retry_delay_ms / 1000
         )
@@ -1141,6 +1166,101 @@ class MatrixFederationHttpClient:
             RequestSendFailed: If there were problems connecting to the
                 remote, due to e.g. DNS failures, connection timeouts etc.
         """
+        json_dict, _ = await self.get_json_with_headers(
+            destination=destination,
+            path=path,
+            args=args,
+            retry_on_dns_fail=retry_on_dns_fail,
+            timeout=timeout,
+            ignore_backoff=ignore_backoff,
+            try_trailing_slash_on_400=try_trailing_slash_on_400,
+            parser=parser,
+        )
+        return json_dict
+
+    @overload
+    async def get_json_with_headers(
+        self,
+        destination: str,
+        path: str,
+        args: Optional[QueryParams] = None,
+        retry_on_dns_fail: bool = True,
+        timeout: Optional[int] = None,
+        ignore_backoff: bool = False,
+        try_trailing_slash_on_400: bool = False,
+        parser: Literal[None] = None,
+    ) -> Tuple[JsonDict, Dict[bytes, List[bytes]]]:
+        ...
+
+    @overload
+    async def get_json_with_headers(
+        self,
+        destination: str,
+        path: str,
+        args: Optional[QueryParams] = ...,
+        retry_on_dns_fail: bool = ...,
+        timeout: Optional[int] = ...,
+        ignore_backoff: bool = ...,
+        try_trailing_slash_on_400: bool = ...,
+        parser: ByteParser[T] = ...,
+    ) -> Tuple[T, Dict[bytes, List[bytes]]]:
+        ...
+
+    async def get_json_with_headers(
+        self,
+        destination: str,
+        path: str,
+        args: Optional[QueryParams] = None,
+        retry_on_dns_fail: bool = True,
+        timeout: Optional[int] = None,
+        ignore_backoff: bool = False,
+        try_trailing_slash_on_400: bool = False,
+        parser: Optional[ByteParser[T]] = None,
+    ) -> Tuple[Union[JsonDict, T], Dict[bytes, List[bytes]]]:
+        """GETs some json from the given host homeserver and path
+
+        Args:
+            destination: The remote server to send the HTTP request to.
+
+            path: The HTTP path.
+
+            args: A dictionary used to create query strings, defaults to
+                None.
+
+            retry_on_dns_fail: true if the request should be retried on DNS failures
+
+            timeout: number of milliseconds to wait for the response.
+                self._default_timeout (60s) by default.
+
+                Note that we may make several attempts to send the request; this
+                timeout applies to the time spent waiting for response headers for
+                *each* attempt (including connection time) as well as the time spent
+                reading the response body after a 200 response.
+
+            ignore_backoff: true to ignore the historical backoff data
+                and try the request anyway.
+
+            try_trailing_slash_on_400: True if on a 400 M_UNRECOGNIZED
+                response we should try appending a trailing slash to the end of
+                the request. Workaround for #3622 in Synapse <= v0.99.3.
+
+            parser: The parser to use to decode the response. Defaults to
+                parsing as JSON.
+
+        Returns:
+            Succeeds when we get a 2xx HTTP response. The result will be a tuple of the
+            decoded JSON body and a dict of the response headers.
+
+        Raises:
+            HttpResponseException: If we get an HTTP response code >= 300
+                (except 429).
+            NotRetryingDestination: If we are not yet ready to retry this
+                server.
+            FederationDeniedError: If this destination is not on our
+                federation whitelist
+            RequestSendFailed: If there were problems connecting to the
+                remote, due to e.g. DNS failures, connection timeouts etc.
+        """
         request = MatrixFederationRequest(
             method="GET", destination=destination, path=path, query=args
         )
@@ -1156,6 +1276,8 @@ class MatrixFederationHttpClient:
             timeout=timeout,
         )
 
+        headers = dict(response.headers.getAllRawHeaders())
+
         if timeout is not None:
             _sec_timeout = timeout / 1000
         else:
@@ -1173,7 +1295,7 @@ class MatrixFederationHttpClient:
             parser=parser,
         )
 
-        return body
+        return body, headers
 
     async def delete_json(
         self,