summary refs log tree commit diff
diff options
context:
space:
mode:
authorOlivier Wilkinson (reivilibre) <oliverw@matrix.org>2023-11-16 15:00:48 +0000
committerOlivier Wilkinson (reivilibre) <oliverw@matrix.org>2023-11-17 12:03:56 +0000
commita22eb7dc1563f4dce521dd4b42fb53ba8ede3b28 (patch)
tree10fa9c0dc95ef6ab4c15a98fcea67ac2c6b93ff6
parentRemove obsolete `"app"` from worker templates (diff)
downloadsynapse-a22eb7dc1563f4dce521dd4b42fb53ba8ede3b28.tar.xz
Convert worker templates into dataclass
-rwxr-xr-xdocker/configure_workers_and_start.py312
1 files changed, 162 insertions, 150 deletions
diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py
index 48abdae674..8329aab6dc 100755
--- a/docker/configure_workers_and_start.py
+++ b/docker/configure_workers_and_start.py
@@ -47,12 +47,14 @@
 # in the project's README), this script may be run multiple times, and functionality should
 # continue to work if so.
 
+import dataclasses
 import os
 import platform
 import re
 import subprocess
 import sys
 from collections import defaultdict
+from dataclasses import dataclass, field
 from itertools import chain
 from pathlib import Path
 from typing import (
@@ -82,32 +84,41 @@ MAIN_PROCESS_UNIX_SOCKET_PRIVATE_PATH = "/run/main_private.sock"
 # during processing with the name of the worker.
 WORKER_PLACEHOLDER_NAME = "placeholder_name"
 
+
 # Workers with exposed endpoints needs either "client", "federation", or "media" listener_resources
 # Watching /_matrix/client needs a "client" listener
 # Watching /_matrix/federation needs a "federation" listener
 # Watching /_matrix/media and related needs a "media" listener
 # Stream Writers require "client" and "replication" listeners because they
 #   have to attach by instance_map to the master process and have client endpoints.
-WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
-    "pusher": {
-        "listener_resources": [],
-        "endpoint_patterns": [],
-        "shared_extra_conf": {},
-        "worker_extra_conf": "",
-    },
-    "user_dir": {
-        "listener_resources": ["client"],
-        "endpoint_patterns": [
+@dataclass
+class WorkerTemplate:
+    listener_resources: List[str] = field(default_factory=list)
+    endpoint_patterns: List[str] = field(default_factory=list)
+    shared_extra_conf: Dict[str, Any] = field(default_factory=dict)
+    worker_extra_conf: str = ""
+
+
+WORKERS_CONFIG: Dict[str, WorkerTemplate] = {
+    "pusher": WorkerTemplate(
+        listener_resources=[],
+        endpoint_patterns=[],
+        shared_extra_conf={},
+        worker_extra_conf="",
+    ),
+    "user_dir": WorkerTemplate(
+        listener_resources=["client"],
+        endpoint_patterns=[
             "^/_matrix/client/(api/v1|r0|v3|unstable)/user_directory/search$"
         ],
-        "shared_extra_conf": {
+        shared_extra_conf={
             "update_user_directory_from_worker": WORKER_PLACEHOLDER_NAME
         },
-        "worker_extra_conf": "",
-    },
-    "media_repository": {
-        "listener_resources": ["media"],
-        "endpoint_patterns": [
+        worker_extra_conf="",
+    ),
+    "media_repository": WorkerTemplate(
+        listener_resources=["media"],
+        endpoint_patterns=[
             "^/_matrix/media/",
             "^/_synapse/admin/v1/purge_media_cache$",
             "^/_synapse/admin/v1/room/.*/media.*$",
@@ -116,40 +127,38 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
             "^/_synapse/admin/v1/quarantine_media/.*$",
         ],
         # The first configured media worker will run the media background jobs
-        "shared_extra_conf": {
+        shared_extra_conf={
             "enable_media_repo": False,
             "media_instance_running_background_jobs": WORKER_PLACEHOLDER_NAME,
         },
-        "worker_extra_conf": "enable_media_repo: true",
-    },
-    "appservice": {
-        "listener_resources": [],
-        "endpoint_patterns": [],
-        "shared_extra_conf": {
-            "notify_appservices_from_worker": WORKER_PLACEHOLDER_NAME
-        },
-        "worker_extra_conf": "",
-    },
-    "federation_sender": {
-        "listener_resources": [],
-        "endpoint_patterns": [],
-        "shared_extra_conf": {},
-        "worker_extra_conf": "",
-    },
-    "synchrotron": {
-        "listener_resources": ["client"],
-        "endpoint_patterns": [
+        worker_extra_conf="enable_media_repo: true",
+    ),
+    "appservice": WorkerTemplate(
+        listener_resources=[],
+        endpoint_patterns=[],
+        shared_extra_conf={"notify_appservices_from_worker": WORKER_PLACEHOLDER_NAME},
+        worker_extra_conf="",
+    ),
+    "federation_sender": WorkerTemplate(
+        listener_resources=[],
+        endpoint_patterns=[],
+        shared_extra_conf={},
+        worker_extra_conf="",
+    ),
+    "synchrotron": WorkerTemplate(
+        listener_resources=["client"],
+        endpoint_patterns=[
             "^/_matrix/client/(v2_alpha|r0|v3)/sync$",
             "^/_matrix/client/(api/v1|v2_alpha|r0|v3)/events$",
             "^/_matrix/client/(api/v1|r0|v3)/initialSync$",
             "^/_matrix/client/(api/v1|r0|v3)/rooms/[^/]+/initialSync$",
         ],
-        "shared_extra_conf": {},
-        "worker_extra_conf": "",
-    },
-    "client_reader": {
-        "listener_resources": ["client"],
-        "endpoint_patterns": [
+        shared_extra_conf={},
+        worker_extra_conf="",
+    ),
+    "client_reader": WorkerTemplate(
+        listener_resources=["client"],
+        endpoint_patterns=[
             "^/_matrix/client/(api/v1|r0|v3|unstable)/publicRooms$",
             "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/joined_members$",
             "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/context/.*$",
@@ -178,12 +187,12 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
             "^/_matrix/client/(r0|v3|unstable)/capabilities$",
             "^/_matrix/client/(r0|v3|unstable)/notifications$",
         ],
-        "shared_extra_conf": {},
-        "worker_extra_conf": "",
-    },
-    "federation_reader": {
-        "listener_resources": ["federation"],
-        "endpoint_patterns": [
+        shared_extra_conf={},
+        worker_extra_conf="",
+    ),
+    "federation_reader": WorkerTemplate(
+        listener_resources=["federation"],
+        endpoint_patterns=[
             "^/_matrix/federation/(v1|v2)/event/",
             "^/_matrix/federation/(v1|v2)/state/",
             "^/_matrix/federation/(v1|v2)/state_ids/",
@@ -204,32 +213,32 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
             "^/_matrix/federation/(v1|v2)/get_groups_publicised$",
             "^/_matrix/key/v2/query",
         ],
-        "shared_extra_conf": {},
-        "worker_extra_conf": "",
-    },
-    "federation_inbound": {
-        "listener_resources": ["federation"],
-        "endpoint_patterns": ["/_matrix/federation/(v1|v2)/send/"],
-        "shared_extra_conf": {},
-        "worker_extra_conf": "",
-    },
-    "event_persister": {
-        "listener_resources": ["replication"],
-        "endpoint_patterns": [],
-        "shared_extra_conf": {},
-        "worker_extra_conf": "",
-    },
-    "background_worker": {
-        "listener_resources": [],
-        "endpoint_patterns": [],
+        shared_extra_conf={},
+        worker_extra_conf="",
+    ),
+    "federation_inbound": WorkerTemplate(
+        listener_resources=["federation"],
+        endpoint_patterns=["/_matrix/federation/(v1|v2)/send/"],
+        shared_extra_conf={},
+        worker_extra_conf="",
+    ),
+    "event_persister": WorkerTemplate(
+        listener_resources=["replication"],
+        endpoint_patterns=[],
+        shared_extra_conf={},
+        worker_extra_conf="",
+    ),
+    "background_worker": WorkerTemplate(
+        listener_resources=[],
+        endpoint_patterns=[],
         # This worker cannot be sharded. Therefore, there should only ever be one
         # background worker. This is enforced for the safety of your database.
-        "shared_extra_conf": {"run_background_tasks_on": WORKER_PLACEHOLDER_NAME},
-        "worker_extra_conf": "",
-    },
-    "event_creator": {
-        "listener_resources": ["client"],
-        "endpoint_patterns": [
+        shared_extra_conf={"run_background_tasks_on": WORKER_PLACEHOLDER_NAME},
+        worker_extra_conf="",
+    ),
+    "event_creator": WorkerTemplate(
+        listener_resources=["client"],
+        endpoint_patterns=[
             "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/redact",
             "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/send",
             "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$",
@@ -237,53 +246,51 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
             "^/_matrix/client/(api/v1|r0|v3|unstable)/knock/",
             "^/_matrix/client/(api/v1|r0|v3|unstable)/profile/",
         ],
-        "shared_extra_conf": {},
-        "worker_extra_conf": "",
-    },
-    "frontend_proxy": {
-        "listener_resources": ["client", "replication"],
-        "endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload"],
-        "shared_extra_conf": {},
-        "worker_extra_conf": "",
-    },
-    "account_data": {
-        "listener_resources": ["client", "replication"],
-        "endpoint_patterns": [
+        shared_extra_conf={},
+        worker_extra_conf="",
+    ),
+    "frontend_proxy": WorkerTemplate(
+        listener_resources=["client", "replication"],
+        endpoint_patterns=["^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload"],
+        shared_extra_conf={},
+        worker_extra_conf="",
+    ),
+    "account_data": WorkerTemplate(
+        listener_resources=["client", "replication"],
+        endpoint_patterns=[
             "^/_matrix/client/(r0|v3|unstable)/.*/tags",
             "^/_matrix/client/(r0|v3|unstable)/.*/account_data",
         ],
-        "shared_extra_conf": {},
-        "worker_extra_conf": "",
-    },
-    "presence": {
-        "listener_resources": ["client", "replication"],
-        "endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/presence/"],
-        "shared_extra_conf": {},
-        "worker_extra_conf": "",
-    },
-    "receipts": {
-        "listener_resources": ["client", "replication"],
-        "endpoint_patterns": [
+        shared_extra_conf={},
+        worker_extra_conf="",
+    ),
+    "presence": WorkerTemplate(
+        listener_resources=["client", "replication"],
+        endpoint_patterns=["^/_matrix/client/(api/v1|r0|v3|unstable)/presence/"],
+        shared_extra_conf={},
+        worker_extra_conf="",
+    ),
+    "receipts": WorkerTemplate(
+        listener_resources=["client", "replication"],
+        endpoint_patterns=[
             "^/_matrix/client/(r0|v3|unstable)/rooms/.*/receipt",
             "^/_matrix/client/(r0|v3|unstable)/rooms/.*/read_markers",
         ],
-        "shared_extra_conf": {},
-        "worker_extra_conf": "",
-    },
-    "to_device": {
-        "listener_resources": ["client", "replication"],
-        "endpoint_patterns": ["^/_matrix/client/(r0|v3|unstable)/sendToDevice/"],
-        "shared_extra_conf": {},
-        "worker_extra_conf": "",
-    },
-    "typing": {
-        "listener_resources": ["client", "replication"],
-        "endpoint_patterns": [
-            "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing"
-        ],
-        "shared_extra_conf": {},
-        "worker_extra_conf": "",
-    },
+        shared_extra_conf={},
+        worker_extra_conf="",
+    ),
+    "to_device": WorkerTemplate(
+        listener_resources=["client", "replication"],
+        endpoint_patterns=["^/_matrix/client/(r0|v3|unstable)/sendToDevice/"],
+        shared_extra_conf={},
+        worker_extra_conf="",
+    ),
+    "typing": WorkerTemplate(
+        listener_resources=["client", "replication"],
+        endpoint_patterns=["^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing"],
+        shared_extra_conf={},
+        worker_extra_conf="",
+    ),
 }
 
 # Templates for sections that may be inserted multiple times in config files
@@ -425,54 +432,59 @@ def add_worker_roles_to_shared_config(
 
 
 def merge_worker_template_configs(
-    existing_dict: Optional[Dict[str, Any]],
-    to_be_merged_dict: Dict[str, Any],
-) -> Dict[str, Any]:
+    existing_template: WorkerTemplate,
+    to_be_merged_template: WorkerTemplate,
+) -> WorkerTemplate:
     """When given an existing dict of worker template configuration consisting with both
         dicts and lists, merge new template data from WORKERS_CONFIG(or create) and
         return new dict.
 
     Args:
-        existing_dict: Either an existing worker template or a fresh blank one.
-        to_be_merged_dict: The template from WORKERS_CONFIGS to be merged into
+        existing_template: Either an existing worker template or a fresh blank one.
+        to_be_merged_template: The template from WORKERS_CONFIGS to be merged into
             existing_dict.
     Returns: The newly merged together dict values.
     """
-    new_dict: Dict[str, Any] = {}
-    if not existing_dict:
-        # It doesn't exist yet, just use the new dict(but take a copy not a reference)
-        new_dict = to_be_merged_dict.copy()
-    else:
-        for i in to_be_merged_dict.keys():
-            if (i == "endpoint_patterns") or (i == "listener_resources"):
-                # merge the two lists, remove duplicates
-                new_dict[i] = list(set(existing_dict[i] + to_be_merged_dict[i]))
-            elif i == "shared_extra_conf":
-                # merge dictionary's, the worker name will be replaced later
-                new_dict[i] = {**existing_dict[i], **to_be_merged_dict[i]}
-            elif i == "worker_extra_conf":
-                # There is only one worker type that has a 'worker_extra_conf' and it is
-                # the media_repo. Since duplicate worker types on the same worker don't
-                # work, this is fine.
-                new_dict[i] = existing_dict[i] + to_be_merged_dict[i]
-            else:
-                # Everything else should be identical, like "app", which only works
-                # because all apps are now generic_workers.
-                new_dict[i] = to_be_merged_dict[i]
-    return new_dict
+    # copy existing_template without any replacements
+    new_template: WorkerTemplate = dataclasses.replace(existing_template)
+
+    # merge the two lists, remove duplicates
+    new_template.listener_resources = list(
+        set(new_template.listener_resources + to_be_merged_template.listener_resources)
+    )
+
+    # merge the two lists, remove duplicates
+    new_template.endpoint_patterns = list(
+        set(new_template.endpoint_patterns + to_be_merged_template.endpoint_patterns)
+    )
+
+    # merge dictionaries; the worker name will be replaced later
+    new_template.shared_extra_conf = {
+        **new_template.shared_extra_conf,
+        **to_be_merged_template.shared_extra_conf,
+    }
+
+    # There is only one worker type that has a 'worker_extra_conf' and it is
+    # the media_repo. Since duplicate worker types on the same worker don't
+    # work, this is fine.
+    new_template.worker_extra_conf = (
+        new_template.worker_extra_conf + to_be_merged_template.worker_extra_conf
+    )
+
+    return new_template
 
 
 def insert_worker_name_for_worker_config(
-    existing_dict: Dict[str, Any], worker_name: str
+    existing_template: WorkerTemplate, worker_name: str
 ) -> Dict[str, Any]:
     """Insert a given worker name into the worker's configuration dict.
 
     Args:
-        existing_dict: The worker_config dict that is imported into shared_config.
+        existing_template: The WorkerTemplate that is imported into shared_config.
         worker_name: The name of the worker to insert.
     Returns: Copy of the dict with newly inserted worker name
     """
-    dict_to_edit = existing_dict.copy()
+    dict_to_edit = dataclasses.asdict(existing_template)
     for k, v in dict_to_edit["shared_extra_conf"].items():
         # Only proceed if it's the placeholder name string
         if v == WORKER_PLACEHOLDER_NAME:
@@ -793,27 +805,27 @@ def generate_worker_files(
     # Map locations to upstreams (corresponding to worker types) in Nginx
     # but only if we use the appropriate worker type
     for worker_type in all_worker_types_in_use:
-        for endpoint_pattern in WORKERS_CONFIG[worker_type]["endpoint_patterns"]:
+        for endpoint_pattern in WORKERS_CONFIG[worker_type].endpoint_patterns:
             nginx_locations[endpoint_pattern] = f"http://{worker_type}"
 
     # For each worker type specified by the user, create config values and write it's
     # yaml config file
     for worker_name, worker_types_set in requested_worker_types.items():
         # The collected and processed data will live here.
-        worker_config: Dict[str, Any] = {}
+        worker_template: WorkerTemplate = WorkerTemplate()
 
         # Merge all worker config templates for this worker into a single config
         for worker_type in worker_types_set:
-            copy_of_template_config = WORKERS_CONFIG[worker_type].copy()
-
             # Merge worker type template configuration data. It's a combination of lists
             # and dicts, so use this helper.
-            worker_config = merge_worker_template_configs(
-                worker_config, copy_of_template_config
+            worker_template = merge_worker_template_configs(
+                worker_template, WORKERS_CONFIG[worker_type]
             )
 
         # Replace placeholder names in the config template with the actual worker name.
-        worker_config = insert_worker_name_for_worker_config(worker_config, worker_name)
+        worker_config: Dict[str, Any] = insert_worker_name_for_worker_config(
+            worker_template, worker_name
+        )
 
         worker_config.update(
             {"name": worker_name, "port": str(worker_port), "config_path": config_path}