diff options
author | Olivier Wilkinson (reivilibre) <oliverw@matrix.org> | 2024-01-17 14:39:57 +0000 |
---|---|---|
committer | Olivier Wilkinson (reivilibre) <oliverw@matrix.org> | 2024-01-17 14:39:57 +0000 |
commit | 29541fd994200eebeba7d22dae9f428a2a7c2a27 (patch) | |
tree | 25f2033c6df2c83b53ea63df91fcf9a668062126 | |
parent | Newsfile (diff) | |
download | synapse-29541fd994200eebeba7d22dae9f428a2a7c2a27.tar.xz |
Move `stream_writers` to their own field in the WorkerTemplate
-rwxr-xr-x | docker/configure_workers_and_start.py | 33 |
1 files changed, 14 insertions, 19 deletions
diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py index 0dba889f96..b2a03f075a 100755 --- a/docker/configure_workers_and_start.py +++ b/docker/configure_workers_and_start.py @@ -103,6 +103,8 @@ class WorkerTemplate: shared_extra_conf: Callable[[str], Dict[str, Any]] = lambda _worker_name: {} worker_extra_conf: str = "" + stream_writers: Set[str] = field(default_factory=set) + # True if and only if multiple of this worker type are allowed. sharding_allowed: bool = True @@ -226,9 +228,7 @@ WORKERS_CONFIG: Dict[str, WorkerTemplate] = { ), "event_persister": WorkerTemplate( listener_resources={"replication"}, - shared_extra_conf=lambda worker_name: { - "stream_writers": {"events": [worker_name]} - }, + stream_writers={"events"}, ), "background_worker": WorkerTemplate( # This worker cannot be sharded. Therefore, there should only ever be one @@ -257,17 +257,13 @@ WORKERS_CONFIG: Dict[str, WorkerTemplate] = { "^/_matrix/client/(r0|v3|unstable)/.*/tags", "^/_matrix/client/(r0|v3|unstable)/.*/account_data", }, - shared_extra_conf=lambda worker_name: { - "stream_writers": {"account_data": [worker_name]} - }, + stream_writers={"account_data"}, sharding_allowed=False, ), "presence": WorkerTemplate( listener_resources={"client", "replication"}, endpoint_patterns={"^/_matrix/client/(api/v1|r0|v3|unstable)/presence/"}, - shared_extra_conf=lambda worker_name: { - "stream_writers": {"presence": [worker_name]} - }, + stream_writers={"presence"}, sharding_allowed=False, ), "receipts": WorkerTemplate( @@ -276,25 +272,19 @@ WORKERS_CONFIG: Dict[str, WorkerTemplate] = { "^/_matrix/client/(r0|v3|unstable)/rooms/.*/receipt", "^/_matrix/client/(r0|v3|unstable)/rooms/.*/read_markers", }, - shared_extra_conf=lambda worker_name: { - "stream_writers": {"receipts": [worker_name]} - }, + stream_writers={"receipts"}, sharding_allowed=False, ), "to_device": WorkerTemplate( listener_resources={"client", "replication"}, endpoint_patterns={"^/_matrix/client/(r0|v3|unstable)/sendToDevice/"}, - shared_extra_conf=lambda worker_name: { - "stream_writers": {"to_device": [worker_name]} - }, + stream_writers={"to_device"}, sharding_allowed=False, ), "typing": WorkerTemplate( listener_resources={"client", "replication"}, endpoint_patterns={"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing"}, - shared_extra_conf=lambda worker_name: { - "stream_writers": {"typing": [worker_name]} - }, + stream_writers={"typing"}, sharding_allowed=False, ), } @@ -447,6 +437,8 @@ def merge_worker_template_configs( # (This is unused, but in principle sharding this hybrid worker type # would be allowed if both constituent types are shardable) sharding_allowed=left.sharding_allowed and right.sharding_allowed, + # include stream writers from both + stream_writers=left.stream_writers | right.stream_writers, ) @@ -462,7 +454,10 @@ def instantiate_worker_template( Returns: worker configuration dictionary """ worker_config_dict = dataclasses.asdict(template) - worker_config_dict["shared_extra_conf"] = template.shared_extra_conf(worker_name) + stream_writers_dict = { + writer: worker_name for writer in template.stream_writers + } + worker_config_dict["shared_extra_conf"] = merged(template.shared_extra_conf(worker_name), stream_writers_dict) worker_config_dict["endpoint_patterns"] = sorted(template.endpoint_patterns) worker_config_dict["listener_resources"] = sorted(template.listener_resources) return worker_config_dict |