summary refs log tree commit diff
path: root/docker/configure_workers_and_start.py
diff options
context:
space:
mode:
Diffstat (limited to 'docker/configure_workers_and_start.py')
-rwxr-xr-xdocker/configure_workers_and_start.py104
1 files changed, 78 insertions, 26 deletions
diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py
index 62fb88daab..dc824038b5 100755
--- a/docker/configure_workers_and_start.py
+++ b/docker/configure_workers_and_start.py
@@ -74,6 +74,9 @@ MAIN_PROCESS_HTTP_LISTENER_PORT = 8080
 MAIN_PROCESS_INSTANCE_NAME = "main"
 MAIN_PROCESS_LOCALHOST_ADDRESS = "127.0.0.1"
 MAIN_PROCESS_REPLICATION_PORT = 9093
+# Obviously, these would only be used with the UNIX socket option
+MAIN_PROCESS_UNIX_SOCKET_PUBLIC_PATH = "/run/main_public.sock"
+MAIN_PROCESS_UNIX_SOCKET_PRIVATE_PATH = "/run/main_private.sock"
 
 # A simple name used as a placeholder in the WORKERS_CONFIG below. This will be replaced
 # during processing with the name of the worker.
@@ -407,11 +410,15 @@ def add_worker_roles_to_shared_config(
         )
 
         # Map of stream writer instance names to host/ports combos
-        instance_map[worker_name] = {
-            "host": "localhost",
-            "port": worker_port,
-        }
-
+        if os.environ.get("SYNAPSE_USE_UNIX_SOCKET", False):
+            instance_map[worker_name] = {
+                "path": f"/run/worker.{worker_port}",
+            }
+        else:
+            instance_map[worker_name] = {
+                "host": "localhost",
+                "port": worker_port,
+            }
     # Update the list of stream writers. It's convenient that the name of the worker
     # type is the same as the stream to write. Iterate over the whole list in case there
     # is more than one.
