diff options
author | Olivier Wilkinson (reivilibre) <oliverw@matrix.org> | 2023-11-16 15:00:48 +0000 |
---|---|---|
committer | Olivier Wilkinson (reivilibre) <oliverw@matrix.org> | 2023-11-17 12:03:56 +0000 |
commit | a22eb7dc1563f4dce521dd4b42fb53ba8ede3b28 (patch) | |
tree | 10fa9c0dc95ef6ab4c15a98fcea67ac2c6b93ff6 | |
parent | Remove obsolete `"app"` from worker templates (diff) | |
download | synapse-a22eb7dc1563f4dce521dd4b42fb53ba8ede3b28.tar.xz |
Convert worker templates into dataclass
-rwxr-xr-x | docker/configure_workers_and_start.py | 312 |
1 files changed, 162 insertions, 150 deletions
diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py index 48abdae674..8329aab6dc 100755 --- a/docker/configure_workers_and_start.py +++ b/docker/configure_workers_and_start.py @@ -47,12 +47,14 @@ # in the project's README), this script may be run multiple times, and functionality should # continue to work if so. +import dataclasses import os import platform import re import subprocess import sys from collections import defaultdict +from dataclasses import dataclass, field from itertools import chain from pathlib import Path from typing import ( @@ -82,32 +84,41 @@ MAIN_PROCESS_UNIX_SOCKET_PRIVATE_PATH = "/run/main_private.sock" # during processing with the name of the worker. WORKER_PLACEHOLDER_NAME = "placeholder_name" + # Workers with exposed endpoints needs either "client", "federation", or "media" listener_resources # Watching /_matrix/client needs a "client" listener # Watching /_matrix/federation needs a "federation" listener # Watching /_matrix/media and related needs a "media" listener # Stream Writers require "client" and "replication" listeners because they # have to attach by instance_map to the master process and have client endpoints. -WORKERS_CONFIG: Dict[str, Dict[str, Any]] = { - "pusher": { - "listener_resources": [], - "endpoint_patterns": [], - "shared_extra_conf": {}, - "worker_extra_conf": "", - }, - "user_dir": { - "listener_resources": ["client"], - "endpoint_patterns": [ +@dataclass +class WorkerTemplate: + listener_resources: List[str] = field(default_factory=list) + endpoint_patterns: List[str] = field(default_factory=list) + shared_extra_conf: Dict[str, Any] = field(default_factory=dict) + worker_extra_conf: str = "" + + +WORKERS_CONFIG: Dict[str, WorkerTemplate] = { + "pusher": WorkerTemplate( + listener_resources=[], + endpoint_patterns=[], + shared_extra_conf={}, + worker_extra_conf="", + ), + "user_dir": WorkerTemplate( + listener_resources=["client"], + endpoint_patterns=[ "^/_matrix/client/(api/v1|r0|v3|unstable)/user_directory/search$" ], - "shared_extra_conf": { + shared_extra_conf={ "update_user_directory_from_worker": WORKER_PLACEHOLDER_NAME }, - "worker_extra_conf": "", - }, - "media_repository": { - "listener_resources": ["media"], - "endpoint_patterns": [ + worker_extra_conf="", + ), + "media_repository": WorkerTemplate( + listener_resources=["media"], + endpoint_patterns=[ "^/_matrix/media/", "^/_synapse/admin/v1/purge_media_cache$", "^/_synapse/admin/v1/room/.*/media.*$", @@ -116,40 +127,38 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = { "^/_synapse/admin/v1/quarantine_media/.*$", ], # The first configured media worker will run the media background jobs - "shared_extra_conf": { + shared_extra_conf={ "enable_media_repo": False, "media_instance_running_background_jobs": WORKER_PLACEHOLDER_NAME, }, - "worker_extra_conf": "enable_media_repo: true", - }, - "appservice": { - "listener_resources": [], - "endpoint_patterns": [], - "shared_extra_conf": { - "notify_appservices_from_worker": WORKER_PLACEHOLDER_NAME - }, - "worker_extra_conf": "", - }, - "federation_sender": { - "listener_resources": [], - "endpoint_patterns": [], - "shared_extra_conf": {}, - "worker_extra_conf": "", - }, - "synchrotron": { - "listener_resources": ["client"], - "endpoint_patterns": [ + worker_extra_conf="enable_media_repo: true", + ), + "appservice": WorkerTemplate( + listener_resources=[], + endpoint_patterns=[], + shared_extra_conf={"notify_appservices_from_worker": WORKER_PLACEHOLDER_NAME}, + worker_extra_conf="", + ), + "federation_sender": WorkerTemplate( + listener_resources=[], + endpoint_patterns=[], + shared_extra_conf={}, + worker_extra_conf="", + ), + "synchrotron": WorkerTemplate( + listener_resources=["client"], + endpoint_patterns=[ "^/_matrix/client/(v2_alpha|r0|v3)/sync$", "^/_matrix/client/(api/v1|v2_alpha|r0|v3)/events$", "^/_matrix/client/(api/v1|r0|v3)/initialSync$", "^/_matrix/client/(api/v1|r0|v3)/rooms/[^/]+/initialSync$", ], - "shared_extra_conf": {}, - "worker_extra_conf": "", - }, - "client_reader": { - "listener_resources": ["client"], - "endpoint_patterns": [ + shared_extra_conf={}, + worker_extra_conf="", + ), + "client_reader": WorkerTemplate( + listener_resources=["client"], + endpoint_patterns=[ "^/_matrix/client/(api/v1|r0|v3|unstable)/publicRooms$", "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/joined_members$", "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/context/.*$", @@ -178,12 +187,12 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = { "^/_matrix/client/(r0|v3|unstable)/capabilities$", "^/_matrix/client/(r0|v3|unstable)/notifications$", ], - "shared_extra_conf": {}, - "worker_extra_conf": "", - }, - "federation_reader": { - "listener_resources": ["federation"], - "endpoint_patterns": [ + shared_extra_conf={}, + worker_extra_conf="", + ), + "federation_reader": WorkerTemplate( + listener_resources=["federation"], + endpoint_patterns=[ "^/_matrix/federation/(v1|v2)/event/", "^/_matrix/federation/(v1|v2)/state/", "^/_matrix/federation/(v1|v2)/state_ids/", @@ -204,32 +213,32 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = { "^/_matrix/federation/(v1|v2)/get_groups_publicised$", "^/_matrix/key/v2/query", ], - "shared_extra_conf": {}, - "worker_extra_conf": "", - }, - "federation_inbound": { - "listener_resources": ["federation"], - "endpoint_patterns": ["/_matrix/federation/(v1|v2)/send/"], - "shared_extra_conf": {}, - "worker_extra_conf": "", - }, - "event_persister": { - "listener_resources": ["replication"], - "endpoint_patterns": [], - "shared_extra_conf": {}, - "worker_extra_conf": "", - }, - "background_worker": { - "listener_resources": [], - "endpoint_patterns": [], + shared_extra_conf={}, + worker_extra_conf="", + ), + "federation_inbound": WorkerTemplate( + listener_resources=["federation"], + endpoint_patterns=["/_matrix/federation/(v1|v2)/send/"], + shared_extra_conf={}, + worker_extra_conf="", + ), + "event_persister": WorkerTemplate( + listener_resources=["replication"], + endpoint_patterns=[], + shared_extra_conf={}, + worker_extra_conf="", + ), + "background_worker": WorkerTemplate( + listener_resources=[], + endpoint_patterns=[], # This worker cannot be sharded. Therefore, there should only ever be one # background worker. This is enforced for the safety of your database. - "shared_extra_conf": {"run_background_tasks_on": WORKER_PLACEHOLDER_NAME}, - "worker_extra_conf": "", - }, - "event_creator": { - "listener_resources": ["client"], - "endpoint_patterns": [ + shared_extra_conf={"run_background_tasks_on": WORKER_PLACEHOLDER_NAME}, + worker_extra_conf="", + ), + "event_creator": WorkerTemplate( + listener_resources=["client"], + endpoint_patterns=[ "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/redact", "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/send", "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$", @@ -237,53 +246,51 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = { "^/_matrix/client/(api/v1|r0|v3|unstable)/knock/", "^/_matrix/client/(api/v1|r0|v3|unstable)/profile/", ], - "shared_extra_conf": {}, - "worker_extra_conf": "", - }, - "frontend_proxy": { - "listener_resources": ["client", "replication"], - "endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload"], - "shared_extra_conf": {}, - "worker_extra_conf": "", - }, - "account_data": { - "listener_resources": ["client", "replication"], - "endpoint_patterns": [ + shared_extra_conf={}, + worker_extra_conf="", + ), + "frontend_proxy": WorkerTemplate( + listener_resources=["client", "replication"], + endpoint_patterns=["^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload"], + shared_extra_conf={}, + worker_extra_conf="", + ), + "account_data": WorkerTemplate( + listener_resources=["client", "replication"], + endpoint_patterns=[ "^/_matrix/client/(r0|v3|unstable)/.*/tags", "^/_matrix/client/(r0|v3|unstable)/.*/account_data", ], - "shared_extra_conf": {}, - "worker_extra_conf": "", - }, - "presence": { - "listener_resources": ["client", "replication"], - "endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/presence/"], - "shared_extra_conf": {}, - "worker_extra_conf": "", - }, - "receipts": { - "listener_resources": ["client", "replication"], - "endpoint_patterns": [ + shared_extra_conf={}, + worker_extra_conf="", + ), + "presence": WorkerTemplate( + listener_resources=["client", "replication"], + endpoint_patterns=["^/_matrix/client/(api/v1|r0|v3|unstable)/presence/"], + shared_extra_conf={}, + worker_extra_conf="", + ), + "receipts": WorkerTemplate( + listener_resources=["client", "replication"], + endpoint_patterns=[ "^/_matrix/client/(r0|v3|unstable)/rooms/.*/receipt", "^/_matrix/client/(r0|v3|unstable)/rooms/.*/read_markers", ], - "shared_extra_conf": {}, - "worker_extra_conf": "", - }, - "to_device": { - "listener_resources": ["client", "replication"], - "endpoint_patterns": ["^/_matrix/client/(r0|v3|unstable)/sendToDevice/"], - "shared_extra_conf": {}, - "worker_extra_conf": "", - }, - "typing": { - "listener_resources": ["client", "replication"], - "endpoint_patterns": [ - "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing" - ], - "shared_extra_conf": {}, - "worker_extra_conf": "", - }, + shared_extra_conf={}, + worker_extra_conf="", + ), + "to_device": WorkerTemplate( + listener_resources=["client", "replication"], + endpoint_patterns=["^/_matrix/client/(r0|v3|unstable)/sendToDevice/"], + shared_extra_conf={}, + worker_extra_conf="", + ), + "typing": WorkerTemplate( + listener_resources=["client", "replication"], + endpoint_patterns=["^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing"], + shared_extra_conf={}, + worker_extra_conf="", + ), } # Templates for sections that may be inserted multiple times in config files @@ -425,54 +432,59 @@ def add_worker_roles_to_shared_config( def merge_worker_template_configs( - existing_dict: Optional[Dict[str, Any]], - to_be_merged_dict: Dict[str, Any], -) -> Dict[str, Any]: + existing_template: WorkerTemplate, + to_be_merged_template: WorkerTemplate, +) -> WorkerTemplate: """When given an existing dict of worker template configuration consisting with both dicts and lists, merge new template data from WORKERS_CONFIG(or create) and return new dict. Args: - existing_dict: Either an existing worker template or a fresh blank one. - to_be_merged_dict: The template from WORKERS_CONFIGS to be merged into + existing_template: Either an existing worker template or a fresh blank one. + to_be_merged_template: The template from WORKERS_CONFIGS to be merged into existing_dict. Returns: The newly merged together dict values. """ - new_dict: Dict[str, Any] = {} - if not existing_dict: - # It doesn't exist yet, just use the new dict(but take a copy not a reference) - new_dict = to_be_merged_dict.copy() - else: - for i in to_be_merged_dict.keys(): - if (i == "endpoint_patterns") or (i == "listener_resources"): - # merge the two lists, remove duplicates - new_dict[i] = list(set(existing_dict[i] + to_be_merged_dict[i])) - elif i == "shared_extra_conf": - # merge dictionary's, the worker name will be replaced later - new_dict[i] = {**existing_dict[i], **to_be_merged_dict[i]} - elif i == "worker_extra_conf": - # There is only one worker type that has a 'worker_extra_conf' and it is - # the media_repo. Since duplicate worker types on the same worker don't - # work, this is fine. - new_dict[i] = existing_dict[i] + to_be_merged_dict[i] - else: - # Everything else should be identical, like "app", which only works - # because all apps are now generic_workers. - new_dict[i] = to_be_merged_dict[i] - return new_dict + # copy existing_template without any replacements + new_template: WorkerTemplate = dataclasses.replace(existing_template) + + # merge the two lists, remove duplicates + new_template.listener_resources = list( + set(new_template.listener_resources + to_be_merged_template.listener_resources) + ) + + # merge the two lists, remove duplicates + new_template.endpoint_patterns = list( + set(new_template.endpoint_patterns + to_be_merged_template.endpoint_patterns) + ) + + # merge dictionaries; the worker name will be replaced later + new_template.shared_extra_conf = { + **new_template.shared_extra_conf, + **to_be_merged_template.shared_extra_conf, + } + + # There is only one worker type that has a 'worker_extra_conf' and it is + # the media_repo. Since duplicate worker types on the same worker don't + # work, this is fine. + new_template.worker_extra_conf = ( + new_template.worker_extra_conf + to_be_merged_template.worker_extra_conf + ) + + return new_template def insert_worker_name_for_worker_config( - existing_dict: Dict[str, Any], worker_name: str + existing_template: WorkerTemplate, worker_name: str ) -> Dict[str, Any]: """Insert a given worker name into the worker's configuration dict. Args: - existing_dict: The worker_config dict that is imported into shared_config. + existing_template: The WorkerTemplate that is imported into shared_config. worker_name: The name of the worker to insert. Returns: Copy of the dict with newly inserted worker name """ - dict_to_edit = existing_dict.copy() + dict_to_edit = dataclasses.asdict(existing_template) for k, v in dict_to_edit["shared_extra_conf"].items(): # Only proceed if it's the placeholder name string if v == WORKER_PLACEHOLDER_NAME: @@ -793,27 +805,27 @@ def generate_worker_files( # Map locations to upstreams (corresponding to worker types) in Nginx # but only if we use the appropriate worker type for worker_type in all_worker_types_in_use: - for endpoint_pattern in WORKERS_CONFIG[worker_type]["endpoint_patterns"]: + for endpoint_pattern in WORKERS_CONFIG[worker_type].endpoint_patterns: nginx_locations[endpoint_pattern] = f"http://{worker_type}" # For each worker type specified by the user, create config values and write it's # yaml config file for worker_name, worker_types_set in requested_worker_types.items(): # The collected and processed data will live here. - worker_config: Dict[str, Any] = {} + worker_template: WorkerTemplate = WorkerTemplate() # Merge all worker config templates for this worker into a single config for worker_type in worker_types_set: - copy_of_template_config = WORKERS_CONFIG[worker_type].copy() - # Merge worker type template configuration data. It's a combination of lists # and dicts, so use this helper. - worker_config = merge_worker_template_configs( - worker_config, copy_of_template_config + worker_template = merge_worker_template_configs( + worker_template, WORKERS_CONFIG[worker_type] ) # Replace placeholder names in the config template with the actual worker name. - worker_config = insert_worker_name_for_worker_config(worker_config, worker_name) + worker_config: Dict[str, Any] = insert_worker_name_for_worker_config( + worker_template, worker_name + ) worker_config.update( {"name": worker_name, "port": str(worker_port), "config_path": config_path} |