Add `instance_map` config and route replication calls (#7495)
3 files changed, 33 insertions, 6 deletions
diff --git a/changelog.d/7495.feature b/changelog.d/7495.feature
new file mode 100644
index 0000000000..1150e714bd
--- /dev/null
+++ b/changelog.d/7495.feature
@@ -0,0 +1 @@
+Add `instance_map` config and route replication calls.
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index fef72ed974..c80c338584 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -13,9 +13,20 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import attr
+
from ._base import Config
+@attr.s
+class InstanceLocationConfig:
+ """The host and port to talk to an instance via HTTP replication.
+ """
+
+ host = attr.ib(type=str)
+ port = attr.ib(type=int)
+
+
class WorkerConfig(Config):
"""The workers are processes run separately to the main synapse process.
They have their own pid_file and listener configuration. They use the
@@ -71,6 +82,12 @@ class WorkerConfig(Config):
elif not bind_addresses:
bind_addresses.append("")
+ # A map from instance name to host/port of their HTTP replication endpoint.
+ instance_map = config.get("instance_map", {}) or {}
+ self.instance_map = {
+ name: InstanceLocationConfig(**c) for name, c in instance_map.items()
+ }
+
def read_arguments(self, args):
# We support a bunch of command line arguments that override options in
# the config. A lot of these options have a worker_* prefix when running
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index f88c80ae84..c3136a4eb9 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -141,17 +141,26 @@ class ReplicationEndpoint(object):
Returns a callable that accepts the same parameters as `_serialize_payload`.
"""
clock = hs.get_clock()
- host = hs.config.worker_replication_host
- port = hs.config.worker_replication_http_port
-
client = hs.get_simple_http_client()
+ master_host = hs.config.worker_replication_host
+ master_port = hs.config.worker_replication_http_port
+
+ instance_map = hs.config.worker.instance_map
+
@trace(opname="outgoing_replication_request")
@defer.inlineCallbacks
def send_request(instance_name="master", **kwargs):
- # Currently we only support sending requests to master process.
- if instance_name != "master":
- raise Exception("Unknown instance")
+ if instance_name == "master":
+ host = master_host
+ port = master_port
+ elif instance_name in instance_map:
+ host = instance_map[instance_name].host
+ port = instance_map[instance_name].port
+ else:
+ raise Exception(
+ "Instance %r not in 'instance_map' config" % (instance_name,)
+ )
data = yield cls._serialize_payload(**kwargs)
|