diff --git a/changelog.d/14921.misc b/changelog.d/14921.misc
new file mode 100644
index 0000000000..599e23eb0c
--- /dev/null
+++ b/changelog.d/14921.misc
@@ -0,0 +1 @@
+Add additional functionality to declaring worker types when starting Complement in worker mode.
diff --git a/docker/complement/conf/start_for_complement.sh b/docker/complement/conf/start_for_complement.sh
index af13209c54..5560ab8b95 100755
--- a/docker/complement/conf/start_for_complement.sh
+++ b/docker/complement/conf/start_for_complement.sh
@@ -51,8 +51,7 @@ if [[ -n "$SYNAPSE_COMPLEMENT_USE_WORKERS" ]]; then
# -z True if the length of string is zero.
if [[ -z "$SYNAPSE_WORKER_TYPES" ]]; then
export SYNAPSE_WORKER_TYPES="\
- event_persister, \
- event_persister, \
+ event_persister:2, \
background_worker, \
frontend_proxy, \
event_creator, \
@@ -64,7 +63,8 @@ if [[ -n "$SYNAPSE_COMPLEMENT_USE_WORKERS" ]]; then
synchrotron, \
client_reader, \
appservice, \
- pusher"
+ pusher, \
+ stream_writers=account_data+presence+receipts+to_device+typing"
fi
log "Workers requested: $SYNAPSE_WORKER_TYPES"
diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py
index add8bb1ff6..f2312ea04b 100755
--- a/docker/configure_workers_and_start.py
+++ b/docker/configure_workers_and_start.py
@@ -19,8 +19,15 @@
# The environment variables it reads are:
# * SYNAPSE_SERVER_NAME: The desired server_name of the homeserver.
# * SYNAPSE_REPORT_STATS: Whether to report stats.
-# * SYNAPSE_WORKER_TYPES: A comma separated list of worker names as specified in WORKER_CONFIG
-# below. Leave empty for no workers.
+# * SYNAPSE_WORKER_TYPES: A comma separated list of worker names as specified in WORKERS_CONFIG
+# below. Leave empty for no workers. Add a ':' and a number at the end to
+# multiply that worker. Append multiple worker types with '+' to merge the
+# worker types into a single worker. Add a name and a '=' to the front of a
+# worker type to give this instance a name in logs and nginx.
+# Examples:
+# SYNAPSE_WORKER_TYPES='event_persister, federation_sender, client_reader'
+# SYNAPSE_WORKER_TYPES='event_persister:2, federation_sender:2, client_reader'
+# SYNAPSE_WORKER_TYPES='stream_writers=account_data+presence+typing'
# * SYNAPSE_AS_REGISTRATION_DIR: If specified, a directory in which .yaml and .yml files
# will be treated as Application Service registration files.
# * SYNAPSE_TLS_CERT: Path to a TLS certificate in PEM format.
@@ -42,14 +49,29 @@ import os
import platform
import subprocess
import sys
+from collections import defaultdict
from pathlib import Path
-from typing import Any, Dict, List, Mapping, MutableMapping, NoReturn, Optional, Set
+from typing import (
+ Any,
+ Dict,
+ List,
+ Mapping,
+ MutableMapping,
+ NoReturn,
+ Optional,
+ Set,
+ SupportsIndex,
+)
import yaml
from jinja2 import Environment, FileSystemLoader
MAIN_PROCESS_HTTP_LISTENER_PORT = 8080
+# A simple name used as a placeholder in the WORKERS_CONFIG below. This will be replaced
+# during processing with the name of the worker.
+WORKER_PLACEHOLDER_NAME = "placeholder_name"
+
# Workers with exposed endpoints needs either "client", "federation", or "media" listener_resources
# Watching /_matrix/client needs a "client" listener
# Watching /_matrix/federation needs a "federation" listener
@@ -70,11 +92,13 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"endpoint_patterns": [
"^/_matrix/client/(api/v1|r0|v3|unstable)/user_directory/search$"
],
- "shared_extra_conf": {"update_user_directory_from_worker": "user_dir1"},
+ "shared_extra_conf": {
+ "update_user_directory_from_worker": WORKER_PLACEHOLDER_NAME
+ },
"worker_extra_conf": "",
},
"media_repository": {
- "app": "synapse.app.media_repository",
+ "app": "synapse.app.generic_worker",
"listener_resources": ["media"],
"endpoint_patterns": [
"^/_matrix/media/",
@@ -87,7 +111,7 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
# The first configured media worker will run the media background jobs
"shared_extra_conf": {
"enable_media_repo": False,
- "media_instance_running_background_jobs": "media_repository1",
+ "media_instance_running_background_jobs": WORKER_PLACEHOLDER_NAME,
},
"worker_extra_conf": "enable_media_repo: true",
},
@@ -95,7 +119,9 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"app": "synapse.app.generic_worker",
"listener_resources": [],
"endpoint_patterns": [],
- "shared_extra_conf": {"notify_appservices_from_worker": "appservice1"},
+ "shared_extra_conf": {
+ "notify_appservices_from_worker": WORKER_PLACEHOLDER_NAME
+ },
"worker_extra_conf": "",
},
"federation_sender": {
@@ -192,9 +218,9 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"app": "synapse.app.generic_worker",
"listener_resources": [],
"endpoint_patterns": [],
- # This worker cannot be sharded. Therefore there should only ever be one background
- # worker, and it should be named background_worker1
- "shared_extra_conf": {"run_background_tasks_on": "background_worker1"},
+ # This worker cannot be sharded. Therefore, there should only ever be one
+ # background worker. This is enforced for the safety of your database.
+ "shared_extra_conf": {"run_background_tasks_on": WORKER_PLACEHOLDER_NAME},
"worker_extra_conf": "",
},
"event_creator": {
@@ -275,7 +301,7 @@ NGINX_LOCATION_CONFIG_BLOCK = """
"""
NGINX_UPSTREAM_CONFIG_BLOCK = """
-upstream {upstream_worker_type} {{
+upstream {upstream_worker_base_name} {{
{body}
}}
"""
@@ -326,7 +352,7 @@ def convert(src: str, dst: str, **template_vars: object) -> None:
def add_worker_roles_to_shared_config(
shared_config: dict,
- worker_type: str,
+ worker_types_set: Set[str],
worker_name: str,
worker_port: int,
) -> None:
@@ -334,22 +360,36 @@ def add_worker_roles_to_shared_config(
append appropriate worker information to it for the current worker_type instance.
Args:
- shared_config: The config dict that all worker instances share (after being converted to YAML)
- worker_type: The type of worker (one of those defined in WORKERS_CONFIG).
+ shared_config: The config dict that all worker instances share (after being
+ converted to YAML)
+ worker_types_set: The type of worker (one of those defined in WORKERS_CONFIG).
+ This list can be a single worker type or multiple.
worker_name: The name of the worker instance.
worker_port: The HTTP replication port that the worker instance is listening on.
"""
- # The instance_map config field marks the workers that write to various replication streams
+ # The instance_map config field marks the workers that write to various replication
+ # streams
instance_map = shared_config.setdefault("instance_map", {})
- # Worker-type specific sharding config
- if worker_type == "pusher":
+ # This is a list of the stream_writers that there can be only one of. Events can be
+ # sharded, and therefore doesn't belong here.
+ singular_stream_writers = [
+ "account_data",
+ "presence",
+ "receipts",
+ "to_device",
+ "typing",
+ ]
+
+ # Worker-type specific sharding config. Now a single worker can fulfill multiple
+ # roles, check each.
+ if "pusher" in worker_types_set:
shared_config.setdefault("pusher_instances", []).append(worker_name)
- elif worker_type == "federation_sender":
+ if "federation_sender" in worker_types_set:
shared_config.setdefault("federation_sender_instances", []).append(worker_name)
- elif worker_type == "event_persister":
+ if "event_persister" in worker_types_set:
# Event persisters write to the events stream, so we need to update
# the list of event stream writers
shared_config.setdefault("stream_writers", {}).setdefault("events", []).append(
@@ -362,19 +402,154 @@ def add_worker_roles_to_shared_config(
"port": worker_port,
}
- elif worker_type in ["account_data", "presence", "receipts", "to_device", "typing"]:
- # Update the list of stream writers
- # It's convenient that the name of the worker type is the same as the stream to write
- shared_config.setdefault("stream_writers", {}).setdefault(
- worker_type, []
- ).append(worker_name)
+ # Update the list of stream writers. It's convenient that the name of the worker
+ # type is the same as the stream to write. Iterate over the whole list in case there
+ # is more than one.
+ for worker in worker_types_set:
+ if worker in singular_stream_writers:
+ shared_config.setdefault("stream_writers", {}).setdefault(
+ worker, []
+ ).append(worker_name)
+
+ # Map of stream writer instance names to host/ports combos
+ # For now, all stream writers need http replication ports
+ instance_map[worker_name] = {
+ "host": "localhost",
+ "port": worker_port,
+ }
+
+
+def merge_worker_template_configs(
+ existing_dict: Dict[str, Any] | None,
+ to_be_merged_dict: Dict[str, Any],
+) -> Dict[str, Any]:
+ """When given an existing dict of worker template configuration consisting with both
+ dicts and lists, merge new template data from WORKERS_CONFIG(or create) and
+ return new dict.
- # Map of stream writer instance names to host/ports combos
- # For now, all stream writers need http replication ports
- instance_map[worker_name] = {
- "host": "localhost",
- "port": worker_port,
- }
+ Args:
+ existing_dict: Either an existing worker template or a fresh blank one.
+ to_be_merged_dict: The template from WORKERS_CONFIGS to be merged into
+ existing_dict.
+ Returns: The newly merged together dict values.
+ """
+ new_dict: Dict[str, Any] = {}
+ if not existing_dict:
+ # It doesn't exist yet, just use the new dict(but take a copy not a reference)
+ new_dict = to_be_merged_dict.copy()
+ else:
+ for i in to_be_merged_dict.keys():
+ if (i == "endpoint_patterns") or (i == "listener_resources"):
+ # merge the two lists, remove duplicates
+ new_dict[i] = list(set(existing_dict[i] + to_be_merged_dict[i]))
+ elif i == "shared_extra_conf":
+ # merge dictionary's, the worker name will be replaced later
+ new_dict[i] = {**existing_dict[i], **to_be_merged_dict[i]}
+ elif i == "worker_extra_conf":
+ # There is only one worker type that has a 'worker_extra_conf' and it is
+ # the media_repo. Since duplicate worker types on the same worker don't
+ # work, this is fine.
+ new_dict[i] = existing_dict[i] + to_be_merged_dict[i]
+ else:
+ # Everything else should be identical, like "app", which only works
+ # because all apps are now generic_workers.
+ new_dict[i] = to_be_merged_dict[i]
+ return new_dict
+
+
+def insert_worker_name_for_worker_config(
+ existing_dict: Dict[str, Any], worker_name: str
+) -> Dict[str, Any]:
+ """Insert a given worker name into the worker's configuration dict.
+
+ Args:
+ existing_dict: The worker_config dict that is imported into shared_config.
+ worker_name: The name of the worker to insert.
+ Returns: Copy of the dict with newly inserted worker name
+ """
+ dict_to_edit = existing_dict.copy()
+ for k, v in dict_to_edit["shared_extra_conf"].items():
+ # Only proceed if it's the placeholder name string
+ if v == WORKER_PLACEHOLDER_NAME:
+ dict_to_edit["shared_extra_conf"][k] = worker_name
+ return dict_to_edit
+
+
+def apply_requested_multiplier_for_worker(worker_types: List[str]) -> List[str]:
+ """
+ Apply multiplier(if found) by returning a new expanded list with some basic error
+ checking.
+
+ Args:
+ worker_types: The unprocessed List of requested workers
+ Returns:
+ A new list with all requested workers expanded.
+ """
+ # Checking performed:
+ # 1. if worker:2 or more is declared, it will create additional workers up to number
+ # 2. if worker:1, it will create a single copy of this worker as if no number was
+ # given
+ # 3. if worker:0 is declared, this worker will be ignored. This is to allow for
+ # scripting and automated expansion and is intended behaviour.
+ # 4. if worker:NaN or is a negative number, it will error and log it.
+ new_worker_types = []
+ for worker_type in worker_types:
+ if ":" in worker_type:
+ worker_type_components = split_and_strip_string(worker_type, ":", 1)
+ worker_count = 0
+ # Should only be 2 components, a type of worker(s) and an integer as a
+ # string. Cast the number as an int then it can be used as a counter.
+ try:
+ worker_count = int(worker_type_components[1])
+ except ValueError:
+ error(
+ f"Bad number in worker count for '{worker_type}': "
+ f"'{worker_type_components[1]}' is not an integer"
+ )
+
+ # As long as there are more than 0, we add one to the list to make below.
+ for _ in range(worker_count):
+ new_worker_types.append(worker_type_components[0])
+
+ else:
+ # If it's not a real worker_type, it will error out later.
+ new_worker_types.append(worker_type)
+ return new_worker_types
+
+
+def is_sharding_allowed_for_worker_type(worker_type: str) -> bool:
+ """Helper to check to make sure worker types that cannot have multiples do not.
+
+ Args:
+ worker_type: The type of worker to check against.
+ Returns: True if allowed, False if not
+ """
+ return worker_type not in [
+ "background_worker",
+ "account_data",
+ "presence",
+ "receipts",
+ "typing",
+ "to_device",
+ ]
+
+
+def split_and_strip_string(
+ given_string: str, split_char: str, max_split: SupportsIndex = -1
+) -> List[str]:
+ """
+ Helper to split a string on split_char and strip whitespace from each end of each
+ element.
+ Args:
+ given_string: The string to split
+ split_char: The character to split the string on
+ max_split: kwarg for split() to limit how many times the split() happens
+ Returns:
+ A List of strings
+ """
+ # Removes whitespace from ends of result strings before adding to list. Allow for
+ # overriding 'maxsplit' kwarg, default being -1 to signify no maximum.
+ return [x.strip() for x in given_string.split(split_char, maxsplit=max_split)]
def generate_base_homeserver_config() -> None:
@@ -389,29 +564,218 @@ def generate_base_homeserver_config() -> None:
subprocess.run(["/usr/local/bin/python", "/start.py", "migrate_config"], check=True)
+def parse_worker_types(
+ requested_worker_types: List[str],
+) -> Dict[str, Dict[str, Any]]:
+ """Read the desired list of requested workers and prepare the data for use in
+ generating worker config files while also checking for potential gotchas.
+
+ Args:
+ requested_worker_types: The list formed from the split environment variable
+ containing the unprocessed requests for workers.
+
+ Returns: A dict of all information needed to generate worker files. Format:
+ {'worker_name':
+ {'worker_base_name': 'base_name'},
+
+ {'worker_types_set':
+ Set{'worker_type', 'other_worker_type'}
+ }
+ }
+ """
+ # Checking performed:
+ # 1. If a requested name contains a space
+ # 2. If a requested name contains either kind of quote mark
+ # 3. If a requested name ends with a digit
+
+ # A counter of worker_type -> int. Used for determining the name for a given
+ # worker type(s) when generating its config file, as each worker's name is just
+ # worker_(name|type(s)) + instance #
+ worker_type_counter: Dict[str, int] = defaultdict(int)
+
+ # Similar to above, but more finely grained. This is used to determine we don't have
+ # more than a single worker for cases where multiples would be bad(e.g. presence).
+ worker_type_shard_counter: Dict[str, int] = defaultdict(int)
+
+ # Dict of worker_base_name's -> worker_type. The worker_type(s) in this case is a
+ # sorted set turned into a string. Use this as a reference that a given
+ # worker_base_name is used only for this worker_type(s). Preventing silly typos.
+ # e.g.
+ # 'bob=user_dir+event_creator, bob=event_creator+user_dir' would be fine and resolve
+ # correctly
+ # 'bob=user_dir+event_creator, bob=user_dir+frontend_proxy' would not be fine.
+ # Follows the pattern:
+ # ["worker_base_name": "worker_type(s)"]
+ worker_name_checklist: Dict[str, str] = {}
+
+ # The final result of all this processing
+ dict_to_return: Dict[str, Any] = {}
+
+ # Handle any multipliers requested for a given worker.
+ multiple_processed_worker_types = apply_requested_multiplier_for_worker(
+ requested_worker_types
+ )
+
+ # Check each requested worker_type_string for a user-requested name and parse it
+ # out if present.
+ for worker_type_string in multiple_processed_worker_types:
+ requested_worker_name = ""
+ new_worker_type_string = worker_type_string
+ # First, check if a name is requested
+ if "=" in worker_type_string:
+ # Split on "=", remove extra whitespace from ends then make list
+ worker_type_split = split_and_strip_string(worker_type_string, "=")
+ if len(worker_type_split) > 2:
+ error(
+ "To many worker names requested for a single worker, or to many "
+ f"'='. Please fix: {worker_type_string}"
+ )
+ # if there was no name given, this will still be an empty string
+ requested_worker_name = worker_type_split[0]
+
+ # Check the last character of a requested name is not a number. This can
+ # cause an error that comes across during startup as an exception in
+ # 'startListening' and ends with 'Address already in use' for the port.
+ # This only seems to occur when requesting more than 10 of a given
+ # worker_type, otherwise it would be ok.
+ if requested_worker_name[-1].isdigit():
+ error(
+ "Found a number at the end of the requested worker name: "
+ f"{requested_worker_name}. This is not allowed as it will cause "
+ "exceptions with 'Address already in use'. Recommend appending an "
+ "underscore after the number if this what you really want to do."
+ )
+
+ # Reassign the worker_type string with no name on it.
+ new_worker_type_string = worker_type_split[1]
+
+ # At this point, we have:
+ # requested_worker_name which might be an empty string
+ # new_worker_type_string which might still be what it was when it came in
+
+ # Split the worker_type_string on "+", remove whitespace from ends then make
+ # the list a set so it's deduplicated.
+ worker_types_list = split_and_strip_string(new_worker_type_string, "+")
+ worker_types_set: Set[str] = set(worker_types_list)
+
+ worker_base_name = new_worker_type_string
+ if requested_worker_name:
+ worker_base_name = requested_worker_name
+ # It'll be useful to have this in the log in case it's a complex of many
+ # workers merged together. Note for Complement: it would only be seen in the
+ # logs for blueprint construction(which are not collected).
+ log(
+ f"Worker name request found: '{requested_worker_name}'"
+ f", for: {worker_types_set}"
+ )
+
+ else:
+ # The worker name will be the worker_type, however if spaces exist
+ # between concatenated worker_types and the "+" because of readability,
+ # it will error on startup. Recombine worker_types without spaces and log.
+ # Allows for human readability while declaring a complex worker type, e.g.
+ # 'event_persister + federation_reader + federation_sender + pusher'
+ if (len(worker_types_set) > 1) and (" " in worker_base_name):
+ worker_base_name = "+".join(sorted(worker_types_set))
+ log(
+ "Default worker name would have contained spaces, which is not "
+ f"allowed: '{worker_type_string}'. Reformed name to not contain "
+ f"spaces: '{worker_base_name}'"
+ )
+
+ # At this point, we have:
+ # worker_base_name which might be identical to
+ # new_worker_type_string which might still be what it was when it came in
+ # worker_types_set which is a Set of what worker_types are requested
+
+ # Uncommon mistake that will cause problems. Name string containing quotes
+ # or spaces will do Bad Things to filenames and probably nginx too.
+ if (
+ (" " in worker_base_name)
+ or ('"' in worker_base_name)
+ or ("'" in worker_base_name)
+ ):
+ error(
+ "Requesting a worker name containing a quote mark or a space is "
+ "not allowed, as it would raise a FileNotFoundError. Please fix: "
+ f"{worker_base_name}"
+ )
+
+ # This counter is used for naming workers with an incrementing number. Use the
+ # worker_base_name for the index
+ worker_type_counter[worker_base_name] += 1
+
+ # Name workers by their type or requested name concatenated with an
+ # incrementing number. e.g. federation_reader1 or event_creator+event_persister1
+ worker_name = worker_base_name + str(worker_type_counter[worker_base_name])
+
+ # Now that the worker name is settled, check this worker_base_name isn't used
+ # for a different worker_type. Make sure the worker types being checked are
+ # deterministic.
+ deterministic_worker_type_string = "+".join(sorted(worker_types_set))
+ check_worker_type = worker_name_checklist.get(worker_base_name)
+ # Either this doesn't exist yet, or it matches with a twin
+ if (check_worker_type is None) or (
+ check_worker_type == deterministic_worker_type_string
+ ):
+ # This is a no-op if it exists, which is expected to avoid the else block
+ worker_name_checklist.setdefault(
+ worker_base_name, deterministic_worker_type_string
+ )
+
+ else:
+ error(
+ f"Can not use worker_name: '{worker_name}' for worker_type(s): "
+ f"'{deterministic_worker_type_string}'. It is already in use by "
+ f"worker_type(s): '{check_worker_type}'"
+ )
+
+ # Make sure we don't allow sharding for a worker type that doesn't support it.
+ # Will error and stop if it is a problem, e.g. 'background_worker'.
+ for worker_type in worker_types_set:
+ if worker_type in worker_type_shard_counter:
+ if not is_sharding_allowed_for_worker_type(worker_type):
+ error(
+ f"There can be only a single worker with {worker_type} "
+ "type. Please recount and remove."
+ )
+ # Not in shard counter, must not have seen it yet, add it.
+ worker_type_shard_counter[worker_type] += 1
+
+ # The worker has survived the gauntlet of why it can't exist. Add it to the pile
+ dict_to_return.setdefault(worker_name, {}).setdefault(
+ "worker_base_name", worker_base_name
+ )
+ dict_to_return.setdefault(worker_name, {}).setdefault(
+ "worker_types_set", set()
+ ).update(worker_types_set)
+
+ return dict_to_return
+
+
def generate_worker_files(
- environ: Mapping[str, str], config_path: str, data_dir: str
+ environ: Mapping[str, str],
+ config_path: str,
+ data_dir: str,
+ requested_worker_types: Dict[str, Any],
) -> None:
- """Read the desired list of workers from environment variables and generate
- shared homeserver, nginx and supervisord configs.
+ """Read the desired workers(if any) that is passed in and generate shared
+ homeserver, nginx and supervisord configs.
Args:
environ: os.environ instance.
config_path: The location of the generated Synapse main worker config file.
data_dir: The location of the synapse data directory. Where log and
user-facing config files live.
+ requested_worker_types: A Dict containing requested workers in the format of
+ {'worker_name1': {'worker_type', ...}}
"""
# Note that yaml cares about indentation, so care should be taken to insert lines
# into files at the correct indentation below.
- # shared_config is the contents of a Synapse config file that will be shared amongst
- # the main Synapse process as well as all workers.
- # It is intended mainly for disabling functionality when certain workers are spun up,
- # and adding a replication listener.
-
- # First read the original config file and extract the listeners block. Then we'll add
- # another listener for replication. Later we'll write out the result to the shared
- # config file.
+ # First read the original config file and extract the listeners block. Then we'll
+ # add another listener for replication. Later we'll write out the result to the
+ # shared config file.
listeners = [
{
"port": 9093,
@@ -427,9 +791,9 @@ def generate_worker_files(
listeners += original_listeners
# The shared homeserver config. The contents of which will be inserted into the
- # base shared worker jinja2 template.
- #
- # This config file will be passed to all workers, included Synapse's main process.
+ # base shared worker jinja2 template. This config file will be passed to all
+ # workers, included Synapse's main process. It is intended mainly for disabling
+ # functionality when certain workers are spun up, and adding a replication listener.
shared_config: Dict[str, Any] = {"listeners": listeners}
# List of dicts that describe workers.
@@ -437,31 +801,26 @@ def generate_worker_files(
# program blocks.
worker_descriptors: List[Dict[str, Any]] = []
- # Upstreams for load-balancing purposes. This dict takes the form of a worker type to the
- # ports of each worker. For example:
+ # Upstreams for load-balancing purposes. This dict takes the form of the base worker
+ # name to the ports of each worker. For example:
# {
- # worker_type: {1234, 1235, ...}}
+ # worker_base_name: {1234, 1235, ...}}
# }
# and will be used to construct 'upstream' nginx directives.
nginx_upstreams: Dict[str, Set[int]] = {}
- # A map of: {"endpoint": "upstream"}, where "upstream" is a str representing what will be
- # placed after the proxy_pass directive. The main benefit to representing this data as a
- # dict over a str is that we can easily deduplicate endpoints across multiple instances
- # of the same worker.
- #
- # An nginx site config that will be amended to depending on the workers that are
- # spun up. To be placed in /etc/nginx/conf.d.
- nginx_locations = {}
-
- # Read the desired worker configuration from the environment
- worker_types_env = environ.get("SYNAPSE_WORKER_TYPES", "").strip()
- if not worker_types_env:
- # No workers, just the main process
- worker_types = []
- else:
- # Split type names by comma, ignoring whitespace.
- worker_types = [x.strip() for x in worker_types_env.split(",")]
+ # A map that will collect port data for load-balancing upstreams before being
+ # reprocessed into nginx_locations. Unfortunately, we cannot just use
+ # nginx_locations as there is a typing clash.
+ # Format: {"endpoint": {1234, 1235, ...}}
+ nginx_preprocessed_locations: Dict[str, Set[int]] = {}
+
+ # A map of: {"endpoint": "upstream"}, where "upstream" is a str representing what
+ # will be placed after the proxy_pass directive. The main benefit to representing
+ # this data as a dict over a str is that we can easily deduplicate endpoints
+ # across multiple instances of the same worker. The final rendering will be combined
+ # with nginx_upstreams and placed in /etc/nginx/conf.d.
+ nginx_locations: Dict[str, str] = {}
# Create the worker configuration directory if it doesn't already exist
os.makedirs("/conf/workers", exist_ok=True)
@@ -469,44 +828,55 @@ def generate_worker_files(
# Start worker ports from this arbitrary port
worker_port = 18009
- # A counter of worker_type -> int. Used for determining the name for a given
- # worker type when generating its config file, as each worker's name is just
- # worker_type + instance #
- worker_type_counter: Dict[str, int] = {}
-
# A list of internal endpoints to healthcheck, starting with the main process
# which exists even if no workers do.
healthcheck_urls = ["http://localhost:8080/health"]
- # For each worker type specified by the user, create config values
- for worker_type in worker_types:
- worker_config = WORKERS_CONFIG.get(worker_type)
- if worker_config:
- worker_config = worker_config.copy()
- else:
- error(worker_type + " is an unknown worker type! Please fix!")
+ # For each worker type specified by the user, create config values and write it's
+ # yaml config file
+ for worker_name, worker_type_data in requested_worker_types.items():
+ worker_types_set = worker_type_data.get("worker_types_set")
+
+ # The collected and processed data will live here.
+ worker_config: Dict[str, Any] = {}
+
+ # Merge all worker config templates for this worker into a single config
+ for worker_type in worker_types_set:
+ # Verify this is a real defined worker type. If it's not, stop everything so
+ # it can be fixed.
+ copy_of_template_config = WORKERS_CONFIG.get(worker_type)
+ if copy_of_template_config:
+ # So it's not a reference pointer
+ copy_of_template_config = copy_of_template_config.copy()
+ else:
+ error(
+ f"{worker_type} is an unknown worker type! Was found in "
+ f"{worker_types_set}. Please fix!"
+ )
- new_worker_count = worker_type_counter.setdefault(worker_type, 0) + 1
- worker_type_counter[worker_type] = new_worker_count
+ # Merge worker type template configuration data. It's a combination of lists
+ # and dicts, so use this helper.
+ worker_config = merge_worker_template_configs(
+ worker_config, copy_of_template_config
+ )
+
+ # Replace placeholder names in the config template with the actual worker name.
+ worker_config = insert_worker_name_for_worker_config(worker_config, worker_name)
- # Name workers by their type concatenated with an incrementing number
- # e.g. federation_reader1
- worker_name = worker_type + str(new_worker_count)
worker_config.update(
{"name": worker_name, "port": str(worker_port), "config_path": config_path}
)
- # Update the shared config with any worker-type specific options
- shared_config.update(worker_config["shared_extra_conf"])
+ # Update the shared config with any worker_type specific options. The first of a
+ # given worker_type needs to stay assigned and not be replaced.
+ worker_config["shared_extra_conf"].update(shared_config)
+ shared_config = worker_config["shared_extra_conf"]
healthcheck_urls.append("http://localhost:%d/health" % (worker_port,))
- # Check if more than one instance of this worker type has been specified
- worker_type_total_count = worker_types.count(worker_type)
-
# Update the shared config with sharding-related options if necessary
add_worker_roles_to_shared_config(
- shared_config, worker_type, worker_name, worker_port
+ shared_config, worker_types_set, worker_name, worker_port
)
# Enable the worker in supervisord
@@ -514,21 +884,11 @@ def generate_worker_files(
# Add nginx location blocks for this worker's endpoints (if any are defined)
for pattern in worker_config["endpoint_patterns"]:
- # Determine whether we need to load-balance this worker
- if worker_type_total_count > 1:
- # Create or add to a load-balanced upstream for this worker
- nginx_upstreams.setdefault(worker_type, set()).add(worker_port)
-
- # Upstreams are named after the worker_type
- upstream = "http://" + worker_type
- else:
- upstream = "http://localhost:%d" % (worker_port,)
-
- # Note that this endpoint should proxy to this upstream
- nginx_locations[pattern] = upstream
+ # Need more data to determine whether we need to load-balance this worker.
+ # Collect all the port numbers for a given endpoint
+ nginx_preprocessed_locations.setdefault(pattern, set()).add(worker_port)
# Write out the worker's logging config file
-
log_config_filepath = generate_worker_log_config(environ, worker_name, data_dir)
# Then a worker config file
@@ -541,6 +901,29 @@ def generate_worker_files(
worker_port += 1
+ # Re process all nginx upstream data. Worker_descriptors contains all the port data,
+ # cross-reference that with the worker_base_name in requested_worker_types.
+ for pattern, port_set in nginx_preprocessed_locations.items():
+ upstream_name: Set[str] = set()
+ for worker in worker_descriptors:
+ # Find the port we want
+ if int(worker["port"]) in port_set:
+ # Capture the name. We want the base name as they will be grouped
+ # together.
+ upstream_name.add(
+ requested_worker_types[worker["name"]].get("worker_base_name")
+ )
+
+ # Join it all up nice and pretty with a double underscore
+ upstream = "__".join(sorted(upstream_name))
+ upstream_location = "http://" + upstream
+
+ # Save the upstream location to it's associated pattern
+ nginx_locations[pattern] = upstream_location
+
+ # And save the port numbers for writing out below
+ nginx_upstreams[upstream] = port_set
+
# Build the nginx location config blocks
nginx_location_config = ""
for endpoint, upstream in nginx_locations.items():
@@ -552,14 +935,14 @@ def generate_worker_files(
# Determine the load-balancing upstreams to configure
nginx_upstream_config = ""
- for upstream_worker_type, upstream_worker_ports in nginx_upstreams.items():
+ for upstream_worker_base_name, upstream_worker_ports in nginx_upstreams.items():
body = ""
for port in upstream_worker_ports:
body += " server localhost:%d;\n" % (port,)
# Add to the list of configured upstreams
nginx_upstream_config += NGINX_UPSTREAM_CONFIG_BLOCK.format(
- upstream_worker_type=upstream_worker_type,
+ upstream_worker_base_name=upstream_worker_base_name,
body=body,
)
@@ -580,7 +963,7 @@ def generate_worker_files(
if reg_path.suffix.lower() in (".yaml", ".yml")
]
- workers_in_use = len(worker_types) > 0
+ workers_in_use = len(requested_worker_types) > 0
# Shared homeserver config
convert(
@@ -678,13 +1061,26 @@ def main(args: List[str], environ: MutableMapping[str, str]) -> None:
generate_base_homeserver_config()
else:
log("Base homeserver config exists—not regenerating")
- # This script may be run multiple times (mostly by Complement, see note at top of file).
- # Don't re-configure workers in this instance.
+ # This script may be run multiple times (mostly by Complement, see note at top of
+ # file). Don't re-configure workers in this instance.
mark_filepath = "/conf/workers_have_been_configured"
if not os.path.exists(mark_filepath):
+ # Collect and validate worker_type requests
+ # Read the desired worker configuration from the environment
+ worker_types_env = environ.get("SYNAPSE_WORKER_TYPES", "").strip()
+ # Only process worker_types if they exist
+ if not worker_types_env:
+ # No workers, just the main process
+ worker_types = []
+ requested_worker_types: Dict[str, Any] = {}
+ else:
+ # Split type names by comma, ignoring whitespace.
+ worker_types = split_and_strip_string(worker_types_env, ",")
+ requested_worker_types = parse_worker_types(worker_types)
+
# Always regenerate all other config files
log("Generating worker config files")
- generate_worker_files(environ, config_path, data_dir)
+ generate_worker_files(environ, config_path, data_dir, requested_worker_types)
# Mark workers as being configured
with open(mark_filepath, "w") as f:
|