diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index 7a0ca16da8..ac92375a85 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -17,9 +17,28 @@ from typing import List, Union
import attr
-from ._base import Config, ConfigError, ShardedWorkerHandlingConfig
+from ._base import (
+ Config,
+ ConfigError,
+ RoutableShardedWorkerHandlingConfig,
+ ShardedWorkerHandlingConfig,
+)
from .server import ListenerConfig, parse_listener_def
+_FEDERATION_SENDER_WITH_SEND_FEDERATION_ENABLED_ERROR = """
+The send_federation config option must be disabled in the main
+synapse process before they can be run in a separate worker.
+
+Please add ``send_federation: false`` to the main config
+"""
+
+_PUSHER_WITH_START_PUSHERS_ENABLED_ERROR = """
+The start_pushers config option must be disabled in the main
+synapse process before they can be run in a separate worker.
+
+Please add ``start_pushers: false`` to the main config
+"""
+
def _instance_to_list_converter(obj: Union[str, List[str]]) -> List[str]:
"""Helper for allowing parsing a string or list of strings to a config
@@ -103,6 +122,7 @@ class WorkerConfig(Config):
self.worker_replication_secret = config.get("worker_replication_secret", None)
self.worker_name = config.get("worker_name", self.worker_app)
+ self.instance_name = self.worker_name or "master"
self.worker_main_http_uri = config.get("worker_main_http_uri", None)
@@ -118,12 +138,41 @@ class WorkerConfig(Config):
)
)
- # Whether to send federation traffic out in this process. This only
- # applies to some federation traffic, and so shouldn't be used to
- # "disable" federation
- self.send_federation = config.get("send_federation", True)
+ # Handle federation sender configuration.
+ #
+ # There are two ways of configuring which instances handle federation
+ # sending:
+ # 1. The old way where "send_federation" is set to false and running a
+ # `synapse.app.federation_sender` worker app.
+ # 2. Specifying the workers sending federation in
+ # `federation_sender_instances`.
+ #
+
+ send_federation = config.get("send_federation", True)
+
+ federation_sender_instances = config.get("federation_sender_instances")
+ if federation_sender_instances is None:
+ # Default to an empty list, which means "another, unknown, worker is
+ # responsible for it".
+ federation_sender_instances = []
- federation_sender_instances = config.get("federation_sender_instances") or []
+ # If no federation sender instances are set we check if
+ # `send_federation` is set, which means use master
+ if send_federation:
+ federation_sender_instances = ["master"]
+
+ if self.worker_app == "synapse.app.federation_sender":
+ if send_federation:
+ # If we're running federation senders, and not using
+ # `federation_sender_instances`, then we should have
+ # explicitly set `send_federation` to false.
+ raise ConfigError(
+ _FEDERATION_SENDER_WITH_SEND_FEDERATION_ENABLED_ERROR
+ )
+
+ federation_sender_instances = [self.worker_name]
+
+ self.send_federation = self.instance_name in federation_sender_instances
self.federation_shard_config = ShardedWorkerHandlingConfig(
federation_sender_instances
)
@@ -164,7 +213,37 @@ class WorkerConfig(Config):
"Must only specify one instance to handle `receipts` messages."
)
- self.events_shard_config = ShardedWorkerHandlingConfig(self.writers.events)
+ if len(self.writers.events) == 0:
+ raise ConfigError("Must specify at least one instance to handle `events`.")
+
+ self.events_shard_config = RoutableShardedWorkerHandlingConfig(
+ self.writers.events
+ )
+
+ # Handle sharded push
+ start_pushers = config.get("start_pushers", True)
+ pusher_instances = config.get("pusher_instances")
+ if pusher_instances is None:
+ # Default to an empty list, which means "another, unknown, worker is
+ # responsible for it".
+ pusher_instances = []
+
+ # If no pushers instances are set we check if `start_pushers` is
+ # set, which means use master
+ if start_pushers:
+ pusher_instances = ["master"]
+
+ if self.worker_app == "synapse.app.pusher":
+ if start_pushers:
+ # If we're running pushers, and not using
+ # `pusher_instances`, then we should have explicitly set
+ # `start_pushers` to false.
+ raise ConfigError(_PUSHER_WITH_START_PUSHERS_ENABLED_ERROR)
+
+ pusher_instances = [self.instance_name]
+
+ self.start_pushers = self.instance_name in pusher_instances
+ self.pusher_shard_config = ShardedWorkerHandlingConfig(pusher_instances)
# Whether this worker should run background tasks or not.
#
|