summary refs log tree commit diff
path: root/synapse/replication/http/_base.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/http/_base.py')
-rw-r--r--synapse/replication/http/_base.py37
1 files changed, 33 insertions, 4 deletions
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py

index 1be1ccbdf3..793cef6c26 100644 --- a/synapse/replication/http/_base.py +++ b/synapse/replication/http/_base.py
@@ -16,6 +16,7 @@ import abc import logging import re +from inspect import signature from typing import Dict, List, Tuple from six import raise_from @@ -60,6 +61,8 @@ class ReplicationEndpoint(object): must call `register` to register the path with the HTTP server. Requests can be sent by calling the client returned by `make_client`. + Requests are sent to master process by default, but can be sent to other + named processes by specifying an `instance_name` keyword argument. Attributes: NAME (str): A name for the endpoint, added to the path as well as used @@ -91,6 +94,16 @@ class ReplicationEndpoint(object): hs, "repl." + self.NAME, timeout_ms=30 * 60 * 1000 ) + # We reserve `instance_name` as a parameter to sending requests, so we + # assert here that sub classes don't try and use the name. + assert ( + "instance_name" not in self.PATH_ARGS + ), "`instance_name` is a reserved paramater name" + assert ( + "instance_name" + not in signature(self.__class__._serialize_payload).parameters + ), "`instance_name` is a reserved paramater name" + assert self.METHOD in ("PUT", "POST", "GET") @abc.abstractmethod @@ -128,14 +141,30 @@ class ReplicationEndpoint(object): Returns a callable that accepts the same parameters as `_serialize_payload`. """ clock = hs.get_clock() - host = hs.config.worker_replication_host - port = hs.config.worker_replication_http_port - client = hs.get_simple_http_client() + local_instance_name = hs.get_instance_name() + + master_host = hs.config.worker_replication_host + master_port = hs.config.worker_replication_http_port + + instance_map = hs.config.worker.instance_map @trace(opname="outgoing_replication_request") @defer.inlineCallbacks - def send_request(**kwargs): + def send_request(instance_name="master", **kwargs): + if instance_name == local_instance_name: + raise Exception("Trying to send HTTP request to self") + if instance_name == "master": + host = master_host + port = master_port + elif instance_name in instance_map: + host = instance_map[instance_name].host + port = instance_map[instance_name].port + else: + raise Exception( + "Instance %r not in 'instance_map' config" % (instance_name,) + ) + data = yield cls._serialize_payload(**kwargs) url_args = [