summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/14921.misc1
-rwxr-xr-xdocker/complement/conf/start_for_complement.sh6
-rwxr-xr-xdocker/configure_workers_and_start.py521
3 files changed, 413 insertions, 115 deletions
diff --git a/changelog.d/14921.misc b/changelog.d/14921.misc
new file mode 100644
index 0000000000..599e23eb0c
--- /dev/null
+++ b/changelog.d/14921.misc
@@ -0,0 +1 @@
+Add additional functionality to declaring worker types when starting Complement in worker mode.
diff --git a/docker/complement/conf/start_for_complement.sh b/docker/complement/conf/start_for_complement.sh
index af13209c54..5560ab8b95 100755
--- a/docker/complement/conf/start_for_complement.sh
+++ b/docker/complement/conf/start_for_complement.sh
@@ -51,8 +51,7 @@ if [[ -n "$SYNAPSE_COMPLEMENT_USE_WORKERS" ]]; then
   # -z True if the length of string is zero.
   if [[ -z "$SYNAPSE_WORKER_TYPES" ]]; then
     export SYNAPSE_WORKER_TYPES="\
-      event_persister, \
-      event_persister, \
+      event_persister:2, \
       background_worker, \
       frontend_proxy, \
       event_creator, \
@@ -64,7 +63,8 @@ if [[ -n "$SYNAPSE_COMPLEMENT_USE_WORKERS" ]]; then
       synchrotron, \
       client_reader, \
       appservice, \
-      pusher"
+      pusher, \
+      stream_writers=account_data+presence+receipts+to_device+typing"
 
   fi
   log "Workers requested: $SYNAPSE_WORKER_TYPES"
diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py
index add8bb1ff6..cfb16c2e22 100755
--- a/docker/configure_workers_and_start.py
+++ b/docker/configure_workers_and_start.py
@@ -19,8 +19,15 @@
 # The environment variables it reads are:
 #   * SYNAPSE_SERVER_NAME: The desired server_name of the homeserver.
 #   * SYNAPSE_REPORT_STATS: Whether to report stats.
-#   * SYNAPSE_WORKER_TYPES: A comma separated list of worker names as specified in WORKER_CONFIG
-#         below. Leave empty for no workers.
+#   * SYNAPSE_WORKER_TYPES: A comma separated list of worker names as specified in WORKERS_CONFIG
+#         below. Leave empty for no workers. Add a ':' and a number at the end to
+#         multiply that worker. Append multiple worker types with '+' to merge the
+#         worker types into a single worker. Add a name and a '=' to the front of a
+#         worker type to give this instance a name in logs and nginx.
+#         Examples:
+#         SYNAPSE_WORKER_TYPES='event_persister, federation_sender, client_reader'
+#         SYNAPSE_WORKER_TYPES='event_persister:2, federation_sender:2, client_reader'
+#         SYNAPSE_WORKER_TYPES='stream_writers=account_data+presence+typing'
 #   * SYNAPSE_AS_REGISTRATION_DIR: If specified, a directory in which .yaml and .yml files
 #         will be treated as Application Service registration files.
 #   * SYNAPSE_TLS_CERT: Path to a TLS certificate in PEM format.
@@ -40,16 +47,33 @@
 
 import os
 import platform
+import re
 import subprocess
 import sys
+from collections import defaultdict
+from itertools import chain
 from pathlib import Path
-from typing import Any, Dict, List, Mapping, MutableMapping, NoReturn, Optional, Set
+from typing import (
+    Any,
+    Dict,
+    List,
+    Mapping,
+    MutableMapping,
+    NoReturn,
+    Optional,
+    Set,
+    SupportsIndex,
+)
 
 import yaml
 from jinja2 import Environment, FileSystemLoader
 
 MAIN_PROCESS_HTTP_LISTENER_PORT = 8080
 
