summary refs log tree commit diff
diff options
context:
space:
mode:
authorJason Little <realtyem@gmail.com>2023-03-03 05:49:24 -0600
committerJason Little <realtyem@gmail.com>2023-03-03 05:49:24 -0600
commit47a965f72054fd4f0d298fca858b0311738d87b3 (patch)
treeb50f54a1b18a02e2e09b0426a400801a9270ea54
parentMerge branch 'develop' into comp-worker-shorthand (diff)
downloadsynapse-47a965f72054fd4f0d298fca858b0311738d87b3.tar.xz
Apply changes from review.
1. Factor processing of all worker types from the environment out and away from
    generating config files.
2. Fix up early templating of 'worker_config' to remove boilerplate.
3. Wrangle nginx upstream processing to accommodate odd combinations for overlapping
    workers. e.g. 'user_dir, user_dir+presence' should be able to handle the(in this case
    single) endpoint over either worker. I think this was promised in a previous commit,
    consider it delivered.
4. Update a bunch of comments, and adjust some pre-existing to fit inside the line
    length count of 88(aka, make the green squiggles go away)
5. Move processing of unusable characters for worker names to a later position so it
    will check the name produced if one was not requested as well.
-rwxr-xr-xdocker/configure_workers_and_start.py616
1 files changed, 352 insertions, 264 deletions
diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py
index dbc1361fb2..006e5fd8da 100755
--- a/docker/configure_workers_and_start.py
+++ b/docker/configure_workers_and_start.py
@@ -350,7 +350,7 @@ def convert(src: str, dst: str, **template_vars: object) -> None:
 
 def add_worker_roles_to_shared_config(
     shared_config: dict,
-    worker_type_list: List[str],
+    worker_type_set: Set[str],
     worker_name: str,
     worker_port: int,
 ) -> None:
