diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/config/workers.py | 78 | ||||
-rw-r--r-- | synapse/replication/http/_base.py | 16 |
2 files changed, 66 insertions, 28 deletions
diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 95b4047f1d..d2311cc857 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -39,6 +39,19 @@ The '%s' configuration option is deprecated and will be removed in a future Synapse version. Please use ``%s: name_of_worker`` instead. """ +_MISSING_MAIN_PROCESS_INSTANCE_MAP_DATA = """ +Missing data for a worker to connect to main process. Please include '%s' in the +`instance_map` declared in your shared yaml configuration, or optionally(as a deprecated +solution) in every worker's yaml as various `worker_replication_*` settings as defined +in workers documentation here: +`https://matrix-org.github.io/synapse/latest/workers.html#worker-configuration` +""" +# This allows for a handy knob when it's time to change from 'master' to +# something with less 'history' +MAIN_PROCESS_INSTANCE_NAME = "master" +# Use this to adjust what the main process is known as in the yaml instance_map +MAIN_PROCESS_INSTANCE_MAP_NAME = "main" + logger = logging.getLogger(__name__) @@ -161,27 +174,15 @@ class WorkerConfig(Config): raise ConfigError("worker_log_config must be a string") self.worker_log_config = worker_log_config - # The host used to connect to the main synapse - self.worker_replication_host = config.get("worker_replication_host", None) - # The port on the main synapse for TCP replication if "worker_replication_port" in config: raise ConfigError(DIRECT_TCP_ERROR, ("worker_replication_port",)) - # The port on the main synapse for HTTP replication endpoint - self.worker_replication_http_port = config.get("worker_replication_http_port") - - # The tls mode on the main synapse for HTTP replication endpoint. - # For backward compatibility this defaults to False. - self.worker_replication_http_tls = config.get( - "worker_replication_http_tls", False - ) - # The shared secret used for authentication when connecting to the main synapse. self.worker_replication_secret = config.get("worker_replication_secret", None) self.worker_name = config.get("worker_name", self.worker_app) - self.instance_name = self.worker_name or "master" + self.instance_name = self.worker_name or MAIN_PROCESS_INSTANCE_NAME # FIXME: Remove this check after a suitable amount of time. self.worker_main_http_uri = config.get("worker_main_http_uri", None) @@ -215,12 +216,55 @@ class WorkerConfig(Config): ) # A map from instance name to host/port of their HTTP replication endpoint. + # Check if the main process is declared. Inject it into the map if it's not, + # based first on if a 'main' block is declared then on 'worker_replication_*' + # data. If both are available, default to instance_map. The main process + # itself doesn't need this data as it would never have to talk to itself. + instance_map: Dict[str, Any] = config.get("instance_map", {}) + + if instance_map and self.instance_name is not MAIN_PROCESS_INSTANCE_NAME: + # The host used to connect to the main synapse + main_host = config.get("worker_replication_host", None) + + # The port on the main synapse for HTTP replication endpoint + main_port = config.get("worker_replication_http_port") + + # The tls mode on the main synapse for HTTP replication endpoint. + # For backward compatibility this defaults to False. + main_tls = config.get("worker_replication_http_tls", False) + + # For now, accept 'main' in the instance_map, but the replication system + # expects 'master', force that into being until it's changed later. + if MAIN_PROCESS_INSTANCE_MAP_NAME in instance_map: + instance_map[MAIN_PROCESS_INSTANCE_NAME] = instance_map[ + MAIN_PROCESS_INSTANCE_MAP_NAME + ] + del instance_map[MAIN_PROCESS_INSTANCE_MAP_NAME] + + # This is the backwards compatibility bit that handles the + # worker_replication_* bits using setdefault() to not overwrite anything. + elif main_host is not None and main_port is not None: + instance_map.setdefault( + MAIN_PROCESS_INSTANCE_NAME, + { + "host": main_host, + "port": main_port, + "tls": main_tls, + }, + ) + + else: + # If we've gotten here, it means that the main process is not on the + # instance_map and that not enough worker_replication_* variables + # were declared in the worker's yaml. + raise ConfigError( + _MISSING_MAIN_PROCESS_INSTANCE_MAP_DATA + % MAIN_PROCESS_INSTANCE_MAP_NAME + ) + self.instance_map: Dict[ str, InstanceLocationConfig - ] = parse_and_validate_mapping( - config.get("instance_map", {}), - InstanceLocationConfig, - ) + ] = parse_and_validate_mapping(instance_map, InstanceLocationConfig) # Map from type of streams to source, c.f. WriterLocations. writers = config.get("stream_writers") or {} diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py index 23129962e9..dc7820f963 100644 --- a/synapse/replication/http/_base.py +++ b/synapse/replication/http/_base.py @@ -25,6 +25,7 @@ from twisted.internet.error import ConnectError, DNSLookupError from twisted.web.server import Request from synapse.api.errors import HttpResponseException, SynapseError +from synapse.config.workers import MAIN_PROCESS_INSTANCE_NAME from synapse.http import RequestTimedOutError from synapse.http.server import HttpServer from synapse.http.servlet import parse_json_object_from_request @@ -197,11 +198,6 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta): client = hs.get_replication_client() local_instance_name = hs.get_instance_name() - # The value of these option should match the replication listener settings - master_host = hs.config.worker.worker_replication_host - master_port = hs.config.worker.worker_replication_http_port - master_tls = hs.config.worker.worker_replication_http_tls - instance_map = hs.config.worker.instance_map outgoing_gauge = _pending_outgoing_requests.labels(cls.NAME) @@ -213,7 +209,9 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta): ) @trace_with_opname("outgoing_replication_request") - async def send_request(*, instance_name: str = "master", **kwargs: Any) -> Any: + async def send_request( + *, instance_name: str = MAIN_PROCESS_INSTANCE_NAME, **kwargs: Any + ) -> Any: # We have to pull these out here to avoid circular dependencies... streams = hs.get_replication_command_handler().get_streams_to_replicate() replication = hs.get_replication_data_handler() @@ -221,11 +219,7 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta): with outgoing_gauge.track_inprogress(): if instance_name == local_instance_name: raise Exception("Trying to send HTTP request to self") - if instance_name == "master": - host = master_host - port = master_port - tls = master_tls - elif instance_name in instance_map: + if instance_name in instance_map: host = instance_map[instance_name].host port = instance_map[instance_name].port tls = instance_map[instance_name].tls |