diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index f2716422b5..2580660b6c 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -27,21 +27,7 @@ from ._base import (
RoutableShardedWorkerHandlingConfig,
ShardedWorkerHandlingConfig,
)
-from .server import ListenerConfig, parse_listener_def
-
-_FEDERATION_SENDER_WITH_SEND_FEDERATION_ENABLED_ERROR = """
-The send_federation config option must be disabled in the main
-synapse process before they can be run in a separate worker.
-
-Please add ``send_federation: false`` to the main config
-"""
-
-_PUSHER_WITH_START_PUSHERS_ENABLED_ERROR = """
-The start_pushers config option must be disabled in the main
-synapse process before they can be run in a separate worker.
-
-Please add ``start_pushers: false`` to the main config
-"""
+from .server import DIRECT_TCP_ERROR, ListenerConfig, parse_listener_def
_DEPRECATED_WORKER_DUTY_OPTION_USED = """
The '%s' configuration option is deprecated and will be removed in a future
@@ -67,6 +53,7 @@ class InstanceLocationConfig:
host: str
port: int
+ tls: bool = False
@attr.s
@@ -128,7 +115,8 @@ class WorkerConfig(Config):
self.worker_app = None
self.worker_listeners = [
- parse_listener_def(x) for x in config.get("worker_listeners", [])
+ parse_listener_def(i, x)
+ for i, x in enumerate(config.get("worker_listeners", []))
]
self.worker_daemonize = bool(config.get("worker_daemonize"))
self.worker_pid_file = config.get("worker_pid_file")
@@ -142,18 +130,31 @@ class WorkerConfig(Config):
self.worker_replication_host = config.get("worker_replication_host", None)
# The port on the main synapse for TCP replication
- self.worker_replication_port = config.get("worker_replication_port", None)
+ if "worker_replication_port" in config:
+ raise ConfigError(DIRECT_TCP_ERROR, ("worker_replication_port",))
# The port on the main synapse for HTTP replication endpoint
self.worker_replication_http_port = config.get("worker_replication_http_port")
+ # The tls mode on the main synapse for HTTP replication endpoint.
+ # For backward compatibility this defaults to False.
+ self.worker_replication_http_tls = config.get(
+ "worker_replication_http_tls", False
+ )
+
# The shared secret used for authentication when connecting to the main synapse.
self.worker_replication_secret = config.get("worker_replication_secret", None)
self.worker_name = config.get("worker_name", self.worker_app)
self.instance_name = self.worker_name or "master"
+ # FIXME: Remove this check after a suitable amount of time.
self.worker_main_http_uri = config.get("worker_main_http_uri", None)
+ if self.worker_main_http_uri is not None:
+ logger.warning(
+ "The config option worker_main_http_uri is unused since Synapse 1.73. "
+ "It can be safely removed from your configuration."
+ )
# This option is really only here to support `--manhole` command line
# argument.
@@ -167,40 +168,12 @@ class WorkerConfig(Config):
)
)
- # Handle federation sender configuration.
- #
- # There are two ways of configuring which instances handle federation
- # sending:
- # 1. The old way where "send_federation" is set to false and running a
- # `synapse.app.federation_sender` worker app.
- # 2. Specifying the workers sending federation in
- # `federation_sender_instances`.
- #
-
- send_federation = config.get("send_federation", True)
-
- federation_sender_instances = config.get("federation_sender_instances")
- if federation_sender_instances is None:
- # Default to an empty list, which means "another, unknown, worker is
- # responsible for it".
- federation_sender_instances = []
-
- # If no federation sender instances are set we check if
- # `send_federation` is set, which means use master
- if send_federation:
- federation_sender_instances = ["master"]
-
- if self.worker_app == "synapse.app.federation_sender":
- if send_federation:
- # If we're running federation senders, and not using
- # `federation_sender_instances`, then we should have
- # explicitly set `send_federation` to false.
- raise ConfigError(
- _FEDERATION_SENDER_WITH_SEND_FEDERATION_ENABLED_ERROR
- )
-
- federation_sender_instances = [self.worker_name]
-
+ federation_sender_instances = self._worker_names_performing_this_duty(
+ config,
+ "send_federation",
+ "synapse.app.federation_sender",
+ "federation_sender_instances",
+ )
self.send_federation = self.instance_name in federation_sender_instances
self.federation_shard_config = ShardedWorkerHandlingConfig(
federation_sender_instances
@@ -267,27 +240,12 @@ class WorkerConfig(Config):
)
# Handle sharded push
- start_pushers = config.get("start_pushers", True)
- pusher_instances = config.get("pusher_instances")
- if pusher_instances is None:
- # Default to an empty list, which means "another, unknown, worker is
- # responsible for it".
- pusher_instances = []
-
- # If no pushers instances are set we check if `start_pushers` is
- # set, which means use master
- if start_pushers:
- pusher_instances = ["master"]
-
- if self.worker_app == "synapse.app.pusher":
- if start_pushers:
- # If we're running pushers, and not using
- # `pusher_instances`, then we should have explicitly set
- # `start_pushers` to false.
- raise ConfigError(_PUSHER_WITH_START_PUSHERS_ENABLED_ERROR)
-
- pusher_instances = [self.instance_name]
-
+ pusher_instances = self._worker_names_performing_this_duty(
+ config,
+ "start_pushers",
+ "synapse.app.pusher",
+ "pusher_instances",
+ )
self.start_pushers = self.instance_name in pusher_instances
self.pusher_shard_config = ShardedWorkerHandlingConfig(pusher_instances)
@@ -410,6 +368,64 @@ class WorkerConfig(Config):
# (By this point, these are either the same value or only one is not None.)
return bool(new_option_should_run_here or legacy_option_should_run_here)
+ def _worker_names_performing_this_duty(
+ self,
+ config: Dict[str, Any],
+ legacy_option_name: str,
+ legacy_app_name: str,
+ modern_instance_list_name: str,
+ ) -> List[str]:
+ """
+ Retrieves the names of the workers handling a given duty, by either legacy
+ option or instance list.
+
+ There are two ways of configuring which instances handle a given duty, e.g.
+ for configuring pushers:
+
+ 1. The old way where "start_pushers" is set to false and running a
+ `synapse.app.pusher'` worker app.
+ 2. Specifying the workers sending federation in `pusher_instances`.
+
+ Args:
+ config: settings read from yaml.
+ legacy_option_name: the old way of enabling options. e.g. 'start_pushers'
+ legacy_app_name: The historical app name. e.g. 'synapse.app.pusher'
+ modern_instance_list_name: the string name of the new instance_list. e.g.
+ 'pusher_instances'
+
+ Returns:
+ A list of worker instance names handling the given duty.
+ """
+
+ legacy_option = config.get(legacy_option_name, True)
+
+ worker_instances = config.get(modern_instance_list_name)
+ if worker_instances is None:
+ # Default to an empty list, which means "another, unknown, worker is
+ # responsible for it".
+ worker_instances = []
+
+ # If no worker instances are set we check if the legacy option
+ # is set, which means use the main process.
+ if legacy_option:
+ worker_instances = ["master"]
+
+ if self.worker_app == legacy_app_name:
+ if legacy_option:
+ # If we're using `legacy_app_name`, and not using
+ # `modern_instance_list_name`, then we should have
+ # explicitly set `legacy_option_name` to false.
+ raise ConfigError(
+ f"The '{legacy_option_name}' config option must be disabled in "
+ "the main synapse process before they can be run in a separate "
+ "worker.\n"
+ f"Please add `{legacy_option_name}: false` to the main config.\n",
+ )
+
+ worker_instances = [self.worker_name]
+
+ return worker_instances
+
def read_arguments(self, args: argparse.Namespace) -> None:
# We support a bunch of command line arguments that override options in
# the config. A lot of these options have a worker_* prefix when running
|