diff --git a/changelog.d/15773.feature b/changelog.d/15773.feature
deleted file mode 100644
index 0d77fae2dc..0000000000
--- a/changelog.d/15773.feature
+++ /dev/null
@@ -1 +0,0 @@
-Allow configuring the set of workers to proxy outbound federation traffic through via `outbound_federation_restricted_to`.
diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md
index 04e8390ffe..ff59cbccc1 100644
--- a/docs/usage/configuration/config_documentation.md
+++ b/docs/usage/configuration/config_documentation.md
@@ -3930,14 +3930,13 @@ federation_sender_instances:
---
### `instance_map`
-When using workers this should be a map from [`worker_name`](#worker_name) to the HTTP
-replication listener of the worker, if configured, and to the main process. Each worker
-declared under [`stream_writers`](../../workers.md#stream-writers) and
-[`outbound_federation_restricted_to`](#outbound_federation_restricted_to) needs a HTTP replication listener, and that
-listener should be included in the `instance_map`. The main process also needs an entry
-on the `instance_map`, and it should be listed under `main` **if even one other worker
-exists**. Ensure the port matches with what is declared inside the `listener` block for
-a `replication` listener.
+When using workers this should be a map from [`worker_name`](#worker_name) to the
+HTTP replication listener of the worker, if configured, and to the main process.
+Each worker declared under [`stream_writers`](../../workers.md#stream-writers) needs
+a HTTP replication listener, and that listener should be included in the `instance_map`.
+The main process also needs an entry on the `instance_map`, and it should be listed under
+`main` **if even one other worker exists**. Ensure the port matches with what is declared
+inside the `listener` block for a `replication` listener.
Example configuration:
@@ -3967,22 +3966,6 @@ stream_writers:
typing: worker1
```
---
-### `outbound_federation_restricted_to`
-
-When using workers, you can restrict outbound federation traffic to only go through a
-specific subset of workers. Any worker specified here must also be in the
-[`instance_map`](#instance_map).
-
-```yaml
-outbound_federation_restricted_to:
- - federation_sender1
- - federation_sender2
-```
-
-Also see the [worker
-documentation](../../workers.md#restrict-outbound-federation-traffic-to-a-specific-set-of-workers)
-for more info.
----
### `run_background_tasks_on`
The [worker](../../workers.md#background-tasks) that is used to run
diff --git a/docs/workers.md b/docs/workers.md
index 03415c6eb3..828f082e75 100644
--- a/docs/workers.md
+++ b/docs/workers.md
@@ -528,26 +528,6 @@ the stream writer for the `presence` stream:
^/_matrix/client/(api/v1|r0|v3|unstable)/presence/
-#### Restrict outbound federation traffic to a specific set of workers
-
-The `outbound_federation_restricted_to` configuration is useful to make sure outbound
-federation traffic only goes through a specified subset of workers. This allows you to
-set more strict access controls (like a firewall) for all workers and only allow the
-`federation_sender`'s to contact the outside world.
-
-```yaml
-instance_map:
- main:
- host: localhost
- port: 8030
- federation_sender1:
- host: localhost
- port: 8034
-
-outbound_federation_restricted_to:
- - federation_sender1
-```
-
#### Background tasks
There is also support for moving background tasks to a separate
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index 938ab40f27..936b1b0430 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -386,7 +386,6 @@ def listen_unix(
def listen_http(
- hs: "HomeServer",
listener_config: ListenerConfig,
root_resource: Resource,
version_string: str,
@@ -407,7 +406,6 @@ def listen_http(
version_string,
max_request_body_size=max_request_body_size,
reactor=reactor,
- federation_agent=hs.get_federation_http_client().agent,
)
if isinstance(listener_config, TCPListenerConfig):
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index dc79efcc14..7406c3948c 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -221,7 +221,6 @@ class GenericWorkerServer(HomeServer):
root_resource = create_resource_tree(resources, OptionsResource())
_base.listen_http(
- self,
listener_config,
root_resource,
self.version_string,
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index f188c7265a..84236ac299 100644
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -139,7 +139,6 @@ class SynapseHomeServer(HomeServer):
root_resource = OptionsResource()
ports = listen_http(
- self,
listener_config,
create_resource_tree(resources, root_resource),
self.version_string,
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index 5c81eb5c67..ccfe75eaf3 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -15,7 +15,7 @@
import argparse
import logging
-from typing import Any, Dict, List, Optional, Union
+from typing import Any, Dict, List, Union
import attr
from pydantic import BaseModel, Extra, StrictBool, StrictInt, StrictStr
@@ -154,27 +154,6 @@ class WriterLocations:
)
-@attr.s(auto_attribs=True)
-class OutboundFederationRestrictedTo:
- """Whether we limit outbound federation to a certain set of instances.
-
- Attributes:
- instances: optional list of instances that can make outbound federation
- requests. If None then all instances can make federation requests.
- locations: list of instance locations to connect to proxy via.
- """
-
- instances: Optional[List[str]]
- locations: List[InstanceLocationConfig] = attr.Factory(list)
-
- def __contains__(self, instance: str) -> bool:
- # It feels a bit dirty to return `True` if `instances` is `None`, but it makes
- # sense in downstream usage in the sense that if
- # `outbound_federation_restricted_to` is not configured, then any instance can
- # talk to federation (no restrictions so always return `True`).
- return self.instances is None or instance in self.instances
-
-
class WorkerConfig(Config):
"""The workers are processes run separately to the main synapse process.
They have their own pid_file and listener configuration. They use the
@@ -386,23 +365,6 @@ class WorkerConfig(Config):
new_option_name="update_user_directory_from_worker",
)
- outbound_federation_restricted_to = config.get(
- "outbound_federation_restricted_to", None
- )
- self.outbound_federation_restricted_to = OutboundFederationRestrictedTo(
- outbound_federation_restricted_to
- )
- if outbound_federation_restricted_to:
- for instance in outbound_federation_restricted_to:
- if instance not in self.instance_map:
- raise ConfigError(
- "Instance %r is configured in 'outbound_federation_restricted_to' but does not appear in `instance_map` config."
- % (instance,)
- )
- self.outbound_federation_restricted_to.locations.append(
- self.instance_map[instance]
- )
-
def _should_this_worker_perform_duty(
self,
config: Dict[str, Any],
diff --git a/synapse/http/client.py b/synapse/http/client.py
index ca2cdbc6e2..09ea93e10d 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -1037,12 +1037,7 @@ class _ReadBodyWithMaxSizeProtocol(protocol.Protocol):
if reason.check(ResponseDone):
self.deferred.callback(self.length)
elif reason.check(PotentialDataLoss):
- # This applies to requests which don't set `Content-Length` or a
- # `Transfer-Encoding` in the response because in this case the end of the
- # response is indicated by the connection being closed, an event which may
- # also be due to a transient network problem or other error. But since this
- # behavior is expected of some servers (like YouTube), let's ignore it.
- # Stolen from https://github.com/twisted/treq/pull/49/files
+ # stolen from https://github.com/twisted/treq/pull/49/files
# http://twistedmatrix.com/trac/ticket/4840
self.deferred.callback(self.length)
else:
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index b00396fdc7..cc4e258b0f 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -50,7 +50,7 @@ from twisted.internet.interfaces import IReactorTime
from twisted.internet.task import Cooperator
from twisted.web.client import ResponseFailed
from twisted.web.http_headers import Headers
-from twisted.web.iweb import IAgent, IBodyProducer, IResponse
+from twisted.web.iweb import IBodyProducer, IResponse
import synapse.metrics
import synapse.util.retryutils
@@ -72,7 +72,6 @@ from synapse.http.client import (
read_body_with_max_size,
)
from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
-from synapse.http.proxyagent import ProxyAgent
from synapse.http.types import QueryParams
from synapse.logging import opentracing
from synapse.logging.context import make_deferred_yieldable, run_in_background
@@ -394,32 +393,17 @@ class MatrixFederationHttpClient:
if hs.config.server.user_agent_suffix:
user_agent = "%s %s" % (user_agent, hs.config.server.user_agent_suffix)
- outbound_federation_restricted_to = (
- hs.config.worker.outbound_federation_restricted_to
+ federation_agent = MatrixFederationAgent(
+ self.reactor,
+ tls_client_options_factory,
+ user_agent.encode("ascii"),
+ hs.config.server.federation_ip_range_allowlist,
+ hs.config.server.federation_ip_range_blocklist,
)
- if hs.get_instance_name() in outbound_federation_restricted_to:
- # Talk to federation directly
- federation_agent: IAgent = MatrixFederationAgent(
- self.reactor,
- tls_client_options_factory,
- user_agent.encode("ascii"),
- hs.config.server.federation_ip_range_allowlist,
- hs.config.server.federation_ip_range_blocklist,
- )
- else:
- # We need to talk to federation via the proxy via one of the configured
- # locations
- federation_proxies = outbound_federation_restricted_to.locations
- federation_agent = ProxyAgent(
- self.reactor,
- self.reactor,
- tls_client_options_factory,
- federation_proxies=federation_proxies,
- )
# Use a BlocklistingAgentWrapper to prevent circumventing the IP
# blocking via IP literals in server names
- self.agent: IAgent = BlocklistingAgentWrapper(
+ self.agent = BlocklistingAgentWrapper(
federation_agent,
ip_blocklist=hs.config.server.federation_ip_range_blocklist,
)
@@ -428,6 +412,7 @@ class MatrixFederationHttpClient:
self._store = hs.get_datastores().main
self.version_string_bytes = hs.version_string.encode("ascii")
self.default_timeout_seconds = hs.config.federation.client_timeout_ms / 1000
+
self.max_long_retry_delay_seconds = (
hs.config.federation.max_long_retry_delay_ms / 1000
)
@@ -1156,101 +1141,6 @@ class MatrixFederationHttpClient:
RequestSendFailed: If there were problems connecting to the
remote, due to e.g. DNS failures, connection timeouts etc.
"""
- json_dict, _ = await self.get_json_with_headers(
- destination=destination,
- path=path,
- args=args,
- retry_on_dns_fail=retry_on_dns_fail,
- timeout=timeout,
- ignore_backoff=ignore_backoff,
- try_trailing_slash_on_400=try_trailing_slash_on_400,
- parser=parser,
- )
- return json_dict
-
- @overload
- async def get_json_with_headers(
- self,
- destination: str,
- path: str,
- args: Optional[QueryParams] = None,
- retry_on_dns_fail: bool = True,
- timeout: Optional[int] = None,
- ignore_backoff: bool = False,
- try_trailing_slash_on_400: bool = False,
- parser: Literal[None] = None,
- ) -> Tuple[JsonDict, Dict[bytes, List[bytes]]]:
- ...
-
- @overload
- async def get_json_with_headers(
- self,
- destination: str,
- path: str,
- args: Optional[QueryParams] = ...,
- retry_on_dns_fail: bool = ...,
- timeout: Optional[int] = ...,
- ignore_backoff: bool = ...,
- try_trailing_slash_on_400: bool = ...,
- parser: ByteParser[T] = ...,
- ) -> Tuple[T, Dict[bytes, List[bytes]]]:
- ...
-
- async def get_json_with_headers(
- self,
- destination: str,
- path: str,
- args: Optional[QueryParams] = None,
- retry_on_dns_fail: bool = True,
- timeout: Optional[int] = None,
- ignore_backoff: bool = False,
- try_trailing_slash_on_400: bool = False,
- parser: Optional[ByteParser[T]] = None,
- ) -> Tuple[Union[JsonDict, T], Dict[bytes, List[bytes]]]:
- """GETs some json from the given host homeserver and path
-
- Args:
- destination: The remote server to send the HTTP request to.
-
- path: The HTTP path.
-
- args: A dictionary used to create query strings, defaults to
- None.
-
- retry_on_dns_fail: true if the request should be retried on DNS failures
-
- timeout: number of milliseconds to wait for the response.
- self._default_timeout (60s) by default.
-
- Note that we may make several attempts to send the request; this
- timeout applies to the time spent waiting for response headers for
- *each* attempt (including connection time) as well as the time spent
- reading the response body after a 200 response.
-
- ignore_backoff: true to ignore the historical backoff data
- and try the request anyway.
-
- try_trailing_slash_on_400: True if on a 400 M_UNRECOGNIZED
- response we should try appending a trailing slash to the end of
- the request. Workaround for #3622 in Synapse <= v0.99.3.
-
- parser: The parser to use to decode the response. Defaults to
- parsing as JSON.
-
- Returns:
- Succeeds when we get a 2xx HTTP response. The result will be a tuple of the
- decoded JSON body and a dict of the response headers.
-
- Raises:
- HttpResponseException: If we get an HTTP response code >= 300
- (except 429).
- NotRetryingDestination: If we are not yet ready to retry this
- server.
- FederationDeniedError: If this destination is not on our
- federation whitelist
- RequestSendFailed: If there were problems connecting to the
- remote, due to e.g. DNS failures, connection timeouts etc.
- """
request = MatrixFederationRequest(
method="GET", destination=destination, path=path, query=args
)
@@ -1266,8 +1156,6 @@ class MatrixFederationHttpClient:
timeout=timeout,
)
- headers = dict(response.headers.getAllRawHeaders())
-
if timeout is not None:
_sec_timeout = timeout / 1000
else:
@@ -1285,7 +1173,7 @@ class MatrixFederationHttpClient:
parser=parser,
)
- return body, headers
+ return body
async def delete_json(
self,
diff --git a/synapse/http/proxy.py b/synapse/http/proxy.py
deleted file mode 100644
index 0874d67760..0000000000
--- a/synapse/http/proxy.py
+++ /dev/null
@@ -1,249 +0,0 @@
-# Copyright 2023 The Matrix.org Foundation C.I.C.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-import json
-import logging
-import urllib.parse
-from typing import TYPE_CHECKING, Any, Optional, Set, Tuple, cast
-
-from twisted.internet import protocol
-from twisted.internet.interfaces import ITCPTransport
-from twisted.internet.protocol import connectionDone
-from twisted.python import failure
-from twisted.python.failure import Failure
-from twisted.web.client import ResponseDone
-from twisted.web.http_headers import Headers
-from twisted.web.iweb import IAgent, IResponse
-from twisted.web.resource import IResource
-from twisted.web.server import Site
-
-from synapse.api.errors import Codes
-from synapse.http import QuieterFileBodyProducer
-from synapse.http.server import _AsyncResource
-from synapse.logging.context import make_deferred_yieldable, run_in_background
-from synapse.types import ISynapseReactor
-from synapse.util.async_helpers import timeout_deferred
-
-if TYPE_CHECKING:
- from synapse.http.site import SynapseRequest
-
-logger = logging.getLogger(__name__)
-
-# "Hop-by-hop" headers (as opposed to "end-to-end" headers) as defined by RFC2616
-# section 13.5.1 and referenced in RFC9110 section 7.6.1. These are meant to only be
-# consumed by the immediate recipient and not be forwarded on.
-HOP_BY_HOP_HEADERS = {
- "Connection",
- "Keep-Alive",
- "Proxy-Authenticate",
- "Proxy-Authorization",
- "TE",
- "Trailers",
- "Transfer-Encoding",
- "Upgrade",
-}
-
-
-def parse_connection_header_value(
- connection_header_value: Optional[bytes],
-) -> Set[str]:
- """
- Parse the `Connection` header to determine which headers we should not be copied
- over from the remote response.
-
- As defined by RFC2616 section 14.10 and RFC9110 section 7.6.1
-
- Example: `Connection: close, X-Foo, X-Bar` will return `{"Close", "X-Foo", "X-Bar"}`
-
- Even though "close" is a special directive, let's just treat it as just another
- header for simplicity. If people want to check for this directive, they can simply
- check for `"Close" in headers`.
-
- Args:
- connection_header_value: The value of the `Connection` header.
-
- Returns:
- The set of header names that should not be copied over from the remote response.
- The keys are capitalized in canonical capitalization.
- """
- headers = Headers()
- extra_headers_to_remove: Set[str] = set()
- if connection_header_value:
- extra_headers_to_remove = {
- headers._canonicalNameCaps(connection_option.strip()).decode("ascii")
- for connection_option in connection_header_value.split(b",")
- }
-
- return extra_headers_to_remove
-
-
-class ProxyResource(_AsyncResource):
- """
- A stub resource that proxies any requests with a `matrix-federation://` scheme
- through the given `federation_agent` to the remote homeserver and ferries back the
- info.
- """
-
- isLeaf = True
-
- def __init__(self, reactor: ISynapseReactor, federation_agent: IAgent):
- super().__init__(True)
-
- self.reactor = reactor
- self.agent = federation_agent
-
- async def _async_render(self, request: "SynapseRequest") -> Tuple[int, Any]:
- uri = urllib.parse.urlparse(request.uri)
- assert uri.scheme == b"matrix-federation"
-
- headers = Headers()
- for header_name in (b"User-Agent", b"Authorization", b"Content-Type"):
- header_value = request.getHeader(header_name)
- if header_value:
- headers.addRawHeader(header_name, header_value)
-
- request_deferred = run_in_background(
- self.agent.request,
- request.method,
- request.uri,
- headers=headers,
- bodyProducer=QuieterFileBodyProducer(request.content),
- )
- request_deferred = timeout_deferred(
- request_deferred,
- # This should be set longer than the timeout in `MatrixFederationHttpClient`
- # so that it has enough time to complete and pass us the data before we give
- # up.
- timeout=90,
- reactor=self.reactor,
- )
-
- response = await make_deferred_yieldable(request_deferred)
-
- return response.code, response
-
- def _send_response(
- self,
- request: "SynapseRequest",
- code: int,
- response_object: Any,
- ) -> None:
- response = cast(IResponse, response_object)
- response_headers = cast(Headers, response.headers)
-
- request.setResponseCode(code)
-
- # The `Connection` header also defines which headers should not be copied over.
- connection_header = response_headers.getRawHeaders(b"connection")
- extra_headers_to_remove = parse_connection_header_value(
- connection_header[0] if connection_header else None
- )
-
- # Copy headers.
- for k, v in response_headers.getAllRawHeaders():
- # Do not copy over any hop-by-hop headers. These are meant to only be
- # consumed by the immediate recipient and not be forwarded on.
- header_key = k.decode("ascii")
- if (
- header_key in HOP_BY_HOP_HEADERS
- or header_key in extra_headers_to_remove
- ):
- continue
-
- request.responseHeaders.setRawHeaders(k, v)
-
- response.deliverBody(_ProxyResponseBody(request))
-
- def _send_error_response(
- self,
- f: failure.Failure,
- request: "SynapseRequest",
- ) -> None:
- request.setResponseCode(502)
- request.setHeader(b"Content-Type", b"application/json")
- request.write(
- (
- json.dumps(
- {
- "errcode": Codes.UNKNOWN,
- "err": "ProxyResource: Error when proxying request: %s %s -> %s"
- % (
- request.method.decode("ascii"),
- request.uri.decode("ascii"),
- f,
- ),
- }
- )
- ).encode()
- )
- request.finish()
-
-
-class _ProxyResponseBody(protocol.Protocol):
- """
- A protocol that proxies the given remote response data back out to the given local
- request.
- """
-
- transport: Optional[ITCPTransport] = None
-
- def __init__(self, request: "SynapseRequest") -> None:
- self._request = request
-
- def dataReceived(self, data: bytes) -> None:
- # Avoid sending response data to the local request that already disconnected
- if self._request._disconnected and self.transport is not None:
- # Close the connection (forcefully) since all the data will get
- # discarded anyway.
- self.transport.abortConnection()
- return
-
- self._request.write(data)
-
- def connectionLost(self, reason: Failure = connectionDone) -> None:
- # If the local request is already finished (successfully or failed), don't
- # worry about sending anything back.
- if self._request.finished:
- return
-
- if reason.check(ResponseDone):
- self._request.finish()
- else:
- # Abort the underlying request since our remote request also failed.
- self._request.transport.abortConnection()
-
-
-class ProxySite(Site):
- """
- Proxies any requests with a `matrix-federation://` scheme through the given
- `federation_agent`. Otherwise, behaves like a normal `Site`.
- """
-
- def __init__(
- self,
- resource: IResource,
- reactor: ISynapseReactor,
- federation_agent: IAgent,
- ):
- super().__init__(resource, reactor=reactor)
-
- self._proxy_resource = ProxyResource(reactor, federation_agent)
-
- def getResourceFor(self, request: "SynapseRequest") -> IResource:
- uri = urllib.parse.urlparse(request.uri)
- if uri.scheme == b"matrix-federation":
- return self._proxy_resource
-
- return super().getResourceFor(request)
diff --git a/synapse/http/proxyagent.py b/synapse/http/proxyagent.py
index 1fa3adbef2..7bdc4acae7 100644
--- a/synapse/http/proxyagent.py
+++ b/synapse/http/proxyagent.py
@@ -12,9 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-import random
import re
-from typing import Any, Collection, Dict, List, Optional, Sequence, Tuple
+from typing import Any, Dict, Optional, Tuple
from urllib.parse import urlparse
from urllib.request import ( # type: ignore[attr-defined]
getproxies_environment,
@@ -25,12 +24,7 @@ from zope.interface import implementer
from twisted.internet import defer
from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
-from twisted.internet.interfaces import (
- IProtocol,
- IProtocolFactory,
- IReactorCore,
- IStreamClientEndpoint,
-)
+from twisted.internet.interfaces import IReactorCore, IStreamClientEndpoint
from twisted.python.failure import Failure
from twisted.web.client import (
URI,
@@ -42,10 +36,8 @@ from twisted.web.error import SchemeNotSupported
from twisted.web.http_headers import Headers
from twisted.web.iweb import IAgent, IBodyProducer, IPolicyForHTTPS, IResponse
-from synapse.config.workers import InstanceLocationConfig
from synapse.http import redact_uri
from synapse.http.connectproxyclient import HTTPConnectProxyEndpoint, ProxyCredentials
-from synapse.logging.context import run_in_background
logger = logging.getLogger(__name__)
@@ -82,10 +74,6 @@ class ProxyAgent(_AgentBase):
use_proxy: Whether proxy settings should be discovered and used
from conventional environment variables.
- federation_proxies: An optional list of locations to proxy outbound federation
- traffic through (only requests that use the `matrix-federation://` scheme
- will be proxied).
-
Raises:
ValueError if use_proxy is set and the environment variables
contain an invalid proxy specification.
@@ -101,7 +89,6 @@ class ProxyAgent(_AgentBase):
bindAddress: Optional[bytes] = None,
pool: Optional[HTTPConnectionPool] = None,
use_proxy: bool = False,
- federation_proxies: Collection[InstanceLocationConfig] = (),
):
contextFactory = contextFactory or BrowserLikePolicyForHTTPS()
@@ -140,27 +127,6 @@ class ProxyAgent(_AgentBase):
self._policy_for_https = contextFactory
self._reactor = reactor
- self._federation_proxy_endpoint: Optional[IStreamClientEndpoint] = None
- if federation_proxies:
- endpoints = []
- for federation_proxy in federation_proxies:
- endpoint = HostnameEndpoint(
- self.proxy_reactor,
- federation_proxy.host,
- federation_proxy.port,
- )
-
- if federation_proxy.tls:
- tls_connection_creator = self._policy_for_https.creatorForNetloc(
- federation_proxy.host,
- federation_proxy.port,
- )
- endpoint = wrapClientTLS(tls_connection_creator, endpoint)
-
- endpoints.append(endpoint)
-
- self._federation_proxy_endpoint = _ProxyEndpoints(endpoints)
-
def request(
self,
method: bytes,
@@ -248,14 +214,6 @@ class ProxyAgent(_AgentBase):
parsed_uri.port,
self.https_proxy_creds,
)
- elif (
- parsed_uri.scheme == b"matrix-federation"
- and self._federation_proxy_endpoint
- ):
- # Cache *all* connections under the same key, since we are only
- # connecting to a single destination, the proxy:
- endpoint = self._federation_proxy_endpoint
- request_path = uri
else:
# not using a proxy
endpoint = HostnameEndpoint(
@@ -275,11 +233,6 @@ class ProxyAgent(_AgentBase):
endpoint = wrapClientTLS(tls_connection_creator, endpoint)
elif parsed_uri.scheme == b"http":
pass
- elif (
- parsed_uri.scheme == b"matrix-federation"
- and self._federation_proxy_endpoint
- ):
- pass
else:
return defer.fail(
Failure(
@@ -384,31 +337,3 @@ def parse_proxy(
credentials = ProxyCredentials(b"".join([url.username, b":", url.password]))
return url.scheme, url.hostname, url.port or default_port, credentials
-
-
-@implementer(IStreamClientEndpoint)
-class _ProxyEndpoints:
- """An endpoint that randomly iterates through a given list of endpoints at
- each connection attempt.
- """
-
- def __init__(self, endpoints: Sequence[IStreamClientEndpoint]) -> None:
- assert endpoints
- self._endpoints = endpoints
-
- def connect(
- self, protocol_factory: IProtocolFactory
- ) -> "defer.Deferred[IProtocol]":
- """Implements IStreamClientEndpoint interface"""
-
- return run_in_background(self._do_connect, protocol_factory)
-
- async def _do_connect(self, protocol_factory: IProtocolFactory) -> IProtocol:
- failures: List[Failure] = []
- for endpoint in random.sample(self._endpoints, k=len(self._endpoints)):
- try:
- return await endpoint.connect(protocol_factory)
- except Exception:
- failures.append(Failure())
-
- failures.pop().raiseException()
diff --git a/synapse/http/server.py b/synapse/http/server.py
index ff3153a9d9..933172c873 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -18,7 +18,6 @@ import html
import logging
import types
import urllib
-import urllib.parse
from http import HTTPStatus
from http.client import FOUND
from inspect import isawaitable
@@ -66,6 +65,7 @@ from synapse.api.errors import (
UnrecognizedRequestError,
)
from synapse.config.homeserver import HomeServerConfig
+from synapse.http.site import SynapseRequest
from synapse.logging.context import defer_to_thread, preserve_fn, run_in_background
from synapse.logging.opentracing import active_span, start_active_span, trace_servlet
from synapse.util import json_encoder
@@ -76,7 +76,6 @@ from synapse.util.iterutils import chunk_seq
if TYPE_CHECKING:
import opentracing
- from synapse.http.site import SynapseRequest
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
@@ -103,7 +102,7 @@ HTTP_STATUS_REQUEST_CANCELLED = 499
def return_json_error(
- f: failure.Failure, request: "SynapseRequest", config: Optional[HomeServerConfig]
+ f: failure.Failure, request: SynapseRequest, config: Optional[HomeServerConfig]
) -> None:
"""Sends a JSON error response to clients."""
@@ -221,8 +220,8 @@ def return_html_error(
def wrap_async_request_handler(
- h: Callable[["_AsyncResource", "SynapseRequest"], Awaitable[None]]
-) -> Callable[["_AsyncResource", "SynapseRequest"], "defer.Deferred[None]"]:
+ h: Callable[["_AsyncResource", SynapseRequest], Awaitable[None]]
+) -> Callable[["_AsyncResource", SynapseRequest], "defer.Deferred[None]"]:
"""Wraps an async request handler so that it calls request.processing.
This helps ensure that work done by the request handler after the request is completed
@@ -236,7 +235,7 @@ def wrap_async_request_handler(
"""
async def wrapped_async_request_handler(
- self: "_AsyncResource", request: "SynapseRequest"
+ self: "_AsyncResource", request: SynapseRequest
) -> None:
with request.processing():
await h(self, request)
@@ -301,7 +300,7 @@ class _AsyncResource(resource.Resource, metaclass=abc.ABCMeta):
self._extract_context = extract_context
- def render(self, request: "SynapseRequest") -> int:
+ def render(self, request: SynapseRequest) -> int:
"""This gets called by twisted every time someone sends us a request."""
request.render_deferred = defer.ensureDeferred(
self._async_render_wrapper(request)
@@ -309,7 +308,7 @@ class _AsyncResource(resource.Resource, metaclass=abc.ABCMeta):
return NOT_DONE_YET
@wrap_async_request_handler
- async def _async_render_wrapper(self, request: "SynapseRequest") -> None:
+ async def _async_render_wrapper(self, request: SynapseRequest) -> None:
"""This is a wrapper that delegates to `_async_render` and handles
exceptions, return values, metrics, etc.
"""
@@ -327,15 +326,9 @@ class _AsyncResource(resource.Resource, metaclass=abc.ABCMeta):
# of our stack, and thus gives us a sensible stack
# trace.
f = failure.Failure()
- logger.exception(
- "Error handling request",
- exc_info=(f.type, f.value, f.getTracebackObject()),
- )
self._send_error_response(f, request)
- async def _async_render(
- self, request: "SynapseRequest"
- ) -> Optional[Tuple[int, Any]]:
+ async def _async_render(self, request: SynapseRequest) -> Optional[Tuple[int, Any]]:
"""Delegates to `_async_render_<METHOD>` methods, or returns a 400 if
no appropriate method exists. Can be overridden in sub classes for
different routing.
@@ -365,7 +358,7 @@ class _AsyncResource(resource.Resource, metaclass=abc.ABCMeta):
@abc.abstractmethod
def _send_response(
self,
- request: "SynapseRequest",
+ request: SynapseRequest,
code: int,
response_object: Any,
) -> None:
@@ -375,7 +368,7 @@ class _AsyncResource(resource.Resource, metaclass=abc.ABCMeta):
def _send_error_response(
self,
f: failure.Failure,
- request: "SynapseRequest",
+ request: SynapseRequest,
) -> None:
raise NotImplementedError()
@@ -391,7 +384,7 @@ class DirectServeJsonResource(_AsyncResource):
def _send_response(
self,
- request: "SynapseRequest",
+ request: SynapseRequest,
code: int,
response_object: Any,
) -> None:
@@ -408,7 +401,7 @@ class DirectServeJsonResource(_AsyncResource):
def _send_error_response(
self,
f: failure.Failure,
- request: "SynapseRequest",
+ request: SynapseRequest,
) -> None:
"""Implements _AsyncResource._send_error_response"""
return_json_error(f, request, None)
@@ -480,7 +473,7 @@ class JsonResource(DirectServeJsonResource):
)
def _get_handler_for_request(
- self, request: "SynapseRequest"
+ self, request: SynapseRequest
) -> Tuple[ServletCallback, str, Dict[str, str]]:
"""Finds a callback method to handle the given request.
@@ -510,7 +503,7 @@ class JsonResource(DirectServeJsonResource):
# Huh. No one wanted to handle that? Fiiiiiine.
raise UnrecognizedRequestError(code=404)
- async def _async_render(self, request: "SynapseRequest") -> Tuple[int, Any]:
+ async def _async_render(self, request: SynapseRequest) -> Tuple[int, Any]:
callback, servlet_classname, group_dict = self._get_handler_for_request(request)
request.is_render_cancellable = is_function_cancellable(callback)
@@ -542,7 +535,7 @@ class JsonResource(DirectServeJsonResource):
def _send_error_response(
self,
f: failure.Failure,
- request: "SynapseRequest",
+ request: SynapseRequest,
) -> None:
"""Implements _AsyncResource._send_error_response"""
return_json_error(f, request, self.hs.config)
@@ -558,7 +551,7 @@ class DirectServeHtmlResource(_AsyncResource):
def _send_response(
self,
- request: "SynapseRequest",
+ request: SynapseRequest,
code: int,
response_object: Any,
) -> None:
@@ -572,7 +565,7 @@ class DirectServeHtmlResource(_AsyncResource):
def _send_error_response(
self,
f: failure.Failure,
- request: "SynapseRequest",
+ request: SynapseRequest,
) -> None:
"""Implements _AsyncResource._send_error_response"""
return_html_error(f, request, self.ERROR_TEMPLATE)
@@ -599,7 +592,7 @@ class UnrecognizedRequestResource(resource.Resource):
errcode of M_UNRECOGNIZED.
"""
- def render(self, request: "SynapseRequest") -> int:
+ def render(self, request: SynapseRequest) -> int:
f = failure.Failure(UnrecognizedRequestError(code=404))
return_json_error(f, request, None)
# A response has already been sent but Twisted requires either NOT_DONE_YET
@@ -629,7 +622,7 @@ class RootRedirect(resource.Resource):
class OptionsResource(resource.Resource):
"""Responds to OPTION requests for itself and all children."""
- def render_OPTIONS(self, request: "SynapseRequest") -> bytes:
+ def render_OPTIONS(self, request: SynapseRequest) -> bytes:
request.setResponseCode(204)
request.setHeader(b"Content-Length", b"0")
@@ -744,7 +737,7 @@ def _encode_json_bytes(json_object: object) -> bytes:
def respond_with_json(
- request: "SynapseRequest",
+ request: SynapseRequest,
code: int,
json_object: Any,
send_cors: bool = False,
@@ -794,7 +787,7 @@ def respond_with_json(
def respond_with_json_bytes(
- request: "SynapseRequest",
+ request: SynapseRequest,
code: int,
json_bytes: bytes,
send_cors: bool = False,
@@ -832,7 +825,7 @@ def respond_with_json_bytes(
async def _async_write_json_to_request_in_thread(
- request: "SynapseRequest",
+ request: SynapseRequest,
json_encoder: Callable[[Any], bytes],
json_object: Any,
) -> None:
@@ -890,7 +883,7 @@ def _write_bytes_to_request(request: Request, bytes_to_write: bytes) -> None:
_ByteProducer(request, bytes_generator)
-def set_cors_headers(request: "SynapseRequest") -> None:
+def set_cors_headers(request: SynapseRequest) -> None:
"""Set the CORS headers so that javascript running in a web browsers can
use this API
@@ -988,7 +981,7 @@ def set_clickjacking_protection_headers(request: Request) -> None:
def respond_with_redirect(
- request: "SynapseRequest", url: bytes, statusCode: int = FOUND, cors: bool = False
+ request: SynapseRequest, url: bytes, statusCode: int = FOUND, cors: bool = False
) -> None:
"""
Write a 302 (or other specified status code) response to the request, if it is still alive.
diff --git a/synapse/http/site.py b/synapse/http/site.py
index 0ee2598345..5b5a7c1e59 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -21,28 +21,25 @@ from zope.interface import implementer
from twisted.internet.address import UNIXAddress
from twisted.internet.defer import Deferred
-from twisted.internet.interfaces import IAddress
+from twisted.internet.interfaces import IAddress, IReactorTime
from twisted.python.failure import Failure
from twisted.web.http import HTTPChannel
-from twisted.web.iweb import IAgent
from twisted.web.resource import IResource, Resource
-from twisted.web.server import Request
+from twisted.web.server import Request, Site
from synapse.config.server import ListenerConfig
from synapse.http import get_request_user_agent, redact_uri
-from synapse.http.proxy import ProxySite
from synapse.http.request_metrics import RequestMetrics, requests_counter
from synapse.logging.context import (
ContextRequest,
LoggingContext,
PreserveLoggingContext,
)
-from synapse.types import ISynapseReactor, Requester
+from synapse.types import Requester
if TYPE_CHECKING:
import opentracing
-
logger = logging.getLogger(__name__)
_next_request_seq = 0
@@ -105,7 +102,7 @@ class SynapseRequest(Request):
# A boolean indicating whether `render_deferred` should be cancelled if the
# client disconnects early. Expected to be set by the coroutine started by
# `Resource.render`, if rendering is asynchronous.
- self.is_render_cancellable: bool = False
+ self.is_render_cancellable = False
global _next_request_seq
self.request_seq = _next_request_seq
@@ -604,7 +601,7 @@ class _XForwardedForAddress:
host: str
-class SynapseSite(ProxySite):
+class SynapseSite(Site):
"""
Synapse-specific twisted http Site
@@ -626,8 +623,7 @@ class SynapseSite(ProxySite):
resource: IResource,
server_version_string: str,
max_request_body_size: int,
- reactor: ISynapseReactor,
- federation_agent: IAgent,
+ reactor: IReactorTime,
):
"""
@@ -642,11 +638,7 @@ class SynapseSite(ProxySite):
dropping the connection
reactor: reactor to be used to manage connection timeouts
"""
- super().__init__(
- resource=resource,
- reactor=reactor,
- federation_agent=federation_agent,
- )
+ Site.__init__(self, resource, reactor=reactor)
self.site_tag = site_tag
self.reactor = reactor
@@ -657,9 +649,7 @@ class SynapseSite(ProxySite):
request_id_header = config.http_options.request_id_header
- self.experimental_cors_msc3886: bool = (
- config.http_options.experimental_cors_msc3886
- )
+ self.experimental_cors_msc3886 = config.http_options.experimental_cors_msc3886
def request_factory(channel: HTTPChannel, queued: bool) -> Request:
return request_class(
diff --git a/tests/app/test_openid_listener.py b/tests/app/test_openid_listener.py
index 21c5309740..5a965f233b 100644
--- a/tests/app/test_openid_listener.py
+++ b/tests/app/test_openid_listener.py
@@ -31,7 +31,9 @@ from tests.unittest import HomeserverTestCase
class FederationReaderOpenIDListenerTests(HomeserverTestCase):
def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
- hs = self.setup_test_homeserver(homeserver_to_use=GenericWorkerServer)
+ hs = self.setup_test_homeserver(
+ federation_http_client=None, homeserver_to_use=GenericWorkerServer
+ )
return hs
def default_config(self) -> JsonDict:
@@ -89,7 +91,9 @@ class FederationReaderOpenIDListenerTests(HomeserverTestCase):
@patch("synapse.app.homeserver.KeyResource", new=Mock())
class SynapseHomeserverOpenIDListenerTests(HomeserverTestCase):
def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
- hs = self.setup_test_homeserver(homeserver_to_use=SynapseHomeServer)
+ hs = self.setup_test_homeserver(
+ federation_http_client=None, homeserver_to_use=SynapseHomeServer
+ )
return hs
@parameterized.expand(
diff --git a/tests/handlers/test_device.py b/tests/handlers/test_device.py
index 66215af2b8..ee48f9e546 100644
--- a/tests/handlers/test_device.py
+++ b/tests/handlers/test_device.py
@@ -41,6 +41,7 @@ class DeviceTestCase(unittest.HomeserverTestCase):
self.appservice_api = mock.Mock()
hs = self.setup_test_homeserver(
"server",
+ federation_http_client=None,
application_service_api=self.appservice_api,
)
handler = hs.get_device_handler()
@@ -400,7 +401,7 @@ class DeviceTestCase(unittest.HomeserverTestCase):
class DehydrationTestCase(unittest.HomeserverTestCase):
def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
- hs = self.setup_test_homeserver("server")
+ hs = self.setup_test_homeserver("server", federation_http_client=None)
handler = hs.get_device_handler()
assert isinstance(handler, DeviceHandler)
self.handler = handler
diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py
index 5f11d5df11..bf0862ed54 100644
--- a/tests/handlers/test_federation.py
+++ b/tests/handlers/test_federation.py
@@ -57,7 +57,7 @@ class FederationTestCase(unittest.FederatingHomeserverTestCase):
]
def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
- hs = self.setup_test_homeserver()
+ hs = self.setup_test_homeserver(federation_http_client=None)
self.handler = hs.get_federation_handler()
self.store = hs.get_datastores().main
return hs
diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py
index fd66d573d2..19f5322317 100644
--- a/tests/handlers/test_presence.py
+++ b/tests/handlers/test_presence.py
@@ -993,6 +993,7 @@ class PresenceJoinTestCase(unittest.HomeserverTestCase):
def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
hs = self.setup_test_homeserver(
"server",
+ federation_http_client=None,
federation_sender=Mock(spec=FederationSender),
)
return hs
diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py
index 5da1d95f0b..94518a7196 100644
--- a/tests/handlers/test_typing.py
+++ b/tests/handlers/test_typing.py
@@ -17,8 +17,6 @@ import json
from typing import Dict, List, Set
from unittest.mock import ANY, Mock, call
-from netaddr import IPSet
-
from twisted.test.proto_helpers import MemoryReactor
from twisted.web.resource import Resource
@@ -26,7 +24,6 @@ from synapse.api.constants import EduTypes
from synapse.api.errors import AuthError
from synapse.federation.transport.server import TransportLayerServer
from synapse.handlers.typing import TypingWriterHandler
-from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
from synapse.server import HomeServer
from synapse.types import JsonDict, Requester, UserID, create_requester
from synapse.util import Clock
@@ -79,13 +76,6 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase):
# we mock out the federation client too
self.mock_federation_client = Mock(spec=["put_json"])
self.mock_federation_client.put_json.return_value = make_awaitable((200, "OK"))
- self.mock_federation_client.agent = MatrixFederationAgent(
- reactor,
- tls_client_options_factory=None,
- user_agent=b"SynapseInTrialTest/0.0.0",
- ip_allowlist=None,
- ip_blocklist=IPSet(),
- )
# the tests assume that we are starting at unix time 1000
reactor.pump((1000,))
diff --git a/tests/http/test_matrixfederationclient.py b/tests/http/test_matrixfederationclient.py
index a8b9737d1f..b5f4a60fe5 100644
--- a/tests/http/test_matrixfederationclient.py
+++ b/tests/http/test_matrixfederationclient.py
@@ -11,8 +11,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from typing import Any, Dict, Generator
-from unittest.mock import ANY, Mock, create_autospec
+from typing import Generator
+from unittest.mock import Mock
from netaddr import IPSet
from parameterized import parameterized
@@ -21,11 +21,10 @@ from twisted.internet import defer
from twisted.internet.defer import Deferred, TimeoutError
from twisted.internet.error import ConnectingCancelledError, DNSLookupError
from twisted.test.proto_helpers import MemoryReactor, StringTransport
-from twisted.web.client import Agent, ResponseNeverReceived
+from twisted.web.client import ResponseNeverReceived
from twisted.web.http import HTTPChannel
-from twisted.web.http_headers import Headers
-from synapse.api.errors import HttpResponseException, RequestSendFailed
+from synapse.api.errors import RequestSendFailed
from synapse.http.matrixfederationclient import (
ByteParser,
MatrixFederationHttpClient,
@@ -40,9 +39,7 @@ from synapse.logging.context import (
from synapse.server import HomeServer
from synapse.util import Clock
-from tests.replication._base import BaseMultiWorkerStreamTestCase
from tests.server import FakeTransport
-from tests.test_utils import FakeResponse
from tests.unittest import HomeserverTestCase, override_config
@@ -661,181 +658,3 @@ class FederationClientTests(HomeserverTestCase):
self.assertEqual(self.cl.max_short_retry_delay_seconds, 7)
self.assertEqual(self.cl.max_long_retries, 20)
self.assertEqual(self.cl.max_short_retries, 5)
-
-
-class FederationClientProxyTests(BaseMultiWorkerStreamTestCase):
- def default_config(self) -> Dict[str, Any]:
- conf = super().default_config()
- conf["instance_map"] = {
- "main": {"host": "testserv", "port": 8765},
- "federation_sender": {"host": "testserv", "port": 1001},
- }
- return conf
-
- @override_config({"outbound_federation_restricted_to": ["federation_sender"]})
- def test_proxy_requests_through_federation_sender_worker(self) -> None:
- """
- Test that all outbound federation requests go through the `federation_sender`
- worker
- """
- # Mock out the `MatrixFederationHttpClient` of the `federation_sender` instance
- # so we can act like some remote server responding to requests
- mock_client_on_federation_sender = Mock()
- mock_agent_on_federation_sender = create_autospec(Agent, spec_set=True)
- mock_client_on_federation_sender.agent = mock_agent_on_federation_sender
-
- # Create the `federation_sender` worker
- self.federation_sender = self.make_worker_hs(
- "synapse.app.generic_worker",
- {"worker_name": "federation_sender"},
- federation_http_client=mock_client_on_federation_sender,
- )
-
- # Fake `remoteserv:8008` responding to requests
- mock_agent_on_federation_sender.request.side_effect = (
- lambda *args, **kwargs: defer.succeed(
- FakeResponse.json(
- payload={
- "foo": "bar",
- }
- )
- )
- )
-
- # This federation request from the main process should be proxied through the
- # `federation_sender` worker off to the remote server
- test_request_from_main_process_d = defer.ensureDeferred(
- self.hs.get_federation_http_client().get_json("remoteserv:8008", "foo/bar")
- )
-
- # Pump the reactor so our deferred goes through the motions
- self.pump()
-
- # Make sure that the request was proxied through the `federation_sender` worker
- mock_agent_on_federation_sender.request.assert_called_once_with(
- b"GET",
- b"matrix-federation://remoteserv:8008/foo/bar",
- headers=ANY,
- bodyProducer=ANY,
- )
-
- # Make sure the response is as expected back on the main worker
- res = self.successResultOf(test_request_from_main_process_d)
- self.assertEqual(res, {"foo": "bar"})
-
- @override_config({"outbound_federation_restricted_to": ["federation_sender"]})
- def test_proxy_request_with_network_error_through_federation_sender_worker(
- self,
- ) -> None:
- """
- Test that when the outbound federation request fails with a network related
- error, a sensible error makes its way back to the main process.
- """
- # Mock out the `MatrixFederationHttpClient` of the `federation_sender` instance
- # so we can act like some remote server responding to requests
- mock_client_on_federation_sender = Mock()
- mock_agent_on_federation_sender = create_autospec(Agent, spec_set=True)
- mock_client_on_federation_sender.agent = mock_agent_on_federation_sender
-
- # Create the `federation_sender` worker
- self.federation_sender = self.make_worker_hs(
- "synapse.app.generic_worker",
- {"worker_name": "federation_sender"},
- federation_http_client=mock_client_on_federation_sender,
- )
-
- # Fake `remoteserv:8008` responding to requests
- mock_agent_on_federation_sender.request.side_effect = (
- lambda *args, **kwargs: defer.fail(ResponseNeverReceived("fake error"))
- )
-
- # This federation request from the main process should be proxied through the
- # `federation_sender` worker off to the remote server
- test_request_from_main_process_d = defer.ensureDeferred(
- self.hs.get_federation_http_client().get_json("remoteserv:8008", "foo/bar")
- )
-
- # Pump the reactor so our deferred goes through the motions. We pump with 10
- # seconds (0.1 * 100) so the `MatrixFederationHttpClient` runs out of retries
- # and finally passes along the error response.
- self.pump(0.1)
-
- # Make sure that the request was proxied through the `federation_sender` worker
- mock_agent_on_federation_sender.request.assert_called_with(
- b"GET",
- b"matrix-federation://remoteserv:8008/foo/bar",
- headers=ANY,
- bodyProducer=ANY,
- )
-
- # Make sure we get some sort of error back on the main worker
- failure_res = self.failureResultOf(test_request_from_main_process_d)
- self.assertIsInstance(failure_res.value, RequestSendFailed)
- self.assertIsInstance(failure_res.value.inner_exception, HttpResponseException)
-
- @override_config({"outbound_federation_restricted_to": ["federation_sender"]})
- def test_proxy_requests_and_discards_hop_by_hop_headers(self) -> None:
- """
- Test to make sure hop-by-hop headers and addional headers defined in the
- `Connection` header are discarded when proxying requests
- """
- # Mock out the `MatrixFederationHttpClient` of the `federation_sender` instance
- # so we can act like some remote server responding to requests
- mock_client_on_federation_sender = Mock()
- mock_agent_on_federation_sender = create_autospec(Agent, spec_set=True)
- mock_client_on_federation_sender.agent = mock_agent_on_federation_sender
-
- # Create the `federation_sender` worker
- self.federation_sender = self.make_worker_hs(
- "synapse.app.generic_worker",
- {"worker_name": "federation_sender"},
- federation_http_client=mock_client_on_federation_sender,
- )
-
- # Fake `remoteserv:8008` responding to requests
- mock_agent_on_federation_sender.request.side_effect = lambda *args, **kwargs: defer.succeed(
- FakeResponse(
- code=200,
- body=b'{"foo": "bar"}',
- headers=Headers(
- {
- "Content-Type": ["application/json"],
- "Connection": ["close, X-Foo, X-Bar"],
- # Should be removed because it's defined in the `Connection` header
- "X-Foo": ["foo"],
- "X-Bar": ["bar"],
- # Should be removed because it's a hop-by-hop header
- "Proxy-Authorization": "abcdef",
- }
- ),
- )
- )
-
- # This federation request from the main process should be proxied through the
- # `federation_sender` worker off to the remote server
- test_request_from_main_process_d = defer.ensureDeferred(
- self.hs.get_federation_http_client().get_json_with_headers(
- "remoteserv:8008", "foo/bar"
- )
- )
-
- # Pump the reactor so our deferred goes through the motions
- self.pump()
-
- # Make sure that the request was proxied through the `federation_sender` worker
- mock_agent_on_federation_sender.request.assert_called_once_with(
- b"GET",
- b"matrix-federation://remoteserv:8008/foo/bar",
- headers=ANY,
- bodyProducer=ANY,
- )
-
- res, headers = self.successResultOf(test_request_from_main_process_d)
- header_names = set(headers.keys())
-
- # Make sure the response does not include the hop-by-hop headers
- self.assertNotIn(b"X-Foo", header_names)
- self.assertNotIn(b"X-Bar", header_names)
- self.assertNotIn(b"Proxy-Authorization", header_names)
- # Make sure the response is as expected back on the main worker
- self.assertEqual(res, {"foo": "bar"})
diff --git a/tests/http/test_proxy.py b/tests/http/test_proxy.py
deleted file mode 100644
index 0dc9ba8e05..0000000000
--- a/tests/http/test_proxy.py
+++ /dev/null
@@ -1,53 +0,0 @@
-# Copyright 2023 The Matrix.org Foundation C.I.C.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-from typing import Set
-
-from parameterized import parameterized
-
-from synapse.http.proxy import parse_connection_header_value
-
-from tests.unittest import TestCase
-
-
-class ProxyTests(TestCase):
- @parameterized.expand(
- [
- [b"close, X-Foo, X-Bar", {"Close", "X-Foo", "X-Bar"}],
- # No whitespace
- [b"close,X-Foo,X-Bar", {"Close", "X-Foo", "X-Bar"}],
- # More whitespace
- [b"close, X-Foo, X-Bar", {"Close", "X-Foo", "X-Bar"}],
- # "close" directive in not the first position
- [b"X-Foo, X-Bar, close", {"X-Foo", "X-Bar", "Close"}],
- # Normalizes header capitalization
- [b"keep-alive, x-fOo, x-bAr", {"Keep-Alive", "X-Foo", "X-Bar"}],
- # Handles header names with whitespace
- [
- b"keep-alive, x foo, x bar",
- {"Keep-Alive", "X foo", "X bar"},
- ],
- ]
- )
- def test_parse_connection_header_value(
- self,
- connection_header_value: bytes,
- expected_extra_headers_to_remove: Set[str],
- ) -> None:
- """
- Tests that the connection header value is parsed correctly
- """
- self.assertEqual(
- expected_extra_headers_to_remove,
- parse_connection_header_value(connection_header_value),
- )
diff --git a/tests/replication/_base.py b/tests/replication/_base.py
index 96badc46b0..eb9b1f1cd9 100644
--- a/tests/replication/_base.py
+++ b/tests/replication/_base.py
@@ -69,10 +69,10 @@ class BaseStreamTestCase(unittest.HomeserverTestCase):
# Make a new HomeServer object for the worker
self.reactor.lookups["testserv"] = "1.2.3.4"
self.worker_hs = self.setup_test_homeserver(
+ federation_http_client=None,
homeserver_to_use=GenericWorkerServer,
config=self._get_worker_hs_config(),
reactor=self.reactor,
- federation_http_client=None,
)
# Since we use sqlite in memory databases we need to make sure the
@@ -380,7 +380,6 @@ class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase):
server_version_string="1",
max_request_body_size=8192,
reactor=self.reactor,
- federation_agent=worker_hs.get_federation_http_client().agent,
)
worker_hs.get_replication_command_handler().start_replication(worker_hs)
diff --git a/tests/replication/test_federation_sender_shard.py b/tests/replication/test_federation_sender_shard.py
index a324b4d31d..08703206a9 100644
--- a/tests/replication/test_federation_sender_shard.py
+++ b/tests/replication/test_federation_sender_shard.py
@@ -14,18 +14,14 @@
import logging
from unittest.mock import Mock
-from netaddr import IPSet
-
from synapse.api.constants import EventTypes, Membership
from synapse.events.builder import EventBuilderFactory
from synapse.handlers.typing import TypingWriterHandler
-from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
from synapse.rest.admin import register_servlets_for_client_rest_resource
from synapse.rest.client import login, room
from synapse.types import UserID, create_requester
from tests.replication._base import BaseMultiWorkerStreamTestCase
-from tests.server import get_clock
from tests.test_utils import make_awaitable
logger = logging.getLogger(__name__)
@@ -45,25 +41,13 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase):
room.register_servlets,
]
- def setUp(self) -> None:
- super().setUp()
-
- reactor, _ = get_clock()
- self.matrix_federation_agent = MatrixFederationAgent(
- reactor,
- tls_client_options_factory=None,
- user_agent=b"SynapseInTrialTest/0.0.0",
- ip_allowlist=None,
- ip_blocklist=IPSet(),
- )
-
def test_send_event_single_sender(self) -> None:
"""Test that using a single federation sender worker correctly sends a
new event.
"""
mock_client = Mock(spec=["put_json"])
mock_client.put_json.return_value = make_awaitable({})
- mock_client.agent = self.matrix_federation_agent
+
self.make_worker_hs(
"synapse.app.generic_worker",
{
@@ -94,7 +78,6 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase):
"""
mock_client1 = Mock(spec=["put_json"])
mock_client1.put_json.return_value = make_awaitable({})
- mock_client1.agent = self.matrix_federation_agent
self.make_worker_hs(
"synapse.app.generic_worker",
{
@@ -109,7 +92,6 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase):
mock_client2 = Mock(spec=["put_json"])
mock_client2.put_json.return_value = make_awaitable({})
- mock_client2.agent = self.matrix_federation_agent
self.make_worker_hs(
"synapse.app.generic_worker",
{
@@ -163,7 +145,6 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase):
"""
mock_client1 = Mock(spec=["put_json"])
mock_client1.put_json.return_value = make_awaitable({})
- mock_client1.agent = self.matrix_federation_agent
self.make_worker_hs(
"synapse.app.generic_worker",
{
@@ -178,7 +159,6 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase):
mock_client2 = Mock(spec=["put_json"])
mock_client2.put_json.return_value = make_awaitable({})
- mock_client2.agent = self.matrix_federation_agent
self.make_worker_hs(
"synapse.app.generic_worker",
{
diff --git a/tests/rest/client/test_presence.py b/tests/rest/client/test_presence.py
index e12098102b..dcbb125a3b 100644
--- a/tests/rest/client/test_presence.py
+++ b/tests/rest/client/test_presence.py
@@ -40,6 +40,7 @@ class PresenceTestCase(unittest.HomeserverTestCase):
hs = self.setup_test_homeserver(
"red",
+ federation_http_client=None,
federation_client=Mock(),
presence_handler=self.presence_handler,
)
diff --git a/tests/rest/client/test_rooms.py b/tests/rest/client/test_rooms.py
index d013e75d55..f1b4e1ad2f 100644
--- a/tests/rest/client/test_rooms.py
+++ b/tests/rest/client/test_rooms.py
@@ -67,6 +67,8 @@ class RoomBase(unittest.HomeserverTestCase):
def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
self.hs = self.setup_test_homeserver(
"red",
+ federation_http_client=None,
+ federation_client=Mock(),
)
self.hs.get_federation_handler = Mock() # type: ignore[assignment]
diff --git a/tests/storage/test_e2e_room_keys.py b/tests/storage/test_e2e_room_keys.py
index f6df31aba4..9cb326d90a 100644
--- a/tests/storage/test_e2e_room_keys.py
+++ b/tests/storage/test_e2e_room_keys.py
@@ -31,7 +31,7 @@ room_key: RoomKey = {
class E2eRoomKeysHandlerTestCase(unittest.HomeserverTestCase):
def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
- hs = self.setup_test_homeserver("server")
+ hs = self.setup_test_homeserver("server", federation_http_client=None)
self.store = hs.get_datastores().main
return hs
diff --git a/tests/storage/test_purge.py b/tests/storage/test_purge.py
index 0282673167..857e2caf2e 100644
--- a/tests/storage/test_purge.py
+++ b/tests/storage/test_purge.py
@@ -27,7 +27,7 @@ class PurgeTests(HomeserverTestCase):
servlets = [room.register_servlets]
def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
- hs = self.setup_test_homeserver("server")
+ hs = self.setup_test_homeserver("server", federation_http_client=None)
return hs
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
diff --git a/tests/storage/test_rollback_worker.py b/tests/storage/test_rollback_worker.py
index 809c9f175d..6861d3a6c9 100644
--- a/tests/storage/test_rollback_worker.py
+++ b/tests/storage/test_rollback_worker.py
@@ -45,7 +45,9 @@ def fake_listdir(filepath: str) -> List[str]:
class WorkerSchemaTests(HomeserverTestCase):
def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
- hs = self.setup_test_homeserver(homeserver_to_use=GenericWorkerServer)
+ hs = self.setup_test_homeserver(
+ federation_http_client=None, homeserver_to_use=GenericWorkerServer
+ )
return hs
def default_config(self) -> JsonDict:
diff --git a/tests/test_server.py b/tests/test_server.py
index fe5afebdcd..e266c06a2c 100644
--- a/tests/test_server.py
+++ b/tests/test_server.py
@@ -38,7 +38,7 @@ from tests.http.server._base import test_disconnect
from tests.server import (
FakeChannel,
FakeSite,
- get_clock,
+ ThreadedMemoryReactorClock,
make_request,
setup_test_homeserver,
)
@@ -46,11 +46,12 @@ from tests.server import (
class JsonResourceTests(unittest.TestCase):
def setUp(self) -> None:
- reactor, clock = get_clock()
- self.reactor = reactor
+ self.reactor = ThreadedMemoryReactorClock()
+ self.hs_clock = Clock(self.reactor)
self.homeserver = setup_test_homeserver(
self.addCleanup,
- clock=clock,
+ federation_http_client=None,
+ clock=self.hs_clock,
reactor=self.reactor,
)
@@ -208,13 +209,7 @@ class JsonResourceTests(unittest.TestCase):
class OptionsResourceTests(unittest.TestCase):
def setUp(self) -> None:
- reactor, clock = get_clock()
- self.reactor = reactor
- self.homeserver = setup_test_homeserver(
- self.addCleanup,
- clock=clock,
- reactor=self.reactor,
- )
+ self.reactor = ThreadedMemoryReactorClock()
class DummyResource(Resource):
isLeaf = True
@@ -247,7 +242,6 @@ class OptionsResourceTests(unittest.TestCase):
"1.0",
max_request_body_size=4096,
reactor=self.reactor,
- federation_agent=self.homeserver.get_federation_http_client().agent,
)
# render the request and return the channel
@@ -350,8 +344,7 @@ class WrapHtmlRequestHandlerTests(unittest.TestCase):
await self.callback(request)
def setUp(self) -> None:
- reactor, _ = get_clock()
- self.reactor = reactor
+ self.reactor = ThreadedMemoryReactorClock()
def test_good_response(self) -> None:
async def callback(request: SynapseRequest) -> None:
@@ -469,9 +462,9 @@ class DirectServeJsonResourceCancellationTests(unittest.TestCase):
"""Tests for `DirectServeJsonResource` cancellation."""
def setUp(self) -> None:
- reactor, clock = get_clock()
- self.reactor = reactor
- self.resource = CancellableDirectServeJsonResource(clock)
+ self.reactor = ThreadedMemoryReactorClock()
+ self.clock = Clock(self.reactor)
+ self.resource = CancellableDirectServeJsonResource(self.clock)
self.site = FakeSite(self.resource, self.reactor)
def test_cancellable_disconnect(self) -> None:
@@ -503,9 +496,9 @@ class DirectServeHtmlResourceCancellationTests(unittest.TestCase):
"""Tests for `DirectServeHtmlResource` cancellation."""
def setUp(self) -> None:
- reactor, clock = get_clock()
- self.reactor = reactor
- self.resource = CancellableDirectServeHtmlResource(clock)
+ self.reactor = ThreadedMemoryReactorClock()
+ self.clock = Clock(self.reactor)
+ self.resource = CancellableDirectServeHtmlResource(self.clock)
self.site = FakeSite(self.resource, self.reactor)
def test_cancellable_disconnect(self) -> None:
diff --git a/tests/unittest.py b/tests/unittest.py
index 334a95a917..c73195b32b 100644
--- a/tests/unittest.py
+++ b/tests/unittest.py
@@ -358,7 +358,6 @@ class HomeserverTestCase(TestCase):
server_version_string="1",
max_request_body_size=4096,
reactor=self.reactor,
- federation_agent=self.hs.get_federation_http_client().agent,
)
from tests.rest.client.utils import RestHelper
|