summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/12452.feature1
-rwxr-xr-xdocker/configure_workers_and_start.py4
-rw-r--r--docs/upgrade.md27
-rw-r--r--docs/workers.md20
-rw-r--r--synapse/app/generic_worker.py16
-rw-r--r--synapse/config/appservice.py1
-rw-r--r--synapse/config/workers.py109
-rw-r--r--synapse/handlers/appservice.py2
-rw-r--r--tests/config/test_workers.py288
9 files changed, 447 insertions, 21 deletions
diff --git a/changelog.d/12452.feature b/changelog.d/12452.feature
new file mode 100644
index 0000000000..22f054d344
--- /dev/null
+++ b/changelog.d/12452.feature
@@ -0,0 +1 @@
+Add the `notify_appservices_from_worker` configuration option (superseding `notify_appservices`) to allow a generic worker to be designated as the worker to send traffic to Application Services.
\ No newline at end of file
diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py
index 33fc20d218..b2b7938ae8 100755
--- a/docker/configure_workers_and_start.py
+++ b/docker/configure_workers_and_start.py
@@ -69,10 +69,10 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = {
         "worker_extra_conf": "enable_media_repo: true",
     },
     "appservice": {
-        "app": "synapse.app.appservice",
+        "app": "synapse.app.generic_worker",
         "listener_resources": [],
         "endpoint_patterns": [],
-        "shared_extra_conf": {"notify_appservices": False},
+        "shared_extra_conf": {"notify_appservices_from_worker": "appservice"},
         "worker_extra_conf": "",
     },
     "federation_sender": {
diff --git a/docs/upgrade.md b/docs/upgrade.md
index b40cac86f0..18c33a4198 100644
--- a/docs/upgrade.md
+++ b/docs/upgrade.md
@@ -100,6 +100,32 @@ To re-enable this functionality, set the
 [`allow_device_name_lookup_over_federation`](https://matrix-org.github.io/synapse/v1.59/usage/configuration/config_documentation.html#federation)
 homeserver config option to `true`.
 
+
+## Deprecation of the `synapse.app.appservice` worker application type
+
+The `synapse.app.appservice` worker application type allowed you to configure a
+single worker to use to notify application services of new events, as long
+as this functionality was disabled on the main process with `notify_appservices: False`.
+
+To unify Synapse's worker types, the `synapse.app.appservice` worker application
+type and the `notify_appservices` configuration option have been deprecated.
+
+To get the same functionality, it's now recommended that the `synapse.app.generic_worker`
+worker application type is used and that the `notify_appservices_from_worker` option
+is set to the name of a worker.
+
+For the time being, `notify_appservices_from_worker` can be used alongside
+`synapse.app.appservice` and `notify_appservices` to make it easier to transition
+between the two configurations, however please note that:
+
+- the options must not contradict each other (otherwise Synapse won't start); and
+- the `notify_appservices` option will be removed in a future release of Synapse.
+
+Please see [the relevant section of the worker documentation][v1_59_notify_ases_from] for more information.
+
+[v1_59_notify_ases_from]: workers.md#notifying-application-services
+
+
 # Upgrading to v1.58.0
 
 ## Groups/communities feature has been disabled by default
@@ -107,6 +133,7 @@ homeserver config option to `true`.
 The non-standard groups/communities feature in Synapse has been disabled by default
 and will be removed in Synapse v1.61.0.
 
+
 # Upgrading to v1.57.0
 
 ## Changes to database schema for application services
diff --git a/docs/workers.md b/docs/workers.md
index afdcd785e4..1d049b6c4f 100644
--- a/docs/workers.md
+++ b/docs/workers.md
@@ -435,6 +435,23 @@ An example for a dedicated background worker instance:
 {{#include systemd-with-workers/workers/background_worker.yaml}}
 ```
 
+#### Notifying Application Services
+
+You can designate one worker to send output traffic to Application Services.
+
+Specify its name in the shared configuration as follows:
+
+```yaml
+notify_appservices_from_worker: worker_name
+```
+
+This work cannot be load-balanced; please ensure the main process is restarted
+after setting this option in the shared configuration!
+
+This style of configuration supersedes the legacy `synapse.app.appservice`
+worker application type.
+
+
 ### `synapse.app.pusher`
 
 Handles sending push notifications to sygnal and email. Doesn't handle any
@@ -453,6 +470,9 @@ pusher_instances:
 
 ### `synapse.app.appservice`
 
+**Deprecated as of Synapse v1.58.** [Use `synapse.app.generic_worker` with the
+`notify_appservices_from_worker` option instead.](#notifying-application-services)
+
 Handles sending output traffic to Application Services. Doesn't handle any
 REST endpoints itself, but you should set `notify_appservices: False` in the
 shared configuration file to stop the main synapse sending appservice notifications.
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 1865c671f4..07dddc0b13 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -441,22 +441,6 @@ def start(config_options: List[str]) -> None:
         "synapse.app.user_dir",
     )
 
-    if config.worker.worker_app == "synapse.app.appservice":
-        if config.appservice.notify_appservices:
-            sys.stderr.write(
-                "\nThe appservices must be disabled in the main synapse process"
-                "\nbefore they can be run in a separate worker."
-                "\nPlease add ``notify_appservices: false`` to the main config"
-                "\n"
-            )
-            sys.exit(1)
-
-        # Force the appservice to start since they will be disabled in the main config
-        config.appservice.notify_appservices = True
-    else:
-        # For other worker types we force this to off.
-        config.appservice.notify_appservices = False
-
     if config.worker.worker_app == "synapse.app.user_dir":
         if config.server.update_user_directory:
             sys.stderr.write(
diff --git a/synapse/config/appservice.py b/synapse/config/appservice.py
index 720b90a283..b13cc6bb6e 100644
--- a/synapse/config/appservice.py
+++ b/synapse/config/appservice.py
@@ -33,7 +33,6 @@ class AppServiceConfig(Config):
 
     def read_config(self, config: JsonDict, **kwargs: Any) -> None:
         self.app_service_config_files = config.get("app_service_config_files", [])
-        self.notify_appservices = config.get("notify_appservices", True)
         self.track_appservice_user_ips = config.get("track_appservice_user_ips", False)
 
     def generate_config_section(cls, **kwargs: Any) -> str:
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index a5479dfca9..a9dbcc6d3d 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -14,7 +14,8 @@
 # limitations under the License.
 
 import argparse
-from typing import Any, List, Union
+import logging
+from typing import Any, Dict, List, Union
 
 import attr
 
@@ -42,6 +43,13 @@ synapse process before they can be run in a separate worker.
 Please add ``start_pushers: false`` to the main config
 """
 
+_DEPRECATED_WORKER_DUTY_OPTION_USED = """
+The '%s' configuration option is deprecated and will be removed in a future
+Synapse version. Please use ``%s: name_of_worker`` instead.
+"""
+
+logger = logging.getLogger(__name__)
+
 
 def _instance_to_list_converter(obj: Union[str, List[str]]) -> List[str]:
     """Helper for allowing parsing a string or list of strings to a config
@@ -296,6 +304,105 @@ class WorkerConfig(Config):
             self.worker_name is None and background_tasks_instance == "master"
         ) or self.worker_name == background_tasks_instance
 
+        self.should_notify_appservices = self._should_this_worker_perform_duty(
+            config,
+            legacy_master_option_name="notify_appservices",
+            legacy_worker_app_name="synapse.app.appservice",
+            new_option_name="notify_appservices_from_worker",
+        )
+
+    def _should_this_worker_perform_duty(
+        self,
+        config: Dict[str, Any],
+        legacy_master_option_name: str,
+        legacy_worker_app_name: str,
+        new_option_name: str,
+    ) -> bool:
+        """
+        Figures out whether this worker should perform a certain duty.
+
+        This function is temporary and is only to deal with the complexity
+        of allowing old, transitional and new configurations all at once.
+
+        Contradictions between the legacy and new part of a transitional configuration
+        will lead to a ConfigError.
+
+        Parameters:
+            config: The config dictionary
+            legacy_master_option_name: The name of a legacy option, whose value is boolean,
+                specifying whether it's the master that should handle a certain duty.
+                e.g. "notify_appservices"
+            legacy_worker_app_name: The name of a legacy Synapse worker application
+                that would traditionally perform this duty.
+                e.g. "synapse.app.appservice"
+            new_option_name: The name of the new option, whose value is the name of a
+                designated worker to perform the duty.
+                e.g. "notify_appservices_from_worker"
+        """
+
+        # None means 'unspecified'; True means 'run here' and False means
+        # 'don't run here'.
+        new_option_should_run_here = None
+        if new_option_name in config:
+            designated_worker = config[new_option_name] or "master"
+            new_option_should_run_here = (
+                designated_worker == "master" and self.worker_name is None
+            ) or designated_worker == self.worker_name
+
+        legacy_option_should_run_here = None
+        if legacy_master_option_name in config:
+            run_on_master = bool(config[legacy_master_option_name])
+
+            legacy_option_should_run_here = (
+                self.worker_name is None and run_on_master
+            ) or (self.worker_app == legacy_worker_app_name and not run_on_master)
+
+            # Suggest using the new option instead.
+            logger.warning(
+                _DEPRECATED_WORKER_DUTY_OPTION_USED,
+                legacy_master_option_name,
+                new_option_name,
+            )
+
+        if self.worker_app == legacy_worker_app_name and config.get(
+            legacy_master_option_name, True
+        ):
+            # As an extra bit of complication, we need to check that the
+            # specialised worker is only used if the legacy config says the
+            # master isn't performing the duties.
+            raise ConfigError(
+                f"Cannot use deprecated worker app type '{legacy_worker_app_name}' whilst deprecated option '{legacy_master_option_name}' is not set to false.\n"
+                f"Consider setting `worker_app: synapse.app.generic_worker` and using the '{new_option_name}' option instead.\n"
+                f"The '{new_option_name}' option replaces '{legacy_master_option_name}'."
+            )
+
+        if new_option_should_run_here is None and legacy_option_should_run_here is None:
+            # Neither option specified; the fallback behaviour is to run on the main process
+            return self.worker_name is None
+
+        if (
+            new_option_should_run_here is not None
+            and legacy_option_should_run_here is not None
+        ):
+            # Both options specified; ensure they match!
+            if new_option_should_run_here != legacy_option_should_run_here:
+                update_worker_type = (
+                    " and set worker_app: synapse.app.generic_worker"
+                    if self.worker_app == legacy_worker_app_name
+                    else ""
+                )
+                # If the values conflict, we suggest the admin removes the legacy option
+                # for simplicity.
+                raise ConfigError(
+                    f"Conflicting configuration options: {legacy_master_option_name} (legacy), {new_option_name} (new).\n"
+                    f"Suggestion: remove {legacy_master_option_name}{update_worker_type}.\n"
+                )
+
+        # We've already validated that these aren't conflicting; now just see if
+        # either is True.
+        # (By this point, these are either the same value or only one is not None.)
+        return bool(new_option_should_run_here or legacy_option_should_run_here)
+
     def generate_config_section(self, **kwargs: Any) -> str:
         return """\
         ## Workers ##
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index b3894666cc..85bd5e4768 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -59,7 +59,7 @@ class ApplicationServicesHandler:
         self.scheduler = hs.get_application_service_scheduler()
         self.started_scheduler = False
         self.clock = hs.get_clock()
-        self.notify_appservices = hs.config.appservice.notify_appservices
+        self.notify_appservices = hs.config.worker.should_notify_appservices
         self.event_sources = hs.get_event_sources()
         self._msc2409_to_device_messages_enabled = (
             hs.config.experimental.msc2409_to_device_messages_enabled
diff --git a/tests/config/test_workers.py b/tests/config/test_workers.py
new file mode 100644
index 0000000000..da81bb9655
--- /dev/null
+++ b/tests/config/test_workers.py
@@ -0,0 +1,288 @@
+# Copyright 2022 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from typing import Any, Mapping, Optional
+from unittest.mock import Mock
+
+from frozendict import frozendict
+
+from synapse.config import ConfigError
+from synapse.config.workers import WorkerConfig
+
+from tests.unittest import TestCase
+
+_EMPTY_FROZENDICT: Mapping[str, Any] = frozendict()
+
+
+class WorkerDutyConfigTestCase(TestCase):
+    def _make_worker_config(
+        self,
+        worker_app: str,
+        worker_name: Optional[str],
+        extras: Mapping[str, Any] = _EMPTY_FROZENDICT,
+    ) -> WorkerConfig:
+        root_config = Mock()
+        root_config.worker_app = worker_app
+        root_config.worker_name = worker_name
+        worker_config = WorkerConfig(root_config)
+        worker_config_dict = {
+            "worker_name": worker_name,
+            "worker_app": worker_app,
+            **extras,
+        }
+        worker_config.read_config(worker_config_dict)
+        return worker_config
+
+    def test_old_configs_master(self) -> None:
+        """
+        Tests old (legacy) config options. This is for the master's config.
+        """
+        main_process_config = self._make_worker_config(
+            worker_app="synapse.app.homeserver", worker_name=None
+        )
+
+        self.assertTrue(
+            main_process_config._should_this_worker_perform_duty(
+                {},
+                "notify_appservices",
+                "synapse.app.appservice",
+                "notify_appservices_from_worker",
+            )
+        )
+
+        self.assertTrue(
+            main_process_config._should_this_worker_perform_duty(
+                {
+                    "notify_appservices": True,
+                },
+                "notify_appservices",
+                "synapse.app.appservice",
+                "notify_appservices_from_worker",
+            )
+        )
+
+        self.assertFalse(
+            main_process_config._should_this_worker_perform_duty(
+                {
+                    "notify_appservices": False,
+                },
+                "notify_appservices",
+                "synapse.app.appservice",
+                "notify_appservices_from_worker",
+            )
+        )
+
+    def test_old_configs_appservice_worker(self) -> None:
+        """
+        Tests old (legacy) config options. This is for the worker's config.
+        """
+        appservice_worker_config = self._make_worker_config(
+            worker_app="synapse.app.appservice",
+            worker_name="worker1",
+            extras={
+                # Set notify_appservices to false for the initialiser's config,
+                # so that it doesn't raise an exception here.
+                # (This is not read by `_should_this_worker_perform_duty`.)
+                "notify_appservices": False,
+            },
+        )
+
+        with self.assertRaises(ConfigError):
+            # This raises because you need to set notify_appservices: False
+            # before using the synapse.app.appservice worker type
+            self.assertFalse(
+                appservice_worker_config._should_this_worker_perform_duty(
+                    {},
+                    "notify_appservices",
+                    "synapse.app.appservice",
+                    "notify_appservices_from_worker",
+                )
+            )
+
+        with self.assertRaises(ConfigError):
+            # This also raises because you need to set notify_appservices: False
+            # before using the synapse.app.appservice worker type
+            appservice_worker_config._should_this_worker_perform_duty(
+                {
+                    "notify_appservices": True,
+                },
+                "notify_appservices",
+                "synapse.app.appservice",
+                "notify_appservices_from_worker",
+            )
+
+        self.assertTrue(
+            appservice_worker_config._should_this_worker_perform_duty(
+                {
+                    "notify_appservices": False,
+                },
+                "notify_appservices",
+                "synapse.app.appservice",
+                "notify_appservices_from_worker",
+            )
+        )
+
+    def test_transitional_configs_master(self) -> None:
+        """
+        Tests transitional (legacy + new) config options. This is for the master's config.
+        """
+
+        main_process_config = self._make_worker_config(
+            worker_app="synapse.app.homeserver", worker_name=None
+        )
+
+        self.assertTrue(
+            main_process_config._should_this_worker_perform_duty(
+                {
+                    "notify_appservices": True,
+                    "notify_appservices_from_worker": "master",
+                },
+                "notify_appservices",
+                "synapse.app.appservice",
+                "notify_appservices_from_worker",
+            )
+        )
+
+        self.assertFalse(
+            main_process_config._should_this_worker_perform_duty(
+                {
+                    "notify_appservices": False,
+                    "notify_appservices_from_worker": "worker1",
+                },
+                "notify_appservices",
+                "synapse.app.appservice",
+                "notify_appservices_from_worker",
+            )
+        )
+
+        with self.assertRaises(ConfigError):
+            # Contradictory because we say the master should notify appservices,
+            # then we say worker1 is the designated worker to do that!
+            main_process_config._should_this_worker_perform_duty(
+                {
+                    "notify_appservices": True,
+                    "notify_appservices_from_worker": "worker1",
+                },
+                "notify_appservices",
+                "synapse.app.appservice",
+                "notify_appservices_from_worker",
+            )
+
+        with self.assertRaises(ConfigError):
+            # Contradictory because we say the master shouldn't notify appservices,
+            # then we say master is the designated worker to do that!
+            main_process_config._should_this_worker_perform_duty(
+                {
+                    "notify_appservices": False,
+                    "notify_appservices_from_worker": "master",
+                },
+                "notify_appservices",
+                "synapse.app.appservice",
+                "notify_appservices_from_worker",
+            )
+
+    def test_transitional_configs_appservice_worker(self) -> None:
+        """
+        Tests transitional (legacy + new) config options. This is for the worker's config.
+        """
+        appservice_worker_config = self._make_worker_config(
+            worker_app="synapse.app.appservice",
+            worker_name="worker1",
+            extras={
+                # Set notify_appservices to false for the initialiser's config,
+                # so that it doesn't raise an exception here.
+                # (This is not read by `_should_this_worker_perform_duty`.)
+                "notify_appservices": False,
+            },
+        )
+
+        self.assertTrue(
+            appservice_worker_config._should_this_worker_perform_duty(
+                {
+                    "notify_appservices": False,
+                    "notify_appservices_from_worker": "worker1",
+                },
+                "notify_appservices",
+                "synapse.app.appservice",
+                "notify_appservices_from_worker",
+            )
+        )
+
+        with self.assertRaises(ConfigError):
+            # This raises because this worker is the appservice app type, yet
+            # another worker is the designated worker!
+            appservice_worker_config._should_this_worker_perform_duty(
+                {
+                    "notify_appservices": False,
+                    "notify_appservices_from_worker": "worker2",
+                },
+                "notify_appservices",
+                "synapse.app.appservice",
+                "notify_appservices_from_worker",
+            )
+
+    def test_new_configs_master(self) -> None:
+        """
+        Tests new config options. This is for the master's config.
+        """
+        main_process_config = self._make_worker_config(
+            worker_app="synapse.app.homeserver", worker_name=None
+        )
+
+        self.assertTrue(
+            main_process_config._should_this_worker_perform_duty(
+                {"notify_appservices_from_worker": None},
+                "notify_appservices",
+                "synapse.app.appservice",
+                "notify_appservices_from_worker",
+            )
+        )
+
+        self.assertFalse(
+            main_process_config._should_this_worker_perform_duty(
+                {"notify_appservices_from_worker": "worker1"},
+                "notify_appservices",
+                "synapse.app.appservice",
+                "notify_appservices_from_worker",
+            )
+        )
+
+    def test_new_configs_appservice_worker(self) -> None:
+        """
+        Tests new config options. This is for the worker's config.
+        """
+        appservice_worker_config = self._make_worker_config(
+            worker_app="synapse.app.generic_worker", worker_name="worker1"
+        )
+
+        self.assertTrue(
+            appservice_worker_config._should_this_worker_perform_duty(
+                {
+                    "notify_appservices_from_worker": "worker1",
+                },
+                "notify_appservices",
+                "synapse.app.appservice",
+                "notify_appservices_from_worker",
+            )
+        )
+
+        self.assertFalse(
+            appservice_worker_config._should_this_worker_perform_duty(
+                {
+                    "notify_appservices_from_worker": "worker2",
+                },
+                "notify_appservices",
+                "synapse.app.appservice",
+                "notify_appservices_from_worker",
+            )
+        )