@@ -360,12 +360,13 @@ def add_worker_roles_to_shared_config(
     Args:
         shared_config: The config dict that all worker instances share (after being
             converted to YAML)
-        worker_type_list: The type of worker (one of those defined in WORKERS_CONFIG).
+        worker_type_set: The type of worker (one of those defined in WORKERS_CONFIG).
             This list can be a single worker type or multiple.
         worker_name: The name of the worker instance.
         worker_port: The HTTP replication port that the worker instance is listening on.
     """
-    # The instance_map config field marks the workers that write to various replication streams
+    # The instance_map config field marks the workers that write to various replication
+    # streams
     instance_map = shared_config.setdefault("instance_map", {})
 
     # This is a list of the stream_writers that there can be only one of. Events can be
@@ -380,13 +381,13 @@ def add_worker_roles_to_shared_config(
 
     # Worker-type specific sharding config. Now a single worker can fulfill multiple
     # roles, check each.
-    if "pusher" in worker_type_list:
+    if "pusher" in worker_type_set:
         shared_config.setdefault("pusher_instances", []).append(worker_name)
 
-    if "federation_sender" in worker_type_list:
+    if "federation_sender" in worker_type_set:
         shared_config.setdefault("federation_sender_instances", []).append(worker_name)
 
-    if "event_persister" in worker_type_list:
+    if "event_persister" in worker_type_set:
         # Event persisters write to the events stream, so we need to update
         # the list of event stream writers
         shared_config.setdefault("stream_writers", {}).setdefault("events", []).append(
@@ -402,7 +403,7 @@ def add_worker_roles_to_shared_config(
     # Update the list of stream writers. It's convenient that the name of the worker
     # type is the same as the stream to write. Iterate over the whole list in case there
     # is more than one.
-    for worker in worker_type_list:
+    for worker in worker_type_set:
         if worker in singular_stream_writers:
             shared_config.setdefault("stream_writers", {}).setdefault(
                 worker, []
@@ -417,11 +418,12 @@ def add_worker_roles_to_shared_config(
 
 
 def merge_worker_template_configs(
-    existing_dict: Dict[str, Any],
+    existing_dict: Dict[str, Any] | None,
     to_be_merged_dict: Dict[str, Any],
 ) -> Dict[str, Any]:
-    """When given an existing dict of worker template configuration, merge new template
-        data from WORKERS_CONFIG and return new dict.
+    """When given an existing dict of worker template configuration consisting with both
+        dicts and lists, merge new template data from WORKERS_CONFIG(or create) and
+        return new dict.
 
     Args:
         existing_dict: Either an existing worker template or a fresh blank one.
@@ -430,22 +432,26 @@ def merge_worker_template_configs(
     Returns: The newly merged together dict values.
     """
     new_dict: Dict[str, Any] = {}
-    for i in to_be_merged_dict.keys():
-        if (i == "endpoint_patterns") or (i == "listener_resources"):
-            # merge the two lists, remove duplicates
-            new_dict[i] = list(set(existing_dict[i] + to_be_merged_dict[i]))
-        elif i == "shared_extra_conf":
-            # merge dictionary's, the worker name will be replaced below after counting
-            new_dict[i] = {**existing_dict[i], **to_be_merged_dict[i]}
-        elif i == "worker_extra_conf":
-            # There is only one worker type that has a 'worker_extra_conf' and it is
-            # the media_repo. Since duplicate worker types on the same worker don't
-            # work, this is fine.
-            new_dict[i] = existing_dict[i] + to_be_merged_dict[i]
-        else:
-            # Everything else should be identical, like "app", which only works because
-            # all apps are now generic_workers.
-            new_dict[i] = to_be_merged_dict[i]
+    if not existing_dict:
+        # It doesn't exist yet, just use the new dict(but take a copy not a reference)
+        new_dict = to_be_merged_dict.copy()
+    else:
+        for i in to_be_merged_dict.keys():
+            if (i == "endpoint_patterns") or (i == "listener_resources"):
+                # merge the two lists, remove duplicates
+                new_dict[i] = list(set(existing_dict[i] + to_be_merged_dict[i]))
+            elif i == "shared_extra_conf":
+                # merge dictionary's, the worker name will be replaced later
+                new_dict[i] = {**existing_dict[i], **to_be_merged_dict[i]}
+            elif i == "worker_extra_conf":
+                # There is only one worker type that has a 'worker_extra_conf' and it is
+                # the media_repo. Since duplicate worker types on the same worker don't
+                # work, this is fine.
+                new_dict[i] = existing_dict[i] + to_be_merged_dict[i]
+            else:
+                # Everything else should be identical, like "app", which only works
+                # because all apps are now generic_workers.
+                new_dict[i] = to_be_merged_dict[i]
     return new_dict
 
 
@@ -467,6 +473,48 @@ def insert_worker_name_for_worker_config(
     return dict_to_edit
 
 
+def apply_requested_multiplier_for_worker(worker_types: List[str]) -> List[str]:
+    """
+    Apply multiplier(if found) by returning a new expanded list with some basic error
+    checking.
+
+    Args:
+        worker_types: The unprocessed List of requested workers
+    Returns:
+        A new list with all requested workers expanded.
+    """
+    # Checking performed:
+    # 1. if worker:2 or more is declared, it will create additional workers up to number
+    # 2. if worker:1, it will create a single copy of this worker as if no number was
+    #   given
+    # 3. if worker:0 is declared, this worker will be ignored. This is to allow for
+    #   scripting and automated expansion and is intended behaviour.
+    # 4. if worker:NaN or is a negative number, it will error and log it.
+    new_worker_types = []
+    for worker_type in worker_types:
+        if ":" in worker_type:
+            worker_type_components = split_and_strip_string(worker_type, ":", 1)
+            worker_count = 0
+            # Should only be 2 components, a type of worker(s) and an integer as a
+            # string. Cast the number as an int then it can be used as a counter.
+            try:
+                worker_count = int(worker_type_components[1])
+            except ValueError:
+                error(
+                    f"Bad number in worker count for '{worker_type}': "
+                    f"'{worker_type_components[1]}' is not an integer"
+                )
+
+            # As long as there are more than 0, we add one to the list to make below.
+            for _ in range(worker_count):
+                new_worker_types.append(worker_type_components[0])
+
+        else:
+            # If it's not a real worker_type, it will error out later.
+            new_worker_types.append(worker_type)
+    return new_worker_types
+
+
 def is_sharding_allowed_for_worker_type(worker_type: str) -> bool:
     """Helper to check to make sure worker types that cannot have multiples do not.
 
@@ -504,29 +552,220 @@ def generate_base_homeserver_config() -> None:
     subprocess.run(["/usr/local/bin/python", "/start.py", "migrate_config"], check=True)
 
 
+def process_worker_types(
+    requested_worker_types: List[str],
+) -> Dict[str, Dict[str, Any]]:
+    """Read the desired list of worker from environment variables and prepare the data
+    for use in generating worker config files while also checking for potential gotchas.
+
+    Args:
+        requested_worker_types: The string pulled from the environment containing
+            the worker requested data.
+
+    Returns: A tuple of a complex dict of all information needed to generate worker
+        files and the total count of a given set of worker types. Format:
+
+        {'worker_name':
+            {'worker_base_name': 'base_name'},
+            {'worker_roles_set':
+                Set{'worker_type', 'other_worker_type'}
+            }
+        }
+    """
+    # Checking performed:
+    #   1. If a requested name contains a space
+    #   2. If a requested name contains either kind of quote mark
+    #   3. If a requested name ends with a digit
+
+    # A counter of worker_type -> int. Used for determining the name for a given
+    # worker type(s) when generating its config file, as each worker's name is just
+    # worker_(name|type(s)) + instance #
+    worker_type_counter: Dict[str, int] = defaultdict(int)
+
+    # Similar to above, but more finely grained. This is used to determine we don't have
+    # more than a single worker for cases where multiples would be bad(e.g. presence).
+    worker_type_shard_counter: Dict[str, int] = defaultdict(int)
+
+    # Dict of worker name's. This is used to check that a name requested doesn't clash
+    # with an existing name in the context of a differing worker_type, as it will error
+    # with 'Address in use'(e.g. "to_device, to_device=typing" would not work).
+    # Follows the pattern:
+    # ["worker_name": "worker_type(s)"]
+    worker_name_checklist: Dict[str, str] = {}
+
+    # The final result of all this processing
+    dict_to_return: Dict[str, Any] = {}
+
+    # Handle any multipliers requested for a given worker.
+    multiple_processed_worker_types = apply_requested_multiplier_for_worker(
+        requested_worker_types
+    )
+
+    # Check each requested worker for a requested name
+    for worker_type_string in multiple_processed_worker_types:
+        # Shortcut these to avoid processing and skip an 'else' block when no worker
+        # name is actually requested.
+        requested_worker_name = ""
+        new_worker_type_string = worker_type_string
+        # First, check if a name is requested
+        if "=" in worker_type_string:
+            # Split on "=", remove extra whitespace from ends then make list
+            worker_type_split = split_and_strip_string(worker_type_string, "=")
+            if len(worker_type_split) > 2:
+                error(
+                    "To many worker names requested for a single worker, or to many "
+                    f"'='. Please fix: {worker_type_string}"
+                )
+            # if there was no name given, this will still be an empty string
+            requested_worker_name = worker_type_split[0]
+
+            # Check the last character of a requested name is not a number. This can
+            # cause an error that comes across during startup as an exception in
+            # 'startListening' and ends with 'Address already in use' for the port.
+            # This only seems to occur when requesting more than 10 of a given
+            # worker_type, otherwise it would be ok.
+            if requested_worker_name[-1].isdigit():
+                error(
+                    "Found a number at the end of the requested worker name: "
+                    f"{requested_worker_name}. This is not allowed as it will cause "
+                    "exceptions with 'Address already in use'. Recommend appending an "
+                    "underscore after the number if this what you really want to do."
+                )
+
+            # Reassign the worker_type string with no name on it.
+            new_worker_type_string = worker_type_split[1]
+
+        # At this point, we have:
+        #   requested_worker_name which might be an empty string
+        #   new_worker_type_string which might still be what it was when it came in
+
+        # Split the worker_type_string on "+", remove whitespace from ends then make
+        # the list a set so it's deduplicated. Hopefully no one tries to put 2
+        # pushers on the same worker(as it would consolidate into one).
+        workers_roles_list = split_and_strip_string(new_worker_type_string, "+")
+        workers_roles_set: Set[str] = set(workers_roles_list)
+
+        # Shortcut this here, then it only has to survive intact(most of the time it
+        # will just pass right through)
+        worker_base_name = new_worker_type_string
+        if requested_worker_name:
+            worker_base_name = requested_worker_name
+            # It'll be useful to have this in the log in case it's a complex of many
+            # workers merged together. Note for Complement: it would only be seen in the
+            # logs for blueprint construction(which are not collected).
+            log(
+                f"Worker name request found: '{requested_worker_name}'"
+                f", for: {workers_roles_set}"
+            )
+
+        else:
+            # The worker name will be the worker_type, however if spaces exist
+            # between concatenated worker_types and the "+" because of readability,
+            # it will error on startup. Recombine worker_types without spaces and log.
+            # Allows for human readability while declaring a complex worker type, e.g.
+            # 'event_persister + federation_reader + federation_sender + pusher'
+            if (len(workers_roles_set) > 1) and (" " in worker_base_name):
+                worker_base_name = "+".join(sorted(workers_roles_set))
+                log(
+                    "Default worker name would have contained spaces, which is not "
+                    f"allowed: '{worker_type_string}'. Reformed name to not contain "
+                    f"spaces: '{worker_base_name}'"
+                )
+
+        # At this point, we have:
+        #   worker_base_name which might be identical to
+        #   new_worker_type_string which might still be what it was when it came in
+        #   worker_roles_set which is a Set of what worker_types are requested
+
+        # Uncommon mistake that will cause problems. Name string containing quotes
+        # or spaces will do Bad Things to filenames and probably nginx too.
+        if (
+            (" " in worker_base_name)
+            or ('"' in worker_base_name)
+            or ("'" in worker_base_name)
+        ):
+            error(
+                "Requesting a worker name containing a quote mark or a space is "
+                "not allowed, as it would raise a FileNotFoundError. Please fix: "
+                f"{worker_base_name}"
+            )
+
+        # This counter is used for naming workers with an incrementing number. Use the
+        # worker_base_name for the index
+        worker_type_counter[worker_base_name] += 1
+
+        # Name workers by their type or requested name concatenated with an
+        # incrementing number. e.g. federation_reader1 or event_creator+event_persister1
+        worker_name = worker_base_name + str(worker_type_counter[worker_base_name])
+
+        # Now that the worker name is settled, check this name isn't used for a
+        # different worker_type. If it's not allowed, will error and stop. If no
+        # issues, it will be added to the counter. This will prevent accidentally
+        # naming a worker by a worker_type. e.g. 'pusher, pusher=user_dir'
+        # Make sure the worker types being checked are deterministic.
+        deterministic_worker_role_string = "+".join(sorted(workers_roles_set))
+        check_worker_type = worker_name_checklist.get(worker_base_name)
+        # Either this doesn't exist yet, or it matches with a twin
+        if (check_worker_type is None) or (
+            check_worker_type == deterministic_worker_role_string
+        ):
+            # This is a no-op if it exists, which is expected to avoid the else block
+            worker_name_checklist.setdefault(
+                worker_base_name, deterministic_worker_role_string
+            )
+
+        else:
+            error(
+                f"Can not use {worker_name} for {deterministic_worker_role_string}. It "
+                f"is already in use by {check_worker_type}"
+            )
+
+        # Make sure we don't allow sharding for a worker type that doesn't support it.
+        # Will error and stop if it is a problem, e.g. 'background_worker'.
+        for worker_role in workers_roles_set:
+            if worker_role in worker_type_shard_counter:
+                if not is_sharding_allowed_for_worker_type(worker_role):
+                    error(
+                        f"There can be only a single worker with {worker_role} "
+                        "type. Please recount and remove."
+                    )
+            # Not in shard counter, must not have seen it yet, add it.
+            worker_type_shard_counter[worker_role] += 1
+
+        # The worker has survived the gauntlet of why it can't exist. Add it to the pile
+        dict_to_return.setdefault(worker_name, {}).setdefault(
+            "worker_base_name", worker_base_name
+        )
+        dict_to_return.setdefault(worker_name, {}).setdefault(
+            "worker_roles_set", set()
+        ).update(workers_roles_set)
+
+    return dict_to_return
+
+
 def generate_worker_files(
-    environ: Mapping[str, str], config_path: str, data_dir: str
+    environ: Mapping[str, str],
+    config_path: str,
+    data_dir: str,
+    requested_worker_types: Dict[str, Any],
 ) -> None:
-    """Read the desired list of workers from environment variables and generate
-    shared homeserver, nginx and supervisord configs.
+    """Read the desired workers(if any) that is passed in and generate shared
+        homeserver, nginx and supervisord configs.
 
     Args:
         environ: os.environ instance.
         config_path: The location of the generated Synapse main worker config file.
         data_dir: The location of the synapse data directory. Where log and
             user-facing config files live.
+        requested_worker_types: A Dict containing requested workers in the format of
+            {'worker_name1': {'worker_type', ...}}
     """
     # Note that yaml cares about indentation, so care should be taken to insert lines
     # into files at the correct indentation below.
 
-    # shared_config is the contents of a Synapse config file that will be shared amongst
-    # the main Synapse process as well as all workers.
-    # It is intended mainly for disabling functionality when certain workers are spun up,
-    # and adding a replication listener.
-
-    # First read the original config file and extract the listeners block. Then we'll add
-    # another listener for replication. Later we'll write out the result to the shared
-    # config file.
+    # First read the original config file and extract the listeners block. Then we'll
+    # add another listener for replication. Later we'll write out the result to the
+    # shared config file.
     listeners = [
         {
             "port": 9093,
@@ -542,9 +781,9 @@ def generate_worker_files(
             listeners += original_listeners
 
     # The shared homeserver config. The contents of which will be inserted into the
-    # base shared worker jinja2 template.
-    #
-    # This config file will be passed to all workers, included Synapse's main process.
+    # base shared worker jinja2 template. This config file will be passed to all
+    # workers, included Synapse's main process. It is intended mainly for disabling
+    # functionality when certain workers are spun up, and adding a replication listener.
     shared_config: Dict[str, Any] = {"listeners": listeners}
 
     # List of dicts that describe workers.
@@ -552,31 +791,23 @@ def generate_worker_files(
     # program blocks.
     worker_descriptors: List[Dict[str, Any]] = []
 
-    # Upstreams for load-balancing purposes. This dict takes the form of a worker type to the
-    # ports of each worker. For example:
+    # Upstreams for load-balancing purposes. This dict takes the form of the base worker
+    # name to the ports of each worker. For example:
     # {
-    #   worker_type: {1234, 1235, ...}}
+    #   worker_base_name: {1234, 1235, ...}}
     # }
     # and will be used to construct 'upstream' nginx directives.
     nginx_upstreams: Dict[str, Set[int]] = {}
 
-    # A map of: {"endpoint": "upstream"}, where "upstream" is a str representing what will be
-    # placed after the proxy_pass directive. The main benefit to representing this data as a
-    # dict over a str is that we can easily deduplicate endpoints across multiple instances
-    # of the same worker.
-    #
-    # An nginx site config that will be amended to depending on the workers that are
-    # spun up. To be placed in /etc/nginx/conf.d.
-    nginx_locations = {}
-
-    # Read the desired worker configuration from the environment
-    worker_types_env = environ.get("SYNAPSE_WORKER_TYPES", "").strip()
-    if not worker_types_env:
-        # No workers, just the main process
-        worker_types = []
-    else:
-        # Split type names by comma, ignoring whitespace.
-        worker_types = [x.strip() for x in worker_types_env.split(",")]
+    # A temporary location dict that will help assemble port data for load-balancing
+    nginx_preprocessed_locations: Dict[str, Set[int]] = {}
+
+    # A map of: {"endpoint": "upstream"}, where "upstream" is a str representing what
+    # will be placed after the proxy_pass directive. The main benefit to representing
+    # this data as a dict over a str is that we can easily deduplicate endpoints
+    # across multiple instances of the same worker. The final rendering will be combined
+    # with nginx_upstreams and placed in /etc/nginx/conf.d.
+    nginx_locations: Dict[str, str] = {}
 
     # Create the worker configuration directory if it doesn't already exist
     os.makedirs("/conf/workers", exist_ok=True)
@@ -584,210 +815,38 @@ def generate_worker_files(
     # Start worker ports from this arbitrary port
     worker_port = 18009
 
-    # A counter of worker_type -> int. Used for determining the name for a given
-    # worker type when generating its config file, as each worker's name is just
-    # worker_type(s) + instance #
-    worker_type_counter: Dict[str, int] = defaultdict(int)
-
-    # Similar to above, but more finely grained. This is used to determine we don't have
-    # more than a single worker for cases where multiples would be bad(e.g. presence).
-    worker_type_shard_counter: Dict[str, int] = defaultdict(int)
-
-    # Similar to above, but for worker name's. This has two uses:
-    # 1. Can be used to check that worker names for different worker types or
-    #       combinations of types is not used, as it will error with 'Address in
-    #       use'(e.g. "to_device, to_device=typing" would not work).
-    # 2. Convenient way to get the combination of worker types from worker_name after
-    #       processing and merging.
-    # Follows the pattern:
-    # ["worker_name": "worker_type(s)"]
-    worker_name_type_list: Dict[str, str] = {}
-
     # A list of internal endpoints to healthcheck, starting with the main process
     # which exists even if no workers do.
     healthcheck_urls = ["http://localhost:8080/health"]
 
-    # Expand worker_type multiples if requested in shorthand(e.g. worker:2). Checking
-    # for not an actual defined type of worker is done later.
-    # Checking performed:
-    # 1. if worker:2 or more is declared, it will create additional workers up to number
-    # 2. if worker:1, it will create a single copy of this worker as if no number was
-    #   given
-    # 3. if worker:0 is declared, this worker will be ignored. This is to allow for
-    #   scripting and automated expansion and is intended behaviour.
-    # 4. if worker:NaN or is a negative number, it will error and log it.
-    new_worker_types = []
-    for worker_type in worker_types:
-        if ":" in worker_type:
-            worker_type_components = split_and_strip_string(worker_type, ":", 1)
-            worker_count = 0
-            # Should only be 2 components, a type of worker(s) and an integer as a
-            # string. Cast the number as an int then it can be used as a counter.
-            try:
-                worker_count = int(worker_type_components[1])
-            except ValueError:
-                error(
-                    f"Bad number in worker count for '{worker_type}': "
-                    "'{worker_type_components[1]}' is not an integer"
-                )
-
-            # As long as there are more than 0, we add one to the list to make below.
-            for _ in range(worker_count):
-                new_worker_types.append(worker_type_components[0])
+    # For each worker type specified by the user, create config values and write it's
+    # yaml config file
+    for worker_name, worker_type_data in requested_worker_types.items():
+        worker_type_set = worker_type_data.get("worker_roles_set")
 
-        else:
-            # If it's not a real worker, it will error out below
-            new_worker_types.append(worker_type)
-
-    # worker_types is now an expanded list of worker types.
-    worker_types = new_worker_types
-
-    # For each worker type specified by the user, create config values
-    for worker_type in worker_types:
-        # pre-template the worker_config so when updating we don't get a KeyError
-        worker_config: Dict[str, Any] = {
-            "app": "",
-            "listener_resources": [],
-            "endpoint_patterns": [],
-            "shared_extra_conf": {},
-            "worker_extra_conf": "",
-        }
-
-        # Peel off any name designated before a '=' to use later.
-        requested_worker_name = ""
-        if "=" in worker_type:
-            # Split on "=", remove extra whitespace from ends then make list
-            worker_type_split = split_and_strip_string(worker_type, "=")
-            if len(worker_type_split) > 2:
-                error(
-                    "To many worker names requested for a single worker, or to many "
-                    "'='. Please fix: " + worker_type
-                )
-            # if there was no name given, this will still be an empty string
-            requested_worker_name = worker_type_split[0]
-            # Uncommon mistake that will cause problems. Name string containing quotes
-            # or spaces.
-            if (
-                (" " in requested_worker_name)
-                or ('"' in requested_worker_name)
-                or ("'" in requested_worker_name)
-            ):
-                error(
-                    "Requesting a worker name containing a quote mark or a space is "
-                    "not allowed, as it would raise a FileNotFoundError. Please fix: "
-                    f"{requested_worker_name }"
-                )
-
-            # Check the last character of a requested name is not a number. This only
-            # seems to occur when requesting more than 10 of a given worker_type. The
-            # error comes across as an exception in 'startListening' and ends with
-            # 'Address already in use' for the port
-            if requested_worker_name[-1].isdigit():
-                error(
-                    "Found a number at the end of the requested worker name: "
-                    + requested_worker_name
-                    + ". This is not allowed as it will cause exceptions with "
-                    "'Address already in use'. Recommend appending an underscore "
-                    "after the number if this what you really want to do."
-                )
-
-            # Reassign the worker_type string with no name on it.
-            worker_type = worker_type_split[1]
-
-        # Split on "+", remove whitespace from ends then make a list.
-        workers_to_combo_list = split_and_strip_string(worker_type, "+")
-
-        # Check for duplicates in the split worker type list. No advantage in having
-        # duplicated worker types on the same worker. Two would consolidate into one.
-        # (e.g. "pusher + pusher" would resolve to a single "pusher" which may not be
-        # what was intended.)
-        if len(workers_to_combo_list) != len(set(workers_to_combo_list)):
-            error("Duplicate worker type found in " + worker_type + "! Please fix.")
+        # The collected and processed data will live here.
+        worker_config: Dict[str, Any] = {}
 
         # Merge all worker config templates for this worker into a single config
-        for worker in workers_to_combo_list:
+        for worker_type in worker_type_set:
             # Verify this is a real defined worker type. If it's not, stop everything so
             # it can be fixed.
-            copy_of_template_config = WORKERS_CONFIG.get(worker)
+            copy_of_template_config = WORKERS_CONFIG.get(worker_type)
             if copy_of_template_config:
                 # So it's not a reference pointer
                 copy_of_template_config = copy_of_template_config.copy()
             else:
                 error(
-                    worker
-                    + " is an unknown worker type! Was found in "
-                    + worker_type
-                    + ". Please fix!"
+                    f"{worker_type} is an unknown worker type! Was found in "
+                    f"{worker_type_set}. Please fix!"
                 )
 
-            # Merge worker type template configuration data.
+            # Merge worker type template configuration data. It's a combination of lists
+            # and dicts, so use this helper.
             worker_config = merge_worker_template_configs(
                 worker_config, copy_of_template_config
             )
 
-            # Check if there is to many of a worker there can only be one of.
-            # Will error and stop if it is a problem, e.g. 'background_worker'.
-            if worker in worker_type_shard_counter:
-                if not is_sharding_allowed_for_worker_type(worker):
-                    error(
-                        "There can be only a single worker with "
-                        + worker
-                        + " type. Please recount and remove."
-                    )
-            # Not in shard counter, add it.
-            worker_type_shard_counter[worker] += 1
-
-        # Name workers by their type or requested name concatenated with an
-        # incrementing number. e.g. federation_reader1 or event_creator+event_persister1
-        worker_base_name: str
-        if requested_worker_name:
-            worker_base_name = requested_worker_name
-            # It'll be useful to have this in the log in case it's a complex of many
-            # workers merged together. Note for Complement: it would only be seen in the
-            # logs for blueprint construction(which are not collected).
-            log(
-                "Worker name request found: "
-                + requested_worker_name
-                + ", for: "
-                + str(workers_to_combo_list)
-            )
-
-        else:
-            # The worker name will be the worker_type, however if spaces exist
-            # between concatenated worker_types and the "+" because of readability,
-            # it will error on startup. Recombine worker_types without spaces and log.
-            if len(workers_to_combo_list) > 1:
-                worker_base_name = "+".join(workers_to_combo_list)
-                if worker_base_name != worker_type:
-                    log(
-                        "Default worker name would have contained spaces, which is not "
-                        "allowed(" + worker_type + "). Reformed name to not contain "
-                        "spaces: " + worker_base_name
-                    )
-            else:
-                worker_base_name = worker_type
-        # This counter is used for names with broader worker type definitions.
-        worker_type_counter[worker_type] += 1
-        worker_name = worker_base_name + str(worker_type_counter[worker_type])
-
-        # Now that the worker name is settled, check this name isn't used for a
-        # different worker_type. If it's not allowed, will error and stop. If no issues,
-        # it will be added to the counter.
-        check_worker_type = worker_name_type_list.get(worker_base_name)
-        if (check_worker_type is None) or (check_worker_type == worker_type):
-            worker_name_type_list.setdefault(worker_base_name, worker_type)
-
-        else:
-            error(
-                "Can not use "
-                + worker_name
-                + " for "
-                + worker_type
-                + ". It is already in use by "
-                # This is cast as a str because mypy thinks it could be None
-                + str(check_worker_type)
-            )
-
         # Replace placeholder names in the config template with the actual worker name.
         worker_config = insert_worker_name_for_worker_config(worker_config, worker_name)
 
@@ -802,12 +861,9 @@ def generate_worker_files(
 
         healthcheck_urls.append("http://localhost:%d/health" % (worker_port,))
 
-        # Check if more than one instance of this worker type has been specified
-        worker_type_total_count = worker_types.count(worker_type)
-
         # Update the shared config with sharding-related options if necessary
         add_worker_roles_to_shared_config(
-            shared_config, workers_to_combo_list, worker_name, worker_port
+            shared_config, worker_type_set, worker_name, worker_port
         )
 
         # Enable the worker in supervisord
@@ -815,18 +871,9 @@ def generate_worker_files(
 
         # Add nginx location blocks for this worker's endpoints (if any are defined)
         for pattern in worker_config["endpoint_patterns"]:
-            # Determine whether we need to load-balance this worker
-            if worker_type_total_count > 1:
-                # Create or add to a load-balanced upstream for this worker
-                nginx_upstreams.setdefault(worker_base_name, set()).add(worker_port)
-
-                # Upstreams are named after the worker_base_name
-                upstream = "http://" + worker_base_name
-            else:
-                upstream = "http://localhost:%d" % (worker_port,)
-
-            # Note that this endpoint should proxy to this upstream
-            nginx_locations[pattern] = upstream
+            # Need more data to determine whether we need to load-balance this worker.
+            # Collect all the port numbers for a given endpoint
+            nginx_preprocessed_locations.setdefault(pattern, set()).add(worker_port)
 
         # Write out the worker's logging config file
         log_config_filepath = generate_worker_log_config(environ, worker_name, data_dir)
@@ -841,6 +888,34 @@ def generate_worker_files(
 
         worker_port += 1
 
+    # Re process all nginx upstream data. Worker_descriptors contains all the port data,
+    # cross-reference that with the worker_base_name in requested_worker_types.
+    for pattern, port_set in nginx_preprocessed_locations.items():
+        if len(port_set) > 1:
+            # Only process upstreams for multiple port arrangements
+            upstream_name: Set[str] = set()
+            for worker in worker_descriptors:
+                # Find the port we want
+                if int(worker["port"]) in port_set:
+                    # Capture the name. We want the base name as they will be grouped
+                    # together.
+                    upstream_name.add(
+                        requested_worker_types[worker["name"]].get("worker_base_name")
+                    )
+
+            # Join it all up nice and pretty with a double underscore
+            upstream = "__".join(sorted(upstream_name))
+            upstream_location = "http://" + upstream
+            # And save the port numbers for writing out below
+            nginx_upstreams[upstream] = port_set
+
+        else:
+            # Only a single port, just use that
+            (unpacked_port,) = port_set
+            upstream_location = "http://localhost:%d" % unpacked_port
+
+        nginx_locations[pattern] = upstream_location
+
     # Build the nginx location config blocks
     nginx_location_config = ""
     for endpoint, upstream in nginx_locations.items():
@@ -880,7 +955,7 @@ def generate_worker_files(
             if reg_path.suffix.lower() in (".yaml", ".yml")
         ]
 
-    workers_in_use = len(worker_types) > 0
+    workers_in_use = len(requested_worker_types) > 0
 
     # Shared homeserver config
     convert(
@@ -977,12 +1052,25 @@ def main(args: List[str], environ: MutableMapping[str, str]) -> None:
         log("Generating base homeserver config")
         generate_base_homeserver_config()
 
-    # This script may be run multiple times (mostly by Complement, see note at top of file).
-    # Don't re-configure workers in this instance.
+    # This script may be run multiple times (mostly by Complement, see note at top of
+    # file). Don't re-configure workers in this instance.
     mark_filepath = "/conf/workers_have_been_configured"
     if not os.path.exists(mark_filepath):
+        # Collect and validate worker_type requests
+        # Read the desired worker configuration from the environment
+        worker_types_env = environ.get("SYNAPSE_WORKER_TYPES", "").strip()
+        # Only process worker_types if they exist
+        if not worker_types_env:
+            # No workers, just the main process
+            worker_types = []
+            requested_worker_types: Dict[str, Any] = {}
+        else:
+            # Split type names by comma, ignoring whitespace.
+            worker_types = split_and_strip_string(worker_types_env, ",")
+            requested_worker_types = process_worker_types(worker_types)
+
         # Always regenerate all other config files
-        generate_worker_files(environ, config_path, data_dir)
+        generate_worker_files(environ, config_path, data_dir, requested_worker_types)
 
         # Mark workers as being configured
         with open(mark_filepath, "w") as f: