summary refs log tree commit diff
path: root/docker
diff options
context:
space:
mode:
authorrealtyem <realtyem@gmail.com>2022-11-08 07:14:00 -0600
committerGitHub <noreply@github.com>2022-11-08 13:14:00 +0000
commitd85cba1aa0a2f6dbb988b81d331b5ba9487fe1ac (patch)
treeb2bd58254c582936cde2dc7c92a7e6866abd8b68 /docker
parentMerge branch 'master' into develop (diff)
downloadsynapse-d85cba1aa0a2f6dbb988b81d331b5ba9487fe1ac.tar.xz
Add all Stream Writer worker types to configure_workers_and_start.py (#14197)
Co-authored-by: reivilibre <oliverw@matrix.org>
Diffstat (limited to 'docker')
-rwxr-xr-xdocker/configure_workers_and_start.py76
1 files changed, 69 insertions, 7 deletions
diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py
index 1ea456b2f8..da259129d1 100755
--- a/docker/configure_workers_and_start.py
+++ b/docker/configure_workers_and_start.py
@@ -50,7 +50,12 @@ from jinja2 import Environment, FileSystemLoader
 
 MAIN_PROCESS_HTTP_LISTENER_PORT = 8080
 
-
+# Workers with exposed endpoints needs either "client", "federation", or "media" listener_resources
+# Watching /_matrix/client needs a "client" listener
+# Watching /_matrix/federation needs a "federation" listener
+# Watching /_matrix/media and related needs a "media" listener
+# Stream Writers require "client" and "replication" listeners because they
+#   have to attach by instance_map to the master process and have client endpoints.
 WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
     "pusher": {
         "app": "synapse.app.pusher",
@@ -209,6 +214,49 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
             % (MAIN_PROCESS_HTTP_LISTENER_PORT,)
         ),
     },
+    "account_data": {
+        "app": "synapse.app.generic_worker",
+        "listener_resources": ["client", "replication"],
+        "endpoint_patterns": [
+            "^/_matrix/client/(r0|v3|unstable)/.*/tags",
+            "^/_matrix/client/(r0|v3|unstable)/.*/account_data",
+        ],
+        "shared_extra_conf": {},
+        "worker_extra_conf": "",
+    },
+    "presence": {
+        "app": "synapse.app.generic_worker",
+        "listener_resources": ["client", "replication"],
+        "endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/presence/"],
+        "shared_extra_conf": {},
+        "worker_extra_conf": "",
+    },
+    "receipts": {
+        "app": "synapse.app.generic_worker",
+        "listener_resources": ["client", "replication"],
+        "endpoint_patterns": [
+            "^/_matrix/client/(r0|v3|unstable)/rooms/.*/receipt",
+            "^/_matrix/client/(r0|v3|unstable)/rooms/.*/read_markers",
+        ],
+        "shared_extra_conf": {},
+        "worker_extra_conf": "",
+    },
+    "to_device": {
+        "app": "synapse.app.generic_worker",
+        "listener_resources": ["client", "replication"],
+        "endpoint_patterns": ["^/_matrix/client/(r0|v3|unstable)/sendToDevice/"],
+        "shared_extra_conf": {},
+        "worker_extra_conf": "",
+    },
+    "typing": {
+        "app": "synapse.app.generic_worker",
+        "listener_resources": ["client", "replication"],
+        "endpoint_patterns": [
+            "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing"
+        ],
+        "shared_extra_conf": {},
+        "worker_extra_conf": "",
+    },
 }
 
 # Templates for sections that may be inserted multiple times in config files
@@ -271,7 +319,7 @@ def convert(src: str, dst: str, **template_vars: object) -> None:
         outfile.write(rendered)
 
 
-def add_sharding_to_shared_config(
+def add_worker_roles_to_shared_config(
     shared_config: dict,
     worker_type: str,
     worker_name: str,
@@ -309,6 +357,20 @@ def add_sharding_to_shared_config(
             "port": worker_port,
         }
 
+    elif worker_type in ["account_data", "presence", "receipts", "to_device", "typing"]:
+        # Update the list of stream writers
+        # It's convienent that the name of the worker type is the same as the event stream
+        shared_config.setdefault("stream_writers", {}).setdefault(
+            worker_type, []
+        ).append(worker_name)
+
+        # Map of stream writer instance names to host/ports combos
+        # For now, all stream writers need http replication ports
+        instance_map[worker_name] = {
+            "host": "localhost",
+            "port": worker_port,
+        }
+
     elif worker_type == "media_repository":
         # The first configured media worker will run the media background jobs
         shared_config.setdefault("media_instance_running_background_jobs", worker_name)
@@ -441,11 +503,11 @@ def generate_worker_files(
 
         # Check if more than one instance of this worker type has been specified
         worker_type_total_count = worker_types.count(worker_type)
-        if worker_type_total_count > 1:
-            # Update the shared config with sharding-related options if necessary
-            add_sharding_to_shared_config(
-                shared_config, worker_type, worker_name, worker_port
-            )
+
+        # Update the shared config with sharding-related options if necessary
+        add_worker_roles_to_shared_config(
+            shared_config, worker_type, worker_name, worker_port
+        )
 
         # Enable the worker in supervisord
         worker_descriptors.append(worker_config)