diff options
author | Jason Little <realtyem@gmail.com> | 2023-03-03 05:49:24 -0600 |
---|---|---|
committer | Jason Little <realtyem@gmail.com> | 2023-03-03 05:49:24 -0600 |
commit | 47a965f72054fd4f0d298fca858b0311738d87b3 (patch) | |
tree | b50f54a1b18a02e2e09b0426a400801a9270ea54 | |
parent | Merge branch 'develop' into comp-worker-shorthand (diff) | |
download | synapse-47a965f72054fd4f0d298fca858b0311738d87b3.tar.xz |
Apply changes from review.
1. Factor processing of all worker types from the environment out and away from generating config files. 2. Fix up early templating of 'worker_config' to remove boilerplate. 3. Wrangle nginx upstream processing to accommodate odd combinations for overlapping workers. e.g. 'user_dir, user_dir+presence' should be able to handle the(in this case single) endpoint over either worker. I think this was promised in a previous commit, consider it delivered. 4. Update a bunch of comments, and adjust some pre-existing to fit inside the line length count of 88(aka, make the green squiggles go away) 5. Move processing of unusable characters for worker names to a later position so it will check the name produced if one was not requested as well.
-rwxr-xr-x | docker/configure_workers_and_start.py | 616 |
1 files changed, 352 insertions, 264 deletions
diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py index dbc1361fb2..006e5fd8da 100755 --- a/docker/configure_workers_and_start.py +++ b/docker/configure_workers_and_start.py @@ -350,7 +350,7 @@ def convert(src: str, dst: str, **template_vars: object) -> None: def add_worker_roles_to_shared_config( shared_config: dict, - worker_type_list: List[str], + worker_type_set: Set[str], worker_name: str, worker_port: int, ) -> None: @@ -360,12 +360,13 @@ def add_worker_roles_to_shared_config( Args: shared_config: The config dict that all worker instances share (after being converted to YAML) - worker_type_list: The type of worker (one of those defined in WORKERS_CONFIG). + worker_type_set: The type of worker (one of those defined in WORKERS_CONFIG). This list can be a single worker type or multiple. worker_name: The name of the worker instance. worker_port: The HTTP replication port that the worker instance is listening on. """ - # The instance_map config field marks the workers that write to various replication streams + # The instance_map config field marks the workers that write to various replication + # streams instance_map = shared_config.setdefault("instance_map", {}) # This is a list of the stream_writers that there can be only one of. Events can be @@ -380,13 +381,13 @@ def add_worker_roles_to_shared_config( # Worker-type specific sharding config. Now a single worker can fulfill multiple # roles, check each. - if "pusher" in worker_type_list: + if "pusher" in worker_type_set: shared_config.setdefault("pusher_instances", []).append(worker_name) - if "federation_sender" in worker_type_list: + if "federation_sender" in worker_type_set: shared_config.setdefault("federation_sender_instances", []).append(worker_name) - if "event_persister" in worker_type_list: + if "event_persister" in worker_type_set: # Event persisters write to the events stream, so we need to update # the list of event stream writers shared_config.setdefault("stream_writers", {}).setdefault("events", []).append( @@ -402,7 +403,7 @@ def add_worker_roles_to_shared_config( # Update the list of stream writers. It's convenient that the name of the worker # type is the same as the stream to write. Iterate over the whole list in case there # is more than one. - for worker in worker_type_list: + for worker in worker_type_set: if worker in singular_stream_writers: shared_config.setdefault("stream_writers", {}).setdefault( worker, [] @@ -417,11 +418,12 @@ def add_worker_roles_to_shared_config( def merge_worker_template_configs( - existing_dict: Dict[str, Any], + existing_dict: Dict[str, Any] | None, to_be_merged_dict: Dict[str, Any], ) -> Dict[str, Any]: - """When given an existing dict of worker template configuration, merge new template - data from WORKERS_CONFIG and return new dict. + """When given an existing dict of worker template configuration consisting with both + dicts and lists, merge new template data from WORKERS_CONFIG(or create) and + return new dict. Args: existing_dict: Either an existing worker template or a fresh blank one. @@ -430,22 +432,26 @@ def merge_worker_template_configs( Returns: The newly merged together dict values. """ new_dict: Dict[str, Any] = {} - for i in to_be_merged_dict.keys(): - if (i == "endpoint_patterns") or (i == "listener_resources"): - # merge the two lists, remove duplicates - new_dict[i] = list(set(existing_dict[i] + to_be_merged_dict[i])) - elif i == "shared_extra_conf": - # merge dictionary's, the worker name will be replaced below after counting - new_dict[i] = {**existing_dict[i], **to_be_merged_dict[i]} - elif i == "worker_extra_conf": - # There is only one worker type that has a 'worker_extra_conf' and it is - # the media_repo. Since duplicate worker types on the same worker don't - # work, this is fine. - new_dict[i] = existing_dict[i] + to_be_merged_dict[i] - else: - # Everything else should be identical, like "app", which only works because - # all apps are now generic_workers. - new_dict[i] = to_be_merged_dict[i] + if not existing_dict: + # It doesn't exist yet, just use the new dict(but take a copy not a reference) + new_dict = to_be_merged_dict.copy() + else: + for i in to_be_merged_dict.keys(): + if (i == "endpoint_patterns") or (i == "listener_resources"): + # merge the two lists, remove duplicates + new_dict[i] = list(set(existing_dict[i] + to_be_merged_dict[i])) + elif i == "shared_extra_conf": + # merge dictionary's, the worker name will be replaced later + new_dict[i] = {**existing_dict[i], **to_be_merged_dict[i]} + elif i == "worker_extra_conf": + # There is only one worker type that has a 'worker_extra_conf' and it is + # the media_repo. Since duplicate worker types on the same worker don't + # work, this is fine. + new_dict[i] = existing_dict[i] + to_be_merged_dict[i] + else: + # Everything else should be identical, like "app", which only works + # because all apps are now generic_workers. + new_dict[i] = to_be_merged_dict[i] return new_dict @@ -467,6 +473,48 @@ def insert_worker_name_for_worker_config( return dict_to_edit +def apply_requested_multiplier_for_worker(worker_types: List[str]) -> List[str]: + """ + Apply multiplier(if found) by returning a new expanded list with some basic error + checking. + + Args: + worker_types: The unprocessed List of requested workers + Returns: + A new list with all requested workers expanded. + """ + # Checking performed: + # 1. if worker:2 or more is declared, it will create additional workers up to number + # 2. if worker:1, it will create a single copy of this worker as if no number was + # given + # 3. if worker:0 is declared, this worker will be ignored. This is to allow for + # scripting and automated expansion and is intended behaviour. + # 4. if worker:NaN or is a negative number, it will error and log it. + new_worker_types = [] + for worker_type in worker_types: + if ":" in worker_type: + worker_type_components = split_and_strip_string(worker_type, ":", 1) + worker_count = 0 + # Should only be 2 components, a type of worker(s) and an integer as a + # string. Cast the number as an int then it can be used as a counter. + try: + worker_count = int(worker_type_components[1]) + except ValueError: + error( + f"Bad number in worker count for '{worker_type}': " + f"'{worker_type_components[1]}' is not an integer" + ) + + # As long as there are more than 0, we add one to the list to make below. + for _ in range(worker_count): + new_worker_types.append(worker_type_components[0]) + + else: + # If it's not a real worker_type, it will error out later. + new_worker_types.append(worker_type) + return new_worker_types + + def is_sharding_allowed_for_worker_type(worker_type: str) -> bool: """Helper to check to make sure worker types that cannot have multiples do not. @@ -504,29 +552,220 @@ def generate_base_homeserver_config() -> None: subprocess.run(["/usr/local/bin/python", "/start.py", "migrate_config"], check=True) +def process_worker_types( + requested_worker_types: List[str], +) -> Dict[str, Dict[str, Any]]: + """Read the desired list of worker from environment variables and prepare the data + for use in generating worker config files while also checking for potential gotchas. + + Args: + requested_worker_types: The string pulled from the environment containing + the worker requested data. + + Returns: A tuple of a complex dict of all information needed to generate worker + files and the total count of a given set of worker types. Format: + + {'worker_name': + {'worker_base_name': 'base_name'}, + {'worker_roles_set': + Set{'worker_type', 'other_worker_type'} + } + } + """ + # Checking performed: + # 1. If a requested name contains a space + # 2. If a requested name contains either kind of quote mark + # 3. If a requested name ends with a digit + + # A counter of worker_type -> int. Used for determining the name for a given + # worker type(s) when generating its config file, as each worker's name is just + # worker_(name|type(s)) + instance # + worker_type_counter: Dict[str, int] = defaultdict(int) + + # Similar to above, but more finely grained. This is used to determine we don't have + # more than a single worker for cases where multiples would be bad(e.g. presence). + worker_type_shard_counter: Dict[str, int] = defaultdict(int) + + # Dict of worker name's. This is used to check that a name requested doesn't clash + # with an existing name in the context of a differing worker_type, as it will error + # with 'Address in use'(e.g. "to_device, to_device=typing" would not work). + # Follows the pattern: + # ["worker_name": "worker_type(s)"] + worker_name_checklist: Dict[str, str] = {} + + # The final result of all this processing + dict_to_return: Dict[str, Any] = {} + + # Handle any multipliers requested for a given worker. + multiple_processed_worker_types = apply_requested_multiplier_for_worker( + requested_worker_types + ) + + # Check each requested worker for a requested name + for worker_type_string in multiple_processed_worker_types: + # Shortcut these to avoid processing and skip an 'else' block when no worker + # name is actually requested. + requested_worker_name = "" + new_worker_type_string = worker_type_string + # First, check if a name is requested + if "=" in worker_type_string: + # Split on "=", remove extra whitespace from ends then make list + worker_type_split = split_and_strip_string(worker_type_string, "=") + if len(worker_type_split) > 2: + error( + "To many worker names requested for a single worker, or to many " + f"'='. Please fix: {worker_type_string}" + ) + # if there was no name given, this will still be an empty string + requested_worker_name = worker_type_split[0] + + # Check the last character of a requested name is not a number. This can + # cause an error that comes across during startup as an exception in + # 'startListening' and ends with 'Address already in use' for the port. + # This only seems to occur when requesting more than 10 of a given + # worker_type, otherwise it would be ok. + if requested_worker_name[-1].isdigit(): + error( + "Found a number at the end of the requested worker name: " + f"{requested_worker_name}. This is not allowed as it will cause " + "exceptions with 'Address already in use'. Recommend appending an " + "underscore after the number if this what you really want to do." + ) + + # Reassign the worker_type string with no name on it. + new_worker_type_string = worker_type_split[1] + + # At this point, we have: + # requested_worker_name which might be an empty string + # new_worker_type_string which might still be what it was when it came in + + # Split the worker_type_string on "+", remove whitespace from ends then make + # the list a set so it's deduplicated. Hopefully no one tries to put 2 + # pushers on the same worker(as it would consolidate into one). + workers_roles_list = split_and_strip_string(new_worker_type_string, "+") + workers_roles_set: Set[str] = set(workers_roles_list) + + # Shortcut this here, then it only has to survive intact(most of the time it + # will just pass right through) + worker_base_name = new_worker_type_string + if requested_worker_name: + worker_base_name = requested_worker_name + # It'll be useful to have this in the log in case it's a complex of many + # workers merged together. Note for Complement: it would only be seen in the + # logs for blueprint construction(which are not collected). + log( + f"Worker name request found: '{requested_worker_name}'" + f", for: {workers_roles_set}" + ) + + else: + # The worker name will be the worker_type, however if spaces exist + # between concatenated worker_types and the "+" because of readability, + # it will error on startup. Recombine worker_types without spaces and log. + # Allows for human readability while declaring a complex worker type, e.g. + # 'event_persister + federation_reader + federation_sender + pusher' + if (len(workers_roles_set) > 1) and (" " in worker_base_name): + worker_base_name = "+".join(sorted(workers_roles_set)) + log( + "Default worker name would have contained spaces, which is not " + f"allowed: '{worker_type_string}'. Reformed name to not contain " + f"spaces: '{worker_base_name}'" + ) + + # At this point, we have: + # worker_base_name which might be identical to + # new_worker_type_string which might still be what it was when it came in + # worker_roles_set which is a Set of what worker_types are requested + + # Uncommon mistake that will cause problems. Name string containing quotes + # or spaces will do Bad Things to filenames and probably nginx too. + if ( + (" " in worker_base_name) + or ('"' in worker_base_name) + or ("'" in worker_base_name) + ): + error( + "Requesting a worker name containing a quote mark or a space is " + "not allowed, as it would raise a FileNotFoundError. Please fix: " + f"{worker_base_name}" + ) + + # This counter is used for naming workers with an incrementing number. Use the + # worker_base_name for the index + worker_type_counter[worker_base_name] += 1 + + # Name workers by their type or requested name concatenated with an + # incrementing number. e.g. federation_reader1 or event_creator+event_persister1 + worker_name = worker_base_name + str(worker_type_counter[worker_base_name]) + + # Now that the worker name is settled, check this name isn't used for a + # different worker_type. If it's not allowed, will error and stop. If no + # issues, it will be added to the counter. This will prevent accidentally + # naming a worker by a worker_type. e.g. 'pusher, pusher=user_dir' + # Make sure the worker types being checked are deterministic. + deterministic_worker_role_string = "+".join(sorted(workers_roles_set)) + check_worker_type = worker_name_checklist.get(worker_base_name) + # Either this doesn't exist yet, or it matches with a twin + if (check_worker_type is None) or ( + check_worker_type == deterministic_worker_role_string + ): + # This is a no-op if it exists, which is expected to avoid the else block + worker_name_checklist.setdefault( + worker_base_name, deterministic_worker_role_string + ) + + else: + error( + f"Can not use {worker_name} for {deterministic_worker_role_string}. It " + f"is already in use by {check_worker_type}" + ) + + # Make sure we don't allow sharding for a worker type that doesn't support it. + # Will error and stop if it is a problem, e.g. 'background_worker'. + for worker_role in workers_roles_set: + if worker_role in worker_type_shard_counter: + if not is_sharding_allowed_for_worker_type(worker_role): + error( + f"There can be only a single worker with {worker_role} " + "type. Please recount and remove." + ) + # Not in shard counter, must not have seen it yet, add it. + worker_type_shard_counter[worker_role] += 1 + + # The worker has survived the gauntlet of why it can't exist. Add it to the pile + dict_to_return.setdefault(worker_name, {}).setdefault( + "worker_base_name", worker_base_name + ) + dict_to_return.setdefault(worker_name, {}).setdefault( + "worker_roles_set", set() + ).update(workers_roles_set) + + return dict_to_return + + def generate_worker_files( - environ: Mapping[str, str], config_path: str, data_dir: str + environ: Mapping[str, str], + config_path: str, + data_dir: str, + requested_worker_types: Dict[str, Any], ) -> None: - """Read the desired list of workers from environment variables and generate - shared homeserver, nginx and supervisord configs. + """Read the desired workers(if any) that is passed in and generate shared + homeserver, nginx and supervisord configs. Args: environ: os.environ instance. config_path: The location of the generated Synapse main worker config file. data_dir: The location of the synapse data directory. Where log and user-facing config files live. + requested_worker_types: A Dict containing requested workers in the format of + {'worker_name1': {'worker_type', ...}} """ # Note that yaml cares about indentation, so care should be taken to insert lines # into files at the correct indentation below. - # shared_config is the contents of a Synapse config file that will be shared amongst - # the main Synapse process as well as all workers. - # It is intended mainly for disabling functionality when certain workers are spun up, - # and adding a replication listener. - - # First read the original config file and extract the listeners block. Then we'll add - # another listener for replication. Later we'll write out the result to the shared - # config file. + # First read the original config file and extract the listeners block. Then we'll + # add another listener for replication. Later we'll write out the result to the + # shared config file. listeners = [ { "port": 9093, @@ -542,9 +781,9 @@ def generate_worker_files( listeners += original_listeners # The shared homeserver config. The contents of which will be inserted into the - # base shared worker jinja2 template. - # - # This config file will be passed to all workers, included Synapse's main process. + # base shared worker jinja2 template. This config file will be passed to all + # workers, included Synapse's main process. It is intended mainly for disabling + # functionality when certain workers are spun up, and adding a replication listener. shared_config: Dict[str, Any] = {"listeners": listeners} # List of dicts that describe workers. @@ -552,31 +791,23 @@ def generate_worker_files( # program blocks. worker_descriptors: List[Dict[str, Any]] = [] - # Upstreams for load-balancing purposes. This dict takes the form of a worker type to the - # ports of each worker. For example: + # Upstreams for load-balancing purposes. This dict takes the form of the base worker + # name to the ports of each worker. For example: # { - # worker_type: {1234, 1235, ...}} + # worker_base_name: {1234, 1235, ...}} # } # and will be used to construct 'upstream' nginx directives. nginx_upstreams: Dict[str, Set[int]] = {} - # A map of: {"endpoint": "upstream"}, where "upstream" is a str representing what will be - # placed after the proxy_pass directive. The main benefit to representing this data as a - # dict over a str is that we can easily deduplicate endpoints across multiple instances - # of the same worker. - # - # An nginx site config that will be amended to depending on the workers that are - # spun up. To be placed in /etc/nginx/conf.d. - nginx_locations = {} - - # Read the desired worker configuration from the environment - worker_types_env = environ.get("SYNAPSE_WORKER_TYPES", "").strip() - if not worker_types_env: - # No workers, just the main process - worker_types = [] - else: - # Split type names by comma, ignoring whitespace. - worker_types = [x.strip() for x in worker_types_env.split(",")] + # A temporary location dict that will help assemble port data for load-balancing + nginx_preprocessed_locations: Dict[str, Set[int]] = {} + + # A map of: {"endpoint": "upstream"}, where "upstream" is a str representing what + # will be placed after the proxy_pass directive. The main benefit to representing + # this data as a dict over a str is that we can easily deduplicate endpoints + # across multiple instances of the same worker. The final rendering will be combined + # with nginx_upstreams and placed in /etc/nginx/conf.d. + nginx_locations: Dict[str, str] = {} # Create the worker configuration directory if it doesn't already exist os.makedirs("/conf/workers", exist_ok=True) @@ -584,210 +815,38 @@ def generate_worker_files( # Start worker ports from this arbitrary port worker_port = 18009 - # A counter of worker_type -> int. Used for determining the name for a given - # worker type when generating its config file, as each worker's name is just - # worker_type(s) + instance # - worker_type_counter: Dict[str, int] = defaultdict(int) - - # Similar to above, but more finely grained. This is used to determine we don't have - # more than a single worker for cases where multiples would be bad(e.g. presence). - worker_type_shard_counter: Dict[str, int] = defaultdict(int) - - # Similar to above, but for worker name's. This has two uses: - # 1. Can be used to check that worker names for different worker types or - # combinations of types is not used, as it will error with 'Address in - # use'(e.g. "to_device, to_device=typing" would not work). - # 2. Convenient way to get the combination of worker types from worker_name after - # processing and merging. - # Follows the pattern: - # ["worker_name": "worker_type(s)"] - worker_name_type_list: Dict[str, str] = {} - # A list of internal endpoints to healthcheck, starting with the main process # which exists even if no workers do. healthcheck_urls = ["http://localhost:8080/health"] - # Expand worker_type multiples if requested in shorthand(e.g. worker:2). Checking - # for not an actual defined type of worker is done later. - # Checking performed: - # 1. if worker:2 or more is declared, it will create additional workers up to number - # 2. if worker:1, it will create a single copy of this worker as if no number was - # given - # 3. if worker:0 is declared, this worker will be ignored. This is to allow for - # scripting and automated expansion and is intended behaviour. - # 4. if worker:NaN or is a negative number, it will error and log it. - new_worker_types = [] - for worker_type in worker_types: - if ":" in worker_type: - worker_type_components = split_and_strip_string(worker_type, ":", 1) - worker_count = 0 - # Should only be 2 components, a type of worker(s) and an integer as a - # string. Cast the number as an int then it can be used as a counter. - try: - worker_count = int(worker_type_components[1]) - except ValueError: - error( - f"Bad number in worker count for '{worker_type}': " - "'{worker_type_components[1]}' is not an integer" - ) - - # As long as there are more than 0, we add one to the list to make below. - for _ in range(worker_count): - new_worker_types.append(worker_type_components[0]) + # For each worker type specified by the user, create config values and write it's + # yaml config file + for worker_name, worker_type_data in requested_worker_types.items(): + worker_type_set = worker_type_data.get("worker_roles_set") - else: - # If it's not a real worker, it will error out below - new_worker_types.append(worker_type) - - # worker_types is now an expanded list of worker types. - worker_types = new_worker_types - - # For each worker type specified by the user, create config values - for worker_type in worker_types: - # pre-template the worker_config so when updating we don't get a KeyError - worker_config: Dict[str, Any] = { - "app": "", - "listener_resources": [], - "endpoint_patterns": [], - "shared_extra_conf": {}, - "worker_extra_conf": "", - } - - # Peel off any name designated before a '=' to use later. - requested_worker_name = "" - if "=" in worker_type: - # Split on "=", remove extra whitespace from ends then make list - worker_type_split = split_and_strip_string(worker_type, "=") - if len(worker_type_split) > 2: - error( - "To many worker names requested for a single worker, or to many " - "'='. Please fix: " + worker_type - ) - # if there was no name given, this will still be an empty string - requested_worker_name = worker_type_split[0] - # Uncommon mistake that will cause problems. Name string containing quotes - # or spaces. - if ( - (" " in requested_worker_name) - or ('"' in requested_worker_name) - or ("'" in requested_worker_name) - ): - error( - "Requesting a worker name containing a quote mark or a space is " - "not allowed, as it would raise a FileNotFoundError. Please fix: " - f"{requested_worker_name }" - ) - - # Check the last character of a requested name is not a number. This only - # seems to occur when requesting more than 10 of a given worker_type. The - # error comes across as an exception in 'startListening' and ends with - # 'Address already in use' for the port - if requested_worker_name[-1].isdigit(): - error( - "Found a number at the end of the requested worker name: " - + requested_worker_name - + ". This is not allowed as it will cause exceptions with " - "'Address already in use'. Recommend appending an underscore " - "after the number if this what you really want to do." - ) - - # Reassign the worker_type string with no name on it. - worker_type = worker_type_split[1] - - # Split on "+", remove whitespace from ends then make a list. - workers_to_combo_list = split_and_strip_string(worker_type, "+") - - # Check for duplicates in the split worker type list. No advantage in having - # duplicated worker types on the same worker. Two would consolidate into one. - # (e.g. "pusher + pusher" would resolve to a single "pusher" which may not be - # what was intended.) - if len(workers_to_combo_list) != len(set(workers_to_combo_list)): - error("Duplicate worker type found in " + worker_type + "! Please fix.") + # The collected and processed data will live here. + worker_config: Dict[str, Any] = {} # Merge all worker config templates for this worker into a single config - for worker in workers_to_combo_list: + for worker_type in worker_type_set: # Verify this is a real defined worker type. If it's not, stop everything so # it can be fixed. - copy_of_template_config = WORKERS_CONFIG.get(worker) + copy_of_template_config = WORKERS_CONFIG.get(worker_type) if copy_of_template_config: # So it's not a reference pointer copy_of_template_config = copy_of_template_config.copy() else: error( - worker - + " is an unknown worker type! Was found in " - + worker_type - + ". Please fix!" + f"{worker_type} is an unknown worker type! Was found in " + f"{worker_type_set}. Please fix!" ) - # Merge worker type template configuration data. + # Merge worker type template configuration data. It's a combination of lists + # and dicts, so use this helper. worker_config = merge_worker_template_configs( worker_config, copy_of_template_config ) - # Check if there is to many of a worker there can only be one of. - # Will error and stop if it is a problem, e.g. 'background_worker'. - if worker in worker_type_shard_counter: - if not is_sharding_allowed_for_worker_type(worker): - error( - "There can be only a single worker with " - + worker - + " type. Please recount and remove." - ) - # Not in shard counter, add it. - worker_type_shard_counter[worker] += 1 - - # Name workers by their type or requested name concatenated with an - # incrementing number. e.g. federation_reader1 or event_creator+event_persister1 - worker_base_name: str - if requested_worker_name: - worker_base_name = requested_worker_name - # It'll be useful to have this in the log in case it's a complex of many - # workers merged together. Note for Complement: it would only be seen in the - # logs for blueprint construction(which are not collected). - log( - "Worker name request found: " - + requested_worker_name - + ", for: " - + str(workers_to_combo_list) - ) - - else: - # The worker name will be the worker_type, however if spaces exist - # between concatenated worker_types and the "+" because of readability, - # it will error on startup. Recombine worker_types without spaces and log. - if len(workers_to_combo_list) > 1: - worker_base_name = "+".join(workers_to_combo_list) - if worker_base_name != worker_type: - log( - "Default worker name would have contained spaces, which is not " - "allowed(" + worker_type + "). Reformed name to not contain " - "spaces: " + worker_base_name - ) - else: - worker_base_name = worker_type - # This counter is used for names with broader worker type definitions. - worker_type_counter[worker_type] += 1 - worker_name = worker_base_name + str(worker_type_counter[worker_type]) - - # Now that the worker name is settled, check this name isn't used for a - # different worker_type. If it's not allowed, will error and stop. If no issues, - # it will be added to the counter. - check_worker_type = worker_name_type_list.get(worker_base_name) - if (check_worker_type is None) or (check_worker_type == worker_type): - worker_name_type_list.setdefault(worker_base_name, worker_type) - - else: - error( - "Can not use " - + worker_name - + " for " - + worker_type - + ". It is already in use by " - # This is cast as a str because mypy thinks it could be None - + str(check_worker_type) - ) - # Replace placeholder names in the config template with the actual worker name. worker_config = insert_worker_name_for_worker_config(worker_config, worker_name) @@ -802,12 +861,9 @@ def generate_worker_files( healthcheck_urls.append("http://localhost:%d/health" % (worker_port,)) - # Check if more than one instance of this worker type has been specified - worker_type_total_count = worker_types.count(worker_type) - # Update the shared config with sharding-related options if necessary add_worker_roles_to_shared_config( - shared_config, workers_to_combo_list, worker_name, worker_port + shared_config, worker_type_set, worker_name, worker_port ) # Enable the worker in supervisord @@ -815,18 +871,9 @@ def generate_worker_files( # Add nginx location blocks for this worker's endpoints (if any are defined) for pattern in worker_config["endpoint_patterns"]: - # Determine whether we need to load-balance this worker - if worker_type_total_count > 1: - # Create or add to a load-balanced upstream for this worker - nginx_upstreams.setdefault(worker_base_name, set()).add(worker_port) - - # Upstreams are named after the worker_base_name - upstream = "http://" + worker_base_name - else: - upstream = "http://localhost:%d" % (worker_port,) - - # Note that this endpoint should proxy to this upstream - nginx_locations[pattern] = upstream + # Need more data to determine whether we need to load-balance this worker. + # Collect all the port numbers for a given endpoint + nginx_preprocessed_locations.setdefault(pattern, set()).add(worker_port) # Write out the worker's logging config file log_config_filepath = generate_worker_log_config(environ, worker_name, data_dir) @@ -841,6 +888,34 @@ def generate_worker_files( worker_port += 1 + # Re process all nginx upstream data. Worker_descriptors contains all the port data, + # cross-reference that with the worker_base_name in requested_worker_types. + for pattern, port_set in nginx_preprocessed_locations.items(): + if len(port_set) > 1: + # Only process upstreams for multiple port arrangements + upstream_name: Set[str] = set() + for worker in worker_descriptors: + # Find the port we want + if int(worker["port"]) in port_set: + # Capture the name. We want the base name as they will be grouped + # together. + upstream_name.add( + requested_worker_types[worker["name"]].get("worker_base_name") + ) + + # Join it all up nice and pretty with a double underscore + upstream = "__".join(sorted(upstream_name)) + upstream_location = "http://" + upstream + # And save the port numbers for writing out below + nginx_upstreams[upstream] = port_set + + else: + # Only a single port, just use that + (unpacked_port,) = port_set + upstream_location = "http://localhost:%d" % unpacked_port + + nginx_locations[pattern] = upstream_location + # Build the nginx location config blocks nginx_location_config = "" for endpoint, upstream in nginx_locations.items(): @@ -880,7 +955,7 @@ def generate_worker_files( if reg_path.suffix.lower() in (".yaml", ".yml") ] - workers_in_use = len(worker_types) > 0 + workers_in_use = len(requested_worker_types) > 0 # Shared homeserver config convert( @@ -977,12 +1052,25 @@ def main(args: List[str], environ: MutableMapping[str, str]) -> None: log("Generating base homeserver config") generate_base_homeserver_config() - # This script may be run multiple times (mostly by Complement, see note at top of file). - # Don't re-configure workers in this instance. + # This script may be run multiple times (mostly by Complement, see note at top of + # file). Don't re-configure workers in this instance. mark_filepath = "/conf/workers_have_been_configured" if not os.path.exists(mark_filepath): + # Collect and validate worker_type requests + # Read the desired worker configuration from the environment + worker_types_env = environ.get("SYNAPSE_WORKER_TYPES", "").strip() + # Only process worker_types if they exist + if not worker_types_env: + # No workers, just the main process + worker_types = [] + requested_worker_types: Dict[str, Any] = {} + else: + # Split type names by comma, ignoring whitespace. + worker_types = split_and_strip_string(worker_types_env, ",") + requested_worker_types = process_worker_types(worker_types) + # Always regenerate all other config files - generate_worker_files(environ, config_path, data_dir) + generate_worker_files(environ, config_path, data_dir, requested_worker_types) # Mark workers as being configured with open(mark_filepath, "w") as f: |