diff --git a/changelog.d/15708.feature b/changelog.d/15708.feature
new file mode 100644
index 0000000000..06a6c959ab
--- /dev/null
+++ b/changelog.d/15708.feature
@@ -0,0 +1 @@
+Add Unix Socket support for HTTP Replication Listeners. Document and provide usage instructions for utilizing Unix sockets in Synapse. Contributed by Jason Little.
diff --git a/docker/conf-workers/nginx.conf.j2 b/docker/conf-workers/nginx.conf.j2
index 967fc65e79..d1e02af723 100644
--- a/docker/conf-workers/nginx.conf.j2
+++ b/docker/conf-workers/nginx.conf.j2
@@ -35,7 +35,11 @@ server {
# Send all other traffic to the main process
location ~* ^(\\/_matrix|\\/_synapse) {
+{% if using_unix_sockets %}
+ proxy_pass http://unix:/run/main_public.sock;
+{% else %}
proxy_pass http://localhost:8080;
+{% endif %}
proxy_set_header X-Forwarded-For $remote_addr;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_set_header Host $host;
diff --git a/docker/conf-workers/shared.yaml.j2 b/docker/conf-workers/shared.yaml.j2
index 92d25386dc..1dfc60ad11 100644
--- a/docker/conf-workers/shared.yaml.j2
+++ b/docker/conf-workers/shared.yaml.j2
@@ -6,6 +6,9 @@
{% if enable_redis %}
redis:
enabled: true
+ {% if using_unix_sockets %}
+ path: /tmp/redis.sock
+ {% endif %}
{% endif %}
{% if appservice_registrations is not none %}
diff --git a/docker/conf-workers/supervisord.conf.j2 b/docker/conf-workers/supervisord.conf.j2
index 9f1e03cfc0..da93358051 100644
--- a/docker/conf-workers/supervisord.conf.j2
+++ b/docker/conf-workers/supervisord.conf.j2
@@ -19,7 +19,11 @@ username=www-data
autorestart=true
[program:redis]
+{% if using_unix_sockets %}
+command=/usr/local/bin/prefix-log /usr/local/bin/redis-server --unixsocket /tmp/redis.sock
+{% else %}
command=/usr/local/bin/prefix-log /usr/local/bin/redis-server
+{% endif %}
priority=1
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
diff --git a/docker/conf-workers/worker.yaml.j2 b/docker/conf-workers/worker.yaml.j2
index 44c6e413cf..29ec74b4ea 100644
--- a/docker/conf-workers/worker.yaml.j2
+++ b/docker/conf-workers/worker.yaml.j2
@@ -8,7 +8,11 @@ worker_name: "{{ name }}"
worker_listeners:
- type: http
+{% if using_unix_sockets %}
+ path: "/run/worker.{{ port }}"
+{% else %}
port: {{ port }}
+{% endif %}
{% if listener_resources %}
resources:
- names:
diff --git a/docker/conf/homeserver.yaml b/docker/conf/homeserver.yaml
index f10f78a48c..c46b955d63 100644
--- a/docker/conf/homeserver.yaml
+++ b/docker/conf/homeserver.yaml
@@ -36,12 +36,17 @@ listeners:
# Allow configuring in case we want to reverse proxy 8008
# using another process in the same container
+{% if SYNAPSE_USE_UNIX_SOCKET %}
+ # Unix sockets don't care about TLS or IP addresses or ports
+ - path: '/run/main_public.sock'
+ type: http
+{% else %}
- port: {{ SYNAPSE_HTTP_PORT or 8008 }}
tls: false
bind_addresses: ['::']
type: http
x_forwarded: false
-
+{% endif %}
resources:
- names: [client]
compress: true
@@ -57,8 +62,11 @@ database:
user: "{{ POSTGRES_USER or "synapse" }}"
password: "{{ POSTGRES_PASSWORD }}"
database: "{{ POSTGRES_DB or "synapse" }}"
+{% if not SYNAPSE_USE_UNIX_SOCKET %}
+{# Synapse will use a default unix socket for Postgres when host/port is not specified (behavior from `psycopg2`). #}
host: "{{ POSTGRES_HOST or "db" }}"
port: "{{ POSTGRES_PORT or "5432" }}"
+{% endif %}
cp_min: 5
cp_max: 10
{% else %}
diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py
index 62fb88daab..dc824038b5 100755
--- a/docker/configure_workers_and_start.py
+++ b/docker/configure_workers_and_start.py
@@ -74,6 +74,9 @@ MAIN_PROCESS_HTTP_LISTENER_PORT = 8080
MAIN_PROCESS_INSTANCE_NAME = "main"
MAIN_PROCESS_LOCALHOST_ADDRESS = "127.0.0.1"
MAIN_PROCESS_REPLICATION_PORT = 9093
+# Obviously, these would only be used with the UNIX socket option
+MAIN_PROCESS_UNIX_SOCKET_PUBLIC_PATH = "/run/main_public.sock"
+MAIN_PROCESS_UNIX_SOCKET_PRIVATE_PATH = "/run/main_private.sock"
# A simple name used as a placeholder in the WORKERS_CONFIG below. This will be replaced
# during processing with the name of the worker.
@@ -407,11 +410,15 @@ def add_worker_roles_to_shared_config(
)
# Map of stream writer instance names to host/ports combos
- instance_map[worker_name] = {
- "host": "localhost",
- "port": worker_port,
- }
-
+ if os.environ.get("SYNAPSE_USE_UNIX_SOCKET", False):
+ instance_map[worker_name] = {
+ "path": f"/run/worker.{worker_port}",
+ }
+ else:
+ instance_map[worker_name] = {
+ "host": "localhost",
+ "port": worker_port,
+ }
# Update the list of stream writers. It's convenient that the name of the worker
# type is the same as the stream to write. Iterate over the whole list in case there
# is more than one.
@@ -423,10 +430,15 @@ def add_worker_roles_to_shared_config(
# Map of stream writer instance names to host/ports combos
# For now, all stream writers need http replication ports
- instance_map[worker_name] = {
- "host": "localhost",
- "port": worker_port,
- }
+ if os.environ.get("SYNAPSE_USE_UNIX_SOCKET", False):
+ instance_map[worker_name] = {
+ "path": f"/run/worker.{worker_port}",
+ }
+ else:
+ instance_map[worker_name] = {
+ "host": "localhost",
+ "port": worker_port,
+ }
def merge_worker_template_configs(
@@ -718,17 +730,29 @@ def generate_worker_files(
# Note that yaml cares about indentation, so care should be taken to insert lines
# into files at the correct indentation below.
+ # Convenience helper for if using unix sockets instead of host:port
+ using_unix_sockets = environ.get("SYNAPSE_USE_UNIX_SOCKET", False)
# First read the original config file and extract the listeners block. Then we'll
# add another listener for replication. Later we'll write out the result to the
# shared config file.
- listeners = [
- {
- "port": MAIN_PROCESS_REPLICATION_PORT,
- "bind_address": MAIN_PROCESS_LOCALHOST_ADDRESS,
- "type": "http",
- "resources": [{"names": ["replication"]}],
- }
- ]
+ listeners: List[Any]
+ if using_unix_sockets:
+ listeners = [
+ {
+ "path": MAIN_PROCESS_UNIX_SOCKET_PRIVATE_PATH,
+ "type": "http",
+ "resources": [{"names": ["replication"]}],
+ }
+ ]
+ else:
+ listeners = [
+ {
+ "port": MAIN_PROCESS_REPLICATION_PORT,
+ "bind_address": MAIN_PROCESS_LOCALHOST_ADDRESS,
+ "type": "http",
+ "resources": [{"names": ["replication"]}],
+ }
+ ]
with open(config_path) as file_stream:
original_config = yaml.safe_load(file_stream)
original_listeners = original_config.get("listeners")
@@ -769,7 +793,17 @@ def generate_worker_files(
# A list of internal endpoints to healthcheck, starting with the main process
# which exists even if no workers do.
- healthcheck_urls = ["http://localhost:8080/health"]
+ # This list ends up being part of the command line to curl, (curl added support for
+ # Unix sockets in version 7.40).
+ if using_unix_sockets:
+ healthcheck_urls = [
+ f"--unix-socket {MAIN_PROCESS_UNIX_SOCKET_PUBLIC_PATH} "
+ # The scheme and hostname from the following URL are ignored.
+ # The only thing that matters is the path `/health`
+ "http://localhost/health"
+ ]
+ else:
+ healthcheck_urls = ["http://localhost:8080/health"]
# Get the set of all worker types that we have configured
all_worker_types_in_use = set(chain(*requested_worker_types.values()))
@@ -806,8 +840,12 @@ def generate_worker_files(
# given worker_type needs to stay assigned and not be replaced.
worker_config["shared_extra_conf"].update(shared_config)
shared_config = worker_config["shared_extra_conf"]
-
- healthcheck_urls.append("http://localhost:%d/health" % (worker_port,))
+ if using_unix_sockets:
+ healthcheck_urls.append(
+ f"--unix-socket /run/worker.{worker_port} http://localhost/health"
+ )
+ else:
+ healthcheck_urls.append("http://localhost:%d/health" % (worker_port,))
# Update the shared config with sharding-related options if necessary
add_worker_roles_to_shared_config(
@@ -826,6 +864,7 @@ def generate_worker_files(
"/conf/workers/{name}.yaml".format(name=worker_name),
**worker_config,
worker_log_config_filepath=log_config_filepath,
+ using_unix_sockets=using_unix_sockets,
)
# Save this worker's port number to the correct nginx upstreams
@@ -846,8 +885,13 @@ def generate_worker_files(
nginx_upstream_config = ""
for upstream_worker_base_name, upstream_worker_ports in nginx_upstreams.items():
body = ""
- for port in upstream_worker_ports:
- body += f" server localhost:{port};\n"
+ if using_unix_sockets:
+ for port in upstream_worker_ports:
+ body += f" server unix:/run/worker.{port};\n"
+
+ else:
+ for port in upstream_worker_ports:
+ body += f" server localhost:{port};\n"
# Add to the list of configured upstreams
nginx_upstream_config += NGINX_UPSTREAM_CONFIG_BLOCK.format(
@@ -877,10 +921,15 @@ def generate_worker_files(
# If there are workers, add the main process to the instance_map too.
if workers_in_use:
instance_map = shared_config.setdefault("instance_map", {})
- instance_map[MAIN_PROCESS_INSTANCE_NAME] = {
- "host": MAIN_PROCESS_LOCALHOST_ADDRESS,
- "port": MAIN_PROCESS_REPLICATION_PORT,
- }
+ if using_unix_sockets:
+ instance_map[MAIN_PROCESS_INSTANCE_NAME] = {
+ "path": MAIN_PROCESS_UNIX_SOCKET_PRIVATE_PATH,
+ }
+ else:
+ instance_map[MAIN_PROCESS_INSTANCE_NAME] = {
+ "host": MAIN_PROCESS_LOCALHOST_ADDRESS,
+ "port": MAIN_PROCESS_REPLICATION_PORT,
+ }
# Shared homeserver config
convert(
@@ -890,6 +939,7 @@ def generate_worker_files(
appservice_registrations=appservice_registrations,
enable_redis=workers_in_use,
workers_in_use=workers_in_use,
+ using_unix_sockets=using_unix_sockets,
)
# Nginx config
@@ -900,6 +950,7 @@ def generate_worker_files(
upstream_directives=nginx_upstream_config,
tls_cert_path=os.environ.get("SYNAPSE_TLS_CERT"),
tls_key_path=os.environ.get("SYNAPSE_TLS_KEY"),
+ using_unix_sockets=using_unix_sockets,
)
# Supervisord config
@@ -909,6 +960,7 @@ def generate_worker_files(
"/etc/supervisor/supervisord.conf",
main_config_path=config_path,
enable_redis=workers_in_use,
+ using_unix_sockets=using_unix_sockets,
)
convert(
diff --git a/docs/development/contributing_guide.md b/docs/development/contributing_guide.md
index e9210b1776..698687b91f 100644
--- a/docs/development/contributing_guide.md
+++ b/docs/development/contributing_guide.md
@@ -370,6 +370,7 @@ The above will run a monolithic (single-process) Synapse with SQLite as the data
See the [worker documentation](../workers.md) for additional information on workers.
- Passing `ASYNCIO_REACTOR=1` as an environment variable to use the Twisted asyncio reactor instead of the default one.
- Passing `PODMAN=1` will use the [podman](https://podman.io/) container runtime, instead of docker.
+- Passing `UNIX_SOCKETS=1` will utilise Unix socket functionality for Synapse, Redis, and Postgres(when applicable).
To increase the log level for the tests, set `SYNAPSE_TEST_LOG_LEVEL`, e.g:
```sh
diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md
index ff59cbccc1..d9286e83bc 100644
--- a/docs/usage/configuration/config_documentation.md
+++ b/docs/usage/configuration/config_documentation.md
@@ -462,6 +462,20 @@ See the docs [request log format](../administration/request_log.md).
* `additional_resources`: Only valid for an 'http' listener. A map of
additional endpoints which should be loaded via dynamic modules.
+Unix socket support (_Added in Synapse 1.88.0_):
+* `path`: A path and filename for a Unix socket. Make sure it is located in a
+ directory with read and write permissions, and that it already exists (the directory
+ will not be created). Defaults to `None`.
+ * **Note**: The use of both `path` and `port` options for the same `listener` is not
+ compatible.
+ * The `x_forwarded` option defaults to true when using Unix sockets and can be omitted.
+ * Other options that would not make sense to use with a UNIX socket, such as
+ `bind_addresses` and `tls` will be ignored and can be removed.
+* `mode`: The file permissions to set on the UNIX socket. Defaults to `666`
+* **Note:** Must be set as `type: http` (does not support `metrics` and `manhole`).
+ Also make sure that `metrics` is not included in `resources` -> `names`
+
+
Valid resource names are:
* `client`: the client-server API (/_matrix/client), and the synapse admin API (/_synapse/admin). Also implies `media` and `static`.
@@ -474,7 +488,7 @@ Valid resource names are:
* `media`: the media API (/_matrix/media).
-* `metrics`: the metrics interface. See [here](../../metrics-howto.md).
+* `metrics`: the metrics interface. See [here](../../metrics-howto.md). (Not compatible with Unix sockets)
* `openid`: OpenID authentication. See [here](../../openid.md).
@@ -533,6 +547,22 @@ listeners:
bind_addresses: ['::1', '127.0.0.1']
type: manhole
```
+Example configuration #3:
+```yaml
+listeners:
+ # Unix socket listener: Ideal for Synapse deployments behind a reverse proxy, offering
+ # lightweight interprocess communication without TCP/IP overhead, avoid port
+ # conflicts, and providing enhanced security through system file permissions.
+ #
+ # Note that x_forwarded will default to true, when using a UNIX socket. Please see
+ # https://matrix-org.github.io/synapse/latest/reverse_proxy.html.
+ #
+ - path: /var/run/synapse/main_public.sock
+ type: http
+ resources:
+ - names: [client, federation]
+```
+
---
### `manhole_settings`
@@ -3949,6 +3979,14 @@ instance_map:
host: localhost
port: 8034
```
+Example configuration(#2, for UNIX sockets):
+```yaml
+instance_map:
+ main:
+ path: /var/run/synapse/main_replication.sock
+ worker1:
+ path: /var/run/synapse/worker1_replication.sock
+```
---
### `stream_writers`
@@ -4108,6 +4146,18 @@ worker_listeners:
resources:
- names: [client, federation]
```
+Example configuration(#2, using UNIX sockets with a `replication` listener):
+```yaml
+worker_listeners:
+ - type: http
+ path: /var/run/synapse/worker_public.sock
+ resources:
+ - names: [client, federation]
+ - type: http
+ path: /var/run/synapse/worker_replication.sock
+ resources:
+ - names: [replication]
+```
---
### `worker_manhole`
diff --git a/docs/workers.md b/docs/workers.md
index 828f082e75..735cd3f18d 100644
--- a/docs/workers.md
+++ b/docs/workers.md
@@ -95,9 +95,12 @@ for the main process
* Secondly, you need to enable
[redis-based replication](usage/configuration/config_documentation.md#redis)
* You will need to add an [`instance_map`](usage/configuration/config_documentation.md#instance_map)
-with the `main` process defined, as well as the relevant connection information from
-it's HTTP `replication` listener (defined in step 1 above). Note that the `host` defined
-is the address the worker needs to look for the `main` process at, not necessarily the same address that is bound to.
+with the `main` process defined, as well as the relevant connection information from
+it's HTTP `replication` listener (defined in step 1 above).
+ * Note that the `host` defined is the address the worker needs to look for the `main`
+ process at, not necessarily the same address that is bound to.
+ * If you are using Unix sockets for the `replication` resource, make sure to
+ use a `path` to the socket file instead of a `port`.
* Optionally, a [shared secret](usage/configuration/config_documentation.md#worker_replication_secret)
can be used to authenticate HTTP traffic between workers. For example:
diff --git a/scripts-dev/complement.sh b/scripts-dev/complement.sh
index 24b83cfeb6..fea76cb5af 100755
--- a/scripts-dev/complement.sh
+++ b/scripts-dev/complement.sh
@@ -253,6 +253,10 @@ if [[ -n "$ASYNCIO_REACTOR" ]]; then
export PASS_SYNAPSE_COMPLEMENT_USE_ASYNCIO_REACTOR=true
fi
+if [[ -n "$UNIX_SOCKETS" ]]; then
+ # Enable full on Unix socket mode for Synapse, Redis and Postgresql
+ export PASS_SYNAPSE_USE_UNIX_SOCKET=1
+fi
if [[ -n "$SYNAPSE_TEST_LOG_LEVEL" ]]; then
# Set the log level to what is desired
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index ccfe75eaf3..e55ca12a36 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -94,7 +94,7 @@ class ConfigModel(BaseModel):
allow_mutation = False
-class InstanceLocationConfig(ConfigModel):
+class InstanceTcpLocationConfig(ConfigModel):
"""The host and port to talk to an instance via HTTP replication."""
host: StrictStr
@@ -110,6 +110,23 @@ class InstanceLocationConfig(ConfigModel):
return f"{self.host}:{self.port}"
+class InstanceUnixLocationConfig(ConfigModel):
+ """The socket file to talk to an instance via HTTP replication."""
+
+ path: StrictStr
+
+ def scheme(self) -> str:
+ """Hardcode a retrievable scheme"""
+ return "unix"
+
+ def netloc(self) -> str:
+ """Nicely format the address location data"""
+ return f"{self.path}"
+
+
+InstanceLocationConfig = Union[InstanceTcpLocationConfig, InstanceUnixLocationConfig]
+
+
@attr.s
class WriterLocations:
"""Specifies the instances that write various streams.
@@ -270,9 +287,12 @@ class WorkerConfig(Config):
% MAIN_PROCESS_INSTANCE_MAP_NAME
)
+ # type-ignore: the expression `Union[A, B]` is not a Type[Union[A, B]] currently
self.instance_map: Dict[
str, InstanceLocationConfig
- ] = parse_and_validate_mapping(instance_map, InstanceLocationConfig)
+ ] = parse_and_validate_mapping(
+ instance_map, InstanceLocationConfig # type: ignore[arg-type]
+ )
# Map from type of streams to source, c.f. WriterLocations.
writers = config.get("stream_writers") or {}
diff --git a/synapse/http/replicationagent.py b/synapse/http/replicationagent.py
index d6ba6f0e57..3ba2f22dfd 100644
--- a/synapse/http/replicationagent.py
+++ b/synapse/http/replicationagent.py
@@ -18,7 +18,11 @@ from typing import Dict, Optional
from zope.interface import implementer
from twisted.internet import defer
-from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
+from twisted.internet.endpoints import (
+ HostnameEndpoint,
+ UNIXClientEndpoint,
+ wrapClientTLS,
+)
from twisted.internet.interfaces import IStreamClientEndpoint
from twisted.python.failure import Failure
from twisted.web.client import URI, HTTPConnectionPool, _AgentBase
@@ -32,7 +36,11 @@ from twisted.web.iweb import (
IResponse,
)
-from synapse.config.workers import InstanceLocationConfig
+from synapse.config.workers import (
+ InstanceLocationConfig,
+ InstanceTcpLocationConfig,
+ InstanceUnixLocationConfig,
+)
from synapse.types import ISynapseReactor
logger = logging.getLogger(__name__)
@@ -40,7 +48,7 @@ logger = logging.getLogger(__name__)
@implementer(IAgentEndpointFactory)
class ReplicationEndpointFactory:
- """Connect to a given TCP socket"""
+ """Connect to a given TCP or UNIX socket"""
def __init__(
self,
@@ -64,24 +72,27 @@ class ReplicationEndpointFactory:
# The given URI has a special scheme and includes the worker name. The
# actual connection details are pulled from the instance map.
worker_name = uri.netloc.decode("utf-8")
- scheme = self.instance_map[worker_name].scheme()
+ location_config = self.instance_map[worker_name]
+ scheme = location_config.scheme()
- if scheme in ("http", "https"):
+ if isinstance(location_config, InstanceTcpLocationConfig):
endpoint = HostnameEndpoint(
self.reactor,
- self.instance_map[worker_name].host,
- self.instance_map[worker_name].port,
+ location_config.host,
+ location_config.port,
)
if scheme == "https":
endpoint = wrapClientTLS(
# The 'port' argument below isn't actually used by the function
self.context_factory.creatorForNetloc(
- self.instance_map[worker_name].host.encode("utf-8"),
- self.instance_map[worker_name].port,
+ location_config.host.encode("utf-8"),
+ location_config.port,
),
endpoint,
)
return endpoint
+ elif isinstance(location_config, InstanceUnixLocationConfig):
+ return UNIXClientEndpoint(self.reactor, location_config.path)
else:
raise SchemeNotSupported(f"Unsupported scheme: {scheme}")
@@ -138,13 +149,16 @@ class ReplicationAgent(_AgentBase):
An existing connection from the connection pool may be used or a new
one may be created.
- Currently, HTTP and HTTPS schemes are supported in uri.
+ Currently, HTTP, HTTPS and UNIX schemes are supported in uri.
This is copied from twisted.web.client.Agent, except:
- * It uses a different pool key (combining the host & port).
- * It does not call _ensureValidURI(...) since it breaks on some
- UNIX paths.
+ * It uses a different pool key (combining the scheme with either host & port or
+ socket path).
+ * It does not call _ensureValidURI(...) as the strictness of IDNA2008 is not
+ required when using a worker's name as a 'hostname' for Synapse HTTP
+ Replication machinery. Specifically, this allows a range of ascii characters
+ such as '+' and '_' in hostnames/worker's names.
See: twisted.web.iweb.IAgent.request
"""
@@ -154,9 +168,12 @@ class ReplicationAgent(_AgentBase):
except SchemeNotSupported:
return defer.fail(Failure())
+ worker_name = parsedURI.netloc.decode("utf-8")
+ key_scheme = self._endpointFactory.instance_map[worker_name].scheme()
+ key_netloc = self._endpointFactory.instance_map[worker_name].netloc()
# This sets the Pool key to be:
- # (http(s), <host:ip>)
- key = (parsedURI.scheme, parsedURI.netloc)
+ # (http(s), <host:port>) or (unix, <socket_path>)
+ key = (key_scheme, key_netloc)
# _requestWithEndpoint comes from _AgentBase class
return self._requestWithEndpoint(
diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py
index 75217e3f45..be910128aa 100644
--- a/synapse/logging/opentracing.py
+++ b/synapse/logging/opentracing.py
@@ -1070,7 +1070,7 @@ def trace_servlet(
tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER,
tags.HTTP_METHOD: request.get_method(),
tags.HTTP_URL: request.get_redacted_uri(),
- tags.PEER_HOST_IPV6: request.getClientAddress().host,
+ tags.PEER_HOST_IPV6: request.get_client_ip_if_available(),
}
request_name = request.request_metrics.name
@@ -1091,9 +1091,11 @@ def trace_servlet(
# with JsonResource).
scope.span.set_operation_name(request.request_metrics.name)
+ # Mypy seems to think that start_context.tag below can be Optional[str], but
+ # that doesn't appear to be correct and works in practice.
request_tags[
SynapseTags.REQUEST_TAG
- ] = request.request_metrics.start_context.tag
+ ] = request.request_metrics.start_context.tag # type: ignore[assignment]
# set the tags *after* the servlet completes, in case it decided to
# prioritise the span (tags will get dropped on unprioritised spans)
diff --git a/tests/replication/_base.py b/tests/replication/_base.py
index eb9b1f1cd9..39aadb9ed5 100644
--- a/tests/replication/_base.py
+++ b/tests/replication/_base.py
@@ -22,6 +22,7 @@ from twisted.test.proto_helpers import MemoryReactor
from twisted.web.resource import Resource
from synapse.app.generic_worker import GenericWorkerServer
+from synapse.config.workers import InstanceTcpLocationConfig, InstanceUnixLocationConfig
from synapse.http.site import SynapseRequest, SynapseSite
from synapse.replication.http import ReplicationRestResource
from synapse.replication.tcp.client import ReplicationDataHandler
@@ -339,7 +340,7 @@ class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase):
# `_handle_http_replication_attempt` like we do with the master HS.
instance_name = worker_hs.get_instance_name()
instance_loc = worker_hs.config.worker.instance_map.get(instance_name)
- if instance_loc:
+ if instance_loc and isinstance(instance_loc, InstanceTcpLocationConfig):
# Ensure the host is one that has a fake DNS entry.
if instance_loc.host not in self.reactor.lookups:
raise Exception(
@@ -360,6 +361,10 @@ class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase):
instance_loc.port,
lambda: self._handle_http_replication_attempt(worker_hs, port),
)
+ elif instance_loc and isinstance(instance_loc, InstanceUnixLocationConfig):
+ raise Exception(
+ "Unix sockets are not supported for unit tests at this time."
+ )
store = worker_hs.get_datastores().main
store.db_pool._db_pool = self.database_pool._db_pool
diff --git a/tests/server.py b/tests/server.py
index a12c3e3b9a..c84a524e8c 100644
--- a/tests/server.py
+++ b/tests/server.py
@@ -53,6 +53,7 @@ from twisted.internet.interfaces import (
IConnector,
IConsumer,
IHostnameResolver,
+ IListeningPort,
IProducer,
IProtocol,
IPullProducer,
@@ -62,7 +63,7 @@ from twisted.internet.interfaces import (
IResolverSimple,
ITransport,
)
-from twisted.internet.protocol import ClientFactory, DatagramProtocol
+from twisted.internet.protocol import ClientFactory, DatagramProtocol, Factory
from twisted.python import threadpool
from twisted.python.failure import Failure
from twisted.test.proto_helpers import AccumulatingProtocol, MemoryReactorClock
@@ -523,6 +524,35 @@ class ThreadedMemoryReactorClock(MemoryReactorClock):
"""
self._tcp_callbacks[(host, port)] = callback
+ def connectUNIX(
+ self,
+ address: str,
+ factory: ClientFactory,
+ timeout: float = 30,
+ checkPID: int = 0,
+ ) -> IConnector:
+ """
+ Unix sockets aren't supported for unit tests yet. Make it obvious to any
+ developer trying it out that they will need to do some work before being able
+ to use it in tests.
+ """
+ raise Exception("Unix sockets are not implemented for tests yet, sorry.")
+
+ def listenUNIX(
+ self,
+ address: str,
+ factory: Factory,
+ backlog: int = 50,
+ mode: int = 0o666,
+ wantPID: int = 0,
+ ) -> IListeningPort:
+ """
+ Unix sockets aren't supported for unit tests yet. Make it obvious to any
+ developer trying it out that they will need to do some work before being able
+ to use it in tests.
+ """
+ raise Exception("Unix sockets are not implemented for tests, sorry")
+
def connectTCP(
self,
host: str,
|