summary refs log tree commit diff
path: root/synapse/config/workers.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/config/workers.py')
-rw-r--r--synapse/config/workers.py88
1 files changed, 63 insertions, 25 deletions
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index ed06b91a54..c784a71508 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -15,7 +15,8 @@
 
 import attr
 
-from ._base import Config, ConfigError
+from ._base import Config, ConfigError, ShardedWorkerHandlingConfig
+from .server import ListenerConfig, parse_listener_def
 
 
 @attr.s
@@ -33,9 +34,11 @@ class WriterLocations:
 
     Attributes:
         events: The instance that writes to the event and backfill streams.
+        events: The instance that writes to the typing stream.
     """
 
     events = attr.ib(default="master", type=str)
+    typing = attr.ib(default="master", type=str)
 
 
 class WorkerConfig(Config):
@@ -52,7 +55,9 @@ class WorkerConfig(Config):
         if self.worker_app == "synapse.app.homeserver":
             self.worker_app = None
 
-        self.worker_listeners = config.get("worker_listeners", [])
+        self.worker_listeners = [
+            parse_listener_def(x) for x in config.get("worker_listeners", [])
+        ]
         self.worker_daemonize = config.get("worker_daemonize")
         self.worker_pid_file = config.get("worker_pid_file")
         self.worker_log_config = config.get("worker_log_config")
@@ -75,23 +80,20 @@ class WorkerConfig(Config):
         manhole = config.get("worker_manhole")
         if manhole:
             self.worker_listeners.append(
-                {
-                    "port": manhole,
-                    "bind_addresses": ["127.0.0.1"],
-                    "type": "manhole",
-                    "tls": False,
-                }
+                ListenerConfig(
+                    port=manhole, bind_addresses=["127.0.0.1"], type="manhole",
+                )
             )
 
-        if self.worker_listeners:
-            for listener in self.worker_listeners:
-                bind_address = listener.pop("bind_address", None)
-                bind_addresses = listener.setdefault("bind_addresses", [])
+        # Whether to send federation traffic out in this process. This only
+        # applies to some federation traffic, and so shouldn't be used to
+        # "disable" federation
+        self.send_federation = config.get("send_federation", True)
 
-                if bind_address:
-                    bind_addresses.append(bind_address)
-                elif not bind_addresses:
-                    bind_addresses.append("")
+        federation_sender_instances = config.get("federation_sender_instances") or []
+        self.federation_shard_config = ShardedWorkerHandlingConfig(
+            federation_sender_instances
+        )
 
         # A map from instance name to host/port of their HTTP replication endpoint.
         instance_map = config.get("instance_map") or {}
@@ -103,16 +105,52 @@ class WorkerConfig(Config):
         writers = config.get("stream_writers") or {}
         self.writers = WriterLocations(**writers)
 
-        # Check that the configured writer for events also appears in
+        # Check that the configured writer for events and typing also appears in
         # `instance_map`.
-        if (
-            self.writers.events != "master"
-            and self.writers.events not in self.instance_map
-        ):
-            raise ConfigError(
-                "Instance %r is configured to write events but does not appear in `instance_map` config."
-                % (self.writers.events,)
-            )
+        for stream in ("events", "typing"):
+            instance = getattr(self.writers, stream)
+            if instance != "master" and instance not in self.instance_map:
+                raise ConfigError(
+                    "Instance %r is configured to write %s but does not appear in `instance_map` config."
+                    % (instance, stream)
+                )
+
+    def generate_config_section(self, config_dir_path, server_name, **kwargs):
+        return """\
+        ## Workers ##
+
+        # Disables sending of outbound federation transactions on the main process.
+        # Uncomment if using a federation sender worker.
+        #
+        #send_federation: false
+
+        # It is possible to run multiple federation sender workers, in which case the
+        # work is balanced across them.
+        #
+        # This configuration must be shared between all federation sender workers, and if
+        # changed all federation sender workers must be stopped at the same time and then
+        # started, to ensure that all instances are running with the same config (otherwise
+        # events may be dropped).
+        #
+        #federation_sender_instances:
+        #  - federation_sender1
+
+        # When using workers this should be a map from `worker_name` to the
+        # HTTP replication listener of the worker, if configured.
+        #
+        #instance_map:
+        #  worker1:
+        #    host: localhost
+        #    port: 8034
+
+        # Experimental: When using workers you can define which workers should
+        # handle event persistence and typing notifications. Any worker
+        # specified here must also be in the `instance_map`.
+        #
+        #stream_writers:
+        #  events: worker1
+        #  typing: worker1
+        """
 
     def read_arguments(self, args):
         # We support a bunch of command line arguments that override options in