summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/7495.feature1
-rw-r--r--synapse/config/workers.py17
-rw-r--r--synapse/replication/http/_base.py21
3 files changed, 33 insertions, 6 deletions
diff --git a/changelog.d/7495.feature b/changelog.d/7495.feature
new file mode 100644
index 0000000000..1150e714bd
--- /dev/null
+++ b/changelog.d/7495.feature
@@ -0,0 +1 @@
+Add `instance_map` config and route replication calls.
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index fef72ed974..c80c338584 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -13,9 +13,20 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import attr
+
 from ._base import Config
 
 
+@attr.s
+class InstanceLocationConfig:
+    """The host and port to talk to an instance via HTTP replication.
+    """
+
+    host = attr.ib(type=str)
+    port = attr.ib(type=int)
+
+
 class WorkerConfig(Config):
     """The workers are processes run separately to the main synapse process.
     They have their own pid_file and listener configuration. They use the
@@ -71,6 +82,12 @@ class WorkerConfig(Config):
                 elif not bind_addresses:
                     bind_addresses.append("")
 
+        # A map from instance name to host/port of their HTTP replication endpoint.
+        instance_map = config.get("instance_map", {}) or {}
+        self.instance_map = {
+            name: InstanceLocationConfig(**c) for name, c in instance_map.items()
+        }
+
     def read_arguments(self, args):
         # We support a bunch of command line arguments that override options in
         # the config. A lot of these options have a worker_* prefix when running
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index f88c80ae84..c3136a4eb9 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -141,17 +141,26 @@ class ReplicationEndpoint(object):
         Returns a callable that accepts the same parameters as `_serialize_payload`.
         """
         clock = hs.get_clock()
-        host = hs.config.worker_replication_host
-        port = hs.config.worker_replication_http_port
-
         client = hs.get_simple_http_client()
 
+        master_host = hs.config.worker_replication_host
+        master_port = hs.config.worker_replication_http_port
+
+        instance_map = hs.config.worker.instance_map
+
         @trace(opname="outgoing_replication_request")
         @defer.inlineCallbacks
         def send_request(instance_name="master", **kwargs):
-            # Currently we only support sending requests to master process.
-            if instance_name != "master":
-                raise Exception("Unknown instance")
+            if instance_name == "master":
+                host = master_host
+                port = master_port
+            elif instance_name in instance_map:
+                host = instance_map[instance_name].host
+                port = instance_map[instance_name].port
+            else:
+                raise Exception(
+                    "Instance %r not in 'instance_map' config" % (instance_name,)
+                )
 
             data = yield cls._serialize_payload(**kwargs)