summary refs log tree commit diff
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--changelog.d/15708.feature1
-rw-r--r--docker/conf-workers/nginx.conf.j24
-rw-r--r--docker/conf-workers/shared.yaml.j23
-rw-r--r--docker/conf-workers/supervisord.conf.j24
-rw-r--r--docker/conf-workers/worker.yaml.j24
-rw-r--r--docker/conf/homeserver.yaml10
-rwxr-xr-xdocker/configure_workers_and_start.py104
-rw-r--r--docs/development/contributing_guide.md1
-rw-r--r--docs/usage/configuration/config_documentation.md52
-rw-r--r--docs/workers.md9
-rwxr-xr-xscripts-dev/complement.sh4
-rw-r--r--synapse/config/workers.py24
-rw-r--r--synapse/http/replicationagent.py47
-rw-r--r--synapse/logging/opentracing.py6
-rw-r--r--tests/replication/_base.py7
-rw-r--r--tests/server.py32
16 files changed, 260 insertions, 52 deletions
diff --git a/changelog.d/15708.feature b/changelog.d/15708.feature
new file mode 100644
index 0000000000..06a6c959ab
--- /dev/null
+++ b/changelog.d/15708.feature
@@ -0,0 +1 @@
+Add Unix Socket support for HTTP Replication Listeners. Document and provide usage instructions for utilizing Unix sockets in Synapse. Contributed by Jason Little.
diff --git a/docker/conf-workers/nginx.conf.j2 b/docker/conf-workers/nginx.conf.j2
index 967fc65e79..d1e02af723 100644
--- a/docker/conf-workers/nginx.conf.j2
+++ b/docker/conf-workers/nginx.conf.j2
@@ -35,7 +35,11 @@ server {
 
     # Send all other traffic to the main process
     location ~* ^(\\/_matrix|\\/_synapse) {
+{% if using_unix_sockets %}
+        proxy_pass http://unix:/run/main_public.sock;
+{% else %}
         proxy_pass http://localhost:8080;
+{% endif %}
         proxy_set_header X-Forwarded-For $remote_addr;
         proxy_set_header X-Forwarded-Proto $scheme;
         proxy_set_header Host $host;
diff --git a/docker/conf-workers/shared.yaml.j2 b/docker/conf-workers/shared.yaml.j2
index 92d25386dc..1dfc60ad11 100644
--- a/docker/conf-workers/shared.yaml.j2
+++ b/docker/conf-workers/shared.yaml.j2
@@ -6,6 +6,9 @@
 {% if enable_redis %}
 redis:
     enabled: true
+    {% if using_unix_sockets %}
+    path: /tmp/redis.sock
+    {% endif %}
 {% endif %}
 
 {% if appservice_registrations is not none %}
diff --git a/docker/conf-workers/supervisord.conf.j2 b/docker/conf-workers/supervisord.conf.j2
index 9f1e03cfc0..da93358051 100644
--- a/docker/conf-workers/supervisord.conf.j2
+++ b/docker/conf-workers/supervisord.conf.j2
@@ -19,7 +19,11 @@ username=www-data
 autorestart=true
 
 [program:redis]
+{% if using_unix_sockets %}
+command=/usr/local/bin/prefix-log /usr/local/bin/redis-server --unixsocket /tmp/redis.sock
+{% else %}
 command=/usr/local/bin/prefix-log /usr/local/bin/redis-server
+{% endif %}
 priority=1
 stdout_logfile=/dev/stdout
 stdout_logfile_maxbytes=0
diff --git a/docker/conf-workers/worker.yaml.j2 b/docker/conf-workers/worker.yaml.j2
index 44c6e413cf..29ec74b4ea 100644
--- a/docker/conf-workers/worker.yaml.j2
+++ b/docker/conf-workers/worker.yaml.j2
@@ -8,7 +8,11 @@ worker_name: "{{ name }}"
 
 worker_listeners:
   - type: http
+{% if using_unix_sockets %}
+    path: "/run/worker.{{ port }}"
+{% else %}
     port: {{ port }}
+{% endif %}
 {% if listener_resources %}
     resources:
       - names:
diff --git a/docker/conf/homeserver.yaml b/docker/conf/homeserver.yaml
index f10f78a48c..c46b955d63 100644
--- a/docker/conf/homeserver.yaml
+++ b/docker/conf/homeserver.yaml
@@ -36,12 +36,17 @@ listeners:
 
   # Allow configuring in case we want to reverse proxy 8008
   # using another process in the same container
+{% if SYNAPSE_USE_UNIX_SOCKET %}
+  # Unix sockets don't care about TLS or IP addresses or ports
+  - path: '/run/main_public.sock'
+    type: http
+{% else %}
   - port: {{ SYNAPSE_HTTP_PORT or 8008 }}
     tls: false
     bind_addresses: ['::']
     type: http
     x_forwarded: false
-
+{% endif %}
     resources:
       - names: [client]
         compress: true
@@ -57,8 +62,11 @@ database:
     user: "{{ POSTGRES_USER or "synapse" }}"
     password: "{{ POSTGRES_PASSWORD }}"
     database: "{{ POSTGRES_DB or "synapse" }}"
+{% if not SYNAPSE_USE_UNIX_SOCKET %}
+{# Synapse will use a default unix socket for Postgres when host/port is not specified (behavior from `psycopg2`). #}
     host: "{{ POSTGRES_HOST or "db" }}"
     port: "{{ POSTGRES_PORT or "5432" }}"
+{% endif %}
     cp_min: 5
     cp_max: 10
 {% else %}
diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py
index 62fb88daab..dc824038b5 100755
--- a/docker/configure_workers_and_start.py
+++ b/docker/configure_workers_and_start.py
@@ -74,6 +74,9 @@ MAIN_PROCESS_HTTP_LISTENER_PORT = 8080
 MAIN_PROCESS_INSTANCE_NAME = "main"
 MAIN_PROCESS_LOCALHOST_ADDRESS = "127.0.0.1"
 MAIN_PROCESS_REPLICATION_PORT = 9093
+# Obviously, these would only be used with the UNIX socket option
+MAIN_PROCESS_UNIX_SOCKET_PUBLIC_PATH = "/run/main_public.sock"
+MAIN_PROCESS_UNIX_SOCKET_PRIVATE_PATH = "/run/main_private.sock"
 
 # A simple name used as a placeholder in the WORKERS_CONFIG below. This will be replaced
 # during processing with the name of the worker.
@@ -407,11 +410,15 @@ def add_worker_roles_to_shared_config(
         )
 
         # Map of stream writer instance names to host/ports combos
-        instance_map[worker_name] = {
-            "host": "localhost",
-            "port": worker_port,
-        }
-
+        if os.environ.get("SYNAPSE_USE_UNIX_SOCKET", False):
+            instance_map[worker_name] = {
+                "path": f"/run/worker.{worker_port}",
+            }
+        else:
+            instance_map[worker_name] = {
+                "host": "localhost",
+                "port": worker_port,
+            }
     # Update the list of stream writers. It's convenient that the name of the worker
     # type is the same as the stream to write. Iterate over the whole list in case there
     # is more than one.
@@ -423,10 +430,15 @@ def add_worker_roles_to_shared_config(
 
             # Map of stream writer instance names to host/ports combos
             # For now, all stream writers need http replication ports
-            instance_map[worker_name] = {
-                "host": "localhost",
-                "port": worker_port,
-            }
+            if os.environ.get("SYNAPSE_USE_UNIX_SOCKET", False):
+                instance_map[worker_name] = {
+                    "path": f"/run/worker.{worker_port}",
+                }
+            else:
+                instance_map[worker_name] = {
+                    "host": "localhost",
+                    "port": worker_port,
+                }
 
 
 def merge_worker_template_configs(
@@ -718,17 +730,29 @@ def generate_worker_files(
     # Note that yaml cares about indentation, so care should be taken to insert lines
     # into files at the correct indentation below.
 
+    # Convenience helper for if using unix sockets instead of host:port
+    using_unix_sockets = environ.get("SYNAPSE_USE_UNIX_SOCKET", False)
     # First read the original config file and extract the listeners block. Then we'll
     # add another listener for replication. Later we'll write out the result to the
     # shared config file.
-    listeners = [
-        {
-            "port": MAIN_PROCESS_REPLICATION_PORT,
-            "bind_address": MAIN_PROCESS_LOCALHOST_ADDRESS,
-            "type": "http",
-            "resources": [{"names": ["replication"]}],
-        }
-    ]
+    listeners: List[Any]
+    if using_unix_sockets:
+        listeners = [
+            {
+                "path": MAIN_PROCESS_UNIX_SOCKET_PRIVATE_PATH,
+                "type": "http",
+                "resources": [{"names": ["replication"]}],
+            }
+        ]
+    else:
+        listeners = [
+            {
+                "port": MAIN_PROCESS_REPLICATION_PORT,
+                "bind_address": MAIN_PROCESS_LOCALHOST_ADDRESS,
+                "type": "http",
+                "resources": [{"names": ["replication"]}],
+            }
+        ]
     with open(config_path) as file_stream:
         original_config = yaml.safe_load(file_stream)
         original_listeners = original_config.get("listeners")
@@ -769,7 +793,17 @@ def generate_worker_files(
 
     # A list of internal endpoints to healthcheck, starting with the main process
     # which exists even if no workers do.
-    healthcheck_urls = ["http://localhost:8080/health"]
+    # This list ends up being part of the command line to curl, (curl added support for
+    # Unix sockets in version 7.40).
+    if using_unix_sockets:
+        healthcheck_urls = [
+            f"--unix-socket {MAIN_PROCESS_UNIX_SOCKET_PUBLIC_PATH} "
+            # The scheme and hostname from the following URL are ignored.
+            # The only thing that matters is the path `/health`
+            "http://localhost/health"
+        ]
+    else:
+        healthcheck_urls = ["http://localhost:8080/health"]
 
     # Get the set of all worker types that we have configured
     all_worker_types_in_use = set(chain(*requested_worker_types.values()))
@@ -806,8 +840,12 @@ def generate_worker_files(
         # given worker_type needs to stay assigned and not be replaced.
         worker_config["shared_extra_conf"].update(shared_config)
         shared_config = worker_config["shared_extra_conf"]
-
-        healthcheck_urls.append("http://localhost:%d/health" % (worker_port,))
+        if using_unix_sockets:
+            healthcheck_urls.append(
+                f"--unix-socket /run/worker.{worker_port} http://localhost/health"
+            )
+        else:
+            healthcheck_urls.append("http://localhost:%d/health" % (worker_port,))
 
         # Update the shared config with sharding-related options if necessary
         add_worker_roles_to_shared_config(
@@ -826,6 +864,7 @@ def generate_worker_files(
             "/conf/workers/{name}.yaml".format(name=worker_name),
             **worker_config,
             worker_log_config_filepath=log_config_filepath,
+            using_unix_sockets=using_unix_sockets,
         )
 
         # Save this worker's port number to the correct nginx upstreams
@@ -846,8 +885,13 @@ def generate_worker_files(
     nginx_upstream_config = ""
     for upstream_worker_base_name, upstream_worker_ports in nginx_upstreams.items():
         body = ""
-        for port in upstream_worker_ports:
-            body += f"    server localhost:{port};\n"
+        if using_unix_sockets:
+            for port in upstream_worker_ports:
+                body += f"    server unix:/run/worker.{port};\n"
+
+        else:
+            for port in upstream_worker_ports:
+                body += f"    server localhost:{port};\n"
 
         # Add to the list of configured upstreams
         nginx_upstream_config += NGINX_UPSTREAM_CONFIG_BLOCK.format(
@@ -877,10 +921,15 @@ def generate_worker_files(
     # If there are workers, add the main process to the instance_map too.
     if workers_in_use:
         instance_map = shared_config.setdefault("instance_map", {})
-        instance_map[MAIN_PROCESS_INSTANCE_NAME] = {
-            "host": MAIN_PROCESS_LOCALHOST_ADDRESS,
-            "port": MAIN_PROCESS_REPLICATION_PORT,
-        }
+        if using_unix_sockets:
+            instance_map[MAIN_PROCESS_INSTANCE_NAME] = {
+                "path": MAIN_PROCESS_UNIX_SOCKET_PRIVATE_PATH,
+            }
+        else:
+            instance_map[MAIN_PROCESS_INSTANCE_NAME] = {
+                "host": MAIN_PROCESS_LOCALHOST_ADDRESS,
+                "port": MAIN_PROCESS_REPLICATION_PORT,
+            }
 
     # Shared homeserver config
     convert(
@@ -890,6 +939,7 @@ def generate_worker_files(
         appservice_registrations=appservice_registrations,
         enable_redis=workers_in_use,
         workers_in_use=workers_in_use,
+        using_unix_sockets=using_unix_sockets,
     )
 
     # Nginx config
@@ -900,6 +950,7 @@ def generate_worker_files(
         upstream_directives=nginx_upstream_config,
         tls_cert_path=os.environ.get("SYNAPSE_TLS_CERT"),
         tls_key_path=os.environ.get("SYNAPSE_TLS_KEY"),
+        using_unix_sockets=using_unix_sockets,
     )
 
     # Supervisord config
@@ -909,6 +960,7 @@ def generate_worker_files(
         "/etc/supervisor/supervisord.conf",
         main_config_path=config_path,
         enable_redis=workers_in_use,
+        using_unix_sockets=using_unix_sockets,
     )
 
     convert(
diff --git a/docs/development/contributing_guide.md b/docs/development/contributing_guide.md
index e9210b1776..698687b91f 100644
--- a/docs/development/contributing_guide.md
+++ b/docs/development/contributing_guide.md
@@ -370,6 +370,7 @@ The above will run a monolithic (single-process) Synapse with SQLite as the data
     See the [worker documentation](../workers.md) for additional information on workers.
 - Passing `ASYNCIO_REACTOR=1` as an environment variable to use the Twisted asyncio reactor instead of the default one.
 - Passing `PODMAN=1` will use the [podman](https://podman.io/) container runtime, instead of docker.
+- Passing `UNIX_SOCKETS=1` will utilise Unix socket functionality for Synapse, Redis, and Postgres(when applicable).
 
 To increase the log level for the tests, set `SYNAPSE_TEST_LOG_LEVEL`, e.g:
 ```sh
diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md
index ff59cbccc1..d9286e83bc 100644
--- a/docs/usage/configuration/config_documentation.md
+++ b/docs/usage/configuration/config_documentation.md
@@ -462,6 +462,20 @@ See the docs [request log format](../administration/request_log.md).
 * `additional_resources`: Only valid for an 'http' listener. A map of
    additional endpoints which should be loaded via dynamic modules.
 
+Unix socket support (_Added in Synapse 1.88.0_):
+* `path`: A path and filename for a Unix socket. Make sure it is located in a
+  directory with read and write permissions, and that it already exists (the directory
+  will not be created). Defaults to `None`.
+  * **Note**: The use of both `path` and `port` options for the same `listener` is not
+    compatible.
+  * The `x_forwarded` option defaults to true  when using Unix sockets and can be omitted.
+  * Other options that would not make sense to use with a UNIX socket, such as 
+    `bind_addresses` and `tls` will be ignored and can be removed.
+* `mode`: The file permissions to set on the UNIX socket. Defaults to `666`
+* **Note:** Must be set as `type: http` (does not support `metrics` and `manhole`). 
+  Also make sure that `metrics` is not included in `resources` -> `names`
+
+
 Valid resource names are:
 
 * `client`: the client-server API (/_matrix/client), and the synapse admin API (/_synapse/admin). Also implies `media` and `static`.
@@ -474,7 +488,7 @@ Valid resource names are:
 
 * `media`: the media API (/_matrix/media).
 
-* `metrics`: the metrics interface. See [here](../../metrics-howto.md).
+* `metrics`: the metrics interface. See [here](../../metrics-howto.md). (Not compatible with Unix sockets)
 
 * `openid`: OpenID authentication. See [here](../../openid.md).
 
@@ -533,6 +547,22 @@ listeners:
     bind_addresses: ['::1', '127.0.0.1']
     type: manhole
 ```
+Example configuration #3:
+```yaml
+listeners:
+  # Unix socket listener: Ideal for Synapse deployments behind a reverse proxy, offering
+  # lightweight interprocess communication without TCP/IP overhead, avoid port
+  # conflicts, and providing enhanced security through system file permissions.
+  #
+  # Note that x_forwarded will default to true, when using a UNIX socket. Please see
+  # https://matrix-org.github.io/synapse/latest/reverse_proxy.html.
+  #
+  - path: /var/run/synapse/main_public.sock
+    type: http
+    resources:
+      - names: [client, federation]
+```
+
 ---
 ### `manhole_settings`
 
@@ -3949,6 +3979,14 @@ instance_map:
     host: localhost
     port: 8034
 ```
+Example configuration(#2, for UNIX sockets):
+```yaml
+instance_map:
+  main:
+    path: /var/run/synapse/main_replication.sock
+  worker1:
+    path: /var/run/synapse/worker1_replication.sock
+```
 ---
 ### `stream_writers`
 
@@ -4108,6 +4146,18 @@ worker_listeners:
     resources:
       - names: [client, federation]
 ```
+Example configuration(#2, using UNIX sockets with a `replication` listener):
+```yaml
+worker_listeners:
+  - type: http
+    path: /var/run/synapse/worker_public.sock
+    resources:
+      - names: [client, federation]
+  - type: http
+    path: /var/run/synapse/worker_replication.sock
+    resources:
+      - names: [replication]
+```
 ---
 ### `worker_manhole`
 
diff --git a/docs/workers.md b/docs/workers.md
index 828f082e75..735cd3f18d 100644
--- a/docs/workers.md
+++ b/docs/workers.md
@@ -95,9 +95,12 @@ for the main process
 * Secondly, you need to enable
 [redis-based replication](usage/configuration/config_documentation.md#redis)
 * You will need to add an [`instance_map`](usage/configuration/config_documentation.md#instance_map) 
-with the `main` process defined, as well as the relevant connection information from 
-it's HTTP `replication` listener (defined in step 1 above). Note that the `host` defined 
-is the address the worker needs to look for the `main` process at, not necessarily the same address that is bound to.
+with the `main` process defined, as well as the relevant connection information from
+it's HTTP `replication` listener (defined in step 1 above).
+  * Note that the `host` defined is the address the worker needs to look for the `main`
+  process at, not necessarily the same address that is bound to.
+  * If you are using Unix sockets for the `replication` resource, make sure to
+  use a `path` to the socket file instead of a `port`.
 * Optionally, a [shared secret](usage/configuration/config_documentation.md#worker_replication_secret)
 can be used to authenticate HTTP traffic between workers. For example:
 
diff --git a/scripts-dev/complement.sh b/scripts-dev/complement.sh
index 24b83cfeb6..fea76cb5af 100755
--- a/scripts-dev/complement.sh
+++ b/scripts-dev/complement.sh
@@ -253,6 +253,10 @@ if [[ -n "$ASYNCIO_REACTOR" ]]; then
   export PASS_SYNAPSE_COMPLEMENT_USE_ASYNCIO_REACTOR=true
 fi
 
+if [[ -n "$UNIX_SOCKETS" ]]; then
+  # Enable full on Unix socket mode for Synapse, Redis and Postgresql
+  export PASS_SYNAPSE_USE_UNIX_SOCKET=1
+fi
 
 if [[ -n "$SYNAPSE_TEST_LOG_LEVEL" ]]; then
   # Set the log level to what is desired
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index ccfe75eaf3..e55ca12a36 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -94,7 +94,7 @@ class ConfigModel(BaseModel):
         allow_mutation = False
 
 
-class InstanceLocationConfig(ConfigModel):
+class InstanceTcpLocationConfig(ConfigModel):
     """The host and port to talk to an instance via HTTP replication."""
 
     host: StrictStr
@@ -110,6 +110,23 @@ class InstanceLocationConfig(ConfigModel):
         return f"{self.host}:{self.port}"
 
 
+class InstanceUnixLocationConfig(ConfigModel):
+    """The socket file to talk to an instance via HTTP replication."""
+
+    path: StrictStr
+
+    def scheme(self) -> str:
+        """Hardcode a retrievable scheme"""
+        return "unix"
+
+    def netloc(self) -> str:
+        """Nicely format the address location data"""
+        return f"{self.path}"
+
+
+InstanceLocationConfig = Union[InstanceTcpLocationConfig, InstanceUnixLocationConfig]
+
+
 @attr.s
 class WriterLocations:
     """Specifies the instances that write various streams.
@@ -270,9 +287,12 @@ class WorkerConfig(Config):
                     % MAIN_PROCESS_INSTANCE_MAP_NAME
                 )
 
+        # type-ignore: the expression `Union[A, B]` is not a Type[Union[A, B]] currently
         self.instance_map: Dict[
             str, InstanceLocationConfig
-        ] = parse_and_validate_mapping(instance_map, InstanceLocationConfig)
+        ] = parse_and_validate_mapping(
+            instance_map, InstanceLocationConfig  # type: ignore[arg-type]
+        )
 
         # Map from type of streams to source, c.f. WriterLocations.
         writers = config.get("stream_writers") or {}
diff --git a/synapse/http/replicationagent.py b/synapse/http/replicationagent.py
index d6ba6f0e57..3ba2f22dfd 100644
--- a/synapse/http/replicationagent.py
+++ b/synapse/http/replicationagent.py
@@ -18,7 +18,11 @@ from typing import Dict, Optional
 from zope.interface import implementer
 
 from twisted.internet import defer
-from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
+from twisted.internet.endpoints import (
+    HostnameEndpoint,
+    UNIXClientEndpoint,
+    wrapClientTLS,
+)
 from twisted.internet.interfaces import IStreamClientEndpoint
 from twisted.python.failure import Failure
 from twisted.web.client import URI, HTTPConnectionPool, _AgentBase
@@ -32,7 +36,11 @@ from twisted.web.iweb import (
     IResponse,
 )
 
-from synapse.config.workers import InstanceLocationConfig
+from synapse.config.workers import (
+    InstanceLocationConfig,
+    InstanceTcpLocationConfig,
+    InstanceUnixLocationConfig,
+)
 from synapse.types import ISynapseReactor
 
 logger = logging.getLogger(__name__)
@@ -40,7 +48,7 @@ logger = logging.getLogger(__name__)
 
 @implementer(IAgentEndpointFactory)
 class ReplicationEndpointFactory:
-    """Connect to a given TCP socket"""
+    """Connect to a given TCP or UNIX socket"""
 
     def __init__(
         self,
@@ -64,24 +72,27 @@ class ReplicationEndpointFactory:
         # The given URI has a special scheme and includes the worker name. The
         # actual connection details are pulled from the instance map.
         worker_name = uri.netloc.decode("utf-8")
-        scheme = self.instance_map[worker_name].scheme()
+        location_config = self.instance_map[worker_name]
+        scheme = location_config.scheme()
 
-        if scheme in ("http", "https"):
+        if isinstance(location_config, InstanceTcpLocationConfig):
             endpoint = HostnameEndpoint(
                 self.reactor,
-                self.instance_map[worker_name].host,
-                self.instance_map[worker_name].port,
+                location_config.host,
+                location_config.port,
             )
             if scheme == "https":
                 endpoint = wrapClientTLS(
                     # The 'port' argument below isn't actually used by the function
                     self.context_factory.creatorForNetloc(
-                        self.instance_map[worker_name].host.encode("utf-8"),
-                        self.instance_map[worker_name].port,
+                        location_config.host.encode("utf-8"),
+                        location_config.port,
                     ),
                     endpoint,
                 )
             return endpoint
+        elif isinstance(location_config, InstanceUnixLocationConfig):
+            return UNIXClientEndpoint(self.reactor, location_config.path)
         else:
             raise SchemeNotSupported(f"Unsupported scheme: {scheme}")
 
@@ -138,13 +149,16 @@ class ReplicationAgent(_AgentBase):
         An existing connection from the connection pool may be used or a new
         one may be created.
 
-        Currently, HTTP and HTTPS schemes are supported in uri.
+        Currently, HTTP, HTTPS and UNIX schemes are supported in uri.
 
         This is copied from twisted.web.client.Agent, except:
 
-        * It uses a different pool key (combining the host & port).
-        * It does not call _ensureValidURI(...) since it breaks on some
-          UNIX paths.
+        * It uses a different pool key (combining the scheme with either host & port or
+          socket path).
+        * It does not call _ensureValidURI(...) as the strictness of IDNA2008 is not
+          required when using a worker's name as a 'hostname' for Synapse HTTP
+          Replication machinery. Specifically, this allows a range of ascii characters
+          such as '+' and '_' in hostnames/worker's names.
 
         See: twisted.web.iweb.IAgent.request
         """
@@ -154,9 +168,12 @@ class ReplicationAgent(_AgentBase):
         except SchemeNotSupported:
             return defer.fail(Failure())
 
+        worker_name = parsedURI.netloc.decode("utf-8")
+        key_scheme = self._endpointFactory.instance_map[worker_name].scheme()
+        key_netloc = self._endpointFactory.instance_map[worker_name].netloc()
         # This sets the Pool key to be:
-        #  (http(s), <host:ip>)
-        key = (parsedURI.scheme, parsedURI.netloc)
+        #  (http(s), <host:port>) or (unix, <socket_path>)
+        key = (key_scheme, key_netloc)
 
         # _requestWithEndpoint comes from _AgentBase class
         return self._requestWithEndpoint(
diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py
index 75217e3f45..be910128aa 100644
--- a/synapse/logging/opentracing.py
+++ b/synapse/logging/opentracing.py
@@ -1070,7 +1070,7 @@ def trace_servlet(
         tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER,
         tags.HTTP_METHOD: request.get_method(),
         tags.HTTP_URL: request.get_redacted_uri(),
-        tags.PEER_HOST_IPV6: request.getClientAddress().host,
+        tags.PEER_HOST_IPV6: request.get_client_ip_if_available(),
     }
 
     request_name = request.request_metrics.name
@@ -1091,9 +1091,11 @@ def trace_servlet(
             # with JsonResource).
             scope.span.set_operation_name(request.request_metrics.name)
 
+            # Mypy seems to think that start_context.tag below can be Optional[str], but
+            # that doesn't appear to be correct and works in practice.
             request_tags[
                 SynapseTags.REQUEST_TAG
-            ] = request.request_metrics.start_context.tag
+            ] = request.request_metrics.start_context.tag  # type: ignore[assignment]
 
             # set the tags *after* the servlet completes, in case it decided to
             # prioritise the span (tags will get dropped on unprioritised spans)
diff --git a/tests/replication/_base.py b/tests/replication/_base.py
index eb9b1f1cd9..39aadb9ed5 100644
--- a/tests/replication/_base.py
+++ b/tests/replication/_base.py
@@ -22,6 +22,7 @@ from twisted.test.proto_helpers import MemoryReactor
 from twisted.web.resource import Resource
 
 from synapse.app.generic_worker import GenericWorkerServer
+from synapse.config.workers import InstanceTcpLocationConfig, InstanceUnixLocationConfig
 from synapse.http.site import SynapseRequest, SynapseSite
 from synapse.replication.http import ReplicationRestResource
 from synapse.replication.tcp.client import ReplicationDataHandler
@@ -339,7 +340,7 @@ class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase):
         # `_handle_http_replication_attempt` like we do with the master HS.
         instance_name = worker_hs.get_instance_name()
         instance_loc = worker_hs.config.worker.instance_map.get(instance_name)
-        if instance_loc:
+        if instance_loc and isinstance(instance_loc, InstanceTcpLocationConfig):
             # Ensure the host is one that has a fake DNS entry.
             if instance_loc.host not in self.reactor.lookups:
                 raise Exception(
@@ -360,6 +361,10 @@ class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase):
                 instance_loc.port,
                 lambda: self._handle_http_replication_attempt(worker_hs, port),
             )
+        elif instance_loc and isinstance(instance_loc, InstanceUnixLocationConfig):
+            raise Exception(
+                "Unix sockets are not supported for unit tests at this time."
+            )
 
         store = worker_hs.get_datastores().main
         store.db_pool._db_pool = self.database_pool._db_pool
diff --git a/tests/server.py b/tests/server.py
index a12c3e3b9a..c84a524e8c 100644
--- a/tests/server.py
+++ b/tests/server.py
@@ -53,6 +53,7 @@ from twisted.internet.interfaces import (
     IConnector,
     IConsumer,
     IHostnameResolver,
+    IListeningPort,
     IProducer,
     IProtocol,
     IPullProducer,
@@ -62,7 +63,7 @@ from twisted.internet.interfaces import (
     IResolverSimple,
     ITransport,
 )
-from twisted.internet.protocol import ClientFactory, DatagramProtocol
+from twisted.internet.protocol import ClientFactory, DatagramProtocol, Factory
 from twisted.python import threadpool
 from twisted.python.failure import Failure
 from twisted.test.proto_helpers import AccumulatingProtocol, MemoryReactorClock
@@ -523,6 +524,35 @@ class ThreadedMemoryReactorClock(MemoryReactorClock):
         """
         self._tcp_callbacks[(host, port)] = callback
 
+    def connectUNIX(
+        self,
+        address: str,
+        factory: ClientFactory,
+        timeout: float = 30,
+        checkPID: int = 0,
+    ) -> IConnector:
+        """
+        Unix sockets aren't supported for unit tests yet. Make it obvious to any
+        developer trying it out that they will need to do some work before being able
+        to use it in tests.
+        """
+        raise Exception("Unix sockets are not implemented for tests yet, sorry.")
+
+    def listenUNIX(
+        self,
+        address: str,
+        factory: Factory,
+        backlog: int = 50,
+        mode: int = 0o666,
+        wantPID: int = 0,
+    ) -> IListeningPort:
+        """
+        Unix sockets aren't supported for unit tests yet. Make it obvious to any
+        developer trying it out that they will need to do some work before being able
+        to use it in tests.
+        """
+        raise Exception("Unix sockets are not implemented for tests, sorry")
+
     def connectTCP(
         self,
         host: str,