summary refs log tree commit diff
path: root/synapse/http/replicationagent.py
diff options
context:
space:
mode:
authorJason Little <realtyem@gmail.com>2023-07-11 13:08:06 -0500
committerGitHub <noreply@github.com>2023-07-11 13:08:06 -0500
commit224ef0b669fdd85925d66deb38ba1b51c5aaa1bd (patch)
treedcd4ecbaf195472a9ffb6319bd25cc5e60e2b59e /synapse/http/replicationagent.py
parentAdd + as an allowed character for Matrix IDs (MSC4009) (#15911) (diff)
downloadsynapse-224ef0b669fdd85925d66deb38ba1b51c5aaa1bd.tar.xz
Unix Sockets for HTTP Replication (#15708)
Unix socket support for `federation` and `client` Listeners has existed now for a little while(since [1.81.0](https://github.com/matrix-org/synapse/pull/15353)), but there was one last hold out before it could be complete: HTTP Replication communication. This should finish it up. The Listeners would have always worked, but would have had no way to be talked to/at.

---------

Co-authored-by: Eric Eastwood <madlittlemods@gmail.com>
Co-authored-by: Olivier Wilkinson (reivilibre) <oliverw@matrix.org>
Co-authored-by: Eric Eastwood <erice@element.io>
Diffstat (limited to 'synapse/http/replicationagent.py')
-rw-r--r--synapse/http/replicationagent.py47
1 files changed, 32 insertions, 15 deletions
diff --git a/synapse/http/replicationagent.py b/synapse/http/replicationagent.py
index d6ba6f0e57..3ba2f22dfd 100644
--- a/synapse/http/replicationagent.py
+++ b/synapse/http/replicationagent.py
@@ -18,7 +18,11 @@ from typing import Dict, Optional
 from zope.interface import implementer
 
 from twisted.internet import defer
-from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
+from twisted.internet.endpoints import (
+    HostnameEndpoint,
+    UNIXClientEndpoint,
+    wrapClientTLS,
+)
 from twisted.internet.interfaces import IStreamClientEndpoint
 from twisted.python.failure import Failure
 from twisted.web.client import URI, HTTPConnectionPool, _AgentBase
@@ -32,7 +36,11 @@ from twisted.web.iweb import (
     IResponse,
 )
 
-from synapse.config.workers import InstanceLocationConfig
+from synapse.config.workers import (
+    InstanceLocationConfig,
+    InstanceTcpLocationConfig,
+    InstanceUnixLocationConfig,
+)
 from synapse.types import ISynapseReactor
 
 logger = logging.getLogger(__name__)
@@ -40,7 +48,7 @@ logger = logging.getLogger(__name__)
 
 @implementer(IAgentEndpointFactory)
 class ReplicationEndpointFactory:
-    """Connect to a given TCP socket"""
+    """Connect to a given TCP or UNIX socket"""
 
     def __init__(
         self,
@@ -64,24 +72,27 @@ class ReplicationEndpointFactory:
         # The given URI has a special scheme and includes the worker name. The
         # actual connection details are pulled from the instance map.
         worker_name = uri.netloc.decode("utf-8")
-        scheme = self.instance_map[worker_name].scheme()
+        location_config = self.instance_map[worker_name]
+        scheme = location_config.scheme()
 
-        if scheme in ("http", "https"):
+        if isinstance(location_config, InstanceTcpLocationConfig):
             endpoint = HostnameEndpoint(
                 self.reactor,
-                self.instance_map[worker_name].host,
-                self.instance_map[worker_name].port,
+                location_config.host,
+                location_config.port,
             )
             if scheme == "https":
                 endpoint = wrapClientTLS(
                     # The 'port' argument below isn't actually used by the function
                     self.context_factory.creatorForNetloc(
-                        self.instance_map[worker_name].host.encode("utf-8"),
-                        self.instance_map[worker_name].port,
+                        location_config.host.encode("utf-8"),
+                        location_config.port,
                     ),
                     endpoint,
                 )
             return endpoint
+        elif isinstance(location_config, InstanceUnixLocationConfig):
+            return UNIXClientEndpoint(self.reactor, location_config.path)
         else:
             raise SchemeNotSupported(f"Unsupported scheme: {scheme}")
 
@@ -138,13 +149,16 @@ class ReplicationAgent(_AgentBase):
         An existing connection from the connection pool may be used or a new
         one may be created.
 
-        Currently, HTTP and HTTPS schemes are supported in uri.
+        Currently, HTTP, HTTPS and UNIX schemes are supported in uri.
 
         This is copied from twisted.web.client.Agent, except:
 
-        * It uses a different pool key (combining the host & port).
-        * It does not call _ensureValidURI(...) since it breaks on some
-          UNIX paths.
+        * It uses a different pool key (combining the scheme with either host & port or
+          socket path).
+        * It does not call _ensureValidURI(...) as the strictness of IDNA2008 is not
+          required when using a worker's name as a 'hostname' for Synapse HTTP
+          Replication machinery. Specifically, this allows a range of ascii characters
+          such as '+' and '_' in hostnames/worker's names.
 
         See: twisted.web.iweb.IAgent.request
         """
@@ -154,9 +168,12 @@ class ReplicationAgent(_AgentBase):
         except SchemeNotSupported:
             return defer.fail(Failure())
 
+        worker_name = parsedURI.netloc.decode("utf-8")
+        key_scheme = self._endpointFactory.instance_map[worker_name].scheme()
+        key_netloc = self._endpointFactory.instance_map[worker_name].netloc()
         # This sets the Pool key to be:
-        #  (http(s), <host:ip>)
-        key = (parsedURI.scheme, parsedURI.netloc)
+        #  (http(s), <host:port>) or (unix, <socket_path>)
+        key = (key_scheme, key_netloc)
 
         # _requestWithEndpoint comes from _AgentBase class
         return self._requestWithEndpoint(