summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/9466.bugfix1
-rw-r--r--synapse/app/admin_cmd.py2
-rw-r--r--synapse/app/generic_worker.py32
-rw-r--r--synapse/config/_base.py36
-rw-r--r--synapse/config/_base.pyi2
-rw-r--r--synapse/config/push.py5
-rw-r--r--synapse/config/server.py1
-rw-r--r--synapse/config/workers.py93
-rw-r--r--synapse/push/pusherpool.py4
-rw-r--r--synapse/server.py7
-rw-r--r--tests/replication/tcp/streams/test_federation.py2
-rw-r--r--tests/replication/test_federation_ack.py2
-rw-r--r--tests/replication/test_federation_sender_shard.py2
-rw-r--r--tests/replication/test_pusher_shard.py2
14 files changed, 128 insertions, 63 deletions
diff --git a/changelog.d/9466.bugfix b/changelog.d/9466.bugfix
new file mode 100644
index 0000000000..2ab4f315c1
--- /dev/null
+++ b/changelog.d/9466.bugfix
@@ -0,0 +1 @@
+Fix deleting pushers when using sharded pushers.
diff --git a/synapse/app/admin_cmd.py b/synapse/app/admin_cmd.py
index b4bd4d8e7a..9f99651aa2 100644
--- a/synapse/app/admin_cmd.py
+++ b/synapse/app/admin_cmd.py
@@ -210,7 +210,9 @@ def start(config_options):
     config.update_user_directory = False
     config.run_background_tasks = False
     config.start_pushers = False
+    config.pusher_shard_config.instances = []
     config.send_federation = False
+    config.federation_shard_config.instances = []
 
     synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 6ed405e66b..dc0d3eb725 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -919,22 +919,6 @@ def start(config_options):
         # For other worker types we force this to off.
         config.appservice.notify_appservices = False
 
