diff --git a/synapse/config/_base.py b/synapse/config/_base.py
index e89decda34..4026966711 100644
--- a/synapse/config/_base.py
+++ b/synapse/config/_base.py
@@ -844,22 +844,23 @@ class ShardedWorkerHandlingConfig:
def should_handle(self, instance_name: str, key: str) -> bool:
"""Whether this instance is responsible for handling the given key."""
- # If multiple instances are not defined we always return true
- if not self.instances or len(self.instances) == 1:
- return True
+ # If no instances are defined we assume some other worker is handling
+ # this.
+ if not self.instances:
+ return False
- return self.get_instance(key) == instance_name
+ return self._get_instance(key) == instance_name
- def get_instance(self, key: str) -> str:
+ def _get_instance(self, key: str) -> str:
"""Get the instance responsible for handling the given key.
- Note: For things like federation sending the config for which instance
- is sending is known only to the sender instance if there is only one.
- Therefore `should_handle` should be used where possible.
+ Note: For federation sending and pushers the config for which instance
+ is sending is known only to the sender instance, so we don't expose this
+ method by default.
"""
if not self.instances:
- return "master"
+ raise Exception("Unknown worker")
if len(self.instances) == 1:
return self.instances[0]
@@ -876,4 +877,21 @@ class ShardedWorkerHandlingConfig:
return self.instances[remainder]
+@attr.s
+class RoutableShardedWorkerHandlingConfig(ShardedWorkerHandlingConfig):
+ """A version of `ShardedWorkerHandlingConfig` that is used for config
+ options where all instances know which instances are responsible for the
+ sharded work.
+ """
+
+ def __attrs_post_init__(self):
+ # We require that `self.instances` is non-empty.
+ if not self.instances:
+ raise Exception("Got empty list of instances for shard config")
+
+ def get_instance(self, key: str) -> str:
+ """Get the instance responsible for handling the given key."""
+ return self._get_instance(key)
+
+
__all__ = ["Config", "RootConfig", "ShardedWorkerHandlingConfig"]
diff --git a/synapse/config/_base.pyi b/synapse/config/_base.pyi
index 70025b5d60..db16c86f50 100644
--- a/synapse/config/_base.pyi
+++ b/synapse/config/_base.pyi
@@ -149,4 +149,6 @@ class ShardedWorkerHandlingConfig:
instances: List[str]
def __init__(self, instances: List[str]) -> None: ...
def should_handle(self, instance_name: str, key: str) -> bool: ...
+
+class RoutableShardedWorkerHandlingConfig(ShardedWorkerHandlingConfig):
def get_instance(self, key: str) -> str: ...
diff --git a/synapse/config/push.py b/synapse/config/push.py
index 3adbfb73e6..7831a2ef79 100644
--- a/synapse/config/push.py
+++ b/synapse/config/push.py
@@ -14,7 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from ._base import Config, ShardedWorkerHandlingConfig
+from ._base import Config
class PushConfig(Config):
@@ -27,9 +27,6 @@ class PushConfig(Config):
"group_unread_count_by_room", True
)
- pusher_instances = config.get("pusher_instances") or []
- self.pusher_shard_config = ShardedWorkerHandlingConfig(pusher_instances)
-
# There was a a 'redact_content' setting but mistakenly read from the
# 'email'section'. Check for the flag in the 'push' section, and log,
# but do not honour it to avoid nasty surprises when people upgrade.
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 0bfd4398e2..2afca36e7d 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -397,7 +397,6 @@ class ServerConfig(Config):
if self.public_baseurl is not None:
if self.public_baseurl[-1] != "/":
self.public_baseurl += "/"
- self.start_pushers = config.get("start_pushers", True)
# (undocumented) option for torturing the worker-mode replication a bit,
# for testing. The value defines the number of milliseconds to pause before
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index 7a0ca16da8..ac92375a85 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -17,9 +17,28 @@ from typing import List, Union
import attr
-from ._base import Config, ConfigError, ShardedWorkerHandlingConfig
+from ._base import (
+ Config,
+ ConfigError,
+ RoutableShardedWorkerHandlingConfig,
+ ShardedWorkerHandlingConfig,
+)
from .server import ListenerConfig, parse_listener_def
+_FEDERATION_SENDER_WITH_SEND_FEDERATION_ENABLED_ERROR = """
+The send_federation config option must be disabled in the main
+synapse process before they can be run in a separate worker.
+
+Please add ``send_federation: false`` to the main config
+"""
+
+_PUSHER_WITH_START_PUSHERS_ENABLED_ERROR = """
+The start_pushers config option must be disabled in the main
+synapse process before they can be run in a separate worker.
+
+Please add ``start_pushers: false`` to the main config
+"""
+
def _instance_to_list_converter(obj: Union[str, List[str]]) -> List[str]:
"""Helper for allowing parsing a string or list of strings to a config
@@ -103,6 +122,7 @@ class WorkerConfig(Config):
self.worker_replication_secret = config.get("worker_replication_secret", None)
self.worker_name = config.get("worker_name", self.worker_app)
+ self.instance_name = self.worker_name or "master"
self.worker_main_http_uri = config.get("worker_main_http_uri", None)
@@ -118,12 +138,41 @@ class WorkerConfig(Config):
)
)
- # Whether to send federation traffic out in this process. This only
- # applies to some federation traffic, and so shouldn't be used to
- # "disable" federation
- self.send_federation = config.get("send_federation", True)
+ # Handle federation sender configuration.
+ #
+ # There are two ways of configuring which instances handle federation
+ # sending:
+ # 1. The old way where "send_federation" is set to false and running a
+ # `synapse.app.federation_sender` worker app.
+ # 2. Specifying the workers sending federation in
+ # `federation_sender_instances`.
+ #
+
+ send_federation = config.get("send_federation", True)
+
+ federation_sender_instances = config.get("federation_sender_instances")
+ if federation_sender_instances is None:
+ # Default to an empty list, which means "another, unknown, worker is
+ # responsible for it".
+ federation_sender_instances = []
- federation_sender_instances = config.get("federation_sender_instances") or []
+ # If no federation sender instances are set we check if
+ # `send_federation` is set, which means use master
+ if send_federation:
+ federation_sender_instances = ["master"]
+
+ if self.worker_app == "synapse.app.federation_sender":
+ if send_federation:
+ # If we're running federation senders, and not using
+ # `federation_sender_instances`, then we should have
+ # explicitly set `send_federation` to false.
+ raise ConfigError(
+ _FEDERATION_SENDER_WITH_SEND_FEDERATION_ENABLED_ERROR
+ )
+
+ federation_sender_instances = [self.worker_name]
+
+ self.send_federation = self.instance_name in federation_sender_instances
self.federation_shard_config = ShardedWorkerHandlingConfig(
federation_sender_instances
)
@@ -164,7 +213,37 @@ class WorkerConfig(Config):
"Must only specify one instance to handle `receipts` messages."
)
- self.events_shard_config = ShardedWorkerHandlingConfig(self.writers.events)
+ if len(self.writers.events) == 0:
+ raise ConfigError("Must specify at least one instance to handle `events`.")
+
+ self.events_shard_config = RoutableShardedWorkerHandlingConfig(
+ self.writers.events
+ )
+
+ # Handle sharded push
+ start_pushers = config.get("start_pushers", True)
+ pusher_instances = config.get("pusher_instances")
+ if pusher_instances is None:
+ # Default to an empty list, which means "another, unknown, worker is
+ # responsible for it".
+ pusher_instances = []
+
+ # If no pushers instances are set we check if `start_pushers` is
+ # set, which means use master
+ if start_pushers:
+ pusher_instances = ["master"]
+
+ if self.worker_app == "synapse.app.pusher":
+ if start_pushers:
+ # If we're running pushers, and not using
+ # `pusher_instances`, then we should have explicitly set
+ # `start_pushers` to false.
+ raise ConfigError(_PUSHER_WITH_START_PUSHERS_ENABLED_ERROR)
+
+ pusher_instances = [self.instance_name]
+
+ self.start_pushers = self.instance_name in pusher_instances
+ self.pusher_shard_config = ShardedWorkerHandlingConfig(pusher_instances)
# Whether this worker should run background tasks or not.
#
|