diff options
-rw-r--r-- | changelog.d/14921.misc | 1 | ||||
-rwxr-xr-x | docker/complement/conf/start_for_complement.sh | 6 | ||||
-rwxr-xr-x | docker/configure_workers_and_start.py | 610 |
3 files changed, 507 insertions, 110 deletions
diff --git a/changelog.d/14921.misc b/changelog.d/14921.misc new file mode 100644 index 0000000000..599e23eb0c --- /dev/null +++ b/changelog.d/14921.misc @@ -0,0 +1 @@ +Add additional functionality to declaring worker types when starting Complement in worker mode. diff --git a/docker/complement/conf/start_for_complement.sh b/docker/complement/conf/start_for_complement.sh index af13209c54..5560ab8b95 100755 --- a/docker/complement/conf/start_for_complement.sh +++ b/docker/complement/conf/start_for_complement.sh @@ -51,8 +51,7 @@ if [[ -n "$SYNAPSE_COMPLEMENT_USE_WORKERS" ]]; then # -z True if the length of string is zero. if [[ -z "$SYNAPSE_WORKER_TYPES" ]]; then export SYNAPSE_WORKER_TYPES="\ - event_persister, \ - event_persister, \ + event_persister:2, \ background_worker, \ frontend_proxy, \ event_creator, \ @@ -64,7 +63,8 @@ if [[ -n "$SYNAPSE_COMPLEMENT_USE_WORKERS" ]]; then synchrotron, \ client_reader, \ appservice, \ - pusher" + pusher, \ + stream_writers=account_data+presence+receipts+to_device+typing" fi log "Workers requested: $SYNAPSE_WORKER_TYPES" diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py index add8bb1ff6..f2312ea04b 100755 --- a/docker/configure_workers_and_start.py +++ b/docker/configure_workers_and_start.py @@ -19,8 +19,15 @@ # The environment variables it reads are: # * SYNAPSE_SERVER_NAME: The desired server_name of the homeserver. # * SYNAPSE_REPORT_STATS: Whether to report stats. -# * SYNAPSE_WORKER_TYPES: A comma separated list of worker names as specified in WORKER_CONFIG -# below. Leave empty for no workers. +# * SYNAPSE_WORKER_TYPES: A comma separated list of worker names as specified in WORKERS_CONFIG +# below. Leave empty for no workers. Add a ':' and a number at the end to +# multiply that worker. Append multiple worker types with '+' to merge the +# worker types into a single worker. Add a name and a '=' to the front of a +# worker type to give this instance a name in logs and nginx. +# Examples: +# SYNAPSE_WORKER_TYPES='event_persister, federation_sender, client_reader' +# SYNAPSE_WORKER_TYPES='event_persister:2, federation_sender:2, client_reader' +# SYNAPSE_WORKER_TYPES='stream_writers=account_data+presence+typing' # * SYNAPSE_AS_REGISTRATION_DIR: If specified, a directory in which .yaml and .yml files # will be treated as Application Service registration files. # * SYNAPSE_TLS_CERT: Path to a TLS certificate in PEM format. @@ -42,14 +49,29 @@ import os import platform import subprocess import sys +from collections import defaultdict from pathlib import Path -from typing import Any, Dict, List, Mapping, MutableMapping, NoReturn, Optional, Set +from typing import ( + Any, + Dict, + List, + Mapping, + MutableMapping, + NoReturn, + Optional, + Set, + SupportsIndex, +) import yaml from jinja2 import Environment, FileSystemLoader MAIN_PROCESS_HTTP_LISTENER_PORT = 8080 +# A simple name used as a placeholder in the WORKERS_CONFIG below. This will be replaced +# during processing with the name of the worker. +WORKER_PLACEHOLDER_NAME = "placeholder_name" + # Workers with exposed endpoints needs either "client", "federation", or "media" listener_resources # Watching /_matrix/client needs a "client" listener # Watching /_matrix/federation needs a "federation" listener @@ -70,11 +92,13 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = { "endpoint_patterns": [ "^/_matrix/client/(api/v1|r0|v3|unstable)/user_directory/search$" ], - "shared_extra_conf": {"update_user_directory_from_worker": "user_dir1"}, + "shared_extra_conf": { + "update_user_directory_from_worker": WORKER_PLACEHOLDER_NAME + }, "worker_extra_conf": "", }, "media_repository": { - "app": "synapse.app.media_repository", + "app": "synapse.app.generic_worker", "listener_resources": ["media"], "endpoint_patterns": [ "^/_matrix/media/", @@ -87,7 +111,7 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = { # The first configured media worker will run the media background jobs "shared_extra_conf": { "enable_media_repo": False, - "media_instance_running_background_jobs": "media_repository1", + "media_instance_running_background_jobs": WORKER_PLACEHOLDER_NAME, }, "worker_extra_conf": "enable_media_repo: true", }, @@ -95,7 +119,9 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = { "app": "synapse.app.generic_worker", "listener_resources": [], "endpoint_patterns": [], - "shared_extra_conf": {"notify_appservices_from_worker": "appservice1"}, + "shared_extra_conf": { + "notify_appservices_from_worker": WORKER_PLACEHOLDER_NAME + }, "worker_extra_conf": "", }, "federation_sender": { @@ -192,9 +218,9 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = { "app": "synapse.app.generic_worker", "listener_resources": [], "endpoint_patterns": [], - # This worker cannot be sharded. Therefore there should only ever be one background - # worker, and it should be named background_worker1 - "shared_extra_conf": {"run_background_tasks_on": "background_worker1"}, + # This worker cannot be sharded. Therefore, there should only ever be one + # background worker. This is enforced for the safety of your database. + "shared_extra_conf": {"run_background_tasks_on": WORKER_PLACEHOLDER_NAME}, "worker_extra_conf": "", }, "event_creator": { @@ -275,7 +301,7 @@ NGINX_LOCATION_CONFIG_BLOCK = """ """ NGINX_UPSTREAM_CONFIG_BLOCK = """ -upstream {upstream_worker_type} {{ +upstream {upstream_worker_base_name} {{ {body} }} """ @@ -326,7 +352,7 @@ def convert(src: str, dst: str, **template_vars: object) -> None: def add_worker_roles_to_shared_config( shared_config: dict, - worker_type: str, + worker_types_set: Set[str], worker_name: str, worker_port: int, ) -> None: @@ -334,22 +360,36 @@ def add_worker_roles_to_shared_config( append appropriate worker information to it for the current worker_type instance. Args: - shared_config: The config dict that all worker instances share (after being converted to YAML) - worker_type: The type of worker (one of those defined in WORKERS_CONFIG). + shared_config: The config dict that all worker instances share (after being + converted to YAML) + worker_types_set: The type of worker (one of those defined in WORKERS_CONFIG). + This list can be a single worker type or multiple. worker_name: The name of the worker instance. worker_port: The HTTP replication port that the worker instance is listening on. """ - # The instance_map config field marks the workers that write to various replication streams + # The instance_map config field marks the workers that write to various replication + # streams instance_map = shared_config.setdefault("instance_map", {}) - # Worker-type specific sharding config - if worker_type == "pusher": + # This is a list of the stream_writers that there can be only one of. Events can be + # sharded, and therefore doesn't belong here. + singular_stream_writers = [ + "account_data", + "presence", + "receipts", + "to_device", + "typing", + ] + + # Worker-type specific sharding config. Now a single worker can fulfill multiple + # roles, check each. + if "pusher" in worker_types_set: shared_config.setdefault("pusher_instances", []).append(worker_name) - elif worker_type == "federation_sender": + if "federation_sender" in worker_types_set: shared_config.setdefault("federation_sender_instances", []).append(worker_name) - elif worker_type == "event_persister": + if "event_persister" in worker_types_set: # Event persisters write to the events stream, so we need to update # the list of event stream writers shared_config.setdefault("stream_writers", {}).setdefault("events", []).append( @@ -362,19 +402,154 @@ def add_worker_roles_to_shared_config( "port": worker_port, } - elif worker_type in ["account_data", "presence", "receipts", "to_device", "typing"]: - # Update the list of stream writers - # It's convenient that the name of the worker type is the same as the stream to write - shared_config.setdefault("stream_writers", {}).setdefault( - worker_type, [] - ).append(worker_name) + # Update the list of stream writers. It's convenient that the name of the worker + # type is the same as the stream to write. Iterate over the whole list in case there + # is more than one. + for worker in worker_types_set: + if worker in singular_stream_writers: + shared_config.setdefault("stream_writers", {}).setdefault( + worker, [] + ).append(worker_name) + + # Map of stream writer instance names to host/ports combos + # For now, all stream writers need http replication ports + instance_map[worker_name] = { + "host": "localhost", + "port": worker_port, + } + + +def merge_worker_template_configs( + existing_dict: Dict[str, Any] | None, + to_be_merged_dict: Dict[str, Any], +) -> Dict[str, Any]: + """When given an existing dict of worker template configuration consisting with both + dicts and lists, merge new template data from WORKERS_CONFIG(or create) and + return new dict. - # Map of stream writer instance names to host/ports combos - # For now, all stream writers need http replication ports - instance_map[worker_name] = { - "host": "localhost", - "port": worker_port, - } + Args: + existing_dict: Either an existing worker template or a fresh blank one. + to_be_merged_dict: The template from WORKERS_CONFIGS to be merged into + existing_dict. + Returns: The newly merged together dict values. + """ + new_dict: Dict[str, Any] = {} + if not existing_dict: + # It doesn't exist yet, just use the new dict(but take a copy not a reference) + new_dict = to_be_merged_dict.copy() + else: + for i in to_be_merged_dict.keys(): + if (i == "endpoint_patterns") or (i == "listener_resources"): + # merge the two lists, remove duplicates + new_dict[i] = list(set(existing_dict[i] + to_be_merged_dict[i])) + elif i == "shared_extra_conf": + # merge dictionary's, the worker name will be replaced later + new_dict[i] = {**existing_dict[i], **to_be_merged_dict[i]} + elif i == "worker_extra_conf": + # There is only one worker type that has a 'worker_extra_conf' and it is + # the media_repo. Since duplicate worker types on the same worker don't + # work, this is fine. + new_dict[i] = existing_dict[i] + to_be_merged_dict[i] + else: + # Everything else should be identical, like "app", which only works + # because all apps are now generic_workers. + new_dict[i] = to_be_merged_dict[i] + return new_dict + + +def insert_worker_name_for_worker_config( + existing_dict: Dict[str, Any], worker_name: str +) -> Dict[str, Any]: + """Insert a given worker name into the worker's configuration dict. + + Args: + existing_dict: The worker_config dict that is imported into shared_config. + worker_name: The name of the worker to insert. + Returns: Copy of the dict with newly inserted worker name + """ + dict_to_edit = existing_dict.copy() + for k, v in dict_to_edit["shared_extra_conf"].items(): + # Only proceed if it's the placeholder name string + if v == WORKER_PLACEHOLDER_NAME: + dict_to_edit["shared_extra_conf"][k] = worker_name + return dict_to_edit + + +def apply_requested_multiplier_for_worker(worker_types: List[str]) -> List[str]: + """ + Apply multiplier(if found) by returning a new expanded list with some basic error + checking. + + Args: + worker_types: The unprocessed List of requested workers + Returns: + A new list with all requested workers expanded. + """ + # Checking performed: + # 1. if worker:2 or more is declared, it will create additional workers up to number + # 2. if worker:1, it will create a single copy of this worker as if no number was + # given + # 3. if worker:0 is declared, this worker will be ignored. This is to allow for + # scripting and automated expansion and is intended behaviour. + # 4. if worker:NaN or is a negative number, it will error and log it. + new_worker_types = [] + for worker_type in worker_types: + if ":" in worker_type: + worker_type_components = split_and_strip_string(worker_type, ":", 1) + worker_count = 0 + # Should only be 2 components, a type of worker(s) and an integer as a + # string. Cast the number as an int then it can be used as a counter. + try: + worker_count = int(worker_type_components[1]) + except ValueError: + error( + f"Bad number in worker count for '{worker_type}': " + f"'{worker_type_components[1]}' is not an integer" + ) + + # As long as there are more than 0, we add one to the list to make below. + for _ in range(worker_count): + new_worker_types.append(worker_type_components[0]) + + else: + # If it's not a real worker_type, it will error out later. + new_worker_types.append(worker_type) + return new_worker_types + + +def is_sharding_allowed_for_worker_type(worker_type: str) -> bool: + """Helper to check to make sure worker types that cannot have multiples do not. + + Args: + worker_type: The type of worker to check against. + Returns: True if allowed, False if not + """ + return worker_type not in [ + "background_worker", + "account_data", + "presence", + "receipts", + "typing", + "to_device", + ] + + +def split_and_strip_string( + given_string: str, split_char: str, max_split: SupportsIndex = -1 +) -> List[str]: + """ + Helper to split a string on split_char and strip whitespace from each end of each + element. + Args: + given_string: The string to split + split_char: The character to split the string on + max_split: kwarg for split() to limit how many times the split() happens + Returns: + A List of strings + """ + # Removes whitespace from ends of result strings before adding to list. Allow for + # overriding 'maxsplit' kwarg, default being -1 to signify no maximum. + return [x.strip() for x in given_string.split(split_char, maxsplit=max_split)] def generate_base_homeserver_config() -> None: @@ -389,29 +564,218 @@ def generate_base_homeserver_config() -> None: subprocess.run(["/usr/local/bin/python", "/start.py", "migrate_config"], check=True) +def parse_worker_types( + requested_worker_types: List[str], +) -> Dict[str, Dict[str, Any]]: + """Read the desired list of requested workers and prepare the data for use in + generating worker config files while also checking for potential gotchas. + + Args: + requested_worker_types: The list formed from the split environment variable + containing the unprocessed requests for workers. + + Returns: A dict of all information needed to generate worker files. Format: + {'worker_name': + {'worker_base_name': 'base_name'}, + + {'worker_types_set': + Set{'worker_type', 'other_worker_type'} + } + } + """ + # Checking performed: + # 1. If a requested name contains a space + # 2. If a requested name contains either kind of quote mark + # 3. If a requested name ends with a digit + + # A counter of worker_type -> int. Used for determining the name for a given + # worker type(s) when generating its config file, as each worker's name is just + # worker_(name|type(s)) + instance # + worker_type_counter: Dict[str, int] = defaultdict(int) + + # Similar to above, but more finely grained. This is used to determine we don't have + # more than a single worker for cases where multiples would be bad(e.g. presence). + worker_type_shard_counter: Dict[str, int] = defaultdict(int) + + # Dict of worker_base_name's -> worker_type. The worker_type(s) in this case is a + # sorted set turned into a string. Use this as a reference that a given + # worker_base_name is used only for this worker_type(s). Preventing silly typos. + # e.g. + # 'bob=user_dir+event_creator, bob=event_creator+user_dir' would be fine and resolve + # correctly + # 'bob=user_dir+event_creator, bob=user_dir+frontend_proxy' would not be fine. + # Follows the pattern: + # ["worker_base_name": "worker_type(s)"] + worker_name_checklist: Dict[str, str] = {} + + # The final result of all this processing + dict_to_return: Dict[str, Any] = {} + + # Handle any multipliers requested for a given worker. + multiple_processed_worker_types = apply_requested_multiplier_for_worker( + requested_worker_types + ) + + # Check each requested worker_type_string for a user-requested name and parse it + # out if present. + for worker_type_string in multiple_processed_worker_types: + requested_worker_name = "" + new_worker_type_string = worker_type_string + # First, check if a name is requested + if "=" in worker_type_string: + # Split on "=", remove extra whitespace from ends then make list + worker_type_split = split_and_strip_string(worker_type_string, "=") + if len(worker_type_split) > 2: + error( + "To many worker names requested for a single worker, or to many " + f"'='. Please fix: {worker_type_string}" + ) + # if there was no name given, this will still be an empty string + requested_worker_name = worker_type_split[0] + + # Check the last character of a requested name is not a number. This can + # cause an error that comes across during startup as an exception in + # 'startListening' and ends with 'Address already in use' for the port. + # This only seems to occur when requesting more than 10 of a given + # worker_type, otherwise it would be ok. + if requested_worker_name[-1].isdigit(): + error( + "Found a number at the end of the requested worker name: " + f"{requested_worker_name}. This is not allowed as it will cause " + "exceptions with 'Address already in use'. Recommend appending an " + "underscore after the number if this what you really want to do." + ) + + # Reassign the worker_type string with no name on it. + new_worker_type_string = worker_type_split[1] + + # At this point, we have: + # requested_worker_name which might be an empty string + # new_worker_type_string which might still be what it was when it came in + + # Split the worker_type_string on "+", remove whitespace from ends then make + # the list a set so it's deduplicated. + worker_types_list = split_and_strip_string(new_worker_type_string, "+") + worker_types_set: Set[str] = set(worker_types_list) + + worker_base_name = new_worker_type_string + if requested_worker_name: + worker_base_name = requested_worker_name + # It'll be useful to have this in the log in case it's a complex of many + # workers merged together. Note for Complement: it would only be seen in the + # logs for blueprint construction(which are not collected). + log( + f"Worker name request found: '{requested_worker_name}'" + f", for: {worker_types_set}" + ) + + else: + # The worker name will be the worker_type, however if spaces exist + # between concatenated worker_types and the "+" because of readability, + # it will error on startup. Recombine worker_types without spaces and log. + # Allows for human readability while declaring a complex worker type, e.g. + # 'event_persister + federation_reader + federation_sender + pusher' + if (len(worker_types_set) > 1) and (" " in worker_base_name): + worker_base_name = "+".join(sorted(worker_types_set)) + log( + "Default worker name would have contained spaces, which is not " + f"allowed: '{worker_type_string}'. Reformed name to not contain " + f"spaces: '{worker_base_name}'" + ) + + # At this point, we have: + # worker_base_name which might be identical to + # new_worker_type_string which might still be what it was when it came in + # worker_types_set which is a Set of what worker_types are requested + + # Uncommon mistake that will cause problems. Name string containing quotes + # or spaces will do Bad Things to filenames and probably nginx too. + if ( + (" " in worker_base_name) + or ('"' in worker_base_name) + or ("'" in worker_base_name) + ): + error( + "Requesting a worker name containing a quote mark or a space is " + "not allowed, as it would raise a FileNotFoundError. Please fix: " + f"{worker_base_name}" + ) + + # This counter is used for naming workers with an incrementing number. Use the + # worker_base_name for the index + worker_type_counter[worker_base_name] += 1 + + # Name workers by their type or requested name concatenated with an + # incrementing number. e.g. federation_reader1 or event_creator+event_persister1 + worker_name = worker_base_name + str(worker_type_counter[worker_base_name]) + + # Now that the worker name is settled, check this worker_base_name isn't used + # for a different worker_type. Make sure the worker types being checked are + # deterministic. + deterministic_worker_type_string = "+".join(sorted(worker_types_set)) + check_worker_type = worker_name_checklist.get(worker_base_name) + # Either this doesn't exist yet, or it matches with a twin + if (check_worker_type is None) or ( + check_worker_type == deterministic_worker_type_string + ): + # This is a no-op if it exists, which is expected to avoid the else block + worker_name_checklist.setdefault( + worker_base_name, deterministic_worker_type_string + ) + + else: + error( + f"Can not use worker_name: '{worker_name}' for worker_type(s): " + f"'{deterministic_worker_type_string}'. It is already in use by " + f"worker_type(s): '{check_worker_type}'" + ) + + # Make sure we don't allow sharding for a worker type that doesn't support it. + # Will error and stop if it is a problem, e.g. 'background_worker'. + for worker_type in worker_types_set: + if worker_type in worker_type_shard_counter: + if not is_sharding_allowed_for_worker_type(worker_type): + error( + f"There can be only a single worker with {worker_type} " + "type. Please recount and remove." + ) + # Not in shard counter, must not have seen it yet, add it. + worker_type_shard_counter[worker_type] += 1 + + # The worker has survived the gauntlet of why it can't exist. Add it to the pile + dict_to_return.setdefault(worker_name, {}).setdefault( + "worker_base_name", worker_base_name + ) + dict_to_return.setdefault(worker_name, {}).setdefault( + "worker_types_set", set() + ).update(worker_types_set) + + return dict_to_return + + def generate_worker_files( - environ: Mapping[str, str], config_path: str, data_dir: str + environ: Mapping[str, str], + config_path: str, + data_dir: str, + requested_worker_types: Dict[str, Any], ) -> None: - """Read the desired list of workers from environment variables and generate - shared homeserver, nginx and supervisord configs. + """Read the desired workers(if any) that is passed in and generate shared + homeserver, nginx and supervisord configs. Args: environ: os.environ instance. config_path: The location of the generated Synapse main worker config file. data_dir: The location of the synapse data directory. Where log and user-facing config files live. + requested_worker_types: A Dict containing requested workers in the format of + {'worker_name1': {'worker_type', ...}} """ # Note that yaml cares about indentation, so care should be taken to insert lines # into files at the correct indentation below. - # shared_config is the contents of a Synapse config file that will be shared amongst - # the main Synapse process as well as all workers. - # It is intended mainly for disabling functionality when certain workers are spun up, - # and adding a replication listener. - - # First read the original config file and extract the listeners block. Then we'll add - # another listener for replication. Later we'll write out the result to the shared - # config file. + # First read the original config file and extract the listeners block. Then we'll + # add another listener for replication. Later we'll write out the result to the + # shared config file. listeners = [ { "port": 9093, @@ -427,9 +791,9 @@ def generate_worker_files( listeners += original_listeners # The shared homeserver config. The contents of which will be inserted into the - # base shared worker jinja2 template. - # - # This config file will be passed to all workers, included Synapse's main process. + # base shared worker jinja2 template. This config file will be passed to all + # workers, included Synapse's main process. It is intended mainly for disabling + # functionality when certain workers are spun up, and adding a replication listener. shared_config: Dict[str, Any] = {"listeners": listeners} # List of dicts that describe workers. @@ -437,31 +801,26 @@ def generate_worker_files( # program blocks. worker_descriptors: List[Dict[str, Any]] = [] - # Upstreams for load-balancing purposes. This dict takes the form of a worker type to the - # ports of each worker. For example: + # Upstreams for load-balancing purposes. This dict takes the form of the base worker + # name to the ports of each worker. For example: # { - # worker_type: {1234, 1235, ...}} + # worker_base_name: {1234, 1235, ...}} # } # and will be used to construct 'upstream' nginx directives. nginx_upstreams: Dict[str, Set[int]] = {} - # A map of: {"endpoint": "upstream"}, where "upstream" is a str representing what will be - # placed after the proxy_pass directive. The main benefit to representing this data as a - # dict over a str is that we can easily deduplicate endpoints across multiple instances - # of the same worker. - # - # An nginx site config that will be amended to depending on the workers that are - # spun up. To be placed in /etc/nginx/conf.d. - nginx_locations = {} - - # Read the desired worker configuration from the environment - worker_types_env = environ.get("SYNAPSE_WORKER_TYPES", "").strip() - if not worker_types_env: - # No workers, just the main process - worker_types = [] - else: - # Split type names by comma, ignoring whitespace. - worker_types = [x.strip() for x in worker_types_env.split(",")] + # A map that will collect port data for load-balancing upstreams before being + # reprocessed into nginx_locations. Unfortunately, we cannot just use + # nginx_locations as there is a typing clash. + # Format: {"endpoint": {1234, 1235, ...}} + nginx_preprocessed_locations: Dict[str, Set[int]] = {} + + # A map of: {"endpoint": "upstream"}, where "upstream" is a str representing what + # will be placed after the proxy_pass directive. The main benefit to representing + # this data as a dict over a str is that we can easily deduplicate endpoints + # across multiple instances of the same worker. The final rendering will be combined + # with nginx_upstreams and placed in /etc/nginx/conf.d. + nginx_locations: Dict[str, str] = {} # Create the worker configuration directory if it doesn't already exist os.makedirs("/conf/workers", exist_ok=True) @@ -469,44 +828,55 @@ def generate_worker_files( # Start worker ports from this arbitrary port worker_port = 18009 - # A counter of worker_type -> int. Used for determining the name for a given - # worker type when generating its config file, as each worker's name is just - # worker_type + instance # - worker_type_counter: Dict[str, int] = {} - # A list of internal endpoints to healthcheck, starting with the main process # which exists even if no workers do. healthcheck_urls = ["http://localhost:8080/health"] - # For each worker type specified by the user, create config values - for worker_type in worker_types: - worker_config = WORKERS_CONFIG.get(worker_type) - if worker_config: - worker_config = worker_config.copy() - else: - error(worker_type + " is an unknown worker type! Please fix!") + # For each worker type specified by the user, create config values and write it's + # yaml config file + for worker_name, worker_type_data in requested_worker_types.items(): + worker_types_set = worker_type_data.get("worker_types_set") + + # The collected and processed data will live here. + worker_config: Dict[str, Any] = {} + + # Merge all worker config templates for this worker into a single config + for worker_type in worker_types_set: + # Verify this is a real defined worker type. If it's not, stop everything so + # it can be fixed. + copy_of_template_config = WORKERS_CONFIG.get(worker_type) + if copy_of_template_config: + # So it's not a reference pointer + copy_of_template_config = copy_of_template_config.copy() + else: + error( + f"{worker_type} is an unknown worker type! Was found in " + f"{worker_types_set}. Please fix!" + ) - new_worker_count = worker_type_counter.setdefault(worker_type, 0) + 1 - worker_type_counter[worker_type] = new_worker_count + # Merge worker type template configuration data. It's a combination of lists + # and dicts, so use this helper. + worker_config = merge_worker_template_configs( + worker_config, copy_of_template_config + ) + + # Replace placeholder names in the config template with the actual worker name. + worker_config = insert_worker_name_for_worker_config(worker_config, worker_name) - # Name workers by their type concatenated with an incrementing number - # e.g. federation_reader1 - worker_name = worker_type + str(new_worker_count) worker_config.update( {"name": worker_name, "port": str(worker_port), "config_path": config_path} ) - # Update the shared config with any worker-type specific options - shared_config.update(worker_config["shared_extra_conf"]) + # Update the shared config with any worker_type specific options. The first of a + # given worker_type needs to stay assigned and not be replaced. + worker_config["shared_extra_conf"].update(shared_config) + shared_config = worker_config["shared_extra_conf"] healthcheck_urls.append("http://localhost:%d/health" % (worker_port,)) - # Check if more than one instance of this worker type has been specified - worker_type_total_count = worker_types.count(worker_type) - # Update the shared config with sharding-related options if necessary add_worker_roles_to_shared_config( - shared_config, worker_type, worker_name, worker_port + shared_config, worker_types_set, worker_name, worker_port ) # Enable the worker in supervisord @@ -514,21 +884,11 @@ def generate_worker_files( # Add nginx location blocks for this worker's endpoints (if any are defined) for pattern in worker_config["endpoint_patterns"]: - # Determine whether we need to load-balance this worker - if worker_type_total_count > 1: - # Create or add to a load-balanced upstream for this worker - nginx_upstreams.setdefault(worker_type, set()).add(worker_port) - - # Upstreams are named after the worker_type - upstream = "http://" + worker_type - else: - upstream = "http://localhost:%d" % (worker_port,) - - # Note that this endpoint should proxy to this upstream - nginx_locations[pattern] = upstream + # Need more data to determine whether we need to load-balance this worker. + # Collect all the port numbers for a given endpoint + nginx_preprocessed_locations.setdefault(pattern, set()).add(worker_port) # Write out the worker's logging config file - log_config_filepath = generate_worker_log_config(environ, worker_name, data_dir) # Then a worker config file @@ -541,6 +901,29 @@ def generate_worker_files( worker_port += 1 + # Re process all nginx upstream data. Worker_descriptors contains all the port data, + # cross-reference that with the worker_base_name in requested_worker_types. + for pattern, port_set in nginx_preprocessed_locations.items(): + upstream_name: Set[str] = set() + for worker in worker_descriptors: + # Find the port we want + if int(worker["port"]) in port_set: + # Capture the name. We want the base name as they will be grouped + # together. + upstream_name.add( + requested_worker_types[worker["name"]].get("worker_base_name") + ) + + # Join it all up nice and pretty with a double underscore + upstream = "__".join(sorted(upstream_name)) + upstream_location = "http://" + upstream + + # Save the upstream location to it's associated pattern + nginx_locations[pattern] = upstream_location + + # And save the port numbers for writing out below + nginx_upstreams[upstream] = port_set + # Build the nginx location config blocks nginx_location_config = "" for endpoint, upstream in nginx_locations.items(): @@ -552,14 +935,14 @@ def generate_worker_files( # Determine the load-balancing upstreams to configure nginx_upstream_config = "" - for upstream_worker_type, upstream_worker_ports in nginx_upstreams.items(): + for upstream_worker_base_name, upstream_worker_ports in nginx_upstreams.items(): body = "" for port in upstream_worker_ports: body += " server localhost:%d;\n" % (port,) # Add to the list of configured upstreams nginx_upstream_config += NGINX_UPSTREAM_CONFIG_BLOCK.format( - upstream_worker_type=upstream_worker_type, + upstream_worker_base_name=upstream_worker_base_name, body=body, ) @@ -580,7 +963,7 @@ def generate_worker_files( if reg_path.suffix.lower() in (".yaml", ".yml") ] - workers_in_use = len(worker_types) > 0 + workers_in_use = len(requested_worker_types) > 0 # Shared homeserver config convert( @@ -678,13 +1061,26 @@ def main(args: List[str], environ: MutableMapping[str, str]) -> None: generate_base_homeserver_config() else: log("Base homeserver config exists—not regenerating") - # This script may be run multiple times (mostly by Complement, see note at top of file). - # Don't re-configure workers in this instance. + # This script may be run multiple times (mostly by Complement, see note at top of + # file). Don't re-configure workers in this instance. mark_filepath = "/conf/workers_have_been_configured" if not os.path.exists(mark_filepath): + # Collect and validate worker_type requests + # Read the desired worker configuration from the environment + worker_types_env = environ.get("SYNAPSE_WORKER_TYPES", "").strip() + # Only process worker_types if they exist + if not worker_types_env: + # No workers, just the main process + worker_types = [] + requested_worker_types: Dict[str, Any] = {} + else: + # Split type names by comma, ignoring whitespace. + worker_types = split_and_strip_string(worker_types_env, ",") + requested_worker_types = parse_worker_types(worker_types) + # Always regenerate all other config files log("Generating worker config files") - generate_worker_files(environ, config_path, data_dir) + generate_worker_files(environ, config_path, data_dir, requested_worker_types) # Mark workers as being configured with open(mark_filepath, "w") as f: |