-    if config.worker_app == "synapse.app.pusher":
-        if config.server.start_pushers:
-            sys.stderr.write(
-                "\nThe pushers must be disabled in the main synapse process"
-                "\nbefore they can be run in a separate worker."
-                "\nPlease add ``start_pushers: false`` to the main config"
-                "\n"
-            )
-            sys.exit(1)
-
-        # Force the pushers to start since they will be disabled in the main config
-        config.server.start_pushers = True
-    else:
-        # For other worker types we force this to off.
-        config.server.start_pushers = False
-
     if config.worker_app == "synapse.app.user_dir":
         if config.server.update_user_directory:
             sys.stderr.write(
@@ -951,22 +935,6 @@ def start(config_options):
         # For other worker types we force this to off.
         config.server.update_user_directory = False
 
-    if config.worker_app == "synapse.app.federation_sender":
-        if config.worker.send_federation:
-            sys.stderr.write(
-                "\nThe send_federation must be disabled in the main synapse process"
-                "\nbefore they can be run in a separate worker."
-                "\nPlease add ``send_federation: false`` to the main config"
-                "\n"
-            )
-            sys.exit(1)
-
-        # Force the pushers to start since they will be disabled in the main config
-        config.worker.send_federation = True
-    else:
-        # For other worker types we force this to off.
-        config.worker.send_federation = False
-
     synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
     hs = GenericWorkerServer(
diff --git a/synapse/config/_base.py b/synapse/config/_base.py
index e89decda34..4026966711 100644
--- a/synapse/config/_base.py
+++ b/synapse/config/_base.py
@@ -844,22 +844,23 @@ class ShardedWorkerHandlingConfig:
 
     def should_handle(self, instance_name: str, key: str) -> bool:
         """Whether this instance is responsible for handling the given key."""
-        # If multiple instances are not defined we always return true
-        if not self.instances or len(self.instances) == 1:
-            return True
+        # If no instances are defined we assume some other worker is handling
+        # this.
+        if not self.instances:
+            return False
 
-        return self.get_instance(key) == instance_name
+        return self._get_instance(key) == instance_name
 
-    def get_instance(self, key: str) -> str:
+    def _get_instance(self, key: str) -> str:
         """Get the instance responsible for handling the given key.
 
-        Note: For things like federation sending the config for which instance
-        is sending is known only to the sender instance if there is only one.
-        Therefore `should_handle` should be used where possible.
+        Note: For federation sending and pushers the config for which instance
+        is sending is known only to the sender instance, so we don't expose this
+        method by default.
         """
 
         if not self.instances:
-            return "master"
+            raise Exception("Unknown worker")
 
         if len(self.instances) == 1:
             return self.instances[0]
@@ -876,4 +877,21 @@ class ShardedWorkerHandlingConfig:
         return self.instances[remainder]
 
 
+@attr.s
+class RoutableShardedWorkerHandlingConfig(ShardedWorkerHandlingConfig):
+    """A version of `ShardedWorkerHandlingConfig` that is used for config
+    options where all instances know which instances are responsible for the
+    sharded work.
+    """
+
+    def __attrs_post_init__(self):
+        # We require that `self.instances` is non-empty.
+        if not self.instances:
+            raise Exception("Got empty list of instances for shard config")
+
+    def get_instance(self, key: str) -> str:
+        """Get the instance responsible for handling the given key."""
+        return self._get_instance(key)
+
+
 __all__ = ["Config", "RootConfig", "ShardedWorkerHandlingConfig"]
diff --git a/synapse/config/_base.pyi b/synapse/config/_base.pyi
index 70025b5d60..db16c86f50 100644
--- a/synapse/config/_base.pyi
+++ b/synapse/config/_base.pyi
@@ -149,4 +149,6 @@ class ShardedWorkerHandlingConfig:
     instances: List[str]
     def __init__(self, instances: List[str]) -> None: ...
     def should_handle(self, instance_name: str, key: str) -> bool: ...
+
+class RoutableShardedWorkerHandlingConfig(ShardedWorkerHandlingConfig):
     def get_instance(self, key: str) -> str: ...
diff --git a/synapse/config/push.py b/synapse/config/push.py
index 3adbfb73e6..7831a2ef79 100644
--- a/synapse/config/push.py
+++ b/synapse/config/push.py
@@ -14,7 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ._base import Config, ShardedWorkerHandlingConfig
+from ._base import Config
 
 
 class PushConfig(Config):
@@ -27,9 +27,6 @@ class PushConfig(Config):
             "group_unread_count_by_room", True
         )
 
-        pusher_instances = config.get("pusher_instances") or []
-        self.pusher_shard_config = ShardedWorkerHandlingConfig(pusher_instances)
-
         # There was a a 'redact_content' setting but mistakenly read from the
         # 'email'section'. Check for the flag in the 'push' section, and log,
         # but do not honour it to avoid nasty surprises when people upgrade.
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 0bfd4398e2..2afca36e7d 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -397,7 +397,6 @@ class ServerConfig(Config):
         if self.public_baseurl is not None:
             if self.public_baseurl[-1] != "/":
                 self.public_baseurl += "/"
-        self.start_pushers = config.get("start_pushers", True)
 
         # (undocumented) option for torturing the worker-mode replication a bit,
         # for testing. The value defines the number of milliseconds to pause before
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index 7a0ca16da8..ac92375a85 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -17,9 +17,28 @@ from typing import List, Union
 
 import attr
 
-from ._base import Config, ConfigError, ShardedWorkerHandlingConfig
+from ._base import (
+    Config,
+    ConfigError,
+    RoutableShardedWorkerHandlingConfig,
+    ShardedWorkerHandlingConfig,
+)
 from .server import ListenerConfig, parse_listener_def
 
+_FEDERATION_SENDER_WITH_SEND_FEDERATION_ENABLED_ERROR = """
+The send_federation config option must be disabled in the main
+synapse process before they can be run in a separate worker.
+
+Please add ``send_federation: false`` to the main config
+"""
+
+_PUSHER_WITH_START_PUSHERS_ENABLED_ERROR = """
+The start_pushers config option must be disabled in the main
+synapse process before they can be run in a separate worker.
+
+Please add ``start_pushers: false`` to the main config
+"""
+
 
 def _instance_to_list_converter(obj: Union[str, List[str]]) -> List[str]:
     """Helper for allowing parsing a string or list of strings to a config
@@ -103,6 +122,7 @@ class WorkerConfig(Config):
         self.worker_replication_secret = config.get("worker_replication_secret", None)
 
         self.worker_name = config.get("worker_name", self.worker_app)
+        self.instance_name = self.worker_name or "master"
 
         self.worker_main_http_uri = config.get("worker_main_http_uri", None)
 
@@ -118,12 +138,41 @@ class WorkerConfig(Config):
                 )
             )
 
-        # Whether to send federation traffic out in this process. This only
-        # applies to some federation traffic, and so shouldn't be used to
-        # "disable" federation
-        self.send_federation = config.get("send_federation", True)
+        # Handle federation sender configuration.
+        #
+        # There are two ways of configuring which instances handle federation
+        # sending:
+        #   1. The old way where "send_federation" is set to false and running a
+        #      `synapse.app.federation_sender` worker app.
+        #   2. Specifying the workers sending federation in
+        #      `federation_sender_instances`.
+        #
+
+        send_federation = config.get("send_federation", True)
+
+        federation_sender_instances = config.get("federation_sender_instances")
+        if federation_sender_instances is None:
+            # Default to an empty list, which means "another, unknown, worker is
+            # responsible for it".
+            federation_sender_instances = []
 
-        federation_sender_instances = config.get("federation_sender_instances") or []
+            # If no federation sender instances are set we check if
+            # `send_federation` is set, which means use master
+            if send_federation:
+                federation_sender_instances = ["master"]
+
+            if self.worker_app == "synapse.app.federation_sender":
+                if send_federation:
+                    # If we're running federation senders, and not using
+                    # `federation_sender_instances`, then we should have
+                    # explicitly set `send_federation` to false.
+                    raise ConfigError(
+                        _FEDERATION_SENDER_WITH_SEND_FEDERATION_ENABLED_ERROR
+                    )
+
+                federation_sender_instances = [self.worker_name]
+
+        self.send_federation = self.instance_name in federation_sender_instances
         self.federation_shard_config = ShardedWorkerHandlingConfig(
             federation_sender_instances
         )
@@ -164,7 +213,37 @@ class WorkerConfig(Config):
                 "Must only specify one instance to handle `receipts` messages."
             )
 
-        self.events_shard_config = ShardedWorkerHandlingConfig(self.writers.events)
+        if len(self.writers.events) == 0:
+            raise ConfigError("Must specify at least one instance to handle `events`.")
+
+        self.events_shard_config = RoutableShardedWorkerHandlingConfig(
+            self.writers.events
+        )
+
+        # Handle sharded push
+        start_pushers = config.get("start_pushers", True)
+        pusher_instances = config.get("pusher_instances")
+        if pusher_instances is None:
+            # Default to an empty list, which means "another, unknown, worker is
+            # responsible for it".
+            pusher_instances = []
+
+            # If no pushers instances are set we check if `start_pushers` is
+            # set, which means use master
+            if start_pushers:
+                pusher_instances = ["master"]
+
+            if self.worker_app == "synapse.app.pusher":
+                if start_pushers:
+                    # If we're running pushers, and not using
+                    # `pusher_instances`, then we should have explicitly set
+                    # `start_pushers` to false.
+                    raise ConfigError(_PUSHER_WITH_START_PUSHERS_ENABLED_ERROR)
+
+                pusher_instances = [self.instance_name]
+
+        self.start_pushers = self.instance_name in pusher_instances
+        self.pusher_shard_config = ShardedWorkerHandlingConfig(pusher_instances)
 
         # Whether this worker should run background tasks or not.
         #
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 3936bf8784..21f14f05f0 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -59,7 +59,6 @@ class PusherPool:
     def __init__(self, hs: "HomeServer"):
         self.hs = hs
         self.pusher_factory = PusherFactory(hs)
-        self._should_start_pushers = hs.config.start_pushers
         self.store = self.hs.get_datastore()
         self.clock = self.hs.get_clock()
 
@@ -68,6 +67,9 @@ class PusherPool:
         # We shard the handling of push notifications by user ID.
         self._pusher_shard_config = hs.config.push.pusher_shard_config
         self._instance_name = hs.get_instance_name()
+        self._should_start_pushers = (
+            self._instance_name in self._pusher_shard_config.instances
+        )
 
         # We can only delete pushers on master.
         self._remove_pusher_client = None
diff --git a/synapse/server.py b/synapse/server.py
index 5de8782000..4b9ec7f0ae 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -248,7 +248,7 @@ class HomeServer(metaclass=abc.ABCMeta):
         self.start_time = None  # type: Optional[int]
 
         self._instance_id = random_string(5)
-        self._instance_name = config.worker_name or "master"
+        self._instance_name = config.worker.instance_name
 
         self.version_string = version_string
 
@@ -760,7 +760,4 @@ class HomeServer(metaclass=abc.ABCMeta):
 
     def should_send_federation(self) -> bool:
         "Should this server be sending federation traffic directly?"
-        return self.config.send_federation and (
-            not self.config.worker_app
-            or self.config.worker_app == "synapse.app.federation_sender"
-        )
+        return self.config.send_federation
diff --git a/tests/replication/tcp/streams/test_federation.py b/tests/replication/tcp/streams/test_federation.py
index 2babea4e3e..aa4bf1c7e3 100644
--- a/tests/replication/tcp/streams/test_federation.py
+++ b/tests/replication/tcp/streams/test_federation.py
@@ -24,7 +24,7 @@ class FederationStreamTestCase(BaseStreamTestCase):
         # enable federation sending on the worker
         config = super()._get_worker_hs_config()
         # TODO: make it so we don't need both of these
-        config["send_federation"] = True
+        config["send_federation"] = False
         config["worker_app"] = "synapse.app.federation_sender"
         return config
 
diff --git a/tests/replication/test_federation_ack.py b/tests/replication/test_federation_ack.py
index 1853667558..f235f1bd83 100644
--- a/tests/replication/test_federation_ack.py
+++ b/tests/replication/test_federation_ack.py
@@ -27,7 +27,7 @@ class FederationAckTestCase(HomeserverTestCase):
     def default_config(self) -> dict:
         config = super().default_config()
         config["worker_app"] = "synapse.app.federation_sender"
-        config["send_federation"] = True
+        config["send_federation"] = False
         return config
 
     def make_homeserver(self, reactor, clock):
diff --git a/tests/replication/test_federation_sender_shard.py b/tests/replication/test_federation_sender_shard.py
index fffdb742c8..2f2d117858 100644
--- a/tests/replication/test_federation_sender_shard.py
+++ b/tests/replication/test_federation_sender_shard.py
@@ -49,7 +49,7 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase):
 
         self.make_worker_hs(
             "synapse.app.federation_sender",
-            {"send_federation": True},
+            {"send_federation": False},
             federation_http_client=mock_client,
         )
 
diff --git a/tests/replication/test_pusher_shard.py b/tests/replication/test_pusher_shard.py
index f118fe32af..ab2988a6ba 100644
--- a/tests/replication/test_pusher_shard.py
+++ b/tests/replication/test_pusher_shard.py
@@ -95,7 +95,7 @@ class PusherShardTestCase(BaseMultiWorkerStreamTestCase):
 
         self.make_worker_hs(
             "synapse.app.pusher",
-            {"start_pushers": True},
+            {"start_pushers": False},
             proxied_blacklisted_http_client=http_client_mock,
         )