diff options
-rw-r--r-- | changelog.d/14921.misc | 1 | ||||
-rwxr-xr-x | docker/complement/conf/start_for_complement.sh | 6 | ||||
-rwxr-xr-x | docker/configure_workers_and_start.py | 521 |
3 files changed, 413 insertions, 115 deletions
diff --git a/changelog.d/14921.misc b/changelog.d/14921.misc new file mode 100644 index 0000000000..599e23eb0c --- /dev/null +++ b/changelog.d/14921.misc @@ -0,0 +1 @@ +Add additional functionality to declaring worker types when starting Complement in worker mode. diff --git a/docker/complement/conf/start_for_complement.sh b/docker/complement/conf/start_for_complement.sh index af13209c54..5560ab8b95 100755 --- a/docker/complement/conf/start_for_complement.sh +++ b/docker/complement/conf/start_for_complement.sh @@ -51,8 +51,7 @@ if [[ -n "$SYNAPSE_COMPLEMENT_USE_WORKERS" ]]; then # -z True if the length of string is zero. if [[ -z "$SYNAPSE_WORKER_TYPES" ]]; then export SYNAPSE_WORKER_TYPES="\ - event_persister, \ - event_persister, \ + event_persister:2, \ background_worker, \ frontend_proxy, \ event_creator, \ @@ -64,7 +63,8 @@ if [[ -n "$SYNAPSE_COMPLEMENT_USE_WORKERS" ]]; then synchrotron, \ client_reader, \ appservice, \ - pusher" + pusher, \ + stream_writers=account_data+presence+receipts+to_device+typing" fi log "Workers requested: $SYNAPSE_WORKER_TYPES" diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py index add8bb1ff6..cfb16c2e22 100755 --- a/docker/configure_workers_and_start.py +++ b/docker/configure_workers_and_start.py @@ -19,8 +19,15 @@ # The environment variables it reads are: # * SYNAPSE_SERVER_NAME: The desired server_name of the homeserver. # * SYNAPSE_REPORT_STATS: Whether to report stats. -# * SYNAPSE_WORKER_TYPES: A comma separated list of worker names as specified in WORKER_CONFIG -# below. Leave empty for no workers. +# * SYNAPSE_WORKER_TYPES: A comma separated list of worker names as specified in WORKERS_CONFIG +# below. Leave empty for no workers. Add a ':' and a number at the end to +# multiply that worker. Append multiple worker types with '+' to merge the +# worker types into a single worker. Add a name and a '=' to the front of a +# worker type to give this instance a name in logs and nginx. +# Examples: +# SYNAPSE_WORKER_TYPES='event_persister, federation_sender, client_reader' +# SYNAPSE_WORKER_TYPES='event_persister:2, federation_sender:2, client_reader' +# SYNAPSE_WORKER_TYPES='stream_writers=account_data+presence+typing' # * SYNAPSE_AS_REGISTRATION_DIR: If specified, a directory in which .yaml and .yml files # will be treated as Application Service registration files. # * SYNAPSE_TLS_CERT: Path to a TLS certificate in PEM format. @@ -40,16 +47,33 @@ import os import platform +import re import subprocess import sys +from collections import defaultdict +from itertools import chain from pathlib import Path -from typing import Any, Dict, List, Mapping, MutableMapping, NoReturn, Optional, Set +from typing import ( + Any, + Dict, + List, + Mapping, + MutableMapping, + NoReturn, + Optional, + Set, + SupportsIndex, +) import yaml from jinja2 import Environment, FileSystemLoader MAIN_PROCESS_HTTP_LISTENER_PORT = 8080 +# A simple name used as a placeholder in the WORKERS_CONFIG below. This will be replaced +# during processing with the name of the worker. +WORKER_PLACEHOLDER_NAME = "placeholder_name" + # Workers with exposed endpoints needs either "client", "federation", or "media" listener_resources # Watching /_matrix/client needs a "client" listener # Watching /_matrix/federation needs a "federation" listener @@ -70,11 +94,13 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = { "endpoint_patterns": [ "^/_matrix/client/(api/v1|r0|v3|unstable)/user_directory/search$" ], - "shared_extra_conf": {"update_user_directory_from_worker": "user_dir1"}, + "shared_extra_conf": { + "update_user_directory_from_worker": WORKER_PLACEHOLDER_NAME + }, "worker_extra_conf": "", }, "media_repository": { - "app": "synapse.app.media_repository", + "app": "synapse.app.generic_worker", "listener_resources": ["media"], "endpoint_patterns": [ "^/_matrix/media/", @@ -87,7 +113,7 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = { # The first configured media worker will run the media background jobs "shared_extra_conf": { "enable_media_repo": False, - "media_instance_running_background_jobs": "media_repository1", + "media_instance_running_background_jobs": WORKER_PLACEHOLDER_NAME, }, "worker_extra_conf": "enable_media_repo: true", }, @@ -95,7 +121,9 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = { "app": "synapse.app.generic_worker", "listener_resources": [], "endpoint_patterns": [], - "shared_extra_conf": {"notify_appservices_from_worker": "appservice1"}, + "shared_extra_conf": { + "notify_appservices_from_worker": WORKER_PLACEHOLDER_NAME + }, "worker_extra_conf": "", }, "federation_sender": { @@ -192,9 +220,9 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = { "app": "synapse.app.generic_worker", "listener_resources": [], "endpoint_patterns": [], - # This worker cannot be sharded. Therefore there should only ever be one background - # worker, and it should be named background_worker1 - "shared_extra_conf": {"run_background_tasks_on": "background_worker1"}, + # This worker cannot be sharded. Therefore, there should only ever be one + # background worker. This is enforced for the safety of your database. + "shared_extra_conf": {"run_background_tasks_on": WORKER_PLACEHOLDER_NAME}, "worker_extra_conf": "", }, "event_creator": { @@ -275,7 +303,7 @@ NGINX_LOCATION_CONFIG_BLOCK = """ """ NGINX_UPSTREAM_CONFIG_BLOCK = """ -upstream {upstream_worker_type} {{ +upstream {upstream_worker_base_name} {{ {body} }} """ @@ -326,7 +354,7 @@ def convert(src: str, dst: str, **template_vars: object) -> None: def add_worker_roles_to_shared_config( shared_config: dict, - worker_type: str, + worker_types_set: Set[str], worker_name: str, worker_port: int, ) -> None: @@ -334,22 +362,36 @@ def add_worker_roles_to_shared_config( append appropriate worker information to it for the current worker_type instance. Args: - shared_config: The config dict that all worker instances share (after being converted to YAML) - worker_type: The type of worker (one of those defined in WORKERS_CONFIG). + shared_config: The config dict that all worker instances share (after being + converted to YAML) + worker_types_set: The type of worker (one of those defined in WORKERS_CONFIG). + This list can be a single worker type or multiple. worker_name: The name of the worker instance. worker_port: The HTTP replication port that the worker instance is listening on. """ - # The instance_map config field marks the workers that write to various replication streams + # The instance_map config field marks the workers that write to various replication + # streams instance_map = shared_config.setdefault("instance_map", {}) - # Worker-type specific sharding config - if worker_type == "pusher": + # This is a list of the stream_writers that there can be only one of. Events can be + # sharded, and therefore doesn't belong here. + singular_stream_writers = [ + "account_data", + "presence", + "receipts", + "to_device", + "typing", + ] + + # Worker-type specific sharding config. Now a single worker can fulfill multiple + # roles, check each. + if "pusher" in worker_types_set: shared_config.setdefault("pusher_instances", []).append(worker_name) - elif worker_type == "federation_sender": + if "federation_sender" in worker_types_set: shared_config.setdefault("federation_sender_instances", []).append(worker_name) - elif worker_type == "event_persister": + if "event_persister" in worker_types_set: # Event persisters write to the events stream, so we need to update # the list of event stream writers shared_config.setdefault("stream_writers", {}).setdefault("events", []).append( @@ -362,19 +404,154 @@ def add_worker_roles_to_shared_config( "port": worker_port, } - elif worker_type in ["account_data", "presence", "receipts", "to_device", "typing"]: - # Update the list of stream writers - # It's convenient that the name of the worker type is the same as the stream to write - shared_config.setdefault("stream_writers", {}).setdefault( - worker_type, [] - ).append(worker_name) + # Update the list of stream writers. It's convenient that the name of the worker + # type is the same as the stream to write. Iterate over the whole list in case there + # is more than one. + for worker in worker_types_set: + if worker in singular_stream_writers: + shared_config.setdefault("stream_writers", {}).setdefault( + worker, [] + ).append(worker_name) + + # Map of stream writer instance names to host/ports combos + # For now, all stream writers need http replication ports + instance_map[worker_name] = { + "host": "localhost", + "port": worker_port, + } + + +def merge_worker_template_configs( + existing_dict: Dict[str, Any] | None, + to_be_merged_dict: Dict[str, Any], +) -> Dict[str, Any]: + """When given an existing dict of worker template configuration consisting with both + dicts and lists, merge new template data from WORKERS_CONFIG(or create) and + return new dict. - # Map of stream writer instance names to host/ports combos - # For now, all stream writers need http replication ports - instance_map[worker_name] = { - "host": "localhost", - "port": worker_port, - } + Args: + existing_dict: Either an existing worker template or a fresh blank one. + to_be_merged_dict: The template from WORKERS_CONFIGS to be merged into + existing_dict. + Returns: The newly merged together dict values. + """ + new_dict: Dict[str, Any] = {} + if not existing_dict: + # It doesn't exist yet, just use the new dict(but take a copy not a reference) + new_dict = to_be_merged_dict.copy() + else: + for i in to_be_merged_dict.keys(): + if (i == "endpoint_patterns") or (i == "listener_resources"): + # merge the two lists, remove duplicates + new_dict[i] = list(set(existing_dict[i] + to_be_merged_dict[i])) + elif i == "shared_extra_conf": + # merge dictionary's, the worker name will be replaced later + new_dict[i] = {**existing_dict[i], **to_be_merged_dict[i]} + elif i == "worker_extra_conf": + # There is only one worker type that has a 'worker_extra_conf' and it is + # the media_repo. Since duplicate worker types on the same worker don't + # work, this is fine. + new_dict[i] = existing_dict[i] + to_be_merged_dict[i] + else: + # Everything else should be identical, like "app", which only works + # because all apps are now generic_workers. + new_dict[i] = to_be_merged_dict[i] + return new_dict + + +def insert_worker_name_for_worker_config( + existing_dict: Dict[str, Any], worker_name: str +) -> Dict[str, Any]: + """Insert a given worker name into the worker's configuration dict. + + Args: + existing_dict: The worker_config dict that is imported into shared_config. + worker_name: The name of the worker to insert. + Returns: Copy of the dict with newly inserted worker name + """ + dict_to_edit = existing_dict.copy() + for k, v in dict_to_edit["shared_extra_conf"].items(): + # Only proceed if it's the placeholder name string + if v == WORKER_PLACEHOLDER_NAME: + dict_to_edit["shared_extra_conf"][k] = worker_name + return dict_to_edit + + +def apply_requested_multiplier_for_worker(worker_types: List[str]) -> List[str]: + """ + Apply multiplier(if found) by returning a new expanded list with some basic error + checking. + + Args: + worker_types: The unprocessed List of requested workers + Returns: + A new list with all requested workers expanded. + """ + # Checking performed: + # 1. if worker:2 or more is declared, it will create additional workers up to number + # 2. if worker:1, it will create a single copy of this worker as if no number was + # given + # 3. if worker:0 is declared, this worker will be ignored. This is to allow for + # scripting and automated expansion and is intended behaviour. + # 4. if worker:NaN or is a negative number, it will error and log it. + new_worker_types = [] + for worker_type in worker_types: + if ":" in worker_type: + worker_type_components = split_and_strip_string(worker_type, ":", 1) + worker_count = 0 + # Should only be 2 components, a type of worker(s) and an integer as a + # string. Cast the number as an int then it can be used as a counter. + try: + worker_count = int(worker_type_components[1]) + except ValueError: + error( + f"Bad number in worker count for '{worker_type}': " + f"'{worker_type_components[1]}' is not an integer" + ) + + # As long as there are more than 0, we add one to the list to make below. + for _ in range(worker_count): + new_worker_types.append(worker_type_components[0]) + + else: + # If it's not a real worker_type, it will error out later. + new_worker_types.append(worker_type) + return new_worker_types + + +def is_sharding_allowed_for_worker_type(worker_type: str) -> bool: + """Helper to check to make sure worker types that cannot have multiples do not. + + Args: + worker_type: The type of worker to check against. + Returns: True if allowed, False if not + """ + return worker_type not in [ + "background_worker", + "account_data", + "presence", + "receipts", + "typing", + "to_device", + ] + + +def split_and_strip_string( + given_string: str, split_char: str, max_split: SupportsIndex = -1 +) -> List[str]: + """ + Helper to split a string on split_char and strip whitespace from each end of each + element. + Args: + given_string: The string to split + split_char: The character to split the string on + max_split: kwarg for split() to limit how many times the split() happens + Returns: + A List of strings + """ + # Removes whitespace from ends of result strings before adding to list. Allow for + # overriding 'maxsplit' kwarg, default being -1 to signify no maximum. + return [x.strip() for x in given_string.split(split_char, maxsplit=max_split)] def generate_base_homeserver_config() -> None: @@ -389,29 +566,153 @@ def generate_base_homeserver_config() -> None: subprocess.run(["/usr/local/bin/python", "/start.py", "migrate_config"], check=True) +def parse_worker_types( + requested_worker_types: List[str], +) -> Dict[str, Set[str]]: + """Read the desired list of requested workers and prepare the data for use in + generating worker config files while also checking for potential gotchas. + + Args: + requested_worker_types: The list formed from the split environment variable + containing the unprocessed requests for workers. + + Returns: A dict of worker names to set of worker types. Format: + {'worker_name': + {'worker_type', 'worker_type2'} + } + """ + # A counter of worker_base_name -> int. Used for determining the name for a given + # worker when generating its config file, as each worker's name is just + # worker_base_name followed by instance number + worker_base_name_counter: Dict[str, int] = defaultdict(int) + + # Similar to above, but more finely grained. This is used to determine we don't have + # more than a single worker for cases where multiples would be bad(e.g. presence). + worker_type_shard_counter: Dict[str, int] = defaultdict(int) + + # The final result of all this processing + dict_to_return: Dict[str, Set[str]] = {} + + # Handle any multipliers requested for given workers. + multiple_processed_worker_types = apply_requested_multiplier_for_worker( + requested_worker_types + ) + + # Process each worker_type_string + # Examples of expected formats: + # - requested_name=type1+type2+type3 + # - synchrotron + # - event_creator+event_persister + for worker_type_string in multiple_processed_worker_types: + # First, if a name is requested, use that — otherwise generate one. + worker_base_name: str = "" + if "=" in worker_type_string: + # Split on "=", remove extra whitespace from ends then make list + worker_type_split = split_and_strip_string(worker_type_string, "=") + if len(worker_type_split) > 2: + error( + "There should only be one '=' in the worker type string. " + f"Please fix: {worker_type_string}" + ) + + # Assign the name + worker_base_name = worker_type_split[0] + + if not re.match(r"^[a-zA-Z0-9_+-]*[a-zA-Z_+-]$", worker_base_name): + # Apply a fairly narrow regex to the worker names. Some characters + # aren't safe for use in file paths or nginx configurations. + # Don't allow to end with a number because we'll add a number + # ourselves in a moment. + error( + "Invalid worker name; please choose a name consisting of " + "alphanumeric letters, _ + -, but not ending with a digit: " + f"{worker_base_name!r}" + ) + + # Continue processing the remainder of the worker_type string + # with the name override removed. + worker_type_string = worker_type_split[1] + + # Split the worker_type_string on "+", remove whitespace from ends then make + # the list a set so it's deduplicated. + worker_types_set: Set[str] = set( + split_and_strip_string(worker_type_string, "+") + ) + + if not worker_base_name: + # No base name specified: generate one deterministically from set of + # types + worker_base_name = "+".join(sorted(worker_types_set)) + + # At this point, we have: + # worker_base_name which is the name for the worker, without counter. + # worker_types_set which is the set of worker types for this worker. + + # Validate worker_type and make sure we don't allow sharding for a worker type + # that doesn't support it. Will error and stop if it is a problem, + # e.g. 'background_worker'. + for worker_type in worker_types_set: + # Verify this is a real defined worker type. If it's not, stop everything so + # it can be fixed. + if worker_type not in WORKERS_CONFIG: + error( + f"{worker_type} is an unknown worker type! Was found in " + f"'{worker_type_string}'. Please fix!" + ) + + if worker_type in worker_type_shard_counter: + if not is_sharding_allowed_for_worker_type(worker_type): + error( + f"There can be only a single worker with {worker_type} " + "type. Please recount and remove." + ) + # Not in shard counter, must not have seen it yet, add it. + worker_type_shard_counter[worker_type] += 1 + + # Generate the number for the worker using incrementing counter + worker_base_name_counter[worker_base_name] += 1 + worker_number = worker_base_name_counter[worker_base_name] + worker_name = f"{worker_base_name}{worker_number}" + + if worker_number > 1: + # If this isn't the first worker, check that we don't have a confusing + # mixture of worker types with the same base name. + first_worker_with_base_name = dict_to_return[f"{worker_base_name}1"] + if first_worker_with_base_name != worker_types_set: + error( + f"Can not use worker_name: '{worker_name}' for worker_type(s): " + f"{worker_types_set!r}. It is already in use by " + f"worker_type(s): {first_worker_with_base_name!r}" + ) + + dict_to_return[worker_name] = worker_types_set + + return dict_to_return + + def generate_worker_files( - environ: Mapping[str, str], config_path: str, data_dir: str + environ: Mapping[str, str], + config_path: str, + data_dir: str, + requested_worker_types: Dict[str, Set[str]], ) -> None: - """Read the desired list of workers from environment variables and generate - shared homeserver, nginx and supervisord configs. + """Read the desired workers(if any) that is passed in and generate shared + homeserver, nginx and supervisord configs. Args: environ: os.environ instance. config_path: The location of the generated Synapse main worker config file. data_dir: The location of the synapse data directory. Where log and user-facing config files live. + requested_worker_types: A Dict containing requested workers in the format of + {'worker_name1': {'worker_type', ...}} """ # Note that yaml cares about indentation, so care should be taken to insert lines # into files at the correct indentation below. - # shared_config is the contents of a Synapse config file that will be shared amongst - # the main Synapse process as well as all workers. - # It is intended mainly for disabling functionality when certain workers are spun up, - # and adding a replication listener. - - # First read the original config file and extract the listeners block. Then we'll add - # another listener for replication. Later we'll write out the result to the shared - # config file. + # First read the original config file and extract the listeners block. Then we'll + # add another listener for replication. Later we'll write out the result to the + # shared config file. listeners = [ { "port": 9093, @@ -427,9 +728,9 @@ def generate_worker_files( listeners += original_listeners # The shared homeserver config. The contents of which will be inserted into the - # base shared worker jinja2 template. - # - # This config file will be passed to all workers, included Synapse's main process. + # base shared worker jinja2 template. This config file will be passed to all + # workers, included Synapse's main process. It is intended mainly for disabling + # functionality when certain workers are spun up, and adding a replication listener. shared_config: Dict[str, Any] = {"listeners": listeners} # List of dicts that describe workers. @@ -437,31 +738,20 @@ def generate_worker_files( # program blocks. worker_descriptors: List[Dict[str, Any]] = [] - # Upstreams for load-balancing purposes. This dict takes the form of a worker type to the - # ports of each worker. For example: + # Upstreams for load-balancing purposes. This dict takes the form of the worker + # type to the ports of each worker. For example: # { # worker_type: {1234, 1235, ...}} # } # and will be used to construct 'upstream' nginx directives. nginx_upstreams: Dict[str, Set[int]] = {} - # A map of: {"endpoint": "upstream"}, where "upstream" is a str representing what will be - # placed after the proxy_pass directive. The main benefit to representing this data as a - # dict over a str is that we can easily deduplicate endpoints across multiple instances - # of the same worker. - # - # An nginx site config that will be amended to depending on the workers that are - # spun up. To be placed in /etc/nginx/conf.d. - nginx_locations = {} - - # Read the desired worker configuration from the environment - worker_types_env = environ.get("SYNAPSE_WORKER_TYPES", "").strip() - if not worker_types_env: - # No workers, just the main process - worker_types = [] - else: - # Split type names by comma, ignoring whitespace. - worker_types = [x.strip() for x in worker_types_env.split(",")] + # A map of: {"endpoint": "upstream"}, where "upstream" is a str representing what + # will be placed after the proxy_pass directive. The main benefit to representing + # this data as a dict over a str is that we can easily deduplicate endpoints + # across multiple instances of the same worker. The final rendering will be combined + # with nginx_upstreams and placed in /etc/nginx/conf.d. + nginx_locations: Dict[str, str] = {} # Create the worker configuration directory if it doesn't already exist os.makedirs("/conf/workers", exist_ok=True) @@ -469,66 +759,57 @@ def generate_worker_files( # Start worker ports from this arbitrary port worker_port = 18009 - # A counter of worker_type -> int. Used for determining the name for a given - # worker type when generating its config file, as each worker's name is just - # worker_type + instance # - worker_type_counter: Dict[str, int] = {} - # A list of internal endpoints to healthcheck, starting with the main process # which exists even if no workers do. healthcheck_urls = ["http://localhost:8080/health"] - # For each worker type specified by the user, create config values - for worker_type in worker_types: - worker_config = WORKERS_CONFIG.get(worker_type) - if worker_config: - worker_config = worker_config.copy() - else: - error(worker_type + " is an unknown worker type! Please fix!") + # Get the set of all worker types that we have configured + all_worker_types_in_use = set(chain(*requested_worker_types.values())) + # Map locations to upstreams (corresponding to worker types) in Nginx + # but only if we use the appropriate worker type + for worker_type in all_worker_types_in_use: + for endpoint_pattern in WORKERS_CONFIG[worker_type]["endpoint_patterns"]: + nginx_locations[endpoint_pattern] = f"http://{worker_type}" + + # For each worker type specified by the user, create config values and write it's + # yaml config file + for worker_name, worker_types_set in requested_worker_types.items(): + # The collected and processed data will live here. + worker_config: Dict[str, Any] = {} + + # Merge all worker config templates for this worker into a single config + for worker_type in worker_types_set: + copy_of_template_config = WORKERS_CONFIG[worker_type].copy() + + # Merge worker type template configuration data. It's a combination of lists + # and dicts, so use this helper. + worker_config = merge_worker_template_configs( + worker_config, copy_of_template_config + ) + + # Replace placeholder names in the config template with the actual worker name. + worker_config = insert_worker_name_for_worker_config(worker_config, worker_name) - new_worker_count = worker_type_counter.setdefault(worker_type, 0) + 1 - worker_type_counter[worker_type] = new_worker_count - - # Name workers by their type concatenated with an incrementing number - # e.g. federation_reader1 - worker_name = worker_type + str(new_worker_count) worker_config.update( {"name": worker_name, "port": str(worker_port), "config_path": config_path} ) - # Update the shared config with any worker-type specific options - shared_config.update(worker_config["shared_extra_conf"]) + # Update the shared config with any worker_type specific options. The first of a + # given worker_type needs to stay assigned and not be replaced. + worker_config["shared_extra_conf"].update(shared_config) + shared_config = worker_config["shared_extra_conf"] healthcheck_urls.append("http://localhost:%d/health" % (worker_port,)) - # Check if more than one instance of this worker type has been specified - worker_type_total_count = worker_types.count(worker_type) - # Update the shared config with sharding-related options if necessary add_worker_roles_to_shared_config( - shared_config, worker_type, worker_name, worker_port + shared_config, worker_types_set, worker_name, worker_port ) # Enable the worker in supervisord worker_descriptors.append(worker_config) - # Add nginx location blocks for this worker's endpoints (if any are defined) - for pattern in worker_config["endpoint_patterns"]: - # Determine whether we need to load-balance this worker - if worker_type_total_count > 1: - # Create or add to a load-balanced upstream for this worker - nginx_upstreams.setdefault(worker_type, set()).add(worker_port) - - # Upstreams are named after the worker_type - upstream = "http://" + worker_type - else: - upstream = "http://localhost:%d" % (worker_port,) - - # Note that this endpoint should proxy to this upstream - nginx_locations[pattern] = upstream - # Write out the worker's logging config file - log_config_filepath = generate_worker_log_config(environ, worker_name, data_dir) # Then a worker config file @@ -539,6 +820,10 @@ def generate_worker_files( worker_log_config_filepath=log_config_filepath, ) + # Save this worker's port number to the correct nginx upstreams + for worker_type in worker_types_set: + nginx_upstreams.setdefault(worker_type, set()).add(worker_port) + worker_port += 1 # Build the nginx location config blocks @@ -551,15 +836,14 @@ def generate_worker_files( # Determine the load-balancing upstreams to configure nginx_upstream_config = "" - - for upstream_worker_type, upstream_worker_ports in nginx_upstreams.items(): + for upstream_worker_base_name, upstream_worker_ports in nginx_upstreams.items(): body = "" for port in upstream_worker_ports: - body += " server localhost:%d;\n" % (port,) + body += f" server localhost:{port};\n" # Add to the list of configured upstreams nginx_upstream_config += NGINX_UPSTREAM_CONFIG_BLOCK.format( - upstream_worker_type=upstream_worker_type, + upstream_worker_base_name=upstream_worker_base_name, body=body, ) @@ -580,7 +864,7 @@ def generate_worker_files( if reg_path.suffix.lower() in (".yaml", ".yml") ] - workers_in_use = len(worker_types) > 0 + workers_in_use = len(requested_worker_types) > 0 # Shared homeserver config convert( @@ -678,13 +962,26 @@ def main(args: List[str], environ: MutableMapping[str, str]) -> None: generate_base_homeserver_config() else: log("Base homeserver config exists—not regenerating") - # This script may be run multiple times (mostly by Complement, see note at top of file). - # Don't re-configure workers in this instance. + # This script may be run multiple times (mostly by Complement, see note at top of + # file). Don't re-configure workers in this instance. mark_filepath = "/conf/workers_have_been_configured" if not os.path.exists(mark_filepath): + # Collect and validate worker_type requests + # Read the desired worker configuration from the environment + worker_types_env = environ.get("SYNAPSE_WORKER_TYPES", "").strip() + # Only process worker_types if they exist + if not worker_types_env: + # No workers, just the main process + worker_types = [] + requested_worker_types: Dict[str, Any] = {} + else: + # Split type names by comma, ignoring whitespace. + worker_types = split_and_strip_string(worker_types_env, ",") + requested_worker_types = parse_worker_types(worker_types) + # Always regenerate all other config files log("Generating worker config files") - generate_worker_files(environ, config_path, data_dir) + generate_worker_files(environ, config_path, data_dir, requested_worker_types) # Mark workers as being configured with open(mark_filepath, "w") as f: |