diff options
author | Jason Little <realtyem@gmail.com> | 2023-05-23 08:05:30 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-05-23 09:05:30 -0400 |
commit | 1df0221bda65cc90ee3a15d210b87e8065bc865f (patch) | |
tree | 8ca59fd66049946e8c0b82ff7c91d8987a21e534 /synapse/http | |
parent | Merge branch 'master' into develop (diff) | |
download | synapse-1df0221bda65cc90ee3a15d210b87e8065bc865f.tar.xz |
Use a custom scheme & the worker name for replication requests. (#15578)
All the information needed is already in the `instance_map`, so use that instead of passing the hostname / IP & port manually for each replication request. This consolidates logic for future improvements of using e.g. UNIX sockets for workers.
Diffstat (limited to 'synapse/http')
-rw-r--r-- | synapse/http/client.py | 1 | ||||
-rw-r--r-- | synapse/http/replicationagent.py | 34 |
2 files changed, 28 insertions, 7 deletions
diff --git a/synapse/http/client.py b/synapse/http/client.py index f1ab7a8bc9..09ea93e10d 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -835,6 +835,7 @@ class ReplicationClient(BaseHttpClient): self.agent: IAgent = ReplicationAgent( hs.get_reactor(), + hs.config.worker.instance_map, contextFactory=hs.get_http_client_context_factory(), pool=pool, ) diff --git a/synapse/http/replicationagent.py b/synapse/http/replicationagent.py index 5ecd08be0f..800f21873d 100644 --- a/synapse/http/replicationagent.py +++ b/synapse/http/replicationagent.py @@ -13,7 +13,7 @@ # limitations under the License. import logging -from typing import Optional +from typing import Dict, Optional from zope.interface import implementer @@ -32,6 +32,7 @@ from twisted.web.iweb import ( IResponse, ) +from synapse.config.workers import InstanceLocationConfig from synapse.types import ISynapseReactor logger = logging.getLogger(__name__) @@ -44,9 +45,11 @@ class ReplicationEndpointFactory: def __init__( self, reactor: ISynapseReactor, + instance_map: Dict[str, InstanceLocationConfig], context_factory: IPolicyForHTTPS, ) -> None: self.reactor = reactor + self.instance_map = instance_map self.context_factory = context_factory def endpointForURI(self, uri: URI) -> IStreamClientEndpoint: @@ -58,15 +61,29 @@ class ReplicationEndpointFactory: Returns: The correct client endpoint object """ - if uri.scheme in (b"http", b"https"): - endpoint = HostnameEndpoint(self.reactor, uri.host, uri.port) - if uri.scheme == b"https": + # The given URI has a special scheme and includes the worker name. The + # actual connection details are pulled from the instance map. + worker_name = uri.netloc.decode("utf-8") + scheme = self.instance_map[worker_name].scheme() + + if scheme in ("http", "https"): + endpoint = HostnameEndpoint( + self.reactor, + self.instance_map[worker_name].host, + self.instance_map[worker_name].port, + ) + if scheme == "https": endpoint = wrapClientTLS( - self.context_factory.creatorForNetloc(uri.host, uri.port), endpoint + # The 'port' argument below isn't actually used by the function + self.context_factory.creatorForNetloc( + self.instance_map[worker_name].host, + self.instance_map[worker_name].port, + ), + endpoint, ) return endpoint else: - raise SchemeNotSupported(f"Unsupported scheme: {uri.scheme!r}") + raise SchemeNotSupported(f"Unsupported scheme: {scheme}") @implementer(IAgent) @@ -80,6 +97,7 @@ class ReplicationAgent(_AgentBase): def __init__( self, reactor: ISynapseReactor, + instance_map: Dict[str, InstanceLocationConfig], contextFactory: IPolicyForHTTPS, connectTimeout: Optional[float] = None, bindAddress: Optional[bytes] = None, @@ -102,7 +120,9 @@ class ReplicationAgent(_AgentBase): created. """ _AgentBase.__init__(self, reactor, pool) - endpoint_factory = ReplicationEndpointFactory(reactor, contextFactory) + endpoint_factory = ReplicationEndpointFactory( + reactor, instance_map, contextFactory + ) self._endpointFactory = endpoint_factory def request( |