summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/api/errors.py7
-rw-r--r--synapse/app/_base.py2
-rw-r--r--synapse/app/generic_worker.py1
-rw-r--r--synapse/app/homeserver.py1
-rw-r--r--synapse/config/workers.py45
-rw-r--r--synapse/http/client.py7
-rw-r--r--synapse/http/connectproxyclient.py20
-rw-r--r--synapse/http/matrixfederationclient.py142
-rw-r--r--synapse/http/proxy.py283
-rw-r--r--synapse/http/proxyagent.py141
-rw-r--r--synapse/http/server.py55
-rw-r--r--synapse/http/site.py27
12 files changed, 681 insertions, 50 deletions
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index af894243f8..3546aaf7c3 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -217,6 +217,13 @@ class InvalidAPICallError(SynapseError):
         super().__init__(HTTPStatus.BAD_REQUEST, msg, Codes.BAD_JSON)
 
 
+class InvalidProxyCredentialsError(SynapseError):
+    """Error raised when the proxy credentials are invalid."""
+
+    def __init__(self, msg: str, errcode: str = Codes.UNKNOWN):
+        super().__init__(401, msg, errcode)
+
+
 class ProxiedRequestError(SynapseError):
     """An error from a general matrix endpoint, eg. from a proxied Matrix API call.
 
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index 936b1b0430..a94b57a671 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -386,6 +386,7 @@ def listen_unix(
 
 
 def listen_http(
+    hs: "HomeServer",
     listener_config: ListenerConfig,
     root_resource: Resource,
     version_string: str,
@@ -406,6 +407,7 @@ def listen_http(
         version_string,
         max_request_body_size=max_request_body_size,
         reactor=reactor,
+        hs=hs,
     )
 
     if isinstance(listener_config, TCPListenerConfig):
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 7406c3948c..dc79efcc14 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -221,6 +221,7 @@ class GenericWorkerServer(HomeServer):
         root_resource = create_resource_tree(resources, OptionsResource())
 
         _base.listen_http(
+            self,
             listener_config,
             root_resource,
             self.version_string,
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 84236ac299..f188c7265a 100644
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -139,6 +139,7 @@ class SynapseHomeServer(HomeServer):
             root_resource = OptionsResource()
 
         ports = listen_http(
+            self,
             listener_config,
             create_resource_tree(resources, root_resource),
             self.version_string,
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index e55ca12a36..6567fb6bb0 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -15,7 +15,7 @@
 
 import argparse
 import logging
-from typing import Any, Dict, List, Union
+from typing import Any, Dict, List, Optional, Union
 
 import attr
 from pydantic import BaseModel, Extra, StrictBool, StrictInt, StrictStr
@@ -171,6 +171,27 @@ class WriterLocations:
     )
 
 
+@attr.s(auto_attribs=True)
+class OutboundFederationRestrictedTo:
+    """Whether we limit outbound federation to a certain set of instances.
+
+    Attributes:
+        instances: optional list of instances that can make outbound federation
+            requests. If None then all instances can make federation requests.
+        locations: list of instance locations to connect to proxy via.
+    """
+
+    instances: Optional[List[str]]
+    locations: List[InstanceLocationConfig] = attr.Factory(list)
+
+    def __contains__(self, instance: str) -> bool:
+        # It feels a bit dirty to return `True` if `instances` is `None`, but it makes
+        # sense in downstream usage in the sense that if
+        # `outbound_federation_restricted_to` is not configured, then any instance can
+        # talk to federation (no restrictions so always return `True`).
+        return self.instances is None or instance in self.instances
+
+
 class WorkerConfig(Config):
     """The workers are processes run separately to the main synapse process.
     They have their own pid_file and listener configuration. They use the
@@ -385,6 +406,28 @@ class WorkerConfig(Config):
             new_option_name="update_user_directory_from_worker",
         )
 
