summary refs log tree commit diff
path: root/tests/http
diff options
context:
space:
mode:
authorEric Eastwood <erice@element.io>2023-07-05 18:53:55 -0500
committerGitHub <noreply@github.com>2023-07-05 18:53:55 -0500
commitb07b14b494ae1dd564b4c44f844c9a9545b3d08a (patch)
tree3fadaf825910c72bb9fc1b4610610bbe6721eb1f /tests/http
parentRemove support for Python 3.7 (#15851) (diff)
downloadsynapse-b07b14b494ae1dd564b4c44f844c9a9545b3d08a.tar.xz
Federation outbound proxy (#15773)
Allow configuring the set of workers to proxy outbound federation traffic through (`outbound_federation_restricted_to`).

This is useful when you have a worker setup with `federation_sender` instances responsible for sending outbound federation requests and want to make sure *all* outbound federation traffic goes through those instances. Before this change, the generic workers would still contact federation themselves for things like profile lookups, backfill, etc. This PR allows you to set more strict access controls/firewall for all workers and only allow the `federation_sender`'s to contact the outside world.

The original code is from @erikjohnston's branches which I've gotten in-shape to merge.
Diffstat (limited to 'tests/http')
-rw-r--r--tests/http/test_matrixfederationclient.py189
-rw-r--r--tests/http/test_proxy.py53
2 files changed, 238 insertions, 4 deletions
diff --git a/tests/http/test_matrixfederationclient.py b/tests/http/test_matrixfederationclient.py
index b5f4a60fe5..a8b9737d1f 100644
--- a/tests/http/test_matrixfederationclient.py
+++ b/tests/http/test_matrixfederationclient.py
@@ -11,8 +11,8 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from typing import Generator
-from unittest.mock import Mock
+from typing import Any, Dict, Generator
+from unittest.mock import ANY, Mock, create_autospec
 
 from netaddr import IPSet
 from parameterized import parameterized
@@ -21,10 +21,11 @@ from twisted.internet import defer
 from twisted.internet.defer import Deferred, TimeoutError
 from twisted.internet.error import ConnectingCancelledError, DNSLookupError
 from twisted.test.proto_helpers import MemoryReactor, StringTransport
-from twisted.web.client import ResponseNeverReceived
+from twisted.web.client import Agent, ResponseNeverReceived
 from twisted.web.http import HTTPChannel
+from twisted.web.http_headers import Headers
 
-from synapse.api.errors import RequestSendFailed
+from synapse.api.errors import HttpResponseException, RequestSendFailed
 from synapse.http.matrixfederationclient import (
     ByteParser,
     MatrixFederationHttpClient,
@@ -39,7 +40,9 @@ from synapse.logging.context import (
 from synapse.server import HomeServer
 from synapse.util import Clock
 
+from tests.replication._base import BaseMultiWorkerStreamTestCase
 from tests.server import FakeTransport
+from tests.test_utils import FakeResponse
 from tests.unittest import HomeserverTestCase, override_config
 
 
@@ -658,3 +661,181 @@ class FederationClientTests(HomeserverTestCase):
         self.assertEqual(self.cl.max_short_retry_delay_seconds, 7)
         self.assertEqual(self.cl.max_long_retries, 20)
         self.assertEqual(self.cl.max_short_retries, 5)
+
+
+class FederationClientProxyTests(BaseMultiWorkerStreamTestCase):
+    def default_config(self) -> Dict[str, Any]:
+        conf = super().default_config()
+        conf["instance_map"] = {
+            "main": {"host": "testserv", "port": 8765},
+            "federation_sender": {"host": "testserv", "port": 1001},
+        }
+        return conf
+
+    @override_config({"outbound_federation_restricted_to": ["federation_sender"]})
+    def test_proxy_requests_through_federation_sender_worker(self) -> None:
+        """
+        Test that all outbound federation requests go through the `federation_sender`
+        worker
+        """
+        # Mock out the `MatrixFederationHttpClient` of the `federation_sender` instance
+        # so we can act like some remote server responding to requests
+        mock_client_on_federation_sender = Mock()
+        mock_agent_on_federation_sender = create_autospec(Agent, spec_set=True)
+        mock_client_on_federation_sender.agent = mock_agent_on_federation_sender
+
+        # Create the `federation_sender` worker
+        self.federation_sender = self.make_worker_hs(
+            "synapse.app.generic_worker",
+            {"worker_name": "federation_sender"},
+            federation_http_client=mock_client_on_federation_sender,
+        )
+
+        # Fake `remoteserv:8008` responding to requests
+        mock_agent_on_federation_sender.request.side_effect = (
+            lambda *args, **kwargs: defer.succeed(
+                FakeResponse.json(
+                    payload={
+                        "foo": "bar",
+                    }
+                )
+            )
+        )
+
+        # This federation request from the main process should be proxied through the
+        # `federation_sender` worker off to the remote server
+        test_request_from_main_process_d = defer.ensureDeferred(
+            self.hs.get_federation_http_client().get_json("remoteserv:8008", "foo/bar")
+        )
+
+        # Pump the reactor so our deferred goes through the motions
+        self.pump()
+
+        # Make sure that the request was proxied through the `federation_sender` worker
+        mock_agent_on_federation_sender.request.assert_called_once_with(
+            b"GET",
+            b"matrix-federation://remoteserv:8008/foo/bar",
+            headers=ANY,
+            bodyProducer=ANY,
+        )
+
+        # Make sure the response is as expected back on the main worker
+        res = self.successResultOf(test_request_from_main_process_d)
+        self.assertEqual(res, {"foo": "bar"})
+
+    @override_config({"outbound_federation_restricted_to": ["federation_sender"]})
+    def test_proxy_request_with_network_error_through_federation_sender_worker(
+        self,
+    ) -> None:
+        """
+        Test that when the outbound federation request fails with a network related
+        error, a sensible error makes its way back to the main process.
+        """
+        # Mock out the `MatrixFederationHttpClient` of the `federation_sender` instance
+        # so we can act like some remote server responding to requests
+        mock_client_on_federation_sender = Mock()
+        mock_agent_on_federation_sender = create_autospec(Agent, spec_set=True)
+        mock_client_on_federation_sender.agent = mock_agent_on_federation_sender
+
+        # Create the `federation_sender` worker
+        self.federation_sender = self.make_worker_hs(
+            "synapse.app.generic_worker",
+            {"worker_name": "federation_sender"},
+            federation_http_client=mock_client_on_federation_sender,
+        )
+
+        # Fake `remoteserv:8008` responding to requests
+        mock_agent_on_federation_sender.request.side_effect = (
+            lambda *args, **kwargs: defer.fail(ResponseNeverReceived("fake error"))
+        )
+
+        # This federation request from the main process should be proxied through the
+        # `federation_sender` worker off to the remote server
+        test_request_from_main_process_d = defer.ensureDeferred(
+            self.hs.get_federation_http_client().get_json("remoteserv:8008", "foo/bar")
+        )
+
+        # Pump the reactor so our deferred goes through the motions. We pump with 10
+        # seconds (0.1 * 100) so the `MatrixFederationHttpClient` runs out of retries
+        # and finally passes along the error response.
+        self.pump(0.1)
+
+        # Make sure that the request was proxied through the `federation_sender` worker
+        mock_agent_on_federation_sender.request.assert_called_with(
+            b"GET",
+            b"matrix-federation://remoteserv:8008/foo/bar",
+            headers=ANY,
+            bodyProducer=ANY,
+        )
+
+        # Make sure we get some sort of error back on the main worker
+        failure_res = self.failureResultOf(test_request_from_main_process_d)
+        self.assertIsInstance(failure_res.value, RequestSendFailed)
+        self.assertIsInstance(failure_res.value.inner_exception, HttpResponseException)
+
+    @override_config({"outbound_federation_restricted_to": ["federation_sender"]})
+    def test_proxy_requests_and_discards_hop_by_hop_headers(self) -> None:
+        """
+        Test to make sure hop-by-hop headers and addional headers defined in the
+        `Connection` header are discarded when proxying requests
+        """
+        # Mock out the `MatrixFederationHttpClient` of the `federation_sender` instance
+        # so we can act like some remote server responding to requests
+        mock_client_on_federation_sender = Mock()
+        mock_agent_on_federation_sender = create_autospec(Agent, spec_set=True)
+        mock_client_on_federation_sender.agent = mock_agent_on_federation_sender
+
+        # Create the `federation_sender` worker
+        self.federation_sender = self.make_worker_hs(
+            "synapse.app.generic_worker",
+            {"worker_name": "federation_sender"},
+            federation_http_client=mock_client_on_federation_sender,
+        )
+
+        # Fake `remoteserv:8008` responding to requests
+        mock_agent_on_federation_sender.request.side_effect = lambda *args, **kwargs: defer.succeed(
+            FakeResponse(
+                code=200,
+                body=b'{"foo": "bar"}',
+                headers=Headers(
+                    {
+                        "Content-Type": ["application/json"],
+                        "Connection": ["close, X-Foo, X-Bar"],
+                        # Should be removed because it's defined in the `Connection` header
+                        "X-Foo": ["foo"],
+                        "X-Bar": ["bar"],
+                        # Should be removed because it's a hop-by-hop header
+                        "Proxy-Authorization": "abcdef",
+                    }
+                ),
+            )
+        )
+
+        # This federation request from the main process should be proxied through the
+        # `federation_sender` worker off to the remote server
+        test_request_from_main_process_d = defer.ensureDeferred(
+            self.hs.get_federation_http_client().get_json_with_headers(
+                "remoteserv:8008", "foo/bar"
+            )
+        )
+
+        # Pump the reactor so our deferred goes through the motions
+        self.pump()
+
+        # Make sure that the request was proxied through the `federation_sender` worker
+        mock_agent_on_federation_sender.request.assert_called_once_with(
+            b"GET",
+            b"matrix-federation://remoteserv:8008/foo/bar",
+            headers=ANY,
+            bodyProducer=ANY,
+        )
+
+        res, headers = self.successResultOf(test_request_from_main_process_d)
+        header_names = set(headers.keys())
+
+        # Make sure the response does not include the hop-by-hop headers
+        self.assertNotIn(b"X-Foo", header_names)
+        self.assertNotIn(b"X-Bar", header_names)
+        self.assertNotIn(b"Proxy-Authorization", header_names)
+        # Make sure the response is as expected back on the main worker
+        self.assertEqual(res, {"foo": "bar"})
diff --git a/tests/http/test_proxy.py b/tests/http/test_proxy.py
new file mode 100644
index 0000000000..0dc9ba8e05
--- /dev/null
+++ b/tests/http/test_proxy.py
@@ -0,0 +1,53 @@
+# Copyright 2023 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from typing import Set
+
+from parameterized import parameterized
+
+from synapse.http.proxy import parse_connection_header_value
+
+from tests.unittest import TestCase
+
+
+class ProxyTests(TestCase):
+    @parameterized.expand(
+        [
+            [b"close, X-Foo, X-Bar", {"Close", "X-Foo", "X-Bar"}],
+            # No whitespace
+            [b"close,X-Foo,X-Bar", {"Close", "X-Foo", "X-Bar"}],
+            # More whitespace
+            [b"close,    X-Foo,      X-Bar", {"Close", "X-Foo", "X-Bar"}],
+            # "close" directive in not the first position
+            [b"X-Foo, X-Bar, close", {"X-Foo", "X-Bar", "Close"}],
+            # Normalizes header capitalization
+            [b"keep-alive, x-fOo, x-bAr", {"Keep-Alive", "X-Foo", "X-Bar"}],
+            # Handles header names with whitespace
+            [
+                b"keep-alive, x  foo, x bar",
+                {"Keep-Alive", "X  foo", "X bar"},
+            ],
+        ]
+    )
+    def test_parse_connection_header_value(
+        self,
+        connection_header_value: bytes,
+        expected_extra_headers_to_remove: Set[str],
+    ) -> None:
+        """
+        Tests that the connection header value is parsed correctly
+        """
+        self.assertEqual(
+            expected_extra_headers_to_remove,
+            parse_connection_header_value(connection_header_value),
+        )