diff options
author | Jason Little <realtyem@gmail.com> | 2023-07-11 13:08:06 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-07-11 13:08:06 -0500 |
commit | 224ef0b669fdd85925d66deb38ba1b51c5aaa1bd (patch) | |
tree | dcd4ecbaf195472a9ffb6319bd25cc5e60e2b59e /synapse | |
parent | Add + as an allowed character for Matrix IDs (MSC4009) (#15911) (diff) | |
download | synapse-224ef0b669fdd85925d66deb38ba1b51c5aaa1bd.tar.xz |
Unix Sockets for HTTP Replication (#15708)
Unix socket support for `federation` and `client` Listeners has existed now for a little while(since [1.81.0](https://github.com/matrix-org/synapse/pull/15353)), but there was one last hold out before it could be complete: HTTP Replication communication. This should finish it up. The Listeners would have always worked, but would have had no way to be talked to/at. --------- Co-authored-by: Eric Eastwood <madlittlemods@gmail.com> Co-authored-by: Olivier Wilkinson (reivilibre) <oliverw@matrix.org> Co-authored-by: Eric Eastwood <erice@element.io>
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/config/workers.py | 24 | ||||
-rw-r--r-- | synapse/http/replicationagent.py | 47 | ||||
-rw-r--r-- | synapse/logging/opentracing.py | 6 |
3 files changed, 58 insertions, 19 deletions
diff --git a/synapse/config/workers.py b/synapse/config/workers.py index ccfe75eaf3..e55ca12a36 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -94,7 +94,7 @@ class ConfigModel(BaseModel): allow_mutation = False -class InstanceLocationConfig(ConfigModel): +class InstanceTcpLocationConfig(ConfigModel): """The host and port to talk to an instance via HTTP replication.""" host: StrictStr @@ -110,6 +110,23 @@ class InstanceLocationConfig(ConfigModel): return f"{self.host}:{self.port}" +class InstanceUnixLocationConfig(ConfigModel): + """The socket file to talk to an instance via HTTP replication.""" + + path: StrictStr + + def scheme(self) -> str: + """Hardcode a retrievable scheme""" + return "unix" + + def netloc(self) -> str: + """Nicely format the address location data""" + return f"{self.path}" + + +InstanceLocationConfig = Union[InstanceTcpLocationConfig, InstanceUnixLocationConfig] + + @attr.s class WriterLocations: """Specifies the instances that write various streams. @@ -270,9 +287,12 @@ class WorkerConfig(Config): % MAIN_PROCESS_INSTANCE_MAP_NAME ) + # type-ignore: the expression `Union[A, B]` is not a Type[Union[A, B]] currently self.instance_map: Dict[ str, InstanceLocationConfig - ] = parse_and_validate_mapping(instance_map, InstanceLocationConfig) + ] = parse_and_validate_mapping( + instance_map, InstanceLocationConfig # type: ignore[arg-type] + ) # Map from type of streams to source, c.f. WriterLocations. writers = config.get("stream_writers") or {} diff --git a/synapse/http/replicationagent.py b/synapse/http/replicationagent.py index d6ba6f0e57..3ba2f22dfd 100644 --- a/synapse/http/replicationagent.py +++ b/synapse/http/replicationagent.py @@ -18,7 +18,11 @@ from typing import Dict, Optional from zope.interface import implementer from twisted.internet import defer -from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS +from twisted.internet.endpoints import ( + HostnameEndpoint, + UNIXClientEndpoint, + wrapClientTLS, +) from twisted.internet.interfaces import IStreamClientEndpoint from twisted.python.failure import Failure from twisted.web.client import URI, HTTPConnectionPool, _AgentBase @@ -32,7 +36,11 @@ from twisted.web.iweb import ( IResponse, ) -from synapse.config.workers import InstanceLocationConfig +from synapse.config.workers import ( + InstanceLocationConfig, + InstanceTcpLocationConfig, + InstanceUnixLocationConfig, +) from synapse.types import ISynapseReactor logger = logging.getLogger(__name__) @@ -40,7 +48,7 @@ logger = logging.getLogger(__name__) @implementer(IAgentEndpointFactory) class ReplicationEndpointFactory: - """Connect to a given TCP socket""" + """Connect to a given TCP or UNIX socket""" def __init__( self, @@ -64,24 +72,27 @@ class ReplicationEndpointFactory: # The given URI has a special scheme and includes the worker name. The # actual connection details are pulled from the instance map. worker_name = uri.netloc.decode("utf-8") - scheme = self.instance_map[worker_name].scheme() + location_config = self.instance_map[worker_name] + scheme = location_config.scheme() - if scheme in ("http", "https"): + if isinstance(location_config, InstanceTcpLocationConfig): endpoint = HostnameEndpoint( self.reactor, - self.instance_map[worker_name].host, - self.instance_map[worker_name].port, + location_config.host, + location_config.port, ) if scheme == "https": endpoint = wrapClientTLS( # The 'port' argument below isn't actually used by the function self.context_factory.creatorForNetloc( - self.instance_map[worker_name].host.encode("utf-8"), - self.instance_map[worker_name].port, + location_config.host.encode("utf-8"), + location_config.port, ), endpoint, ) return endpoint + elif isinstance(location_config, InstanceUnixLocationConfig): + return UNIXClientEndpoint(self.reactor, location_config.path) else: raise SchemeNotSupported(f"Unsupported scheme: {scheme}") @@ -138,13 +149,16 @@ class ReplicationAgent(_AgentBase): An existing connection from the connection pool may be used or a new one may be created. - Currently, HTTP and HTTPS schemes are supported in uri. + Currently, HTTP, HTTPS and UNIX schemes are supported in uri. This is copied from twisted.web.client.Agent, except: - * It uses a different pool key (combining the host & port). - * It does not call _ensureValidURI(...) since it breaks on some - UNIX paths. + * It uses a different pool key (combining the scheme with either host & port or + socket path). + * It does not call _ensureValidURI(...) as the strictness of IDNA2008 is not + required when using a worker's name as a 'hostname' for Synapse HTTP + Replication machinery. Specifically, this allows a range of ascii characters + such as '+' and '_' in hostnames/worker's names. See: twisted.web.iweb.IAgent.request """ @@ -154,9 +168,12 @@ class ReplicationAgent(_AgentBase): except SchemeNotSupported: return defer.fail(Failure()) + worker_name = parsedURI.netloc.decode("utf-8") + key_scheme = self._endpointFactory.instance_map[worker_name].scheme() + key_netloc = self._endpointFactory.instance_map[worker_name].netloc() # This sets the Pool key to be: - # (http(s), <host:ip>) - key = (parsedURI.scheme, parsedURI.netloc) + # (http(s), <host:port>) or (unix, <socket_path>) + key = (key_scheme, key_netloc) # _requestWithEndpoint comes from _AgentBase class return self._requestWithEndpoint( diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py index 75217e3f45..be910128aa 100644 --- a/synapse/logging/opentracing.py +++ b/synapse/logging/opentracing.py @@ -1070,7 +1070,7 @@ def trace_servlet( tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER, tags.HTTP_METHOD: request.get_method(), tags.HTTP_URL: request.get_redacted_uri(), - tags.PEER_HOST_IPV6: request.getClientAddress().host, + tags.PEER_HOST_IPV6: request.get_client_ip_if_available(), } request_name = request.request_metrics.name @@ -1091,9 +1091,11 @@ def trace_servlet( # with JsonResource). scope.span.set_operation_name(request.request_metrics.name) + # Mypy seems to think that start_context.tag below can be Optional[str], but + # that doesn't appear to be correct and works in practice. request_tags[ SynapseTags.REQUEST_TAG - ] = request.request_metrics.start_context.tag + ] = request.request_metrics.start_context.tag # type: ignore[assignment] # set the tags *after* the servlet completes, in case it decided to # prioritise the span (tags will get dropped on unprioritised spans) |