From 224ef0b669fdd85925d66deb38ba1b51c5aaa1bd Mon Sep 17 00:00:00 2001 From: Jason Little Date: Tue, 11 Jul 2023 13:08:06 -0500 Subject: Unix Sockets for HTTP Replication (#15708) Unix socket support for `federation` and `client` Listeners has existed now for a little while(since [1.81.0](https://github.com/matrix-org/synapse/pull/15353)), but there was one last hold out before it could be complete: HTTP Replication communication. This should finish it up. The Listeners would have always worked, but would have had no way to be talked to/at. --------- Co-authored-by: Eric Eastwood Co-authored-by: Olivier Wilkinson (reivilibre) Co-authored-by: Eric Eastwood --- docker/configure_workers_and_start.py | 104 +++++++++++++++++++++++++--------- 1 file changed, 78 insertions(+), 26 deletions(-) (limited to 'docker/configure_workers_and_start.py') diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py index 62fb88daab..dc824038b5 100755 --- a/docker/configure_workers_and_start.py +++ b/docker/configure_workers_and_start.py @@ -74,6 +74,9 @@ MAIN_PROCESS_HTTP_LISTENER_PORT = 8080 MAIN_PROCESS_INSTANCE_NAME = "main" MAIN_PROCESS_LOCALHOST_ADDRESS = "127.0.0.1" MAIN_PROCESS_REPLICATION_PORT = 9093 +# Obviously, these would only be used with the UNIX socket option +MAIN_PROCESS_UNIX_SOCKET_PUBLIC_PATH = "/run/main_public.sock" +MAIN_PROCESS_UNIX_SOCKET_PRIVATE_PATH = "/run/main_private.sock" # A simple name used as a placeholder in the WORKERS_CONFIG below. This will be replaced # during processing with the name of the worker. @@ -407,11 +410,15 @@ def add_worker_roles_to_shared_config( ) # Map of stream writer instance names to host/ports combos - instance_map[worker_name] = { - "host": "localhost", - "port": worker_port, - } - + if os.environ.get("SYNAPSE_USE_UNIX_SOCKET", False): + instance_map[worker_name] = { + "path": f"/run/worker.{worker_port}", + } + else: + instance_map[worker_name] = { + "host": "localhost", + "port": worker_port, + } # Update the list of stream writers. It's convenient that the name of the worker # type is the same as the stream to write. Iterate over the whole list in case there # is more than one. @@ -423,10 +430,15 @@ def add_worker_roles_to_shared_config( # Map of stream writer instance names to host/ports combos # For now, all stream writers need http replication ports - instance_map[worker_name] = { - "host": "localhost", - "port": worker_port, - } + if os.environ.get("SYNAPSE_USE_UNIX_SOCKET", False): + instance_map[worker_name] = { + "path": f"/run/worker.{worker_port}", + } + else: + instance_map[worker_name] = { + "host": "localhost", + "port": worker_port, + } def merge_worker_template_configs( @@ -718,17 +730,29 @@ def generate_worker_files( # Note that yaml cares about indentation, so care should be taken to insert lines # into files at the correct indentation below. + # Convenience helper for if using unix sockets instead of host:port + using_unix_sockets = environ.get("SYNAPSE_USE_UNIX_SOCKET", False) # First read the original config file and extract the listeners block. Then we'll # add another listener for replication. Later we'll write out the result to the # shared config file. - listeners = [ - { - "port": MAIN_PROCESS_REPLICATION_PORT, - "bind_address": MAIN_PROCESS_LOCALHOST_ADDRESS, - "type": "http", - "resources": [{"names": ["replication"]}], - } - ] + listeners: List[Any] + if using_unix_sockets: + listeners = [ + { + "path": MAIN_PROCESS_UNIX_SOCKET_PRIVATE_PATH, + "type": "http", + "resources": [{"names": ["replication"]}], + } + ] + else: + listeners = [ + { + "port": MAIN_PROCESS_REPLICATION_PORT, + "bind_address": MAIN_PROCESS_LOCALHOST_ADDRESS, + "type": "http", + "resources": [{"names": ["replication"]}], + } + ] with open(config_path) as file_stream: original_config = yaml.safe_load(file_stream) original_listeners = original_config.get("listeners") @@ -769,7 +793,17 @@ def generate_worker_files( # A list of internal endpoints to healthcheck, starting with the main process # which exists even if no workers do. - healthcheck_urls = ["http://localhost:8080/health"] + # This list ends up being part of the command line to curl, (curl added support for + # Unix sockets in version 7.40). + if using_unix_sockets: + healthcheck_urls = [ + f"--unix-socket {MAIN_PROCESS_UNIX_SOCKET_PUBLIC_PATH} " + # The scheme and hostname from the following URL are ignored. + # The only thing that matters is the path `/health` + "http://localhost/health" + ] + else: + healthcheck_urls = ["http://localhost:8080/health"] # Get the set of all worker types that we have configured all_worker_types_in_use = set(chain(*requested_worker_types.values())) @@ -806,8 +840,12 @@ def generate_worker_files( # given worker_type needs to stay assigned and not be replaced. worker_config["shared_extra_conf"].update(shared_config) shared_config = worker_config["shared_extra_conf"] - - healthcheck_urls.append("http://localhost:%d/health" % (worker_port,)) + if using_unix_sockets: + healthcheck_urls.append( + f"--unix-socket /run/worker.{worker_port} http://localhost/health" + ) + else: + healthcheck_urls.append("http://localhost:%d/health" % (worker_port,)) # Update the shared config with sharding-related options if necessary add_worker_roles_to_shared_config( @@ -826,6 +864,7 @@ def generate_worker_files( "/conf/workers/{name}.yaml".format(name=worker_name), **worker_config, worker_log_config_filepath=log_config_filepath, + using_unix_sockets=using_unix_sockets, ) # Save this worker's port number to the correct nginx upstreams @@ -846,8 +885,13 @@ def generate_worker_files( nginx_upstream_config = "" for upstream_worker_base_name, upstream_worker_ports in nginx_upstreams.items(): body = "" - for port in upstream_worker_ports: - body += f" server localhost:{port};\n" + if using_unix_sockets: + for port in upstream_worker_ports: + body += f" server unix:/run/worker.{port};\n" + + else: + for port in upstream_worker_ports: + body += f" server localhost:{port};\n" # Add to the list of configured upstreams nginx_upstream_config += NGINX_UPSTREAM_CONFIG_BLOCK.format( @@ -877,10 +921,15 @@ def generate_worker_files( # If there are workers, add the main process to the instance_map too. if workers_in_use: instance_map = shared_config.setdefault("instance_map", {}) - instance_map[MAIN_PROCESS_INSTANCE_NAME] = { - "host": MAIN_PROCESS_LOCALHOST_ADDRESS, - "port": MAIN_PROCESS_REPLICATION_PORT, - } + if using_unix_sockets: + instance_map[MAIN_PROCESS_INSTANCE_NAME] = { + "path": MAIN_PROCESS_UNIX_SOCKET_PRIVATE_PATH, + } + else: + instance_map[MAIN_PROCESS_INSTANCE_NAME] = { + "host": MAIN_PROCESS_LOCALHOST_ADDRESS, + "port": MAIN_PROCESS_REPLICATION_PORT, + } # Shared homeserver config convert( @@ -890,6 +939,7 @@ def generate_worker_files( appservice_registrations=appservice_registrations, enable_redis=workers_in_use, workers_in_use=workers_in_use, + using_unix_sockets=using_unix_sockets, ) # Nginx config @@ -900,6 +950,7 @@ def generate_worker_files( upstream_directives=nginx_upstream_config, tls_cert_path=os.environ.get("SYNAPSE_TLS_CERT"), tls_key_path=os.environ.get("SYNAPSE_TLS_KEY"), + using_unix_sockets=using_unix_sockets, ) # Supervisord config @@ -909,6 +960,7 @@ def generate_worker_files( "/etc/supervisor/supervisord.conf", main_config_path=config_path, enable_redis=workers_in_use, + using_unix_sockets=using_unix_sockets, ) convert( -- cgit 1.4.1