diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index ccfe75eaf3..e55ca12a36 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -94,7 +94,7 @@ class ConfigModel(BaseModel):
allow_mutation = False
-class InstanceLocationConfig(ConfigModel):
+class InstanceTcpLocationConfig(ConfigModel):
"""The host and port to talk to an instance via HTTP replication."""
host: StrictStr
@@ -110,6 +110,23 @@ class InstanceLocationConfig(ConfigModel):
return f"{self.host}:{self.port}"
+class InstanceUnixLocationConfig(ConfigModel):
+ """The socket file to talk to an instance via HTTP replication."""
+
+ path: StrictStr
+
+ def scheme(self) -> str:
+ """Hardcode a retrievable scheme"""
+ return "unix"
+
+ def netloc(self) -> str:
+ """Nicely format the address location data"""
+ return f"{self.path}"
+
+
+InstanceLocationConfig = Union[InstanceTcpLocationConfig, InstanceUnixLocationConfig]
+
+
@attr.s
class WriterLocations:
"""Specifies the instances that write various streams.
@@ -270,9 +287,12 @@ class WorkerConfig(Config):
% MAIN_PROCESS_INSTANCE_MAP_NAME
)
+ # type-ignore: the expression `Union[A, B]` is not a Type[Union[A, B]] currently
self.instance_map: Dict[
str, InstanceLocationConfig
- ] = parse_and_validate_mapping(instance_map, InstanceLocationConfig)
+ ] = parse_and_validate_mapping(
+ instance_map, InstanceLocationConfig # type: ignore[arg-type]
+ )
# Map from type of streams to source, c.f. WriterLocations.
writers = config.get("stream_writers") or {}
diff --git a/synapse/http/replicationagent.py b/synapse/http/replicationagent.py
index d6ba6f0e57..3ba2f22dfd 100644
--- a/synapse/http/replicationagent.py
+++ b/synapse/http/replicationagent.py
@@ -18,7 +18,11 @@ from typing import Dict, Optional
from zope.interface import implementer
from twisted.internet import defer
-from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
+from twisted.internet.endpoints import (
+ HostnameEndpoint,
+ UNIXClientEndpoint,
+ wrapClientTLS,
+)
from twisted.internet.interfaces import IStreamClientEndpoint
from twisted.python.failure import Failure
from twisted.web.client import URI, HTTPConnectionPool, _AgentBase
@@ -32,7 +36,11 @@ from twisted.web.iweb import (
IResponse,
)
-from synapse.config.workers import InstanceLocationConfig
+from synapse.config.workers import (
+ InstanceLocationConfig,
+ InstanceTcpLocationConfig,
+ InstanceUnixLocationConfig,
+)
from synapse.types import ISynapseReactor
logger = logging.getLogger(__name__)
@@ -40,7 +48,7 @@ logger = logging.getLogger(__name__)
@implementer(IAgentEndpointFactory)
class ReplicationEndpointFactory:
- """Connect to a given TCP socket"""
+ """Connect to a given TCP or UNIX socket"""
def __init__(
self,
@@ -64,24 +72,27 @@ class ReplicationEndpointFactory:
# The given URI has a special scheme and includes the worker name. The
# actual connection details are pulled from the instance map.
worker_name = uri.netloc.decode("utf-8")
- scheme = self.instance_map[worker_name].scheme()
+ location_config = self.instance_map[worker_name]
+ scheme = location_config.scheme()
- if scheme in ("http", "https"):
+ if isinstance(location_config, InstanceTcpLocationConfig):
endpoint = HostnameEndpoint(
self.reactor,
- self.instance_map[worker_name].host,
- self.instance_map[worker_name].port,
+ location_config.host,
+ location_config.port,
)
if scheme == "https":
endpoint = wrapClientTLS(
# The 'port' argument below isn't actually used by the function
self.context_factory.creatorForNetloc(
- self.instance_map[worker_name].host.encode("utf-8"),
- self.instance_map[worker_name].port,
+ location_config.host.encode("utf-8"),
+ location_config.port,
),
endpoint,
)
return endpoint
+ elif isinstance(location_config, InstanceUnixLocationConfig):
+ return UNIXClientEndpoint(self.reactor, location_config.path)
else:
raise SchemeNotSupported(f"Unsupported scheme: {scheme}")
@@ -138,13 +149,16 @@ class ReplicationAgent(_AgentBase):
An existing connection from the connection pool may be used or a new
one may be created.
- Currently, HTTP and HTTPS schemes are supported in uri.
+ Currently, HTTP, HTTPS and UNIX schemes are supported in uri.
This is copied from twisted.web.client.Agent, except:
- * It uses a different pool key (combining the host & port).
- * It does not call _ensureValidURI(...) since it breaks on some
- UNIX paths.
+ * It uses a different pool key (combining the scheme with either host & port or
+ socket path).
+ * It does not call _ensureValidURI(...) as the strictness of IDNA2008 is not
+ required when using a worker's name as a 'hostname' for Synapse HTTP
+ Replication machinery. Specifically, this allows a range of ascii characters
+ such as '+' and '_' in hostnames/worker's names.
See: twisted.web.iweb.IAgent.request
"""
@@ -154,9 +168,12 @@ class ReplicationAgent(_AgentBase):
except SchemeNotSupported:
return defer.fail(Failure())
+ worker_name = parsedURI.netloc.decode("utf-8")
+ key_scheme = self._endpointFactory.instance_map[worker_name].scheme()
+ key_netloc = self._endpointFactory.instance_map[worker_name].netloc()
# This sets the Pool key to be:
- # (http(s), <host:ip>)
- key = (parsedURI.scheme, parsedURI.netloc)
+ # (http(s), <host:port>) or (unix, <socket_path>)
+ key = (key_scheme, key_netloc)
# _requestWithEndpoint comes from _AgentBase class
return self._requestWithEndpoint(
diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py
index 75217e3f45..be910128aa 100644
--- a/synapse/logging/opentracing.py
+++ b/synapse/logging/opentracing.py
@@ -1070,7 +1070,7 @@ def trace_servlet(
tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER,
tags.HTTP_METHOD: request.get_method(),
tags.HTTP_URL: request.get_redacted_uri(),
- tags.PEER_HOST_IPV6: request.getClientAddress().host,
+ tags.PEER_HOST_IPV6: request.get_client_ip_if_available(),
}
request_name = request.request_metrics.name
@@ -1091,9 +1091,11 @@ def trace_servlet(
# with JsonResource).
scope.span.set_operation_name(request.request_metrics.name)
+ # Mypy seems to think that start_context.tag below can be Optional[str], but
+ # that doesn't appear to be correct and works in practice.
request_tags[
SynapseTags.REQUEST_TAG
- ] = request.request_metrics.start_context.tag
+ ] = request.request_metrics.start_context.tag # type: ignore[assignment]
# set the tags *after* the servlet completes, in case it decided to
# prioritise the span (tags will get dropped on unprioritised spans)
|