@@ -423,10 +430,15 @@ def add_worker_roles_to_shared_config(
 
             # Map of stream writer instance names to host/ports combos
             # For now, all stream writers need http replication ports
-            instance_map[worker_name] = {
-                "host": "localhost",
-                "port": worker_port,
-            }
+            if os.environ.get("SYNAPSE_USE_UNIX_SOCKET", False):
+                instance_map[worker_name] = {
+                    "path": f"/run/worker.{worker_port}",
+                }
+            else:
+                instance_map[worker_name] = {
+                    "host": "localhost",
+                    "port": worker_port,
+                }
 
 
 def merge_worker_template_configs(
@@ -718,17 +730,29 @@ def generate_worker_files(
     # Note that yaml cares about indentation, so care should be taken to insert lines
     # into files at the correct indentation below.
 
+    # Convenience helper for if using unix sockets instead of host:port
+    using_unix_sockets = environ.get("SYNAPSE_USE_UNIX_SOCKET", False)
     # First read the original config file and extract the listeners block. Then we'll
     # add another listener for replication. Later we'll write out the result to the
     # shared config file.
-    listeners = [
-        {
-            "port": MAIN_PROCESS_REPLICATION_PORT,
-            "bind_address": MAIN_PROCESS_LOCALHOST_ADDRESS,
-            "type": "http",
-            "resources": [{"names": ["replication"]}],
-        }
-    ]
+    listeners: List[Any]
+    if using_unix_sockets:
+        listeners = [
+            {
+                "path": MAIN_PROCESS_UNIX_SOCKET_PRIVATE_PATH,
+                "type": "http",
+                "resources": [{"names": ["replication"]}],
+            }
+        ]
+    else:
+        listeners = [
+            {
+                "port": MAIN_PROCESS_REPLICATION_PORT,
+                "bind_address": MAIN_PROCESS_LOCALHOST_ADDRESS,
+                "type": "http",
+                "resources": [{"names": ["replication"]}],
+            }
+        ]
     with open(config_path) as file_stream:
         original_config = yaml.safe_load(file_stream)
         original_listeners = original_config.get("listeners")
@@ -769,7 +793,17 @@ def generate_worker_files(
 
     # A list of internal endpoints to healthcheck, starting with the main process
     # which exists even if no workers do.
-    healthcheck_urls = ["http://localhost:8080/health"]
+    # This list ends up being part of the command line to curl, (curl added support for
+    # Unix sockets in version 7.40).
+    if using_unix_sockets:
+        healthcheck_urls = [
+            f"--unix-socket {MAIN_PROCESS_UNIX_SOCKET_PUBLIC_PATH} "
+            # The scheme and hostname from the following URL are ignored.
+            # The only thing that matters is the path `/health`
+            "http://localhost/health"
+        ]
+    else:
+        healthcheck_urls = ["http://localhost:8080/health"]
 
     # Get the set of all worker types that we have configured
     all_worker_types_in_use = set(chain(*requested_worker_types.values()))
@@ -806,8 +840,12 @@ def generate_worker_files(
         # given worker_type needs to stay assigned and not be replaced.
         worker_config["shared_extra_conf"].update(shared_config)
         shared_config = worker_config["shared_extra_conf"]
-
-        healthcheck_urls.append("http://localhost:%d/health" % (worker_port,))
+        if using_unix_sockets:
+            healthcheck_urls.append(
+                f"--unix-socket /run/worker.{worker_port} http://localhost/health"
+            )
+        else:
+            healthcheck_urls.append("http://localhost:%d/health" % (worker_port,))
 
         # Update the shared config with sharding-related options if necessary
         add_worker_roles_to_shared_config(
@@ -826,6 +864,7 @@ def generate_worker_files(
             "/conf/workers/{name}.yaml".format(name=worker_name),
             **worker_config,
             worker_log_config_filepath=log_config_filepath,
+            using_unix_sockets=using_unix_sockets,
         )
 
         # Save this worker's port number to the correct nginx upstreams
@@ -846,8 +885,13 @@ def generate_worker_files(
     nginx_upstream_config = ""
     for upstream_worker_base_name, upstream_worker_ports in nginx_upstreams.items():
         body = ""
-        for port in upstream_worker_ports:
-            body += f"    server localhost:{port};\n"
+        if using_unix_sockets:
+            for port in upstream_worker_ports:
+                body += f"    server unix:/run/worker.{port};\n"
+
+        else:
+            for port in upstream_worker_ports:
+                body += f"    server localhost:{port};\n"
 
         # Add to the list of configured upstreams
         nginx_upstream_config += NGINX_UPSTREAM_CONFIG_BLOCK.format(
@@ -877,10 +921,15 @@ def generate_worker_files(
     # If there are workers, add the main process to the instance_map too.
     if workers_in_use:
         instance_map = shared_config.setdefault("instance_map", {})
-        instance_map[MAIN_PROCESS_INSTANCE_NAME] = {
-            "host": MAIN_PROCESS_LOCALHOST_ADDRESS,
-            "port": MAIN_PROCESS_REPLICATION_PORT,
-        }
+        if using_unix_sockets:
+            instance_map[MAIN_PROCESS_INSTANCE_NAME] = {
+                "path": MAIN_PROCESS_UNIX_SOCKET_PRIVATE_PATH,
+            }
+        else:
+            instance_map[MAIN_PROCESS_INSTANCE_NAME] = {
+                "host": MAIN_PROCESS_LOCALHOST_ADDRESS,
+                "port": MAIN_PROCESS_REPLICATION_PORT,
+            }
 
     # Shared homeserver config
     convert(
@@ -890,6 +939,7 @@ def generate_worker_files(
         appservice_registrations=appservice_registrations,
         enable_redis=workers_in_use,
         workers_in_use=workers_in_use,
+        using_unix_sockets=using_unix_sockets,
     )
 
     # Nginx config
@@ -900,6 +950,7 @@ def generate_worker_files(
         upstream_directives=nginx_upstream_config,
         tls_cert_path=os.environ.get("SYNAPSE_TLS_CERT"),
         tls_key_path=os.environ.get("SYNAPSE_TLS_KEY"),
+        using_unix_sockets=using_unix_sockets,
     )
 
     # Supervisord config
@@ -909,6 +960,7 @@ def generate_worker_files(
         "/etc/supervisor/supervisord.conf",
         main_config_path=config_path,
         enable_redis=workers_in_use,
+        using_unix_sockets=using_unix_sockets,
     )
 
     convert(