diff --git a/changelog.d/14197.docker b/changelog.d/14197.docker
new file mode 100644
index 0000000000..529ccd99c5
--- /dev/null
+++ b/changelog.d/14197.docker
@@ -0,0 +1 @@
+Add all Stream Writer worker types to configure_workers_and_start.py.
diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py
index 1ea456b2f8..da259129d1 100755
--- a/docker/configure_workers_and_start.py
+++ b/docker/configure_workers_and_start.py
@@ -50,7 +50,12 @@ from jinja2 import Environment, FileSystemLoader
MAIN_PROCESS_HTTP_LISTENER_PORT = 8080
-
+# Workers with exposed endpoints needs either "client", "federation", or "media" listener_resources
+# Watching /_matrix/client needs a "client" listener
+# Watching /_matrix/federation needs a "federation" listener
+# Watching /_matrix/media and related needs a "media" listener
+# Stream Writers require "client" and "replication" listeners because they
+# have to attach by instance_map to the master process and have client endpoints.
WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
"pusher": {
"app": "synapse.app.pusher",
@@ -209,6 +214,49 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
% (MAIN_PROCESS_HTTP_LISTENER_PORT,)
),
},
+ "account_data": {
+ "app": "synapse.app.generic_worker",
+ "listener_resources": ["client", "replication"],
+ "endpoint_patterns": [
+ "^/_matrix/client/(r0|v3|unstable)/.*/tags",
+ "^/_matrix/client/(r0|v3|unstable)/.*/account_data",
+ ],
+ "shared_extra_conf": {},
+ "worker_extra_conf": "",
+ },
+ "presence": {
+ "app": "synapse.app.generic_worker",
+ "listener_resources": ["client", "replication"],
+ "endpoint_patterns": ["^/_matrix/client/(api/v1|r0|v3|unstable)/presence/"],
+ "shared_extra_conf": {},
+ "worker_extra_conf": "",
+ },
+ "receipts": {
+ "app": "synapse.app.generic_worker",
+ "listener_resources": ["client", "replication"],
+ "endpoint_patterns": [
+ "^/_matrix/client/(r0|v3|unstable)/rooms/.*/receipt",
+ "^/_matrix/client/(r0|v3|unstable)/rooms/.*/read_markers",
+ ],
+ "shared_extra_conf": {},
+ "worker_extra_conf": "",
+ },
+ "to_device": {
+ "app": "synapse.app.generic_worker",
+ "listener_resources": ["client", "replication"],
+ "endpoint_patterns": ["^/_matrix/client/(r0|v3|unstable)/sendToDevice/"],
+ "shared_extra_conf": {},
+ "worker_extra_conf": "",
+ },
+ "typing": {
+ "app": "synapse.app.generic_worker",
+ "listener_resources": ["client", "replication"],
+ "endpoint_patterns": [
+ "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing"
+ ],
+ "shared_extra_conf": {},
+ "worker_extra_conf": "",
+ },
}
# Templates for sections that may be inserted multiple times in config files
@@ -271,7 +319,7 @@ def convert(src: str, dst: str, **template_vars: object) -> None:
outfile.write(rendered)
-def add_sharding_to_shared_config(
+def add_worker_roles_to_shared_config(
shared_config: dict,
worker_type: str,
worker_name: str,
@@ -309,6 +357,20 @@ def add_sharding_to_shared_config(
"port": worker_port,
}
+ elif worker_type in ["account_data", "presence", "receipts", "to_device", "typing"]:
+ # Update the list of stream writers
+ # It's convienent that the name of the worker type is the same as the event stream
+ shared_config.setdefault("stream_writers", {}).setdefault(
+ worker_type, []
+ ).append(worker_name)
+
+ # Map of stream writer instance names to host/ports combos
+ # For now, all stream writers need http replication ports
+ instance_map[worker_name] = {
+ "host": "localhost",
+ "port": worker_port,
+ }
+
elif worker_type == "media_repository":
# The first configured media worker will run the media background jobs
shared_config.setdefault("media_instance_running_background_jobs", worker_name)
@@ -441,11 +503,11 @@ def generate_worker_files(
# Check if more than one instance of this worker type has been specified
worker_type_total_count = worker_types.count(worker_type)
- if worker_type_total_count > 1:
- # Update the shared config with sharding-related options if necessary
- add_sharding_to_shared_config(
- shared_config, worker_type, worker_name, worker_port
- )
+
+ # Update the shared config with sharding-related options if necessary
+ add_worker_roles_to_shared_config(
+ shared_config, worker_type, worker_name, worker_port
+ )
# Enable the worker in supervisord
worker_descriptors.append(worker_config)
|