summary refs log tree commit diff
path: root/synapse/config/workers.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/config/workers.py')
-rw-r--r--synapse/config/workers.py93
1 files changed, 86 insertions, 7 deletions
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index 7a0ca16da8..ac92375a85 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -17,9 +17,28 @@ from typing import List, Union
 
 import attr
 
-from ._base import Config, ConfigError, ShardedWorkerHandlingConfig
+from ._base import (
+    Config,
+    ConfigError,
+    RoutableShardedWorkerHandlingConfig,
+    ShardedWorkerHandlingConfig,
+)
 from .server import ListenerConfig, parse_listener_def
 
+_FEDERATION_SENDER_WITH_SEND_FEDERATION_ENABLED_ERROR = """
+The send_federation config option must be disabled in the main
+synapse process before they can be run in a separate worker.
+
+Please add ``send_federation: false`` to the main config
+"""
+
+_PUSHER_WITH_START_PUSHERS_ENABLED_ERROR = """
+The start_pushers config option must be disabled in the main
+synapse process before they can be run in a separate worker.
+
+Please add ``start_pushers: false`` to the main config
+"""
+
 
 def _instance_to_list_converter(obj: Union[str, List[str]]) -> List[str]:
     """Helper for allowing parsing a string or list of strings to a config
@@ -103,6 +122,7 @@ class WorkerConfig(Config):
         self.worker_replication_secret = config.get("worker_replication_secret", None)
 
         self.worker_name = config.get("worker_name", self.worker_app)
+        self.instance_name = self.worker_name or "master"
 
         self.worker_main_http_uri = config.get("worker_main_http_uri", None)
 
@@ -118,12 +138,41 @@ class WorkerConfig(Config):
                 )
             )
 
-        # Whether to send federation traffic out in this process. This only
-        # applies to some federation traffic, and so shouldn't be used to
-        # "disable" federation
-        self.send_federation = config.get("send_federation", True)
+        # Handle federation sender configuration.
+        #
+        # There are two ways of configuring which instances handle federation
+        # sending:
+        #   1. The old way where "send_federation" is set to false and running a
+        #      `synapse.app.federation_sender` worker app.
+        #   2. Specifying the workers sending federation in
+        #      `federation_sender_instances`.
+        #
+
+        send_federation = config.get("send_federation", True)
+
+        federation_sender_instances = config.get("federation_sender_instances")
+        if federation_sender_instances is None:
+            # Default to an empty list, which means "another, unknown, worker is
+            # responsible for it".
+            federation_sender_instances = []
 
-        federation_sender_instances = config.get("federation_sender_instances") or []
+            # If no federation sender instances are set we check if
+            # `send_federation` is set, which means use master
+            if send_federation:
+                federation_sender_instances = ["master"]
+
+            if self.worker_app == "synapse.app.federation_sender":
+                if send_federation:
+                    # If we're running federation senders, and not using
+                    # `federation_sender_instances`, then we should have
+                    # explicitly set `send_federation` to false.
+                    raise ConfigError(
+                        _FEDERATION_SENDER_WITH_SEND_FEDERATION_ENABLED_ERROR
+                    )
+
+                federation_sender_instances = [self.worker_name]
+
+        self.send_federation = self.instance_name in federation_sender_instances
         self.federation_shard_config = ShardedWorkerHandlingConfig(
             federation_sender_instances
         )
@@ -164,7 +213,37 @@ class WorkerConfig(Config):
                 "Must only specify one instance to handle `receipts` messages."
             )
 
-        self.events_shard_config = ShardedWorkerHandlingConfig(self.writers.events)
+        if len(self.writers.events) == 0:
+            raise ConfigError("Must specify at least one instance to handle `events`.")
+
+        self.events_shard_config = RoutableShardedWorkerHandlingConfig(
+            self.writers.events
+        )
+
+        # Handle sharded push
+        start_pushers = config.get("start_pushers", True)
+        pusher_instances = config.get("pusher_instances")
+        if pusher_instances is None:
+            # Default to an empty list, which means "another, unknown, worker is
+            # responsible for it".
+            pusher_instances = []
+
+            # If no pushers instances are set we check if `start_pushers` is
+            # set, which means use master
+            if start_pushers:
+                pusher_instances = ["master"]
+
+            if self.worker_app == "synapse.app.pusher":
+                if start_pushers:
+                    # If we're running pushers, and not using
+                    # `pusher_instances`, then we should have explicitly set
+                    # `start_pushers` to false.
+                    raise ConfigError(_PUSHER_WITH_START_PUSHERS_ENABLED_ERROR)
+
+                pusher_instances = [self.instance_name]
+
+        self.start_pushers = self.instance_name in pusher_instances
+        self.pusher_shard_config = ShardedWorkerHandlingConfig(pusher_instances)
 
         # Whether this worker should run background tasks or not.
         #