+        outbound_federation_restricted_to = config.get(
+            "outbound_federation_restricted_to", None
+        )
+        self.outbound_federation_restricted_to = OutboundFederationRestrictedTo(
+            outbound_federation_restricted_to
+        )
+        if outbound_federation_restricted_to:
+            if not self.worker_replication_secret:
+                raise ConfigError(
+                    "`worker_replication_secret` must be configured when using `outbound_federation_restricted_to`."
+                )
+
+            for instance in outbound_federation_restricted_to:
+                if instance not in self.instance_map:
+                    raise ConfigError(
+                        "Instance %r is configured in 'outbound_federation_restricted_to' but does not appear in `instance_map` config."
+                        % (instance,)
+                    )
+                self.outbound_federation_restricted_to.locations.append(
+                    self.instance_map[instance]
+                )
+
     def _should_this_worker_perform_duty(
         self,
         config: Dict[str, Any],
diff --git a/synapse/http/client.py b/synapse/http/client.py
index 09ea93e10d..ca2cdbc6e2 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -1037,7 +1037,12 @@ class _ReadBodyWithMaxSizeProtocol(protocol.Protocol):
         if reason.check(ResponseDone):
             self.deferred.callback(self.length)
         elif reason.check(PotentialDataLoss):
-            # stolen from https://github.com/twisted/treq/pull/49/files
+            # This applies to requests which don't set `Content-Length` or a
+            # `Transfer-Encoding` in the response because in this case the end of the
+            # response is indicated by the connection being closed, an event which may
+            # also be due to a transient network problem or other error. But since this
+            # behavior is expected of some servers (like YouTube), let's ignore it.
+            # Stolen from https://github.com/twisted/treq/pull/49/files
             # http://twistedmatrix.com/trac/ticket/4840
             self.deferred.callback(self.length)
         else:
diff --git a/synapse/http/connectproxyclient.py b/synapse/http/connectproxyclient.py
index 23a60af171..636efc33e8 100644
--- a/synapse/http/connectproxyclient.py
+++ b/synapse/http/connectproxyclient.py
@@ -12,6 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import abc
 import base64
 import logging
 from typing import Optional, Union
@@ -39,8 +40,14 @@ class ProxyConnectError(ConnectError):
     pass
 
 
-@attr.s(auto_attribs=True)
 class ProxyCredentials:
+    @abc.abstractmethod
+    def as_proxy_authorization_value(self) -> bytes:
+        raise NotImplementedError()
+
+
+@attr.s(auto_attribs=True)
+class BasicProxyCredentials(ProxyCredentials):
     username_password: bytes
 
     def as_proxy_authorization_value(self) -> bytes:
@@ -55,6 +62,17 @@ class ProxyCredentials:
         return b"Basic " + base64.encodebytes(self.username_password)
 
 
+@attr.s(auto_attribs=True)
+class BearerProxyCredentials(ProxyCredentials):
+    access_token: bytes
+
+    def as_proxy_authorization_value(self) -> bytes:
+        """
+        Return the value for a Proxy-Authorization header (i.e. 'Bearer xxx').
+        """
+        return b"Bearer " + self.access_token
+
+
 @implementer(IStreamClientEndpoint)
 class HTTPConnectProxyEndpoint:
     """An Endpoint implementation which will send a CONNECT request to an http proxy
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index cc4e258b0f..583c03447c 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -50,7 +50,7 @@ from twisted.internet.interfaces import IReactorTime
 from twisted.internet.task import Cooperator
 from twisted.web.client import ResponseFailed
 from twisted.web.http_headers import Headers
-from twisted.web.iweb import IBodyProducer, IResponse
+from twisted.web.iweb import IAgent, IBodyProducer, IResponse
 
 import synapse.metrics
 import synapse.util.retryutils
@@ -71,7 +71,9 @@ from synapse.http.client import (
     encode_query_args,
     read_body_with_max_size,
 )
+from synapse.http.connectproxyclient import BearerProxyCredentials
 from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
+from synapse.http.proxyagent import ProxyAgent
 from synapse.http.types import QueryParams
 from synapse.logging import opentracing
 from synapse.logging.context import make_deferred_yieldable, run_in_background
@@ -393,17 +395,41 @@ class MatrixFederationHttpClient:
         if hs.config.server.user_agent_suffix:
             user_agent = "%s %s" % (user_agent, hs.config.server.user_agent_suffix)
 
-        federation_agent = MatrixFederationAgent(
-            self.reactor,
-            tls_client_options_factory,
-            user_agent.encode("ascii"),
-            hs.config.server.federation_ip_range_allowlist,
-            hs.config.server.federation_ip_range_blocklist,
+        outbound_federation_restricted_to = (
+            hs.config.worker.outbound_federation_restricted_to
         )
+        if hs.get_instance_name() in outbound_federation_restricted_to:
+            # Talk to federation directly
+            federation_agent: IAgent = MatrixFederationAgent(
+                self.reactor,
+                tls_client_options_factory,
+                user_agent.encode("ascii"),
+                hs.config.server.federation_ip_range_allowlist,
+                hs.config.server.federation_ip_range_blocklist,
+            )
+        else:
+            proxy_authorization_secret = hs.config.worker.worker_replication_secret
+            assert (
+                proxy_authorization_secret is not None
+            ), "`worker_replication_secret` must be set when using `outbound_federation_restricted_to` (used to authenticate requests across workers)"
+            federation_proxy_credentials = BearerProxyCredentials(
+                proxy_authorization_secret.encode("ascii")
+            )
+
+            # We need to talk to federation via the proxy via one of the configured
+            # locations
+            federation_proxy_locations = outbound_federation_restricted_to.locations
+            federation_agent = ProxyAgent(
+                self.reactor,
+                self.reactor,
+                tls_client_options_factory,
+                federation_proxy_locations=federation_proxy_locations,
+                federation_proxy_credentials=federation_proxy_credentials,
+            )
 
         # Use a BlocklistingAgentWrapper to prevent circumventing the IP
         # blocking via IP literals in server names
-        self.agent = BlocklistingAgentWrapper(
+        self.agent: IAgent = BlocklistingAgentWrapper(
             federation_agent,
             ip_blocklist=hs.config.server.federation_ip_range_blocklist,
         )
@@ -412,7 +438,6 @@ class MatrixFederationHttpClient:
         self._store = hs.get_datastores().main
         self.version_string_bytes = hs.version_string.encode("ascii")
         self.default_timeout_seconds = hs.config.federation.client_timeout_ms / 1000
-
         self.max_long_retry_delay_seconds = (
             hs.config.federation.max_long_retry_delay_ms / 1000
         )
@@ -1141,6 +1166,101 @@ class MatrixFederationHttpClient:
             RequestSendFailed: If there were problems connecting to the
                 remote, due to e.g. DNS failures, connection timeouts etc.
         """
+        json_dict, _ = await self.get_json_with_headers(
+            destination=destination,
+            path=path,
+            args=args,
+            retry_on_dns_fail=retry_on_dns_fail,
+            timeout=timeout,
+            ignore_backoff=ignore_backoff,
+            try_trailing_slash_on_400=try_trailing_slash_on_400,
+            parser=parser,
+        )
+        return json_dict
+
+    @overload
+    async def get_json_with_headers(
+        self,
+        destination: str,
+        path: str,
+        args: Optional[QueryParams] = None,
+        retry_on_dns_fail: bool = True,
+        timeout: Optional[int] = None,
+        ignore_backoff: bool = False,
+        try_trailing_slash_on_400: bool = False,
+        parser: Literal[None] = None,
+    ) -> Tuple[JsonDict, Dict[bytes, List[bytes]]]:
+        ...
+
+    @overload
+    async def get_json_with_headers(
+        self,
+        destination: str,
+        path: str,
+        args: Optional[QueryParams] = ...,
+        retry_on_dns_fail: bool = ...,
+        timeout: Optional[int] = ...,
+        ignore_backoff: bool = ...,
+        try_trailing_slash_on_400: bool = ...,
+        parser: ByteParser[T] = ...,
+    ) -> Tuple[T, Dict[bytes, List[bytes]]]:
+        ...
+
+    async def get_json_with_headers(
+        self,
+        destination: str,
+        path: str,
+        args: Optional[QueryParams] = None,
+        retry_on_dns_fail: bool = True,
+        timeout: Optional[int] = None,
+        ignore_backoff: bool = False,
+        try_trailing_slash_on_400: bool = False,
+        parser: Optional[ByteParser[T]] = None,
+    ) -> Tuple[Union[JsonDict, T], Dict[bytes, List[bytes]]]:
+        """GETs some json from the given host homeserver and path
+
+        Args:
+            destination: The remote server to send the HTTP request to.
+
+            path: The HTTP path.
+
+            args: A dictionary used to create query strings, defaults to
+                None.
+
+            retry_on_dns_fail: true if the request should be retried on DNS failures
+
+            timeout: number of milliseconds to wait for the response.
+                self._default_timeout (60s) by default.
+
+                Note that we may make several attempts to send the request; this
+                timeout applies to the time spent waiting for response headers for
+                *each* attempt (including connection time) as well as the time spent
+                reading the response body after a 200 response.
+
+            ignore_backoff: true to ignore the historical backoff data
+                and try the request anyway.
+
+            try_trailing_slash_on_400: True if on a 400 M_UNRECOGNIZED
+                response we should try appending a trailing slash to the end of
+                the request. Workaround for #3622 in Synapse <= v0.99.3.
+
+            parser: The parser to use to decode the response. Defaults to
+                parsing as JSON.
+
+        Returns:
+            Succeeds when we get a 2xx HTTP response. The result will be a tuple of the
+            decoded JSON body and a dict of the response headers.
+
+        Raises:
+            HttpResponseException: If we get an HTTP response code >= 300
+                (except 429).
+            NotRetryingDestination: If we are not yet ready to retry this
+                server.
+            FederationDeniedError: If this destination is not on our
+                federation whitelist
+            RequestSendFailed: If there were problems connecting to the
+                remote, due to e.g. DNS failures, connection timeouts etc.
+        """
         request = MatrixFederationRequest(
             method="GET", destination=destination, path=path, query=args
         )
@@ -1156,6 +1276,8 @@ class MatrixFederationHttpClient:
             timeout=timeout,
         )
 
