summary refs log tree commit diff
path: root/synapse/config/workers.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/config/workers.py')
-rw-r--r--synapse/config/workers.py30
1 files changed, 28 insertions, 2 deletions
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index c80c338584..ed06b91a54 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -15,7 +15,7 @@
 
 import attr
 
-from ._base import Config
+from ._base import Config, ConfigError
 
 
 @attr.s
@@ -27,6 +27,17 @@ class InstanceLocationConfig:
     port = attr.ib(type=int)
 
 
+@attr.s
+class WriterLocations:
+    """Specifies the instances that write various streams.
+
+    Attributes:
+        events: The instance that writes to the event and backfill streams.
+    """
+
+    events = attr.ib(default="master", type=str)
+
+
 class WorkerConfig(Config):
     """The workers are processes run separately to the main synapse process.
     They have their own pid_file and listener configuration. They use the
@@ -83,11 +94,26 @@ class WorkerConfig(Config):
                     bind_addresses.append("")
 
         # A map from instance name to host/port of their HTTP replication endpoint.
-        instance_map = config.get("instance_map", {}) or {}
+        instance_map = config.get("instance_map") or {}
         self.instance_map = {
             name: InstanceLocationConfig(**c) for name, c in instance_map.items()
         }
 
+        # Map from type of streams to source, c.f. WriterLocations.
+        writers = config.get("stream_writers") or {}
+        self.writers = WriterLocations(**writers)
+
+        # Check that the configured writer for events also appears in
+        # `instance_map`.
+        if (
+            self.writers.events != "master"
+            and self.writers.events not in self.instance_map
+        ):
+            raise ConfigError(
+                "Instance %r is configured to write events but does not appear in `instance_map` config."
+                % (self.writers.events,)
+            )
+
     def read_arguments(self, args):
         # We support a bunch of command line arguments that override options in
         # the config. A lot of these options have a worker_* prefix when running