summary refs log tree commit diff
diff options
context:
space:
mode:
authorOlivier Wilkinson (reivilibre) <oliverw@matrix.org>2024-01-17 14:39:57 +0000
committerOlivier Wilkinson (reivilibre) <oliverw@matrix.org>2024-01-17 14:39:57 +0000
commit29541fd994200eebeba7d22dae9f428a2a7c2a27 (patch)
tree25f2033c6df2c83b53ea63df91fcf9a668062126
parentNewsfile (diff)
downloadsynapse-29541fd994200eebeba7d22dae9f428a2a7c2a27.tar.xz
Move `stream_writers` to their own field in the WorkerTemplate
-rwxr-xr-xdocker/configure_workers_and_start.py33
1 files changed, 14 insertions, 19 deletions
diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py
index 0dba889f96..b2a03f075a 100755
--- a/docker/configure_workers_and_start.py
+++ b/docker/configure_workers_and_start.py
@@ -103,6 +103,8 @@ class WorkerTemplate:
     shared_extra_conf: Callable[[str], Dict[str, Any]] = lambda _worker_name: {}
     worker_extra_conf: str = ""
 
+    stream_writers: Set[str] = field(default_factory=set)
+
     # True if and only if multiple of this worker type are allowed.
     sharding_allowed: bool = True
 
@@ -226,9 +228,7 @@ WORKERS_CONFIG: Dict[str, WorkerTemplate] = {
     ),
     "event_persister": WorkerTemplate(
         listener_resources={"replication"},
-        shared_extra_conf=lambda worker_name: {
-            "stream_writers": {"events": [worker_name]}
-        },
+        stream_writers={"events"},
     ),
     "background_worker": WorkerTemplate(
         # This worker cannot be sharded. Therefore, there should only ever be one
@@ -257,17 +257,13 @@ WORKERS_CONFIG: Dict[str, WorkerTemplate] = {
             "^/_matrix/client/(r0|v3|unstable)/.*/tags",
             "^/_matrix/client/(r0|v3|unstable)/.*/account_data",
         },
-        shared_extra_conf=lambda worker_name: {
-            "stream_writers": {"account_data": [worker_name]}
-        },
+        stream_writers={"account_data"},
         sharding_allowed=False,
     ),
     "presence": WorkerTemplate(
         listener_resources={"client", "replication"},
         endpoint_patterns={"^/_matrix/client/(api/v1|r0|v3|unstable)/presence/"},
-        shared_extra_conf=lambda worker_name: {
-            "stream_writers": {"presence": [worker_name]}
-        },
+        stream_writers={"presence"},
         sharding_allowed=False,
     ),
     "receipts": WorkerTemplate(
@@ -276,25 +272,19 @@ WORKERS_CONFIG: Dict[str, WorkerTemplate] = {
             "^/_matrix/client/(r0|v3|unstable)/rooms/.*/receipt",
             "^/_matrix/client/(r0|v3|unstable)/rooms/.*/read_markers",
         },
-        shared_extra_conf=lambda worker_name: {
-            "stream_writers": {"receipts": [worker_name]}
-        },
+        stream_writers={"receipts"},
         sharding_allowed=False,
     ),
     "to_device": WorkerTemplate(
         listener_resources={"client", "replication"},
         endpoint_patterns={"^/_matrix/client/(r0|v3|unstable)/sendToDevice/"},
-        shared_extra_conf=lambda worker_name: {
-            "stream_writers": {"to_device": [worker_name]}
-        },
+        stream_writers={"to_device"},
         sharding_allowed=False,
     ),
     "typing": WorkerTemplate(
         listener_resources={"client", "replication"},
         endpoint_patterns={"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing"},
-        shared_extra_conf=lambda worker_name: {
-            "stream_writers": {"typing": [worker_name]}
-        },
+        stream_writers={"typing"},
         sharding_allowed=False,
     ),
 }
@@ -447,6 +437,8 @@ def merge_worker_template_configs(
         # (This is unused, but in principle sharding this hybrid worker type
         # would be allowed if both constituent types are shardable)
         sharding_allowed=left.sharding_allowed and right.sharding_allowed,
+        # include stream writers from both
+        stream_writers=left.stream_writers | right.stream_writers,
     )
 
 
@@ -462,7 +454,10 @@ def instantiate_worker_template(
     Returns: worker configuration dictionary
     """
     worker_config_dict = dataclasses.asdict(template)
-    worker_config_dict["shared_extra_conf"] = template.shared_extra_conf(worker_name)
+    stream_writers_dict = {
+        writer: worker_name for writer in template.stream_writers
+    }
+    worker_config_dict["shared_extra_conf"] = merged(template.shared_extra_conf(worker_name), stream_writers_dict)
     worker_config_dict["endpoint_patterns"] = sorted(template.endpoint_patterns)
     worker_config_dict["listener_resources"] = sorted(template.listener_resources)
     return worker_config_dict