+        headers = dict(response.headers.getAllRawHeaders())
+
         if timeout is not None:
             _sec_timeout = timeout / 1000
         else:
@@ -1173,7 +1295,7 @@ class MatrixFederationHttpClient:
             parser=parser,
         )
 
-        return body
+        return body, headers
 
     async def delete_json(
         self,
diff --git a/synapse/http/proxy.py b/synapse/http/proxy.py
new file mode 100644
index 0000000000..c9f51e51bc
--- /dev/null
+++ b/synapse/http/proxy.py
@@ -0,0 +1,283 @@
+#  Copyright 2023 The Matrix.org Foundation C.I.C.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+
+import json
+import logging
+import urllib.parse
+from typing import TYPE_CHECKING, Any, Optional, Set, Tuple, cast
+
+from twisted.internet import protocol
+from twisted.internet.interfaces import ITCPTransport
+from twisted.internet.protocol import connectionDone
+from twisted.python import failure
+from twisted.python.failure import Failure
+from twisted.web.client import ResponseDone
+from twisted.web.http_headers import Headers
+from twisted.web.iweb import IResponse
+from twisted.web.resource import IResource
+from twisted.web.server import Request, Site
+
+from synapse.api.errors import Codes, InvalidProxyCredentialsError
+from synapse.http import QuieterFileBodyProducer
+from synapse.http.server import _AsyncResource
+from synapse.logging.context import make_deferred_yieldable, run_in_background
+from synapse.types import ISynapseReactor
+from synapse.util.async_helpers import timeout_deferred
+
+if TYPE_CHECKING:
+    from synapse.http.site import SynapseRequest
+    from synapse.server import HomeServer
+
+logger = logging.getLogger(__name__)
+
+# "Hop-by-hop" headers (as opposed to "end-to-end" headers) as defined by RFC2616
+# section 13.5.1 and referenced in RFC9110 section 7.6.1. These are meant to only be
+# consumed by the immediate recipient and not be forwarded on.
+HOP_BY_HOP_HEADERS = {
+    "Connection",
+    "Keep-Alive",
+    "Proxy-Authenticate",
+    "Proxy-Authorization",
+    "TE",
+    "Trailers",
+    "Transfer-Encoding",
+    "Upgrade",
+}
+
+
+def parse_connection_header_value(
+    connection_header_value: Optional[bytes],
+) -> Set[str]:
+    """
+    Parse the `Connection` header to determine which headers we should not be copied
+    over from the remote response.
+
+    As defined by RFC2616 section 14.10 and RFC9110 section 7.6.1
+
+    Example: `Connection: close, X-Foo, X-Bar` will return `{"Close", "X-Foo", "X-Bar"}`
+
+    Even though "close" is a special directive, let's just treat it as just another
+    header for simplicity. If people want to check for this directive, they can simply
+    check for `"Close" in headers`.
+
+    Args:
+        connection_header_value: The value of the `Connection` header.
+
+    Returns:
+        The set of header names that should not be copied over from the remote response.
+        The keys are capitalized in canonical capitalization.
+    """
+    headers = Headers()
+    extra_headers_to_remove: Set[str] = set()
+    if connection_header_value:
+        extra_headers_to_remove = {
+            headers._canonicalNameCaps(connection_option.strip()).decode("ascii")
+            for connection_option in connection_header_value.split(b",")
+        }
+
+    return extra_headers_to_remove
+
+
+class ProxyResource(_AsyncResource):
+    """
+    A stub resource that proxies any requests with a `matrix-federation://` scheme
+    through the given `federation_agent` to the remote homeserver and ferries back the
+    info.
+    """
+
+    isLeaf = True
+
+    def __init__(self, reactor: ISynapseReactor, hs: "HomeServer"):
+        super().__init__(True)
+
+        self.reactor = reactor
+        self.agent = hs.get_federation_http_client().agent
+
+        self._proxy_authorization_secret = hs.config.worker.worker_replication_secret
+
+    def _check_auth(self, request: Request) -> None:
+        # The `matrix-federation://` proxy functionality can only be used with auth.
+        # Protect homserver admins forgetting to configure a secret.
+        assert self._proxy_authorization_secret is not None
+
+        # Get the authorization header.
+        auth_headers = request.requestHeaders.getRawHeaders(b"Proxy-Authorization")
+
+        if not auth_headers:
+            raise InvalidProxyCredentialsError(
+                "Missing Proxy-Authorization header.", Codes.MISSING_TOKEN
+            )
+        if len(auth_headers) > 1:
+            raise InvalidProxyCredentialsError(
+                "Too many Proxy-Authorization headers.", Codes.UNAUTHORIZED
+            )
+        parts = auth_headers[0].split(b" ")
+        if parts[0] == b"Bearer" and len(parts) == 2:
+            received_secret = parts[1].decode("ascii")
+            if self._proxy_authorization_secret == received_secret:
+                # Success!
+                return
+
+        raise InvalidProxyCredentialsError(
+            "Invalid Proxy-Authorization header.", Codes.UNAUTHORIZED
+        )
+
+    async def _async_render(self, request: "SynapseRequest") -> Tuple[int, Any]:
+        uri = urllib.parse.urlparse(request.uri)
+        assert uri.scheme == b"matrix-federation"
+
+        # Check the authorization headers before handling the request.
+        self._check_auth(request)
+
+        headers = Headers()
+        for header_name in (b"User-Agent", b"Authorization", b"Content-Type"):
+            header_value = request.getHeader(header_name)
+            if header_value:
+                headers.addRawHeader(header_name, header_value)
+
+        request_deferred = run_in_background(
+            self.agent.request,
+            request.method,
+            request.uri,
+            headers=headers,
+            bodyProducer=QuieterFileBodyProducer(request.content),
+        )
+        request_deferred = timeout_deferred(
+            request_deferred,
+            # This should be set longer than the timeout in `MatrixFederationHttpClient`
+            # so that it has enough time to complete and pass us the data before we give
+            # up.
+            timeout=90,
+            reactor=self.reactor,
+        )
+
+        response = await make_deferred_yieldable(request_deferred)
+
+        return response.code, response
+
+    def _send_response(
+        self,
+        request: "SynapseRequest",
+        code: int,
+        response_object: Any,
+    ) -> None:
+        response = cast(IResponse, response_object)
+        response_headers = cast(Headers, response.headers)
+
+        request.setResponseCode(code)
+
+        # The `Connection` header also defines which headers should not be copied over.
+        connection_header = response_headers.getRawHeaders(b"connection")
+        extra_headers_to_remove = parse_connection_header_value(
+            connection_header[0] if connection_header else None
+        )
+
+        # Copy headers.
+        for k, v in response_headers.getAllRawHeaders():
+            # Do not copy over any hop-by-hop headers. These are meant to only be
+            # consumed by the immediate recipient and not be forwarded on.
+            header_key = k.decode("ascii")
+            if (
+                header_key in HOP_BY_HOP_HEADERS
+                or header_key in extra_headers_to_remove
+            ):
+                continue
+
+            request.responseHeaders.setRawHeaders(k, v)
+
+        response.deliverBody(_ProxyResponseBody(request))
+
+    def _send_error_response(
+        self,
+        f: failure.Failure,
+        request: "SynapseRequest",
+    ) -> None:
+        if isinstance(f.value, InvalidProxyCredentialsError):
+            error_response_code = f.value.code
+            error_response_json = {"errcode": f.value.errcode, "err": f.value.msg}
+        else:
+            error_response_code = 502
+            error_response_json = {
+                "errcode": Codes.UNKNOWN,
+                "err": "ProxyResource: Error when proxying request: %s %s -> %s"
+                % (
+                    request.method.decode("ascii"),
+                    request.uri.decode("ascii"),
+                    f,
+                ),
+            }
+
+        request.setResponseCode(error_response_code)
+        request.setHeader(b"Content-Type", b"application/json")
+        request.write((json.dumps(error_response_json)).encode())
+        request.finish()
+
+
+class _ProxyResponseBody(protocol.Protocol):
+    """
+    A protocol that proxies the given remote response data back out to the given local
+    request.
+    """
+
+    transport: Optional[ITCPTransport] = None
+
+    def __init__(self, request: "SynapseRequest") -> None:
+        self._request = request
+
+    def dataReceived(self, data: bytes) -> None:
+        # Avoid sending response data to the local request that already disconnected
+        if self._request._disconnected and self.transport is not None:
+            # Close the connection (forcefully) since all the data will get
+            # discarded anyway.
+            self.transport.abortConnection()
+            return
+
+        self._request.write(data)
+
+    def connectionLost(self, reason: Failure = connectionDone) -> None:
+        # If the local request is already finished (successfully or failed), don't
+        # worry about sending anything back.
+        if self._request.finished:
+            return
+
+        if reason.check(ResponseDone):
+            self._request.finish()
+        else:
+            # Abort the underlying request since our remote request also failed.
+            self._request.transport.abortConnection()
+
+
+class ProxySite(Site):
+    """
+    Proxies any requests with a `matrix-federation://` scheme through the given
+    `federation_agent`. Otherwise, behaves like a normal `Site`.
+    """
+
+    def __init__(
+        self,
+        resource: IResource,
+        reactor: ISynapseReactor,
+        hs: "HomeServer",
+    ):
+        super().__init__(resource, reactor=reactor)
+
+        self._proxy_resource = ProxyResource(reactor, hs=hs)
+
+    def getResourceFor(self, request: "SynapseRequest") -> IResource:
+        uri = urllib.parse.urlparse(request.uri)
+        if uri.scheme == b"matrix-federation":
+            return self._proxy_resource
+
+        return super().getResourceFor(request)
diff --git a/synapse/http/proxyagent.py b/synapse/http/proxyagent.py
index 7bdc4acae7..59ab8fad35 100644
--- a/synapse/http/proxyagent.py
+++ b/synapse/http/proxyagent.py
@@ -12,8 +12,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
+import random
 import re
-from typing import Any, Dict, Optional, Tuple
+from typing import Any, Collection, Dict, List, Optional, Sequence, Tuple
 from urllib.parse import urlparse
 from urllib.request import (  # type: ignore[attr-defined]
     getproxies_environment,
@@ -23,8 +24,17 @@ from urllib.request import (  # type: ignore[attr-defined]
 from zope.interface import implementer
 
 from twisted.internet import defer
-from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
-from twisted.internet.interfaces import IReactorCore, IStreamClientEndpoint
+from twisted.internet.endpoints import (
+    HostnameEndpoint,
+    UNIXClientEndpoint,
+    wrapClientTLS,
+)
+from twisted.internet.interfaces import (
+    IProtocol,
+    IProtocolFactory,
+    IReactorCore,
+    IStreamClientEndpoint,
+)
 from twisted.python.failure import Failure
 from twisted.web.client import (
     URI,
@@ -36,8 +46,18 @@ from twisted.web.error import SchemeNotSupported
 from twisted.web.http_headers import Headers
 from twisted.web.iweb import IAgent, IBodyProducer, IPolicyForHTTPS, IResponse
 
+from synapse.config.workers import (
+    InstanceLocationConfig,
+    InstanceTcpLocationConfig,
+    InstanceUnixLocationConfig,
+)
 from synapse.http import redact_uri
-from synapse.http.connectproxyclient import HTTPConnectProxyEndpoint, ProxyCredentials
+from synapse.http.connectproxyclient import (
+    BasicProxyCredentials,
+    HTTPConnectProxyEndpoint,
+    ProxyCredentials,
+)
+from synapse.logging.context import run_in_background
 
 logger = logging.getLogger(__name__)
 
@@ -74,6 +94,14 @@ class ProxyAgent(_AgentBase):
         use_proxy: Whether proxy settings should be discovered and used
             from conventional environment variables.
 
+        federation_proxy_locations: An optional list of locations to proxy outbound federation
+            traffic through (only requests that use the `matrix-federation://` scheme
+            will be proxied).
+
+        federation_proxy_credentials: Required if `federation_proxy_locations` is set. The
+            credentials to use when proxying outbound federation traffic through another
+            worker.
+
     Raises:
         ValueError if use_proxy is set and the environment variables
             contain an invalid proxy specification.
@@ -89,6 +117,8 @@ class ProxyAgent(_AgentBase):
         bindAddress: Optional[bytes] = None,
         pool: Optional[HTTPConnectionPool] = None,
         use_proxy: bool = False,
+        federation_proxy_locations: Collection[InstanceLocationConfig] = (),
+        federation_proxy_credentials: Optional[ProxyCredentials] = None,
     ):
         contextFactory = contextFactory or BrowserLikePolicyForHTTPS()
 
@@ -127,6 +157,47 @@ class ProxyAgent(_AgentBase):
         self._policy_for_https = contextFactory
         self._reactor = reactor
 
+        self._federation_proxy_endpoint: Optional[IStreamClientEndpoint] = None
+        self._federation_proxy_credentials: Optional[ProxyCredentials] = None
+        if federation_proxy_locations:
+            assert (
+                federation_proxy_credentials is not None
+            ), "`federation_proxy_credentials` are required when using `federation_proxy_locations`"
+
+            endpoints: List[IStreamClientEndpoint] = []
+            for federation_proxy_location in federation_proxy_locations:
+                endpoint: IStreamClientEndpoint
+                if isinstance(federation_proxy_location, InstanceTcpLocationConfig):
+                    endpoint = HostnameEndpoint(
+                        self.proxy_reactor,
+                        federation_proxy_location.host,
+                        federation_proxy_location.port,
+                    )
+                    if federation_proxy_location.tls:
+                        tls_connection_creator = (
+                            self._policy_for_https.creatorForNetloc(
+                                federation_proxy_location.host.encode("utf-8"),
+                                federation_proxy_location.port,
+                            )
+                        )
+                        endpoint = wrapClientTLS(tls_connection_creator, endpoint)
+
+                elif isinstance(federation_proxy_location, InstanceUnixLocationConfig):
+                    endpoint = UNIXClientEndpoint(
+                        self.proxy_reactor, federation_proxy_location.path
+                    )
+
+                else:
+                    # It is supremely unlikely we ever hit this
+                    raise SchemeNotSupported(
+                        f"Unknown type of Endpoint requested, check {federation_proxy_location}"
+                    )
+
+                endpoints.append(endpoint)
+
+            self._federation_proxy_endpoint = _RandomSampleEndpoints(endpoints)
+            self._federation_proxy_credentials = federation_proxy_credentials
+
     def request(
         self,
         method: bytes,
@@ -214,6 +285,25 @@ class ProxyAgent(_AgentBase):
                 parsed_uri.port,
                 self.https_proxy_creds,
             )
+        elif (
+            parsed_uri.scheme == b"matrix-federation"
+            and self._federation_proxy_endpoint
+        ):
+            assert (
+                self._federation_proxy_credentials is not None
+            ), "`federation_proxy_credentials` are required when using `federation_proxy_locations`"
+
+            # Set a Proxy-Authorization header
+            if headers is None:
+                headers = Headers()
+            # We always need authentication for the outbound federation proxy
+            headers.addRawHeader(
+                b"Proxy-Authorization",
+                self._federation_proxy_credentials.as_proxy_authorization_value(),
+            )
+
+            endpoint = self._federation_proxy_endpoint
+            request_path = uri
         else:
             # not using a proxy
             endpoint = HostnameEndpoint(
@@ -233,6 +323,11 @@ class ProxyAgent(_AgentBase):
             endpoint = wrapClientTLS(tls_connection_creator, endpoint)
         elif parsed_uri.scheme == b"http":
             pass
+        elif (
+            parsed_uri.scheme == b"matrix-federation"
+            and self._federation_proxy_endpoint
+        ):
+            pass
         else:
             return defer.fail(
                 Failure(
@@ -334,6 +429,42 @@ def parse_proxy(
 
     credentials = None
     if url.username and url.password:
-        credentials = ProxyCredentials(b"".join([url.username, b":", url.password]))
+        credentials = BasicProxyCredentials(
+            b"".join([url.username, b":", url.password])
+        )
 
     return url.scheme, url.hostname, url.port or default_port, credentials
+
+
+@implementer(IStreamClientEndpoint)
+class _RandomSampleEndpoints:
+    """An endpoint that randomly iterates through a given list of endpoints at
+    each connection attempt.
+    """
+
+    def __init__(
+        self,
+        endpoints: Sequence[IStreamClientEndpoint],
+    ) -> None:
+        assert endpoints
+        self._endpoints = endpoints
+
+    def __repr__(self) -> str:
+        return f"<_RandomSampleEndpoints endpoints={self._endpoints}>"
+
+    def connect(
+        self, protocol_factory: IProtocolFactory
+    ) -> "defer.Deferred[IProtocol]":
+        """Implements IStreamClientEndpoint interface"""
+
+        return run_in_background(self._do_connect, protocol_factory)
+
+    async def _do_connect(self, protocol_factory: IProtocolFactory) -> IProtocol:
+        failures: List[Failure] = []
+        for endpoint in random.sample(self._endpoints, k=len(self._endpoints)):
+            try:
+                return await endpoint.connect(protocol_factory)
+            except Exception:
+                failures.append(Failure())
+
+        failures.pop().raiseException()
diff --git a/synapse/http/server.py b/synapse/http/server.py
index e411ac7e62..f592600880 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -18,6 +18,7 @@ import html
 import logging
 import types
 import urllib
+import urllib.parse
 from http import HTTPStatus
 from http.client import FOUND
 from inspect import isawaitable
@@ -65,7 +66,6 @@ from synapse.api.errors import (
     UnrecognizedRequestError,
 )
 from synapse.config.homeserver import HomeServerConfig
-from synapse.http.site import SynapseRequest
 from synapse.logging.context import defer_to_thread, preserve_fn, run_in_background
 from synapse.logging.opentracing import active_span, start_active_span, trace_servlet
 from synapse.util import json_encoder
@@ -76,6 +76,7 @@ from synapse.util.iterutils import chunk_seq
 if TYPE_CHECKING:
     import opentracing
 
+    from synapse.http.site import SynapseRequest
     from synapse.server import HomeServer
 
 logger = logging.getLogger(__name__)
@@ -102,7 +103,7 @@ HTTP_STATUS_REQUEST_CANCELLED = 499
 
 
 def return_json_error(
-    f: failure.Failure, request: SynapseRequest, config: Optional[HomeServerConfig]
+    f: failure.Failure, request: "SynapseRequest", config: Optional[HomeServerConfig]
 ) -> None:
     """Sends a JSON error response to clients."""
 
@@ -220,8 +221,8 @@ def return_html_error(
 
 
 def wrap_async_request_handler(
-    h: Callable[["_AsyncResource", SynapseRequest], Awaitable[None]]
-) -> Callable[["_AsyncResource", SynapseRequest], "defer.Deferred[None]"]:
+    h: Callable[["_AsyncResource", "SynapseRequest"], Awaitable[None]]
+) -> Callable[["_AsyncResource", "SynapseRequest"], "defer.Deferred[None]"]:
     """Wraps an async request handler so that it calls request.processing.
 
     This helps ensure that work done by the request handler after the request is completed
@@ -235,7 +236,7 @@ def wrap_async_request_handler(
     """
 
     async def wrapped_async_request_handler(
-        self: "_AsyncResource", request: SynapseRequest
+        self: "_AsyncResource", request: "SynapseRequest"
     ) -> None:
         with request.processing():
             await h(self, request)
@@ -300,7 +301,7 @@ class _AsyncResource(resource.Resource, metaclass=abc.ABCMeta):
 
         self._extract_context = extract_context
 
-    def render(self, request: SynapseRequest) -> int:
+    def render(self, request: "SynapseRequest") -> int:
         """This gets called by twisted every time someone sends us a request."""
         request.render_deferred = defer.ensureDeferred(
             self._async_render_wrapper(request)
@@ -308,7 +309,7 @@ class _AsyncResource(resource.Resource, metaclass=abc.ABCMeta):
         return NOT_DONE_YET
 
     @wrap_async_request_handler
-    async def _async_render_wrapper(self, request: SynapseRequest) -> None:
+    async def _async_render_wrapper(self, request: "SynapseRequest") -> None:
         """This is a wrapper that delegates to `_async_render` and handles
         exceptions, return values, metrics, etc.
         """
@@ -326,9 +327,15 @@ class _AsyncResource(resource.Resource, metaclass=abc.ABCMeta):
             # of our stack, and thus gives us a sensible stack
             # trace.
             f = failure.Failure()
+            logger.exception(
+                "Error handling request",
+                exc_info=(f.type, f.value, f.getTracebackObject()),
+            )
             self._send_error_response(f, request)
 
-    async def _async_render(self, request: SynapseRequest) -> Optional[Tuple[int, Any]]:
+    async def _async_render(
+        self, request: "SynapseRequest"
+    ) -> Optional[Tuple[int, Any]]:
         """Delegates to `_async_render_<METHOD>` methods, or returns a 400 if
         no appropriate method exists. Can be overridden in sub classes for
         different routing.
@@ -358,7 +365,7 @@ class _AsyncResource(resource.Resource, metaclass=abc.ABCMeta):
     @abc.abstractmethod
     def _send_response(
         self,
-        request: SynapseRequest,
+        request: "SynapseRequest",
         code: int,
         response_object: Any,
     ) -> None:
@@ -368,7 +375,7 @@ class _AsyncResource(resource.Resource, metaclass=abc.ABCMeta):
     def _send_error_response(
         self,
         f: failure.Failure,
-        request: SynapseRequest,
+        request: "SynapseRequest",
     ) -> None:
         raise NotImplementedError()
 
@@ -384,7 +391,7 @@ class DirectServeJsonResource(_AsyncResource):
 
     def _send_response(
         self,
-        request: SynapseRequest,
+        request: "SynapseRequest",
         code: int,
         response_object: Any,
     ) -> None:
@@ -401,7 +408,7 @@ class DirectServeJsonResource(_AsyncResource):
     def _send_error_response(
         self,
         f: failure.Failure,
-        request: SynapseRequest,
+        request: "SynapseRequest",
     ) -> None:
         """Implements _AsyncResource._send_error_response"""
         return_json_error(f, request, None)
@@ -473,7 +480,7 @@ class JsonResource(DirectServeJsonResource):
             )
 
     def _get_handler_for_request(
-        self, request: SynapseRequest
+        self, request: "SynapseRequest"
     ) -> Tuple[ServletCallback, str, Dict[str, str]]:
         """Finds a callback method to handle the given request.
 
@@ -503,7 +510,7 @@ class JsonResource(DirectServeJsonResource):
         # Huh. No one wanted to handle that? Fiiiiiine.
         raise UnrecognizedRequestError(code=404)
 
-    async def _async_render(self, request: SynapseRequest) -> Tuple[int, Any]:
+    async def _async_render(self, request: "SynapseRequest") -> Tuple[int, Any]:
         callback, servlet_classname, group_dict = self._get_handler_for_request(request)
 
         request.is_render_cancellable = is_function_cancellable(callback)
@@ -535,7 +542,7 @@ class JsonResource(DirectServeJsonResource):
     def _send_error_response(
         self,
         f: failure.Failure,
-        request: SynapseRequest,
+        request: "SynapseRequest",
     ) -> None:
         """Implements _AsyncResource._send_error_response"""
         return_json_error(f, request, self.hs.config)
@@ -551,7 +558,7 @@ class DirectServeHtmlResource(_AsyncResource):
 
     def _send_response(
         self,
-        request: SynapseRequest,
+        request: "SynapseRequest",
         code: int,
         response_object: Any,
     ) -> None:
@@ -565,7 +572,7 @@ class DirectServeHtmlResource(_AsyncResource):
     def _send_error_response(
         self,
         f: failure.Failure,
-        request: SynapseRequest,
+        request: "SynapseRequest",
     ) -> None:
         """Implements _AsyncResource._send_error_response"""
         return_html_error(f, request, self.ERROR_TEMPLATE)
@@ -592,7 +599,7 @@ class UnrecognizedRequestResource(resource.Resource):
     errcode of M_UNRECOGNIZED.
     """
 
-    def render(self, request: SynapseRequest) -> int:
+    def render(self, request: "SynapseRequest") -> int:
         f = failure.Failure(UnrecognizedRequestError(code=404))
         return_json_error(f, request, None)
         # A response has already been sent but Twisted requires either NOT_DONE_YET
@@ -622,7 +629,7 @@ class RootRedirect(resource.Resource):
 class OptionsResource(resource.Resource):
     """Responds to OPTION requests for itself and all children."""
 
-    def render_OPTIONS(self, request: SynapseRequest) -> bytes:
+    def render_OPTIONS(self, request: "SynapseRequest") -> bytes:
         request.setResponseCode(204)
         request.setHeader(b"Content-Length", b"0")
 
@@ -737,7 +744,7 @@ def _encode_json_bytes(json_object: object) -> bytes:
 
 
 def respond_with_json(
-    request: SynapseRequest,
+    request: "SynapseRequest",
     code: int,
     json_object: Any,
     send_cors: bool = False,
@@ -787,7 +794,7 @@ def respond_with_json(
 
 
 def respond_with_json_bytes(
-    request: SynapseRequest,
+    request: "SynapseRequest",
     code: int,
     json_bytes: bytes,
     send_cors: bool = False,
@@ -825,7 +832,7 @@ def respond_with_json_bytes(
 
 
 async def _async_write_json_to_request_in_thread(
-    request: SynapseRequest,
+    request: "SynapseRequest",
     json_encoder: Callable[[Any], bytes],
     json_object: Any,
 ) -> None:
@@ -883,7 +890,7 @@ def _write_bytes_to_request(request: Request, bytes_to_write: bytes) -> None:
     _ByteProducer(request, bytes_generator)
 
 
-def set_cors_headers(request: SynapseRequest) -> None:
+def set_cors_headers(request: "SynapseRequest") -> None:
     """Set the CORS headers so that javascript running in a web browsers can
     use this API
 
@@ -981,7 +988,7 @@ def set_clickjacking_protection_headers(request: Request) -> None:
 
 
 def respond_with_redirect(
-    request: SynapseRequest, url: bytes, statusCode: int = FOUND, cors: bool = False
+    request: "SynapseRequest", url: bytes, statusCode: int = FOUND, cors: bool = False
 ) -> None:
     """
     Write a 302 (or other specified status code) response to the request, if it is still alive.
diff --git a/synapse/http/site.py b/synapse/http/site.py
index 5b5a7c1e59..a388d6cf7f 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -21,25 +21,29 @@ from zope.interface import implementer
 
 from twisted.internet.address import UNIXAddress
 from twisted.internet.defer import Deferred
-from twisted.internet.interfaces import IAddress, IReactorTime
+from twisted.internet.interfaces import IAddress
 from twisted.python.failure import Failure
 from twisted.web.http import HTTPChannel
 from twisted.web.resource import IResource, Resource
-from twisted.web.server import Request, Site
+from twisted.web.server import Request
 
 from synapse.config.server import ListenerConfig
 from synapse.http import get_request_user_agent, redact_uri
+from synapse.http.proxy import ProxySite
 from synapse.http.request_metrics import RequestMetrics, requests_counter
 from synapse.logging.context import (
     ContextRequest,
     LoggingContext,
     PreserveLoggingContext,
 )
-from synapse.types import Requester
+from synapse.types import ISynapseReactor, Requester
 
 if TYPE_CHECKING:
     import opentracing
 
+    from synapse.server import HomeServer
+
+
 logger = logging.getLogger(__name__)
 
 _next_request_seq = 0
@@ -102,7 +106,7 @@ class SynapseRequest(Request):
         # A boolean indicating whether `render_deferred` should be cancelled if the
         # client disconnects early. Expected to be set by the coroutine started by
         # `Resource.render`, if rendering is asynchronous.
-        self.is_render_cancellable = False
+        self.is_render_cancellable: bool = False
 
         global _next_request_seq
         self.request_seq = _next_request_seq
@@ -601,7 +605,7 @@ class _XForwardedForAddress:
     host: str
 
 
-class SynapseSite(Site):
+class SynapseSite(ProxySite):
     """
     Synapse-specific twisted http Site
 
@@ -623,7 +627,8 @@ class SynapseSite(Site):
         resource: IResource,
         server_version_string: str,
         max_request_body_size: int,
-        reactor: IReactorTime,
+        reactor: ISynapseReactor,
+        hs: "HomeServer",
     ):
         """
 
@@ -638,7 +643,11 @@ class SynapseSite(Site):
                 dropping the connection
             reactor: reactor to be used to manage connection timeouts
         """
-        Site.__init__(self, resource, reactor=reactor)
+        super().__init__(
+            resource=resource,
+            reactor=reactor,
+            hs=hs,
+        )
 
         self.site_tag = site_tag
         self.reactor = reactor
@@ -649,7 +658,9 @@ class SynapseSite(Site):
 
         request_id_header = config.http_options.request_id_header
 
-        self.experimental_cors_msc3886 = config.http_options.experimental_cors_msc3886
+        self.experimental_cors_msc3886: bool = (
+            config.http_options.experimental_cors_msc3886
+        )
 
         def request_factory(channel: HTTPChannel, queued: bool) -> Request:
             return request_class(