summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorJason Little <realtyem@gmail.com>2023-07-11 13:08:06 -0500
committerGitHub <noreply@github.com>2023-07-11 13:08:06 -0500
commit224ef0b669fdd85925d66deb38ba1b51c5aaa1bd (patch)
treedcd4ecbaf195472a9ffb6319bd25cc5e60e2b59e /synapse
parentAdd + as an allowed character for Matrix IDs (MSC4009) (#15911) (diff)
downloadsynapse-224ef0b669fdd85925d66deb38ba1b51c5aaa1bd.tar.xz
Unix Sockets for HTTP Replication (#15708)
Unix socket support for `federation` and `client` Listeners has existed now for a little while(since [1.81.0](https://github.com/matrix-org/synapse/pull/15353)), but there was one last hold out before it could be complete: HTTP Replication communication. This should finish it up. The Listeners would have always worked, but would have had no way to be talked to/at.

---------

Co-authored-by: Eric Eastwood <madlittlemods@gmail.com>
Co-authored-by: Olivier Wilkinson (reivilibre) <oliverw@matrix.org>
Co-authored-by: Eric Eastwood <erice@element.io>
Diffstat (limited to 'synapse')
-rw-r--r--synapse/config/workers.py24
-rw-r--r--synapse/http/replicationagent.py47
-rw-r--r--synapse/logging/opentracing.py6
3 files changed, 58 insertions, 19 deletions
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index ccfe75eaf3..e55ca12a36 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -94,7 +94,7 @@ class ConfigModel(BaseModel):
         allow_mutation = False
 
 
-class InstanceLocationConfig(ConfigModel):
+class InstanceTcpLocationConfig(ConfigModel):
     """The host and port to talk to an instance via HTTP replication."""
 
     host: StrictStr
@@ -110,6 +110,23 @@ class InstanceLocationConfig(ConfigModel):
         return f"{self.host}:{self.port}"
 
 
+class InstanceUnixLocationConfig(ConfigModel):
+    """The socket file to talk to an instance via HTTP replication."""
+
+    path: StrictStr
+
+    def scheme(self) -> str:
+        """Hardcode a retrievable scheme"""
+        return "unix"
+
+    def netloc(self) -> str:
+        """Nicely format the address location data"""
+        return f"{self.path}"
+
+
+InstanceLocationConfig = Union[InstanceTcpLocationConfig, InstanceUnixLocationConfig]
+
+
 @attr.s
 class WriterLocations:
     """Specifies the instances that write various streams.
@@ -270,9 +287,12 @@ class WorkerConfig(Config):
                     % MAIN_PROCESS_INSTANCE_MAP_NAME
                 )
 
+        # type-ignore: the expression `Union[A, B]` is not a Type[Union[A, B]] currently
         self.instance_map: Dict[
             str, InstanceLocationConfig
-        ] = parse_and_validate_mapping(instance_map, InstanceLocationConfig)
+        ] = parse_and_validate_mapping(
+            instance_map, InstanceLocationConfig  # type: ignore[arg-type]
+        )
 
         # Map from type of streams to source, c.f. WriterLocations.
         writers = config.get("stream_writers") or {}
diff --git a/synapse/http/replicationagent.py b/synapse/http/replicationagent.py
index d6ba6f0e57..3ba2f22dfd 100644
--- a/synapse/http/replicationagent.py
+++ b/synapse/http/replicationagent.py
@@ -18,7 +18,11 @@ from typing import Dict, Optional
 from zope.interface import implementer
 
 from twisted.internet import defer
-from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
+from twisted.internet.endpoints import (
+    HostnameEndpoint,
+    UNIXClientEndpoint,
+    wrapClientTLS,
+)
 from twisted.internet.interfaces import IStreamClientEndpoint
 from twisted.python.failure import Failure
 from twisted.web.client import URI, HTTPConnectionPool, _AgentBase
@@ -32,7 +36,11 @@ from twisted.web.iweb import (
     IResponse,
 )
 
-from synapse.config.workers import InstanceLocationConfig
+from synapse.config.workers import (
+    InstanceLocationConfig,
+    InstanceTcpLocationConfig,
+    InstanceUnixLocationConfig,
+)
 from synapse.types import ISynapseReactor
 
 logger = logging.getLogger(__name__)
@@ -40,7 +48,7 @@ logger = logging.getLogger(__name__)
 
 @implementer(IAgentEndpointFactory)
 class ReplicationEndpointFactory:
-    """Connect to a given TCP socket"""
+    """Connect to a given TCP or UNIX socket"""
 
     def __init__(
         self,
@@ -64,24 +72,27 @@ class ReplicationEndpointFactory:
         # The given URI has a special scheme and includes the worker name. The
         # actual connection details are pulled from the instance map.
         worker_name = uri.netloc.decode("utf-8")
-        scheme = self.instance_map[worker_name].scheme()
+        location_config = self.instance_map[worker_name]
+        scheme = location_config.scheme()
 
-        if scheme in ("http", "https"):
+        if isinstance(location_config, InstanceTcpLocationConfig):
             endpoint = HostnameEndpoint(
                 self.reactor,
-                self.instance_map[worker_name].host,
-                self.instance_map[worker_name].port,
+                location_config.host,
+                location_config.port,
             )
             if scheme == "https":
                 endpoint = wrapClientTLS(
                     # The 'port' argument below isn't actually used by the function
                     self.context_factory.creatorForNetloc(
-                        self.instance_map[worker_name].host.encode("utf-8"),
-                        self.instance_map[worker_name].port,
+                        location_config.host.encode("utf-8"),
+                        location_config.port,
                     ),
                     endpoint,
                 )
             return endpoint
+        elif isinstance(location_config, InstanceUnixLocationConfig):
+            return UNIXClientEndpoint(self.reactor, location_config.path)
         else:
             raise SchemeNotSupported(f"Unsupported scheme: {scheme}")
 
@@ -138,13 +149,16 @@ class ReplicationAgent(_AgentBase):
         An existing connection from the connection pool may be used or a new
         one may be created.
 
-        Currently, HTTP and HTTPS schemes are supported in uri.
+        Currently, HTTP, HTTPS and UNIX schemes are supported in uri.
 
         This is copied from twisted.web.client.Agent, except:
 
-        * It uses a different pool key (combining the host & port).
-        * It does not call _ensureValidURI(...) since it breaks on some
-          UNIX paths.
+        * It uses a different pool key (combining the scheme with either host & port or
+          socket path).
+        * It does not call _ensureValidURI(...) as the strictness of IDNA2008 is not
+          required when using a worker's name as a 'hostname' for Synapse HTTP
+          Replication machinery. Specifically, this allows a range of ascii characters
+          such as '+' and '_' in hostnames/worker's names.
 
         See: twisted.web.iweb.IAgent.request
         """
@@ -154,9 +168,12 @@ class ReplicationAgent(_AgentBase):
         except SchemeNotSupported:
             return defer.fail(Failure())
 
+        worker_name = parsedURI.netloc.decode("utf-8")
+        key_scheme = self._endpointFactory.instance_map[worker_name].scheme()
+        key_netloc = self._endpointFactory.instance_map[worker_name].netloc()
         # This sets the Pool key to be:
-        #  (http(s), <host:ip>)
-        key = (parsedURI.scheme, parsedURI.netloc)
+        #  (http(s), <host:port>) or (unix, <socket_path>)
+        key = (key_scheme, key_netloc)
 
         # _requestWithEndpoint comes from _AgentBase class
         return self._requestWithEndpoint(
diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py
index 75217e3f45..be910128aa 100644
--- a/synapse/logging/opentracing.py
+++ b/synapse/logging/opentracing.py
@@ -1070,7 +1070,7 @@ def trace_servlet(
         tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER,
         tags.HTTP_METHOD: request.get_method(),
         tags.HTTP_URL: request.get_redacted_uri(),
-        tags.PEER_HOST_IPV6: request.getClientAddress().host,
+        tags.PEER_HOST_IPV6: request.get_client_ip_if_available(),
     }
 
     request_name = request.request_metrics.name
@@ -1091,9 +1091,11 @@ def trace_servlet(
             # with JsonResource).
             scope.span.set_operation_name(request.request_metrics.name)
 
+            # Mypy seems to think that start_context.tag below can be Optional[str], but
+            # that doesn't appear to be correct and works in practice.
             request_tags[
                 SynapseTags.REQUEST_TAG
-            ] = request.request_metrics.start_context.tag
+            ] = request.request_metrics.start_context.tag  # type: ignore[assignment]
 
             # set the tags *after* the servlet completes, in case it decided to
             # prioritise the span (tags will get dropped on unprioritised spans)