+# A simple name used as a placeholder in the WORKERS_CONFIG below. This will be replaced
+# during processing with the name of the worker.
+WORKER_PLACEHOLDER_NAME = "placeholder_name"
+
 # Workers with exposed endpoints needs either "client", "federation", or "media" listener_resources
 # Watching /_matrix/client needs a "client" listener
 # Watching /_matrix/federation needs a "federation" listener
@@ -70,11 +94,13 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
         "endpoint_patterns": [
             "^/_matrix/client/(api/v1|r0|v3|unstable)/user_directory/search$"
         ],
-        "shared_extra_conf": {"update_user_directory_from_worker": "user_dir1"},
+        "shared_extra_conf": {
+            "update_user_directory_from_worker": WORKER_PLACEHOLDER_NAME
+        },
         "worker_extra_conf": "",
     },
     "media_repository": {
-        "app": "synapse.app.media_repository",
+        "app": "synapse.app.generic_worker",
         "listener_resources": ["media"],
         "endpoint_patterns": [
             "^/_matrix/media/",
@@ -87,7 +113,7 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
         # The first configured media worker will run the media background jobs
         "shared_extra_conf": {
             "enable_media_repo": False,
-            "media_instance_running_background_jobs": "media_repository1",
+            "media_instance_running_background_jobs": WORKER_PLACEHOLDER_NAME,
         },
         "worker_extra_conf": "enable_media_repo: true",
     },
@@ -95,7 +121,9 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
         "app": "synapse.app.generic_worker",
         "listener_resources": [],
         "endpoint_patterns": [],
-        "shared_extra_conf": {"notify_appservices_from_worker": "appservice1"},
+        "shared_extra_conf": {
+            "notify_appservices_from_worker": WORKER_PLACEHOLDER_NAME
+        },
         "worker_extra_conf": "",
     },
     "federation_sender": {
@@ -192,9 +220,9 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
         "app": "synapse.app.generic_worker",
         "listener_resources": [],
         "endpoint_patterns": [],
-        # This worker cannot be sharded. Therefore there should only ever be one background
-        # worker, and it should be named background_worker1
-        "shared_extra_conf": {"run_background_tasks_on": "background_worker1"},
+        # This worker cannot be sharded. Therefore, there should only ever be one
+        # background worker. This is enforced for the safety of your database.
+        "shared_extra_conf": {"run_background_tasks_on": WORKER_PLACEHOLDER_NAME},
         "worker_extra_conf": "",
     },
     "event_creator": {
@@ -275,7 +303,7 @@ NGINX_LOCATION_CONFIG_BLOCK = """
 """
 
 NGINX_UPSTREAM_CONFIG_BLOCK = """
-upstream {upstream_worker_type} {{
+upstream {upstream_worker_base_name} {{
 {body}
 }}
 """
@@ -326,7 +354,7 @@ def convert(src: str, dst: str, **template_vars: object) -> None:
 
 def add_worker_roles_to_shared_config(
     shared_config: dict,
-    worker_type: str,
+    worker_types_set: Set[str],
     worker_name: str,
     worker_port: int,
 ) -> None:
@@ -334,22 +362,36 @@ def add_worker_roles_to_shared_config(
     append appropriate worker information to it for the current worker_type instance.
 
     Args:
-        shared_config: The config dict that all worker instances share (after being converted to YAML)
-        worker_type: The type of worker (one of those defined in WORKERS_CONFIG).
+        shared_config: The config dict that all worker instances share (after being
+            converted to YAML)
+        worker_types_set: The type of worker (one of those defined in WORKERS_CONFIG).
+            This list can be a single worker type or multiple.
         worker_name: The name of the worker instance.
         worker_port: The HTTP replication port that the worker instance is listening on.
     """
-    # The instance_map config field marks the workers that write to various replication streams
+    # The instance_map config field marks the workers that write to various replication
+    # streams
     instance_map = shared_config.setdefault("instance_map", {})
 
-    # Worker-type specific sharding config
-    if worker_type == "pusher":
+    # This is a list of the stream_writers that there can be only one of. Events can be
+    # sharded, and therefore doesn't belong here.
+    singular_stream_writers = [
+        "account_data",
+        "presence",
+        "receipts",
+        "to_device",
+        "typing",
+    ]
+
+    # Worker-type specific sharding config. Now a single worker can fulfill multiple
+    # roles, check each.
+    if "pusher" in worker_types_set:
         shared_config.setdefault("pusher_instances", []).append(worker_name)
 
-    elif worker_type == "federation_sender":
+    if "federation_sender" in worker_types_set:
         shared_config.setdefault("federation_sender_instances", []).append(worker_name)
 
-    elif worker_type == "event_persister":
+    if "event_persister" in worker_types_set:
         # Event persisters write to the events stream, so we need to update
         # the list of event stream writers
         shared_config.setdefault("stream_writers", {}).setdefault("events", []).append(
@@ -362,19 +404,154 @@ def add_worker_roles_to_shared_config(
             "port": worker_port,
         }
 
-    elif worker_type in ["account_data", "presence", "receipts", "to_device", "typing"]:
-        # Update the list of stream writers
-        # It's convenient that the name of the worker type is the same as the stream to write
-        shared_config.setdefault("stream_writers", {}).setdefault(
-            worker_type, []
-        ).append(worker_name)
+    # Update the list of stream writers. It's convenient that the name of the worker
+    # type is the same as the stream to write. Iterate over the whole list in case there
+    # is more than one.
+    for worker in worker_types_set:
+        if worker in singular_stream_writers:
+            shared_config.setdefault("stream_writers", {}).setdefault(
+                worker, []
+            ).append(worker_name)
+
+            # Map of stream writer instance names to host/ports combos
+            # For now, all stream writers need http replication ports
+            instance_map[worker_name] = {
+                "host": "localhost",
+                "port": worker_port,
+            }
+
+
+def merge_worker_template_configs(
+    existing_dict: Dict[str, Any] | None,
+    to_be_merged_dict: Dict[str, Any],
+) -> Dict[str, Any]:
+    """When given an existing dict of worker template configuration consisting with both
+        dicts and lists, merge new template data from WORKERS_CONFIG(or create) and
+        return new dict.
 
-        # Map of stream writer instance names to host/ports combos
-        # For now, all stream writers need http replication ports
-        instance_map[worker_name] = {
-            "host": "localhost",
-            "port": worker_port,
-        }
+    Args:
+        existing_dict: Either an existing worker template or a fresh blank one.
+        to_be_merged_dict: The template from WORKERS_CONFIGS to be merged into
+            existing_dict.
+    Returns: The newly merged together dict values.
+    """
+    new_dict: Dict[str, Any] = {}
+    if not existing_dict:
+        # It doesn't exist yet, just use the new dict(but take a copy not a reference)
+        new_dict = to_be_merged_dict.copy()
+    else:
+        for i in to_be_merged_dict.keys():
+            if (i == "endpoint_patterns") or (i == "listener_resources"):
+                # merge the two lists, remove duplicates
+                new_dict[i] = list(set(existing_dict[i] + to_be_merged_dict[i]))
+            elif i == "shared_extra_conf":
+                # merge dictionary's, the worker name will be replaced later
+                new_dict[i] = {**existing_dict[i], **to_be_merged_dict[i]}
+            elif i == "worker_extra_conf":
+                # There is only one worker type that has a 'worker_extra_conf' and it is
+                # the media_repo. Since duplicate worker types on the same worker don't
+                # work, this is fine.
+                new_dict[i] = existing_dict[i] + to_be_merged_dict[i]
+            else:
+                # Everything else should be identical, like "app", which only works
+                # because all apps are now generic_workers.
+                new_dict[i] = to_be_merged_dict[i]
+    return new_dict
+
+
+def insert_worker_name_for_worker_config(
+    existing_dict: Dict[str, Any], worker_name: str
+) -> Dict[str, Any]:
+    """Insert a given worker name into the worker's configuration dict.
+
+    Args:
+        existing_dict: The worker_config dict that is imported into shared_config.
+        worker_name: The name of the worker to insert.
+    Returns: Copy of the dict with newly inserted worker name
+    """
+    dict_to_edit = existing_dict.copy()
+    for k, v in dict_to_edit["shared_extra_conf"].items():
+        # Only proceed if it's the placeholder name string
+        if v == WORKER_PLACEHOLDER_NAME:
+            dict_to_edit["shared_extra_conf"][k] = worker_name
+    return dict_to_edit
+
+
+def apply_requested_multiplier_for_worker(worker_types: List[str]) -> List[str]:
+    """
+    Apply multiplier(if found) by returning a new expanded list with some basic error
+    checking.
+
+    Args:
+        worker_types: The unprocessed List of requested workers
+    Returns:
+        A new list with all requested workers expanded.
+    """
+    # Checking performed:
+    # 1. if worker:2 or more is declared, it will create additional workers up to number
+    # 2. if worker:1, it will create a single copy of this worker as if no number was
+    #   given
+    # 3. if worker:0 is declared, this worker will be ignored. This is to allow for
+    #   scripting and automated expansion and is intended behaviour.
+    # 4. if worker:NaN or is a negative number, it will error and log it.
+    new_worker_types = []
+    for worker_type in worker_types:
+        if ":" in worker_type:
+            worker_type_components = split_and_strip_string(worker_type, ":", 1)
+            worker_count = 0
+            # Should only be 2 components, a type of worker(s) and an integer as a
+            # string. Cast the number as an int then it can be used as a counter.
+            try:
+                worker_count = int(worker_type_components[1])
+            except ValueError:
+                error(
+                    f"Bad number in worker count for '{worker_type}': "
+                    f"'{worker_type_components[1]}' is not an integer"
+                )
+
+            # As long as there are more than 0, we add one to the list to make below.
+            for _ in range(worker_count):
+                new_worker_types.append(worker_type_components[0])
+
+        else:
+            # If it's not a real worker_type, it will error out later.
+            new_worker_types.append(worker_type)
+    return new_worker_types
+
+
+def is_sharding_allowed_for_worker_type(worker_type: str) -> bool:
+    """Helper to check to make sure worker types that cannot have multiples do not.
+
+    Args:
+        worker_type: The type of worker to check against.
+    Returns: True if allowed, False if not
+    """
+    return worker_type not in [
+        "background_worker",
+        "account_data",
+        "presence",
+        "receipts",
+        "typing",
+        "to_device",
+    ]
+
+
+def split_and_strip_string(
+    given_string: str, split_char: str, max_split: SupportsIndex = -1
+) -> List[str]:
+    """
+    Helper to split a string on split_char and strip whitespace from each end of each
+        element.
+    Args:
+        given_string: The string to split
+        split_char: The character to split the string on
+        max_split: kwarg for split() to limit how many times the split() happens
+    Returns:
+        A List of strings
+    """
+    # Removes whitespace from ends of result strings before adding to list. Allow for
+    # overriding 'maxsplit' kwarg, default being -1 to signify no maximum.
+    return [x.strip() for x in given_string.split(split_char, maxsplit=max_split)]
 
 
 def generate_base_homeserver_config() -> None:
@@ -389,29 +566,153 @@ def generate_base_homeserver_config() -> None:
     subprocess.run(["/usr/local/bin/python", "/start.py", "migrate_config"], check=True)
 
 
+def parse_worker_types(
+    requested_worker_types: List[str],
+) -> Dict[str, Set[str]]:
+    """Read the desired list of requested workers and prepare the data for use in
+        generating worker config files while also checking for potential gotchas.
+
+    Args:
+        requested_worker_types: The list formed from the split environment variable
+            containing the unprocessed requests for workers.
+
+    Returns: A dict of worker names to set of worker types. Format:
+        {'worker_name':
+            {'worker_type', 'worker_type2'}
+        }
+    """
+    # A counter of worker_base_name -> int. Used for determining the name for a given
+    # worker when generating its config file, as each worker's name is just
+    # worker_base_name followed by instance number
+    worker_base_name_counter: Dict[str, int] = defaultdict(int)
+
+    # Similar to above, but more finely grained. This is used to determine we don't have
+    # more than a single worker for cases where multiples would be bad(e.g. presence).
+    worker_type_shard_counter: Dict[str, int] = defaultdict(int)
+
+    # The final result of all this processing
+    dict_to_return: Dict[str, Set[str]] = {}
+
+    # Handle any multipliers requested for given workers.
+    multiple_processed_worker_types = apply_requested_multiplier_for_worker(
+        requested_worker_types
+    )
+
+    # Process each worker_type_string
+    # Examples of expected formats:
+    #  - requested_name=type1+type2+type3
+    #  - synchrotron
+    #  - event_creator+event_persister
+    for worker_type_string in multiple_processed_worker_types:
+        # First, if a name is requested, use that — otherwise generate one.
+        worker_base_name: str = ""
+        if "=" in worker_type_string:
+            # Split on "=", remove extra whitespace from ends then make list
+            worker_type_split = split_and_strip_string(worker_type_string, "=")
+            if len(worker_type_split) > 2:
+                error(
+                    "There should only be one '=' in the worker type string. "
+                    f"Please fix: {worker_type_string}"
+                )
+
+            # Assign the name
+            worker_base_name = worker_type_split[0]
+
+            if not re.match(r"^[a-zA-Z0-9_+-]*[a-zA-Z_+-]$", worker_base_name):
+                # Apply a fairly narrow regex to the worker names. Some characters
+                # aren't safe for use in file paths or nginx configurations.
+                # Don't allow to end with a number because we'll add a number
+                # ourselves in a moment.
+                error(
+                    "Invalid worker name; please choose a name consisting of "
+                    "alphanumeric letters, _ + -, but not ending with a digit: "
+                    f"{worker_base_name!r}"
+                )
+
+            # Continue processing the remainder of the worker_type string
+            # with the name override removed.
+            worker_type_string = worker_type_split[1]
+
+        # Split the worker_type_string on "+", remove whitespace from ends then make
+        # the list a set so it's deduplicated.
+        worker_types_set: Set[str] = set(
+            split_and_strip_string(worker_type_string, "+")
+        )
+
+        if not worker_base_name:
+            # No base name specified: generate one deterministically from set of
+            # types
+            worker_base_name = "+".join(sorted(worker_types_set))
+
+        # At this point, we have:
+        #   worker_base_name which is the name for the worker, without counter.
+        #   worker_types_set which is the set of worker types for this worker.
+
+        # Validate worker_type and make sure we don't allow sharding for a worker type
+        # that doesn't support it. Will error and stop if it is a problem,
+        # e.g. 'background_worker'.
+        for worker_type in worker_types_set:
+            # Verify this is a real defined worker type. If it's not, stop everything so
+            # it can be fixed.
+            if worker_type not in WORKERS_CONFIG:
+                error(
+                    f"{worker_type} is an unknown worker type! Was found in "
+                    f"'{worker_type_string}'. Please fix!"
+                )
+
+            if worker_type in worker_type_shard_counter:
+                if not is_sharding_allowed_for_worker_type(worker_type):
+                    error(
+                        f"There can be only a single worker with {worker_type} "
+                        "type. Please recount and remove."
+                    )
+            # Not in shard counter, must not have seen it yet, add it.
+            worker_type_shard_counter[worker_type] += 1
+
+        # Generate the number for the worker using incrementing counter
+        worker_base_name_counter[worker_base_name] += 1
+        worker_number = worker_base_name_counter[worker_base_name]
+        worker_name = f"{worker_base_name}{worker_number}"
+
+        if worker_number > 1:
+            # If this isn't the first worker, check that we don't have a confusing
+            # mixture of worker types with the same base name.
+            first_worker_with_base_name = dict_to_return[f"{worker_base_name}1"]
+            if first_worker_with_base_name != worker_types_set:
+                error(
+                    f"Can not use worker_name: '{worker_name}' for worker_type(s): "
+                    f"{worker_types_set!r}. It is already in use by "
+                    f"worker_type(s): {first_worker_with_base_name!r}"
+                )
+
+        dict_to_return[worker_name] = worker_types_set
+
+    return dict_to_return
+
+
 def generate_worker_files(
-    environ: Mapping[str, str], config_path: str, data_dir: str
+    environ: Mapping[str, str],
+    config_path: str,
+    data_dir: str,
+    requested_worker_types: Dict[str, Set[str]],
 ) -> None:
-    """Read the desired list of workers from environment variables and generate
-    shared homeserver, nginx and supervisord configs.
+    """Read the desired workers(if any) that is passed in and generate shared
+        homeserver, nginx and supervisord configs.
 
     Args:
         environ: os.environ instance.
         config_path: The location of the generated Synapse main worker config file.
         data_dir: The location of the synapse data directory. Where log and
             user-facing config files live.
+        requested_worker_types: A Dict containing requested workers in the format of
+            {'worker_name1': {'worker_type', ...}}
     """
     # Note that yaml cares about indentation, so care should be taken to insert lines
     # into files at the correct indentation below.
 
-    # shared_config is the contents of a Synapse config file that will be shared amongst
-    # the main Synapse process as well as all workers.
-    # It is intended mainly for disabling functionality when certain workers are spun up,
-    # and adding a replication listener.
-
-    # First read the original config file and extract the listeners block. Then we'll add
-    # another listener for replication. Later we'll write out the result to the shared
-    # config file.
+    # First read the original config file and extract the listeners block. Then we'll
+    # add another listener for replication. Later we'll write out the result to the
+    # shared config file.
     listeners = [
         {
             "port": 9093,
@@ -427,9 +728,9 @@ def generate_worker_files(
             listeners += original_listeners
 
     # The shared homeserver config. The contents of which will be inserted into the
-    # base shared worker jinja2 template.
-    #
-    # This config file will be passed to all workers, included Synapse's main process.
+    # base shared worker jinja2 template. This config file will be passed to all
+    # workers, included Synapse's main process. It is intended mainly for disabling
+    # functionality when certain workers are spun up, and adding a replication listener.
     shared_config: Dict[str, Any] = {"listeners": listeners}
 
     # List of dicts that describe workers.
@@ -437,31 +738,20 @@ def generate_worker_files(
     # program blocks.
     worker_descriptors: List[Dict[str, Any]] = []
 
-    # Upstreams for load-balancing purposes. This dict takes the form of a worker type to the
-    # ports of each worker. For example:
+    # Upstreams for load-balancing purposes. This dict takes the form of the worker
+    # type to the ports of each worker. For example:
     # {
     #   worker_type: {1234, 1235, ...}}
     # }
     # and will be used to construct 'upstream' nginx directives.
     nginx_upstreams: Dict[str, Set[int]] = {}
 
-    # A map of: {"endpoint": "upstream"}, where "upstream" is a str representing what will be
-    # placed after the proxy_pass directive. The main benefit to representing this data as a
-    # dict over a str is that we can easily deduplicate endpoints across multiple instances
-    # of the same worker.
-    #
-    # An nginx site config that will be amended to depending on the workers that are
-    # spun up. To be placed in /etc/nginx/conf.d.
-    nginx_locations = {}
-
-    # Read the desired worker configuration from the environment
-    worker_types_env = environ.get("SYNAPSE_WORKER_TYPES", "").strip()
-    if not worker_types_env:
-        # No workers, just the main process
-        worker_types = []
-    else:
-        # Split type names by comma, ignoring whitespace.
-        worker_types = [x.strip() for x in worker_types_env.split(",")]
+    # A map of: {"endpoint": "upstream"}, where "upstream" is a str representing what
+    # will be placed after the proxy_pass directive. The main benefit to representing
+    # this data as a dict over a str is that we can easily deduplicate endpoints
+    # across multiple instances of the same worker. The final rendering will be combined
+    # with nginx_upstreams and placed in /etc/nginx/conf.d.
+    nginx_locations: Dict[str, str] = {}
 
     # Create the worker configuration directory if it doesn't already exist
     os.makedirs("/conf/workers", exist_ok=True)
@@ -469,66 +759,57 @@ def generate_worker_files(
     # Start worker ports from this arbitrary port
     worker_port = 18009
 
-    # A counter of worker_type -> int. Used for determining the name for a given
-    # worker type when generating its config file, as each worker's name is just
-    # worker_type + instance #
-    worker_type_counter: Dict[str, int] = {}
-
     # A list of internal endpoints to healthcheck, starting with the main process
     # which exists even if no workers do.
     healthcheck_urls = ["http://localhost:8080/health"]
 
-    # For each worker type specified by the user, create config values
-    for worker_type in worker_types:
-        worker_config = WORKERS_CONFIG.get(worker_type)
-        if worker_config:
-            worker_config = worker_config.copy()
-        else:
-            error(worker_type + " is an unknown worker type! Please fix!")
+    # Get the set of all worker types that we have configured
+    all_worker_types_in_use = set(chain(*requested_worker_types.values()))
+    # Map locations to upstreams (corresponding to worker types) in Nginx
+    # but only if we use the appropriate worker type
+    for worker_type in all_worker_types_in_use:
+        for endpoint_pattern in WORKERS_CONFIG[worker_type]["endpoint_patterns"]:
+            nginx_locations[endpoint_pattern] = f"http://{worker_type}"
+
+    # For each worker type specified by the user, create config values and write it's
+    # yaml config file
+    for worker_name, worker_types_set in requested_worker_types.items():
+        # The collected and processed data will live here.
+        worker_config: Dict[str, Any] = {}
+
+        # Merge all worker config templates for this worker into a single config
+        for worker_type in worker_types_set:
+            copy_of_template_config = WORKERS_CONFIG[worker_type].copy()
+
+            # Merge worker type template configuration data. It's a combination of lists
+            # and dicts, so use this helper.
+            worker_config = merge_worker_template_configs(
+                worker_config, copy_of_template_config
+            )
+
+        # Replace placeholder names in the config template with the actual worker name.
+        worker_config = insert_worker_name_for_worker_config(worker_config, worker_name)
 
-        new_worker_count = worker_type_counter.setdefault(worker_type, 0) + 1
-        worker_type_counter[worker_type] = new_worker_count
-
-        # Name workers by their type concatenated with an incrementing number
-        # e.g. federation_reader1
-        worker_name = worker_type + str(new_worker_count)
         worker_config.update(
             {"name": worker_name, "port": str(worker_port), "config_path": config_path}
         )
 
-        # Update the shared config with any worker-type specific options
-        shared_config.update(worker_config["shared_extra_conf"])
+        # Update the shared config with any worker_type specific options. The first of a
+        # given worker_type needs to stay assigned and not be replaced.
+        worker_config["shared_extra_conf"].update(shared_config)
+        shared_config = worker_config["shared_extra_conf"]
 
         healthcheck_urls.append("http://localhost:%d/health" % (worker_port,))
 
-        # Check if more than one instance of this worker type has been specified
-        worker_type_total_count = worker_types.count(worker_type)
-
         # Update the shared config with sharding-related options if necessary
         add_worker_roles_to_shared_config(
-            shared_config, worker_type, worker_name, worker_port
+            shared_config, worker_types_set, worker_name, worker_port
         )
 
         # Enable the worker in supervisord
         worker_descriptors.append(worker_config)
 
-        # Add nginx location blocks for this worker's endpoints (if any are defined)
-        for pattern in worker_config["endpoint_patterns"]:
-            # Determine whether we need to load-balance this worker
-            if worker_type_total_count > 1:
-                # Create or add to a load-balanced upstream for this worker
-                nginx_upstreams.setdefault(worker_type, set()).add(worker_port)
-
-                # Upstreams are named after the worker_type
-                upstream = "http://" + worker_type
-            else:
-                upstream = "http://localhost:%d" % (worker_port,)
-
-            # Note that this endpoint should proxy to this upstream
-            nginx_locations[pattern] = upstream
-
         # Write out the worker's logging config file
-
         log_config_filepath = generate_worker_log_config(environ, worker_name, data_dir)
 
         # Then a worker config file
@@ -539,6 +820,10 @@ def generate_worker_files(
             worker_log_config_filepath=log_config_filepath,
         )
 
+        # Save this worker's port number to the correct nginx upstreams
+        for worker_type in worker_types_set:
+            nginx_upstreams.setdefault(worker_type, set()).add(worker_port)
+
         worker_port += 1
 
     # Build the nginx location config blocks
@@ -551,15 +836,14 @@ def generate_worker_files(
 
     # Determine the load-balancing upstreams to configure
     nginx_upstream_config = ""
-
-    for upstream_worker_type, upstream_worker_ports in nginx_upstreams.items():
+    for upstream_worker_base_name, upstream_worker_ports in nginx_upstreams.items():
         body = ""
         for port in upstream_worker_ports:
-            body += "    server localhost:%d;\n" % (port,)
+            body += f"    server localhost:{port};\n"
 
         # Add to the list of configured upstreams
         nginx_upstream_config += NGINX_UPSTREAM_CONFIG_BLOCK.format(
-            upstream_worker_type=upstream_worker_type,
+            upstream_worker_base_name=upstream_worker_base_name,
             body=body,
         )
 
@@ -580,7 +864,7 @@ def generate_worker_files(
             if reg_path.suffix.lower() in (".yaml", ".yml")
         ]
 
-    workers_in_use = len(worker_types) > 0
+    workers_in_use = len(requested_worker_types) > 0
 
     # Shared homeserver config
     convert(
@@ -678,13 +962,26 @@ def main(args: List[str], environ: MutableMapping[str, str]) -> None:
         generate_base_homeserver_config()
     else:
         log("Base homeserver config exists—not regenerating")
-    # This script may be run multiple times (mostly by Complement, see note at top of file).
-    # Don't re-configure workers in this instance.
+    # This script may be run multiple times (mostly by Complement, see note at top of
+    # file). Don't re-configure workers in this instance.
     mark_filepath = "/conf/workers_have_been_configured"
     if not os.path.exists(mark_filepath):
+        # Collect and validate worker_type requests
+        # Read the desired worker configuration from the environment
+        worker_types_env = environ.get("SYNAPSE_WORKER_TYPES", "").strip()
+        # Only process worker_types if they exist
+        if not worker_types_env:
+            # No workers, just the main process
+            worker_types = []
+            requested_worker_types: Dict[str, Any] = {}
+        else:
+            # Split type names by comma, ignoring whitespace.
+            worker_types = split_and_strip_string(worker_types_env, ",")
+            requested_worker_types = parse_worker_types(worker_types)
+
         # Always regenerate all other config files
         log("Generating worker config files")
-        generate_worker_files(environ, config_path, data_dir)
+        generate_worker_files(environ, config_path, data_dir, requested_worker_types)
 
         # Mark workers as being configured
         with open(mark_filepath, "w") as f: