From e4f545c452df817daa2f22dfda906f3451d98351 Mon Sep 17 00:00:00 2001 From: Jason Little Date: Thu, 11 May 2023 05:30:56 -0500 Subject: Remove `worker_replication_*` settings (#15491) * Add master to the instance_map as part of Complement, have ReplicationEndpoint look at instance_map for master. * Fix typo in drive by. * Remove unnecessary worker_replication_* bits from unit tests and add master to instance_map(hopefully in the right place) * Several updates: 1. Switch from master to main for naming the main process in the instance_map. Add useful constants for easier adjustment of names in the future. 2. Add backwards compatibility for worker_replication_* to allow time to transition to new style. Make sure to prioritize declaring main directly on the instance_map. 3. Clean up old comments/commented out code. 4. Adjust unit tests to match with new code. 5. Adjust Complement setup infrastructure to only add main to the instance_map if workers are used and remove now unused options from the worker.yaml template. * Initial Docs upload * Changelog * Missed some commented out code that can go now * Remove TODO comment that no longer holds true. * Fix links in docs * More docs * Remove debug logging * Apply suggestions from code review Co-authored-by: reivilibre * Apply suggestions from code review Co-authored-by: reivilibre * Update version to latest, include completeish before/after examples in upgrade notes. * Fix up and docs too --------- Co-authored-by: reivilibre --- changelog.d/15491.misc | 1 + docker/conf-workers/worker.yaml.j2 | 4 -- docker/configure_workers_and_start.py | 15 ++++- .../workers/generic_worker.yaml | 4 -- docs/upgrade.md | 78 ++++++++++++++++++++++ docs/usage/configuration/config_documentation.md | 20 ++++-- docs/workers.md | 41 ++++++++---- synapse/config/workers.py | 78 +++++++++++++++++----- synapse/replication/http/_base.py | 16 ++--- tests/module_api/test_api.py | 1 + tests/replication/_base.py | 8 +-- tests/replication/test_auth.py | 3 - tests/replication/test_client_reader_shard.py | 2 - tests/replication/test_sharded_event_persister.py | 1 + 14 files changed, 206 insertions(+), 66 deletions(-) create mode 100644 changelog.d/15491.misc diff --git a/changelog.d/15491.misc b/changelog.d/15491.misc new file mode 100644 index 0000000000..98f88dbf19 --- /dev/null +++ b/changelog.d/15491.misc @@ -0,0 +1 @@ +Remove need for `worker_replication_*` based settings in worker configuration yaml by placing this data directly on the `instance_map` instead. diff --git a/docker/conf-workers/worker.yaml.j2 b/docker/conf-workers/worker.yaml.j2 index 42131afc95..44c6e413cf 100644 --- a/docker/conf-workers/worker.yaml.j2 +++ b/docker/conf-workers/worker.yaml.j2 @@ -6,10 +6,6 @@ worker_app: "{{ app }}" worker_name: "{{ name }}" -# The replication listener on the main synapse process. -worker_replication_host: 127.0.0.1 -worker_replication_http_port: 9093 - worker_listeners: - type: http port: {{ port }} diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py index 4beec3daaf..79b5b87397 100755 --- a/docker/configure_workers_and_start.py +++ b/docker/configure_workers_and_start.py @@ -69,6 +69,9 @@ import yaml from jinja2 import Environment, FileSystemLoader MAIN_PROCESS_HTTP_LISTENER_PORT = 8080 +MAIN_PROCESS_INSTANCE_NAME = "main" +MAIN_PROCESS_LOCALHOST_ADDRESS = "127.0.0.1" +MAIN_PROCESS_REPLICATION_PORT = 9093 # A simple name used as a placeholder in the WORKERS_CONFIG below. This will be replaced # during processing with the name of the worker. @@ -719,8 +722,8 @@ def generate_worker_files( # shared config file. listeners = [ { - "port": 9093, - "bind_address": "127.0.0.1", + "port": MAIN_PROCESS_REPLICATION_PORT, + "bind_address": MAIN_PROCESS_LOCALHOST_ADDRESS, "type": "http", "resources": [{"names": ["replication"]}], } @@ -870,6 +873,14 @@ def generate_worker_files( workers_in_use = len(requested_worker_types) > 0 + # If there are workers, add the main process to the instance_map too. + if workers_in_use: + instance_map = shared_config.setdefault("instance_map", {}) + instance_map[MAIN_PROCESS_INSTANCE_NAME] = { + "host": MAIN_PROCESS_LOCALHOST_ADDRESS, + "port": MAIN_PROCESS_REPLICATION_PORT, + } + # Shared homeserver config convert( "/conf/shared.yaml.j2", diff --git a/docs/systemd-with-workers/workers/generic_worker.yaml b/docs/systemd-with-workers/workers/generic_worker.yaml index a858f99ed1..db6436ee6e 100644 --- a/docs/systemd-with-workers/workers/generic_worker.yaml +++ b/docs/systemd-with-workers/workers/generic_worker.yaml @@ -1,10 +1,6 @@ worker_app: synapse.app.generic_worker worker_name: generic_worker1 -# The replication listener on the main synapse process. -worker_replication_host: 127.0.0.1 -worker_replication_http_port: 9093 - worker_listeners: - type: http port: 8083 diff --git a/docs/upgrade.md b/docs/upgrade.md index 0886b03115..0625de8afb 100644 --- a/docs/upgrade.md +++ b/docs/upgrade.md @@ -88,6 +88,84 @@ process, for example: dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb ``` +# Upgrading to v1.84.0 + +## Deprecation of `worker_replication_*` configuration settings + +When using workers, +* `worker_replication_host` +* `worker_replication_http_port` +* `worker_replication_http_tls` + +can now be removed from individual worker YAML configuration ***if*** you add the main process to the `instance_map` in the shared YAML configuration, +using the name `main`. + +### Before: +Shared YAML +```yaml +instance_map: + generic_worker1: + host: localhost + port: 5678 + tls: false +``` +Worker YAML +```yaml +worker_app: synapse.app.generic_worker +worker_name: generic_worker1 + +worker_replication_host: localhost +worker_replication_http_port: 3456 +worker_replication_http_tls: false + +worker_listeners: + - type: http + port: 1234 + resources: + - names: [client, federation] + - type: http + port: 5678 + resources: + - names: [replication] + +worker_log_config: /etc/matrix-synapse/generic-worker-log.yaml +``` +### After: +Shared YAML +```yaml +instance_map: + main: + host: localhost + port: 3456 + tls: false + generic_worker1: + host: localhost + port: 5678 + tls: false +``` +Worker YAML +```yaml +worker_app: synapse.app.generic_worker +worker_name: generic_worker1 + +worker_listeners: + - type: http + port: 1234 + resources: + - names: [client, federation] + - type: http + port: 5678 + resources: + - names: [replication] + +worker_log_config: /etc/matrix-synapse/generic-worker-log.yaml + +``` +Notes: +* `tls` is optional but mirrors the functionality of `worker_replication_http_tls` + + + # Upgrading to v1.81.0 ## Application service path & authentication deprecations diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md index 6dd1a639ed..dc965b4119 100644 --- a/docs/usage/configuration/config_documentation.md +++ b/docs/usage/configuration/config_documentation.md @@ -3884,15 +3884,20 @@ federation_sender_instances: ### `instance_map` When using workers this should be a map from [`worker_name`](#worker_name) to the -HTTP replication listener of the worker, if configured. +HTTP replication listener of the worker, if configured, and to the main process. Each worker declared under [`stream_writers`](../../workers.md#stream-writers) needs a HTTP replication listener, and that listener should be included in the `instance_map`. -(The main process also needs an HTTP replication listener, but it should not be -listed in the `instance_map`.) +The main process also needs an entry on the `instance_map`, and it should be listed under +`main` **if even one other worker exists**. Ensure the port matches with what is declared +inside the `listener` block for a `replication` listener. + Example configuration: ```yaml instance_map: + main: + host: localhost + port: 8030 worker1: host: localhost port: 8034 @@ -4024,6 +4029,7 @@ worker_name: generic_worker1 ``` --- ### `worker_replication_host` +*Deprecated as of version 1.84.0. Place `host` under `main` entry on the [`instance_map`](#instance_map) in your shared yaml configuration instead.* The HTTP replication endpoint that it should talk to on the main Synapse process. The main Synapse process defines this with a `replication` resource in @@ -4035,6 +4041,7 @@ worker_replication_host: 127.0.0.1 ``` --- ### `worker_replication_http_port` +*Deprecated as of version 1.84.0. Place `port` under `main` entry on the [`instance_map`](#instance_map) in your shared yaml configuration instead.* The HTTP replication port that it should talk to on the main Synapse process. The main Synapse process defines this with a `replication` resource in @@ -4046,6 +4053,7 @@ worker_replication_http_port: 9093 ``` --- ### `worker_replication_http_tls` +*Deprecated as of version 1.84.0. Place `tls` under `main` entry on the [`instance_map`](#instance_map) in your shared yaml configuration instead.* Whether TLS should be used for talking to the HTTP replication port on the main Synapse process. @@ -4071,9 +4079,9 @@ A worker can handle HTTP requests. To do so, a `worker_listeners` option must be declared, in the same way as the [`listeners` option](#listeners) in the shared config. -Workers declared in [`stream_writers`](#stream_writers) will need to include a -`replication` listener here, in order to accept internal HTTP requests from -other workers. +Workers declared in [`stream_writers`](#stream_writers) and [`instance_map`](#instance_map) + will need to include a `replication` listener here, in order to accept internal HTTP +requests from other workers. Example configuration: ```yaml diff --git a/docs/workers.md b/docs/workers.md index 765f03c263..991814c0bc 100644 --- a/docs/workers.md +++ b/docs/workers.md @@ -87,12 +87,18 @@ shared configuration file. ### Shared configuration -Normally, only a couple of changes are needed to make an existing configuration -file suitable for use with workers. First, you need to enable an +Normally, only a few changes are needed to make an existing configuration +file suitable for use with workers: +* First, you need to enable an ["HTTP replication listener"](usage/configuration/config_documentation.md#listeners) -for the main process; and secondly, you need to enable -[redis-based replication](usage/configuration/config_documentation.md#redis). -Optionally, a [shared secret](usage/configuration/config_documentation.md#worker_replication_secret) +for the main process +* Secondly, you need to enable +[redis-based replication](usage/configuration/config_documentation.md#redis) +* You will need to add an [`instance_map`](usage/configuration/config_documentation.md#instance_map) +with the `main` process defined, as well as the relevant connection information from +it's HTTP `replication` listener (defined in step 1 above). Note that the `host` defined +is the address the worker needs to look for the `main` process at, not necessarily the same address that is bound to. +* Optionally, a [shared secret](usage/configuration/config_documentation.md#worker_replication_secret) can be used to authenticate HTTP traffic between workers. For example: ```yaml @@ -111,6 +117,11 @@ worker_replication_secret: "" redis: enabled: true + +instance_map: + main: + host: 'localhost' + port: 9093 ``` See the [configuration manual](usage/configuration/config_documentation.md) @@ -130,13 +141,13 @@ In the config file for each worker, you must specify: * The type of worker ([`worker_app`](usage/configuration/config_documentation.md#worker_app)). The currently available worker applications are listed [below](#available-worker-applications). * A unique name for the worker ([`worker_name`](usage/configuration/config_documentation.md#worker_name)). - * The HTTP replication endpoint that it should talk to on the main synapse process - ([`worker_replication_host`](usage/configuration/config_documentation.md#worker_replication_host) and - [`worker_replication_http_port`](usage/configuration/config_documentation.md#worker_replication_http_port)). * If handling HTTP requests, a [`worker_listeners`](usage/configuration/config_documentation.md#worker_listeners) option with an `http` listener. * **Synapse 1.72 and older:** if handling the `^/_matrix/client/v3/keys/upload` endpoint, the HTTP URI for the main process (`worker_main_http_uri`). This config option is no longer required and is ignored when running Synapse 1.73 and newer. + * **Synapse 1.83 and older:** The HTTP replication endpoint that the worker should talk to on the main synapse process + ([`worker_replication_host`](usage/configuration/config_documentation.md#worker_replication_host) and + [`worker_replication_http_port`](usage/configuration/config_documentation.md#worker_replication_http_port)). If using Synapse 1.84 and newer, these are not needed if `main` is defined on the [shared configuration](#shared-configuration) `instance_map` For example: @@ -417,11 +428,14 @@ effects of bursts of events from that bridge on events sent by normal users. Additionally, the writing of specific streams (such as events) can be moved off of the main process to a particular worker. -To enable this, the worker must have a -[HTTP `replication` listener](usage/configuration/config_documentation.md#listeners) configured, -have a [`worker_name`](usage/configuration/config_documentation.md#worker_name) +To enable this, the worker must have: +* An [HTTP `replication` listener](usage/configuration/config_documentation.md#listeners) configured, +* Have a [`worker_name`](usage/configuration/config_documentation.md#worker_name) and be listed in the [`instance_map`](usage/configuration/config_documentation.md#instance_map) -config. The same worker can handle multiple streams, but unless otherwise documented, +config. +* Have the main process declared on the [`instance_map`](usage/configuration/config_documentation.md#instance_map) as well. + +Note: The same worker can handle multiple streams, but unless otherwise documented, each stream can only have a single writer. For example, to move event persistence off to a dedicated worker, the shared @@ -429,6 +443,9 @@ configuration would include: ```yaml instance_map: + main: + host: localhost + port: 8030 event_persister1: host: localhost port: 8034 diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 95b4047f1d..d2311cc857 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -39,6 +39,19 @@ The '%s' configuration option is deprecated and will be removed in a future Synapse version. Please use ``%s: name_of_worker`` instead. """ +_MISSING_MAIN_PROCESS_INSTANCE_MAP_DATA = """ +Missing data for a worker to connect to main process. Please include '%s' in the +`instance_map` declared in your shared yaml configuration, or optionally(as a deprecated +solution) in every worker's yaml as various `worker_replication_*` settings as defined +in workers documentation here: +`https://matrix-org.github.io/synapse/latest/workers.html#worker-configuration` +""" +# This allows for a handy knob when it's time to change from 'master' to +# something with less 'history' +MAIN_PROCESS_INSTANCE_NAME = "master" +# Use this to adjust what the main process is known as in the yaml instance_map +MAIN_PROCESS_INSTANCE_MAP_NAME = "main" + logger = logging.getLogger(__name__) @@ -161,27 +174,15 @@ class WorkerConfig(Config): raise ConfigError("worker_log_config must be a string") self.worker_log_config = worker_log_config - # The host used to connect to the main synapse - self.worker_replication_host = config.get("worker_replication_host", None) - # The port on the main synapse for TCP replication if "worker_replication_port" in config: raise ConfigError(DIRECT_TCP_ERROR, ("worker_replication_port",)) - # The port on the main synapse for HTTP replication endpoint - self.worker_replication_http_port = config.get("worker_replication_http_port") - - # The tls mode on the main synapse for HTTP replication endpoint. - # For backward compatibility this defaults to False. - self.worker_replication_http_tls = config.get( - "worker_replication_http_tls", False - ) - # The shared secret used for authentication when connecting to the main synapse. self.worker_replication_secret = config.get("worker_replication_secret", None) self.worker_name = config.get("worker_name", self.worker_app) - self.instance_name = self.worker_name or "master" + self.instance_name = self.worker_name or MAIN_PROCESS_INSTANCE_NAME # FIXME: Remove this check after a suitable amount of time. self.worker_main_http_uri = config.get("worker_main_http_uri", None) @@ -215,12 +216,55 @@ class WorkerConfig(Config): ) # A map from instance name to host/port of their HTTP replication endpoint. + # Check if the main process is declared. Inject it into the map if it's not, + # based first on if a 'main' block is declared then on 'worker_replication_*' + # data. If both are available, default to instance_map. The main process + # itself doesn't need this data as it would never have to talk to itself. + instance_map: Dict[str, Any] = config.get("instance_map", {}) + + if instance_map and self.instance_name is not MAIN_PROCESS_INSTANCE_NAME: + # The host used to connect to the main synapse + main_host = config.get("worker_replication_host", None) + + # The port on the main synapse for HTTP replication endpoint + main_port = config.get("worker_replication_http_port") + + # The tls mode on the main synapse for HTTP replication endpoint. + # For backward compatibility this defaults to False. + main_tls = config.get("worker_replication_http_tls", False) + + # For now, accept 'main' in the instance_map, but the replication system + # expects 'master', force that into being until it's changed later. + if MAIN_PROCESS_INSTANCE_MAP_NAME in instance_map: + instance_map[MAIN_PROCESS_INSTANCE_NAME] = instance_map[ + MAIN_PROCESS_INSTANCE_MAP_NAME + ] + del instance_map[MAIN_PROCESS_INSTANCE_MAP_NAME] + + # This is the backwards compatibility bit that handles the + # worker_replication_* bits using setdefault() to not overwrite anything. + elif main_host is not None and main_port is not None: + instance_map.setdefault( + MAIN_PROCESS_INSTANCE_NAME, + { + "host": main_host, + "port": main_port, + "tls": main_tls, + }, + ) + + else: + # If we've gotten here, it means that the main process is not on the + # instance_map and that not enough worker_replication_* variables + # were declared in the worker's yaml. + raise ConfigError( + _MISSING_MAIN_PROCESS_INSTANCE_MAP_DATA + % MAIN_PROCESS_INSTANCE_MAP_NAME + ) + self.instance_map: Dict[ str, InstanceLocationConfig - ] = parse_and_validate_mapping( - config.get("instance_map", {}), - InstanceLocationConfig, - ) + ] = parse_and_validate_mapping(instance_map, InstanceLocationConfig) # Map from type of streams to source, c.f. WriterLocations. writers = config.get("stream_writers") or {} diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py index 23129962e9..dc7820f963 100644 --- a/synapse/replication/http/_base.py +++ b/synapse/replication/http/_base.py @@ -25,6 +25,7 @@ from twisted.internet.error import ConnectError, DNSLookupError from twisted.web.server import Request from synapse.api.errors import HttpResponseException, SynapseError +from synapse.config.workers import MAIN_PROCESS_INSTANCE_NAME from synapse.http import RequestTimedOutError from synapse.http.server import HttpServer from synapse.http.servlet import parse_json_object_from_request @@ -197,11 +198,6 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta): client = hs.get_replication_client() local_instance_name = hs.get_instance_name() - # The value of these option should match the replication listener settings - master_host = hs.config.worker.worker_replication_host - master_port = hs.config.worker.worker_replication_http_port - master_tls = hs.config.worker.worker_replication_http_tls - instance_map = hs.config.worker.instance_map outgoing_gauge = _pending_outgoing_requests.labels(cls.NAME) @@ -213,7 +209,9 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta): ) @trace_with_opname("outgoing_replication_request") - async def send_request(*, instance_name: str = "master", **kwargs: Any) -> Any: + async def send_request( + *, instance_name: str = MAIN_PROCESS_INSTANCE_NAME, **kwargs: Any + ) -> Any: # We have to pull these out here to avoid circular dependencies... streams = hs.get_replication_command_handler().get_streams_to_replicate() replication = hs.get_replication_data_handler() @@ -221,11 +219,7 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta): with outgoing_gauge.track_inprogress(): if instance_name == local_instance_name: raise Exception("Trying to send HTTP request to self") - if instance_name == "master": - host = master_host - port = master_port - tls = master_tls - elif instance_name in instance_map: + if instance_name in instance_map: host = instance_map[instance_name].host port = instance_map[instance_name].port tls = instance_map[instance_name].tls diff --git a/tests/module_api/test_api.py b/tests/module_api/test_api.py index 758b4bc38b..bff7114cd8 100644 --- a/tests/module_api/test_api.py +++ b/tests/module_api/test_api.py @@ -837,6 +837,7 @@ class ModuleApiWorkerTestCase(BaseModuleApiTestCase, BaseMultiWorkerStreamTestCa conf = super().default_config() conf["stream_writers"] = {"presence": ["presence_writer"]} conf["instance_map"] = { + "main": {"host": "testserv", "port": 8765}, "presence_writer": {"host": "testserv", "port": 1001}, } return conf diff --git a/tests/replication/_base.py b/tests/replication/_base.py index 0f1a8a145f..eb9b1f1cd9 100644 --- a/tests/replication/_base.py +++ b/tests/replication/_base.py @@ -110,8 +110,7 @@ class BaseStreamTestCase(unittest.HomeserverTestCase): def _get_worker_hs_config(self) -> dict: config = self.default_config() config["worker_app"] = "synapse.app.generic_worker" - config["worker_replication_host"] = "testserv" - config["worker_replication_http_port"] = "8765" + config["instance_map"] = {"main": {"host": "testserv", "port": 8765}} return config def _build_replication_data_handler(self) -> "TestReplicationDataHandler": @@ -249,6 +248,7 @@ class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase): """ base = super().default_config() base["redis"] = {"enabled": True} + base["instance_map"] = {"main": {"host": "testserv", "port": 8765}} return base def setUp(self) -> None: @@ -310,7 +310,7 @@ class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase): def make_worker_hs( self, worker_app: str, extra_config: Optional[dict] = None, **kwargs: Any ) -> HomeServer: - """Make a new worker HS instance, correctly connecting replcation + """Make a new worker HS instance, correctly connecting replication stream to the master HS. Args: @@ -388,8 +388,6 @@ class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase): def _get_worker_hs_config(self) -> dict: config = self.default_config() - config["worker_replication_host"] = "testserv" - config["worker_replication_http_port"] = "8765" return config def replicate(self) -> None: diff --git a/tests/replication/test_auth.py b/tests/replication/test_auth.py index 98602371e4..f7bca0063d 100644 --- a/tests/replication/test_auth.py +++ b/tests/replication/test_auth.py @@ -43,9 +43,6 @@ class WorkerAuthenticationTestCase(BaseMultiWorkerStreamTestCase): def _get_worker_hs_config(self) -> dict: config = self.default_config() config["worker_app"] = "synapse.app.generic_worker" - config["worker_replication_host"] = "testserv" - config["worker_replication_http_port"] = "8765" - return config def _test_register(self) -> FakeChannel: diff --git a/tests/replication/test_client_reader_shard.py b/tests/replication/test_client_reader_shard.py index eca5033761..a18859099f 100644 --- a/tests/replication/test_client_reader_shard.py +++ b/tests/replication/test_client_reader_shard.py @@ -29,8 +29,6 @@ class ClientReaderTestCase(BaseMultiWorkerStreamTestCase): def _get_worker_hs_config(self) -> dict: config = self.default_config() config["worker_app"] = "synapse.app.generic_worker" - config["worker_replication_host"] = "testserv" - config["worker_replication_http_port"] = "8765" return config def test_register_single_worker(self) -> None: diff --git a/tests/replication/test_sharded_event_persister.py b/tests/replication/test_sharded_event_persister.py index 7f9cc67e73..4623d737fb 100644 --- a/tests/replication/test_sharded_event_persister.py +++ b/tests/replication/test_sharded_event_persister.py @@ -50,6 +50,7 @@ class EventPersisterShardTestCase(BaseMultiWorkerStreamTestCase): conf = super().default_config() conf["stream_writers"] = {"events": ["worker1", "worker2"]} conf["instance_map"] = { + "main": {"host": "testserv", "port": 8765}, "worker1": {"host": "testserv", "port": 1001}, "worker2": {"host": "testserv", "port": 1002}, } -- cgit 1.4.1