summary refs log tree commit diff
path: root/synapse/config/workers.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/config/workers.py')
-rw-r--r--synapse/config/workers.py139
1 files changed, 70 insertions, 69 deletions
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index 913b83e174..2580660b6c 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -29,20 +29,6 @@ from ._base import (
 )
 from .server import DIRECT_TCP_ERROR, ListenerConfig, parse_listener_def
 
-_FEDERATION_SENDER_WITH_SEND_FEDERATION_ENABLED_ERROR = """
-The send_federation config option must be disabled in the main
-synapse process before they can be run in a separate worker.
-
-Please add ``send_federation: false`` to the main config
-"""
-
-_PUSHER_WITH_START_PUSHERS_ENABLED_ERROR = """
-The start_pushers config option must be disabled in the main
-synapse process before they can be run in a separate worker.
-
-Please add ``start_pushers: false`` to the main config
-"""
-
 _DEPRECATED_WORKER_DUTY_OPTION_USED = """
 The '%s' configuration option is deprecated and will be removed in a future
 Synapse version. Please use ``%s: name_of_worker`` instead.
@@ -182,40 +168,12 @@ class WorkerConfig(Config):
                 )
             )
 
-        # Handle federation sender configuration.
-        #
-        # There are two ways of configuring which instances handle federation
-        # sending:
-        #   1. The old way where "send_federation" is set to false and running a
-        #      `synapse.app.federation_sender` worker app.
-        #   2. Specifying the workers sending federation in
-        #      `federation_sender_instances`.
-        #
-
-        send_federation = config.get("send_federation", True)
-
-        federation_sender_instances = config.get("federation_sender_instances")
-        if federation_sender_instances is None:
-            # Default to an empty list, which means "another, unknown, worker is
-            # responsible for it".
-            federation_sender_instances = []
-
-            # If no federation sender instances are set we check if
-            # `send_federation` is set, which means use master
-            if send_federation:
-                federation_sender_instances = ["master"]
-
-            if self.worker_app == "synapse.app.federation_sender":
-                if send_federation:
-                    # If we're running federation senders, and not using
-                    # `federation_sender_instances`, then we should have
-                    # explicitly set `send_federation` to false.
-                    raise ConfigError(
-                        _FEDERATION_SENDER_WITH_SEND_FEDERATION_ENABLED_ERROR
-                    )
-
-                federation_sender_instances = [self.worker_name]
-
+        federation_sender_instances = self._worker_names_performing_this_duty(
+            config,
+            "send_federation",
+            "synapse.app.federation_sender",
+            "federation_sender_instances",
+        )
         self.send_federation = self.instance_name in federation_sender_instances
         self.federation_shard_config = ShardedWorkerHandlingConfig(
             federation_sender_instances
@@ -282,27 +240,12 @@ class WorkerConfig(Config):
         )
 
         # Handle sharded push
-        start_pushers = config.get("start_pushers", True)
-        pusher_instances = config.get("pusher_instances")
-        if pusher_instances is None:
-            # Default to an empty list, which means "another, unknown, worker is
-            # responsible for it".
-            pusher_instances = []
-
-            # If no pushers instances are set we check if `start_pushers` is
-            # set, which means use master
-            if start_pushers:
-                pusher_instances = ["master"]
-
-            if self.worker_app == "synapse.app.pusher":
-                if start_pushers:
-                    # If we're running pushers, and not using
-                    # `pusher_instances`, then we should have explicitly set
-                    # `start_pushers` to false.
-                    raise ConfigError(_PUSHER_WITH_START_PUSHERS_ENABLED_ERROR)
-
-                pusher_instances = [self.instance_name]
-
+        pusher_instances = self._worker_names_performing_this_duty(
+            config,
+            "start_pushers",
+            "synapse.app.pusher",
+            "pusher_instances",
+        )
         self.start_pushers = self.instance_name in pusher_instances
         self.pusher_shard_config = ShardedWorkerHandlingConfig(pusher_instances)
 
@@ -425,6 +368,64 @@ class WorkerConfig(Config):
         # (By this point, these are either the same value or only one is not None.)
         return bool(new_option_should_run_here or legacy_option_should_run_here)
 
+    def _worker_names_performing_this_duty(
+        self,
+        config: Dict[str, Any],
+        legacy_option_name: str,
+        legacy_app_name: str,
+        modern_instance_list_name: str,
+    ) -> List[str]:
+        """
+        Retrieves the names of the workers handling a given duty, by either legacy
+        option or instance list.
+
+        There are two ways of configuring which instances handle a given duty, e.g.
+        for configuring pushers:
+
+        1. The old way where "start_pushers" is set to false and running a
+          `synapse.app.pusher'` worker app.
+        2. Specifying the workers sending federation in `pusher_instances`.
+
+        Args:
+            config: settings read from yaml.
+            legacy_option_name: the old way of enabling options. e.g. 'start_pushers'
+            legacy_app_name: The historical app name. e.g. 'synapse.app.pusher'
+            modern_instance_list_name: the string name of the new instance_list. e.g.
+            'pusher_instances'
+
+        Returns:
+            A list of worker instance names handling the given duty.
+        """
+
+        legacy_option = config.get(legacy_option_name, True)
+
+        worker_instances = config.get(modern_instance_list_name)
+        if worker_instances is None:
+            # Default to an empty list, which means "another, unknown, worker is
+            # responsible for it".
+            worker_instances = []
+
+            # If no worker instances are set we check if the legacy option
+            # is set, which means use the main process.
+            if legacy_option:
+                worker_instances = ["master"]
+
+            if self.worker_app == legacy_app_name:
+                if legacy_option:
+                    # If we're using `legacy_app_name`, and not using
+                    # `modern_instance_list_name`, then we should have
+                    # explicitly set `legacy_option_name` to false.
+                    raise ConfigError(
+                        f"The '{legacy_option_name}' config option must be disabled in "
+                        "the main synapse process before they can be run in a separate "
+                        "worker.\n"
+                        f"Please add `{legacy_option_name}: false` to the main config.\n",
+                    )
+
+                worker_instances = [self.worker_name]
+
+        return worker_instances
+
     def read_arguments(self, args: argparse.Namespace) -> None:
         # We support a bunch of command line arguments that override options in
         # the config. A lot of these options have a worker_* prefix when running