diff options
Diffstat (limited to 'tests/http/test_matrixfederationclient.py')
-rw-r--r-- | tests/http/test_matrixfederationclient.py | 189 |
1 files changed, 185 insertions, 4 deletions
diff --git a/tests/http/test_matrixfederationclient.py b/tests/http/test_matrixfederationclient.py index b5f4a60fe5..a8b9737d1f 100644 --- a/tests/http/test_matrixfederationclient.py +++ b/tests/http/test_matrixfederationclient.py @@ -11,8 +11,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from typing import Generator -from unittest.mock import Mock +from typing import Any, Dict, Generator +from unittest.mock import ANY, Mock, create_autospec from netaddr import IPSet from parameterized import parameterized @@ -21,10 +21,11 @@ from twisted.internet import defer from twisted.internet.defer import Deferred, TimeoutError from twisted.internet.error import ConnectingCancelledError, DNSLookupError from twisted.test.proto_helpers import MemoryReactor, StringTransport -from twisted.web.client import ResponseNeverReceived +from twisted.web.client import Agent, ResponseNeverReceived from twisted.web.http import HTTPChannel +from twisted.web.http_headers import Headers -from synapse.api.errors import RequestSendFailed +from synapse.api.errors import HttpResponseException, RequestSendFailed from synapse.http.matrixfederationclient import ( ByteParser, MatrixFederationHttpClient, @@ -39,7 +40,9 @@ from synapse.logging.context import ( from synapse.server import HomeServer from synapse.util import Clock +from tests.replication._base import BaseMultiWorkerStreamTestCase from tests.server import FakeTransport +from tests.test_utils import FakeResponse from tests.unittest import HomeserverTestCase, override_config @@ -658,3 +661,181 @@ class FederationClientTests(HomeserverTestCase): self.assertEqual(self.cl.max_short_retry_delay_seconds, 7) self.assertEqual(self.cl.max_long_retries, 20) self.assertEqual(self.cl.max_short_retries, 5) + + +class FederationClientProxyTests(BaseMultiWorkerStreamTestCase): + def default_config(self) -> Dict[str, Any]: + conf = super().default_config() + conf["instance_map"] = { + "main": {"host": "testserv", "port": 8765}, + "federation_sender": {"host": "testserv", "port": 1001}, + } + return conf + + @override_config({"outbound_federation_restricted_to": ["federation_sender"]}) + def test_proxy_requests_through_federation_sender_worker(self) -> None: + """ + Test that all outbound federation requests go through the `federation_sender` + worker + """ + # Mock out the `MatrixFederationHttpClient` of the `federation_sender` instance + # so we can act like some remote server responding to requests + mock_client_on_federation_sender = Mock() + mock_agent_on_federation_sender = create_autospec(Agent, spec_set=True) + mock_client_on_federation_sender.agent = mock_agent_on_federation_sender + + # Create the `federation_sender` worker + self.federation_sender = self.make_worker_hs( + "synapse.app.generic_worker", + {"worker_name": "federation_sender"}, + federation_http_client=mock_client_on_federation_sender, + ) + + # Fake `remoteserv:8008` responding to requests + mock_agent_on_federation_sender.request.side_effect = ( + lambda *args, **kwargs: defer.succeed( + FakeResponse.json( + payload={ + "foo": "bar", + } + ) + ) + ) + + # This federation request from the main process should be proxied through the + # `federation_sender` worker off to the remote server + test_request_from_main_process_d = defer.ensureDeferred( + self.hs.get_federation_http_client().get_json("remoteserv:8008", "foo/bar") + ) + + # Pump the reactor so our deferred goes through the motions + self.pump() + + # Make sure that the request was proxied through the `federation_sender` worker + mock_agent_on_federation_sender.request.assert_called_once_with( + b"GET", + b"matrix-federation://remoteserv:8008/foo/bar", + headers=ANY, + bodyProducer=ANY, + ) + + # Make sure the response is as expected back on the main worker + res = self.successResultOf(test_request_from_main_process_d) + self.assertEqual(res, {"foo": "bar"}) + + @override_config({"outbound_federation_restricted_to": ["federation_sender"]}) + def test_proxy_request_with_network_error_through_federation_sender_worker( + self, + ) -> None: + """ + Test that when the outbound federation request fails with a network related + error, a sensible error makes its way back to the main process. + """ + # Mock out the `MatrixFederationHttpClient` of the `federation_sender` instance + # so we can act like some remote server responding to requests + mock_client_on_federation_sender = Mock() + mock_agent_on_federation_sender = create_autospec(Agent, spec_set=True) + mock_client_on_federation_sender.agent = mock_agent_on_federation_sender + + # Create the `federation_sender` worker + self.federation_sender = self.make_worker_hs( + "synapse.app.generic_worker", + {"worker_name": "federation_sender"}, + federation_http_client=mock_client_on_federation_sender, + ) + + # Fake `remoteserv:8008` responding to requests + mock_agent_on_federation_sender.request.side_effect = ( + lambda *args, **kwargs: defer.fail(ResponseNeverReceived("fake error")) + ) + + # This federation request from the main process should be proxied through the + # `federation_sender` worker off to the remote server + test_request_from_main_process_d = defer.ensureDeferred( + self.hs.get_federation_http_client().get_json("remoteserv:8008", "foo/bar") + ) + + # Pump the reactor so our deferred goes through the motions. We pump with 10 + # seconds (0.1 * 100) so the `MatrixFederationHttpClient` runs out of retries + # and finally passes along the error response. + self.pump(0.1) + + # Make sure that the request was proxied through the `federation_sender` worker + mock_agent_on_federation_sender.request.assert_called_with( + b"GET", + b"matrix-federation://remoteserv:8008/foo/bar", + headers=ANY, + bodyProducer=ANY, + ) + + # Make sure we get some sort of error back on the main worker + failure_res = self.failureResultOf(test_request_from_main_process_d) + self.assertIsInstance(failure_res.value, RequestSendFailed) + self.assertIsInstance(failure_res.value.inner_exception, HttpResponseException) + + @override_config({"outbound_federation_restricted_to": ["federation_sender"]}) + def test_proxy_requests_and_discards_hop_by_hop_headers(self) -> None: + """ + Test to make sure hop-by-hop headers and addional headers defined in the + `Connection` header are discarded when proxying requests + """ + # Mock out the `MatrixFederationHttpClient` of the `federation_sender` instance + # so we can act like some remote server responding to requests + mock_client_on_federation_sender = Mock() + mock_agent_on_federation_sender = create_autospec(Agent, spec_set=True) + mock_client_on_federation_sender.agent = mock_agent_on_federation_sender + + # Create the `federation_sender` worker + self.federation_sender = self.make_worker_hs( + "synapse.app.generic_worker", + {"worker_name": "federation_sender"}, + federation_http_client=mock_client_on_federation_sender, + ) + + # Fake `remoteserv:8008` responding to requests + mock_agent_on_federation_sender.request.side_effect = lambda *args, **kwargs: defer.succeed( + FakeResponse( + code=200, + body=b'{"foo": "bar"}', + headers=Headers( + { + "Content-Type": ["application/json"], + "Connection": ["close, X-Foo, X-Bar"], + # Should be removed because it's defined in the `Connection` header + "X-Foo": ["foo"], + "X-Bar": ["bar"], + # Should be removed because it's a hop-by-hop header + "Proxy-Authorization": "abcdef", + } + ), + ) + ) + + # This federation request from the main process should be proxied through the + # `federation_sender` worker off to the remote server + test_request_from_main_process_d = defer.ensureDeferred( + self.hs.get_federation_http_client().get_json_with_headers( + "remoteserv:8008", "foo/bar" + ) + ) + + # Pump the reactor so our deferred goes through the motions + self.pump() + + # Make sure that the request was proxied through the `federation_sender` worker + mock_agent_on_federation_sender.request.assert_called_once_with( + b"GET", + b"matrix-federation://remoteserv:8008/foo/bar", + headers=ANY, + bodyProducer=ANY, + ) + + res, headers = self.successResultOf(test_request_from_main_process_d) + header_names = set(headers.keys()) + + # Make sure the response does not include the hop-by-hop headers + self.assertNotIn(b"X-Foo", header_names) + self.assertNotIn(b"X-Bar", header_names) + self.assertNotIn(b"Proxy-Authorization", header_names) + # Make sure the response is as expected back on the main worker + self.assertEqual(res, {"foo": "bar"}) |