diff options
Diffstat (limited to 'synapse/config/workers.py')
-rw-r--r-- | synapse/config/workers.py | 93 |
1 files changed, 86 insertions, 7 deletions
diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 7a0ca16da8..ac92375a85 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -17,9 +17,28 @@ from typing import List, Union import attr -from ._base import Config, ConfigError, ShardedWorkerHandlingConfig +from ._base import ( + Config, + ConfigError, + RoutableShardedWorkerHandlingConfig, + ShardedWorkerHandlingConfig, +) from .server import ListenerConfig, parse_listener_def +_FEDERATION_SENDER_WITH_SEND_FEDERATION_ENABLED_ERROR = """ +The send_federation config option must be disabled in the main +synapse process before they can be run in a separate worker. + +Please add ``send_federation: false`` to the main config +""" + +_PUSHER_WITH_START_PUSHERS_ENABLED_ERROR = """ +The start_pushers config option must be disabled in the main +synapse process before they can be run in a separate worker. + +Please add ``start_pushers: false`` to the main config +""" + def _instance_to_list_converter(obj: Union[str, List[str]]) -> List[str]: """Helper for allowing parsing a string or list of strings to a config @@ -103,6 +122,7 @@ class WorkerConfig(Config): self.worker_replication_secret = config.get("worker_replication_secret", None) self.worker_name = config.get("worker_name", self.worker_app) + self.instance_name = self.worker_name or "master" self.worker_main_http_uri = config.get("worker_main_http_uri", None) @@ -118,12 +138,41 @@ class WorkerConfig(Config): ) ) - # Whether to send federation traffic out in this process. This only - # applies to some federation traffic, and so shouldn't be used to - # "disable" federation - self.send_federation = config.get("send_federation", True) + # Handle federation sender configuration. + # + # There are two ways of configuring which instances handle federation + # sending: + # 1. The old way where "send_federation" is set to false and running a + # `synapse.app.federation_sender` worker app. + # 2. Specifying the workers sending federation in + # `federation_sender_instances`. + # + + send_federation = config.get("send_federation", True) + + federation_sender_instances = config.get("federation_sender_instances") + if federation_sender_instances is None: + # Default to an empty list, which means "another, unknown, worker is + # responsible for it". + federation_sender_instances = [] - federation_sender_instances = config.get("federation_sender_instances") or [] + # If no federation sender instances are set we check if + # `send_federation` is set, which means use master + if send_federation: + federation_sender_instances = ["master"] + + if self.worker_app == "synapse.app.federation_sender": + if send_federation: + # If we're running federation senders, and not using + # `federation_sender_instances`, then we should have + # explicitly set `send_federation` to false. + raise ConfigError( + _FEDERATION_SENDER_WITH_SEND_FEDERATION_ENABLED_ERROR + ) + + federation_sender_instances = [self.worker_name] + + self.send_federation = self.instance_name in federation_sender_instances self.federation_shard_config = ShardedWorkerHandlingConfig( federation_sender_instances ) @@ -164,7 +213,37 @@ class WorkerConfig(Config): "Must only specify one instance to handle `receipts` messages." ) - self.events_shard_config = ShardedWorkerHandlingConfig(self.writers.events) + if len(self.writers.events) == 0: + raise ConfigError("Must specify at least one instance to handle `events`.") + + self.events_shard_config = RoutableShardedWorkerHandlingConfig( + self.writers.events + ) + + # Handle sharded push + start_pushers = config.get("start_pushers", True) + pusher_instances = config.get("pusher_instances") + if pusher_instances is None: + # Default to an empty list, which means "another, unknown, worker is + # responsible for it". + pusher_instances = [] + + # If no pushers instances are set we check if `start_pushers` is + # set, which means use master + if start_pushers: + pusher_instances = ["master"] + + if self.worker_app == "synapse.app.pusher": + if start_pushers: + # If we're running pushers, and not using + # `pusher_instances`, then we should have explicitly set + # `start_pushers` to false. + raise ConfigError(_PUSHER_WITH_START_PUSHERS_ENABLED_ERROR) + + pusher_instances = [self.instance_name] + + self.start_pushers = self.instance_name in pusher_instances + self.pusher_shard_config = ShardedWorkerHandlingConfig(pusher_instances) # Whether this worker should run background tasks or not. # |