diff options
-rwxr-xr-x | docker/configure_workers_and_start.py | 90 |
1 files changed, 44 insertions, 46 deletions
diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py index b6428601cc..47292eecc1 100755 --- a/docker/configure_workers_and_start.py +++ b/docker/configure_workers_and_start.py @@ -90,8 +90,8 @@ MAIN_PROCESS_UNIX_SOCKET_PRIVATE_PATH = "/run/main_private.sock" # have to attach by instance_map to the master process and have client endpoints. @dataclass class WorkerTemplate: - listener_resources: List[str] = field(default_factory=list) - endpoint_patterns: List[str] = field(default_factory=list) + listener_resources: Set[str] = field(default_factory=set) + endpoint_patterns: Set[str] = field(default_factory=set) # (worker_name) -> {} shared_extra_conf: Callable[[str], Dict[str, Any]] = lambda _worker_name: {} worker_extra_conf: str = "" @@ -100,24 +100,24 @@ class WorkerTemplate: WORKERS_CONFIG: Dict[str, WorkerTemplate] = { "pusher": WorkerTemplate(), "user_dir": WorkerTemplate( - listener_resources=["client"], - endpoint_patterns=[ + listener_resources={"client"}, + endpoint_patterns={ "^/_matrix/client/(api/v1|r0|v3|unstable)/user_directory/search$" - ], + }, shared_extra_conf=lambda worker_name: { "update_user_directory_from_worker": worker_name }, ), "media_repository": WorkerTemplate( - listener_resources=["media"], - endpoint_patterns=[ + listener_resources={"media"}, + endpoint_patterns={ "^/_matrix/media/", "^/_synapse/admin/v1/purge_media_cache$", "^/_synapse/admin/v1/room/.*/media.*$", "^/_synapse/admin/v1/user/.*/media.*$", "^/_synapse/admin/v1/media/.*$", "^/_synapse/admin/v1/quarantine_media/.*$", - ], + }, # The first configured media worker will run the media background jobs shared_extra_conf=lambda worker_name: { "enable_media_repo": False, @@ -132,17 +132,17 @@ WORKERS_CONFIG: Dict[str, WorkerTemplate] = { ), "federation_sender": WorkerTemplate(), "synchrotron": WorkerTemplate( - listener_resources=["client"], - endpoint_patterns=[ + listener_resources={"client"}, + endpoint_patterns={ "^/_matrix/client/(v2_alpha|r0|v3)/sync$", "^/_matrix/client/(api/v1|v2_alpha|r0|v3)/events$", "^/_matrix/client/(api/v1|r0|v3)/initialSync$", "^/_matrix/client/(api/v1|r0|v3)/rooms/[^/]+/initialSync$", - ], + }, ), "client_reader": WorkerTemplate( - listener_resources=["client"], - endpoint_patterns=[ + listener_resources={"client"}, + endpoint_patterns={ "^/_matrix/client/(api/v1|r0|v3|unstable)/publicRooms$", "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/joined_members$", "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/context/.*$", @@ -170,11 +170,11 @@ WORKERS_CONFIG: Dict[str, WorkerTemplate] = { "^/_matrix/client/(api/v1|r0|v3|unstable)/directory/room/.*$", "^/_matrix/client/(r0|v3|unstable)/capabilities$", "^/_matrix/client/(r0|v3|unstable)/notifications$", - ], + }, ), "federation_reader": WorkerTemplate( - listener_resources=["federation"], - endpoint_patterns=[ + listener_resources={"federation"}, + endpoint_patterns={ "^/_matrix/federation/(v1|v2)/event/", "^/_matrix/federation/(v1|v2)/state/", "^/_matrix/federation/(v1|v2)/state_ids/", @@ -194,14 +194,14 @@ WORKERS_CONFIG: Dict[str, WorkerTemplate] = { "^/_matrix/federation/(v1|v2)/user/devices/", "^/_matrix/federation/(v1|v2)/get_groups_publicised$", "^/_matrix/key/v2/query", - ], + }, ), "federation_inbound": WorkerTemplate( - listener_resources=["federation"], - endpoint_patterns=["/_matrix/federation/(v1|v2)/send/"], + listener_resources={"federation"}, + endpoint_patterns={"/_matrix/federation/(v1|v2)/send/"}, ), "event_persister": WorkerTemplate( - listener_resources=["replication"], + listener_resources={"replication"}, ), "background_worker": WorkerTemplate( # This worker cannot be sharded. Therefore, there should only ever be one @@ -209,45 +209,45 @@ WORKERS_CONFIG: Dict[str, WorkerTemplate] = { shared_extra_conf=lambda worker_name: {"run_background_tasks_on": worker_name}, ), "event_creator": WorkerTemplate( - listener_resources=["client"], - endpoint_patterns=[ + listener_resources={"client"}, + endpoint_patterns={ "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/redact", "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/send", "^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$", "^/_matrix/client/(api/v1|r0|v3|unstable)/join/", "^/_matrix/client/(api/v1|r0|v3|unstable)/knock/", "^/_matrix/client/(api/v1|r0|v3|unstable)/profile/", - ], + }, ), "frontend_proxy": WorkerTemplate( - listener_resources=["client", "replication"], - endpoint_patterns=["^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload"], + listener_resources={"client", "replication"}, + endpoint_patterns={"^/_matrix/client/(api/v1|r0|v3|unstable)/keys/upload"}, ), "account_data": WorkerTemplate( - listener_resources=["client", "replication"], - endpoint_patterns=[ + listener_resources={"client", "replication"}, + endpoint_patterns={ "^/_matrix/client/(r0|v3|unstable)/.*/tags", "^/_matrix/client/(r0|v3|unstable)/.*/account_data", - ], + }, ), "presence": WorkerTemplate( - listener_resources=["client", "replication"], - endpoint_patterns=["^/_matrix/client/(api/v1|r0|v3|unstable)/presence/"], + listener_resources={"client", "replication"}, + endpoint_patterns={"^/_matrix/client/(api/v1|r0|v3|unstable)/presence/"}, ), "receipts": WorkerTemplate( - listener_resources=["client", "replication"], - endpoint_patterns=[ + listener_resources={"client", "replication"}, + endpoint_patterns={ "^/_matrix/client/(r0|v3|unstable)/rooms/.*/receipt", "^/_matrix/client/(r0|v3|unstable)/rooms/.*/read_markers", - ], + }, ), "to_device": WorkerTemplate( - listener_resources=["client", "replication"], - endpoint_patterns=["^/_matrix/client/(r0|v3|unstable)/sendToDevice/"], + listener_resources={"client", "replication"}, + endpoint_patterns={"^/_matrix/client/(r0|v3|unstable)/sendToDevice/"}, ), "typing": WorkerTemplate( - listener_resources=["client", "replication"], - endpoint_patterns=["^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing"], + listener_resources={"client", "replication"}, + endpoint_patterns={"^/_matrix/client/(api/v1|r0|v3|unstable)/rooms/.*/typing"}, ), } @@ -406,15 +406,11 @@ def merge_worker_template_configs( # copy existing_template without any replacements new_template: WorkerTemplate = dataclasses.replace(existing_template) - # merge the two lists, remove duplicates - new_template.listener_resources = list( - set(new_template.listener_resources + to_be_merged_template.listener_resources) - ) + # add listener resources from the other set + new_template.listener_resources |= to_be_merged_template.listener_resources - # merge the two lists, remove duplicates - new_template.endpoint_patterns = list( - set(new_template.endpoint_patterns + to_be_merged_template.endpoint_patterns) - ) + # add endpoint patterns from the other set + new_template.endpoint_patterns |= to_be_merged_template.endpoint_patterns # merge dictionaries; the worker name will be replaced later new_template.shared_extra_conf = lambda worker_name: { @@ -444,6 +440,8 @@ def insert_worker_name_for_worker_config( """ dict_to_edit = dataclasses.asdict(existing_template) dict_to_edit["shared_extra_conf"] = existing_template.shared_extra_conf(worker_name) + dict_to_edit["endpoint_patterns"] = sorted(existing_template.endpoint_patterns) + dict_to_edit["listener_resources"] = sorted(existing_template.listener_resources) return dict_to_edit @@ -760,7 +758,7 @@ def generate_worker_files( # Map locations to upstreams (corresponding to worker types) in Nginx # but only if we use the appropriate worker type for worker_type in all_worker_types_in_use: - for endpoint_pattern in WORKERS_CONFIG[worker_type].endpoint_patterns: + for endpoint_pattern in sorted(WORKERS_CONFIG[worker_type].endpoint_patterns): nginx_locations[endpoint_pattern] = f"http://{worker_type}" # For each worker type specified by the user, create config values and write it's |