diff --git a/synapse/http/replicationagent.py b/synapse/http/replicationagent.py
index d6ba6f0e57..3ba2f22dfd 100644
--- a/synapse/http/replicationagent.py
+++ b/synapse/http/replicationagent.py
@@ -18,7 +18,11 @@ from typing import Dict, Optional
from zope.interface import implementer
from twisted.internet import defer
-from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
+from twisted.internet.endpoints import (
+ HostnameEndpoint,
+ UNIXClientEndpoint,
+ wrapClientTLS,
+)
from twisted.internet.interfaces import IStreamClientEndpoint
from twisted.python.failure import Failure
from twisted.web.client import URI, HTTPConnectionPool, _AgentBase
@@ -32,7 +36,11 @@ from twisted.web.iweb import (
IResponse,
)
-from synapse.config.workers import InstanceLocationConfig
+from synapse.config.workers import (
+ InstanceLocationConfig,
+ InstanceTcpLocationConfig,
+ InstanceUnixLocationConfig,
+)
from synapse.types import ISynapseReactor
logger = logging.getLogger(__name__)
@@ -40,7 +48,7 @@ logger = logging.getLogger(__name__)
@implementer(IAgentEndpointFactory)
class ReplicationEndpointFactory:
- """Connect to a given TCP socket"""
+ """Connect to a given TCP or UNIX socket"""
def __init__(
self,
@@ -64,24 +72,27 @@ class ReplicationEndpointFactory:
# The given URI has a special scheme and includes the worker name. The
# actual connection details are pulled from the instance map.
worker_name = uri.netloc.decode("utf-8")
- scheme = self.instance_map[worker_name].scheme()
+ location_config = self.instance_map[worker_name]
+ scheme = location_config.scheme()
- if scheme in ("http", "https"):
+ if isinstance(location_config, InstanceTcpLocationConfig):
endpoint = HostnameEndpoint(
self.reactor,
- self.instance_map[worker_name].host,
- self.instance_map[worker_name].port,
+ location_config.host,
+ location_config.port,
)
if scheme == "https":
endpoint = wrapClientTLS(
# The 'port' argument below isn't actually used by the function
self.context_factory.creatorForNetloc(
- self.instance_map[worker_name].host.encode("utf-8"),
- self.instance_map[worker_name].port,
+ location_config.host.encode("utf-8"),
+ location_config.port,
),
endpoint,
)
return endpoint
+ elif isinstance(location_config, InstanceUnixLocationConfig):
+ return UNIXClientEndpoint(self.reactor, location_config.path)
else:
raise SchemeNotSupported(f"Unsupported scheme: {scheme}")
@@ -138,13 +149,16 @@ class ReplicationAgent(_AgentBase):
An existing connection from the connection pool may be used or a new
one may be created.
- Currently, HTTP and HTTPS schemes are supported in uri.
+ Currently, HTTP, HTTPS and UNIX schemes are supported in uri.
This is copied from twisted.web.client.Agent, except:
- * It uses a different pool key (combining the host & port).
- * It does not call _ensureValidURI(...) since it breaks on some
- UNIX paths.
+ * It uses a different pool key (combining the scheme with either host & port or
+ socket path).
+ * It does not call _ensureValidURI(...) as the strictness of IDNA2008 is not
+ required when using a worker's name as a 'hostname' for Synapse HTTP
+ Replication machinery. Specifically, this allows a range of ascii characters
+ such as '+' and '_' in hostnames/worker's names.
See: twisted.web.iweb.IAgent.request
"""
@@ -154,9 +168,12 @@ class ReplicationAgent(_AgentBase):
except SchemeNotSupported:
return defer.fail(Failure())
+ worker_name = parsedURI.netloc.decode("utf-8")
+ key_scheme = self._endpointFactory.instance_map[worker_name].scheme()
+ key_netloc = self._endpointFactory.instance_map[worker_name].netloc()
# This sets the Pool key to be:
- # (http(s), <host:ip>)
- key = (parsedURI.scheme, parsedURI.netloc)
+ # (http(s), <host:port>) or (unix, <socket_path>)
+ key = (key_scheme, key_netloc)
# _requestWithEndpoint comes from _AgentBase class
return self._requestWithEndpoint(
|