summary refs log tree commit diff
path: root/docker/configure_workers_and_start.py
diff options
context:
space:
mode:
Diffstat (limited to 'docker/configure_workers_and_start.py')
-rwxr-xr-xdocker/configure_workers_and_start.py558
1 files changed, 558 insertions, 0 deletions
diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py
new file mode 100755
index 0000000000..4be6afc65d
--- /dev/null
+++ b/docker/configure_workers_and_start.py
@@ -0,0 +1,558 @@
+#!/usr/bin/env python
+# Copyright 2021 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This script reads environment variables and generates a shared Synapse worker,
+# nginx and supervisord configs depending on the workers requested.
+#
+# The environment variables it reads are:
+#   * SYNAPSE_SERVER_NAME: The desired server_name of the homeserver.
+#   * SYNAPSE_REPORT_STATS: Whether to report stats.
+#   * SYNAPSE_WORKER_TYPES: A comma separated list of worker names as specified in WORKER_CONFIG
+#         below. Leave empty for no workers, or set to '*' for all possible workers.
+#
+# NOTE: According to Complement's ENTRYPOINT expectations for a homeserver image (as defined
+# in the project's README), this script may be run multiple times, and functionality should
+# continue to work if so.
+
+import os
+import subprocess
+import sys
+
+import jinja2
+import yaml
+
+MAIN_PROCESS_HTTP_LISTENER_PORT = 8080
+
+
+WORKERS_CONFIG = {
+    "pusher": {
+        "app": "synapse.app.pusher",
+        "listener_resources": [],
+        "endpoint_patterns": [],
+        "shared_extra_conf": {"start_pushers": False},
+        "worker_extra_conf": "",
+    },
+    "user_dir": {
+        "app": "synapse.app.user_dir",
+        "listener_resources": ["client"],
+        "endpoint_patterns": [
+            "^/_matrix/client/(api/v1|r0|unstable)/user_directory/search$"
+        ],
+        "shared_extra_conf": {"update_user_directory": False},
+        "worker_extra_conf": "",
+    },
+    "media_repository": {
+        "app": "synapse.app.media_repository",
+        "listener_resources": ["media"],
+        "endpoint_patterns": [
+            "^/_matrix/media/",
+            "^/_synapse/admin/v1/purge_media_cache$",
+            "^/_synapse/admin/v1/room/.*/media.*$",
+            "^/_synapse/admin/v1/user/.*/media.*$",
+            "^/_synapse/admin/v1/media/.*$",
+            "^/_synapse/admin/v1/quarantine_media/.*$",
+        ],
+        "shared_extra_conf": {"enable_media_repo": False},
+        "worker_extra_conf": "enable_media_repo: true",
+    },
+    "appservice": {
+        "app": "synapse.app.appservice",
+        "listener_resources": [],
+        "endpoint_patterns": [],
+        "shared_extra_conf": {"notify_appservices": False},
+        "worker_extra_conf": "",
+    },
+    "federation_sender": {
+        "app": "synapse.app.federation_sender",
+        "listener_resources": [],
+        "endpoint_patterns": [],
+        "shared_extra_conf": {"send_federation": False},
+        "worker_extra_conf": "",
+    },
+    "synchrotron": {
+        "app": "synapse.app.generic_worker",
+        "listener_resources": ["client"],
+        "endpoint_patterns": [
+            "^/_matrix/client/(v2_alpha|r0)/sync$",
+            "^/_matrix/client/(api/v1|v2_alpha|r0)/events$",
+            "^/_matrix/client/(api/v1|r0)/initialSync$",
+            "^/_matrix/client/(api/v1|r0)/rooms/[^/]+/initialSync$",
+        ],
+        "shared_extra_conf": {},
+        "worker_extra_conf": "",
+    },
+    "federation_reader": {
+        "app": "synapse.app.generic_worker",
+        "listener_resources": ["federation"],
+        "endpoint_patterns": [
+            "^/_matrix/federation/(v1|v2)/event/",
+            "^/_matrix/federation/(v1|v2)/state/",
+            "^/_matrix/federation/(v1|v2)/state_ids/",
+            "^/_matrix/federation/(v1|v2)/backfill/",
+            "^/_matrix/federation/(v1|v2)/get_missing_events/",
+            "^/_matrix/federation/(v1|v2)/publicRooms",
+            "^/_matrix/federation/(v1|v2)/query/",
+            "^/_matrix/federation/(v1|v2)/make_join/",
+            "^/_matrix/federation/(v1|v2)/make_leave/",
+            "^/_matrix/federation/(v1|v2)/send_join/",
+            "^/_matrix/federation/(v1|v2)/send_leave/",
+            "^/_matrix/federation/(v1|v2)/invite/",
+            "^/_matrix/federation/(v1|v2)/query_auth/",
+            "^/_matrix/federation/(v1|v2)/event_auth/",
+            "^/_matrix/federation/(v1|v2)/exchange_third_party_invite/",
+            "^/_matrix/federation/(v1|v2)/user/devices/",
+            "^/_matrix/federation/(v1|v2)/get_groups_publicised$",
+            "^/_matrix/key/v2/query",
+        ],
+        "shared_extra_conf": {},
+        "worker_extra_conf": "",
+    },
+    "federation_inbound": {
+        "app": "synapse.app.generic_worker",
+        "listener_resources": ["federation"],
+        "endpoint_patterns": ["/_matrix/federation/(v1|v2)/send/"],
+        "shared_extra_conf": {},
+        "worker_extra_conf": "",
+    },
+    "event_persister": {
+        "app": "synapse.app.generic_worker",
+        "listener_resources": ["replication"],
+        "endpoint_patterns": [],
+        "shared_extra_conf": {},
+        "worker_extra_conf": "",
+    },
+    "background_worker": {
+        "app": "synapse.app.generic_worker",
+        "listener_resources": [],
+        "endpoint_patterns": [],
+        # This worker cannot be sharded. Therefore there should only ever be one background
+        # worker, and it should be named background_worker1
+        "shared_extra_conf": {"run_background_tasks_on": "background_worker1"},
+        "worker_extra_conf": "",
+    },
+    "event_creator": {
+        "app": "synapse.app.generic_worker",
+        "listener_resources": ["client"],
+        "endpoint_patterns": [
+            "^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/redact",
+            "^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/send",
+            "^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$",
+            "^/_matrix/client/(api/v1|r0|unstable)/join/",
+            "^/_matrix/client/(api/v1|r0|unstable)/profile/",
+        ],
+        "shared_extra_conf": {},
+        "worker_extra_conf": "",
+    },
+    "frontend_proxy": {
+        "app": "synapse.app.frontend_proxy",
+        "listener_resources": ["client", "replication"],
+        "endpoint_patterns": ["^/_matrix/client/(api/v1|r0|unstable)/keys/upload"],
+        "shared_extra_conf": {},
+        "worker_extra_conf": (
+            "worker_main_http_uri: http://127.0.0.1:%d"
+            % (MAIN_PROCESS_HTTP_LISTENER_PORT,),
+        ),
+    },
+}
+
+# Templates for sections that may be inserted multiple times in config files
+SUPERVISORD_PROCESS_CONFIG_BLOCK = """
+[program:synapse_{name}]
+command=/usr/local/bin/python -m {app} \
+    --config-path="{config_path}" \
+    --config-path=/conf/workers/shared.yaml \
+    --config-path=/conf/workers/{name}.yaml
+autorestart=unexpected
+priority=500
+exitcodes=0
+stdout_logfile=/dev/stdout
+stdout_logfile_maxbytes=0
+stderr_logfile=/dev/stderr
+stderr_logfile_maxbytes=0
+"""
+
+NGINX_LOCATION_CONFIG_BLOCK = """
+    location ~* {endpoint} {
+        proxy_pass {upstream};
+        proxy_set_header X-Forwarded-For $remote_addr;
+        proxy_set_header X-Forwarded-Proto $scheme;
+        proxy_set_header Host $host;
+    }
+"""
+
+NGINX_UPSTREAM_CONFIG_BLOCK = """
+upstream {upstream_worker_type} {
+{body}
+}
+"""
+
+
+# Utility functions
+def log(txt: str):
+    """Log something to the stdout.
+
+    Args:
+        txt: The text to log.
+    """
+    print(txt)
+
+
+def error(txt: str):
+    """Log something and exit with an error code.
+
+    Args:
+        txt: The text to log in error.
+    """
+    log(txt)
+    sys.exit(2)
+
+
+def convert(src: str, dst: str, **template_vars):
+    """Generate a file from a template
+
+    Args:
+        src: Path to the input file.
+        dst: Path to write to.
+        template_vars: The arguments to replace placeholder variables in the template with.
+    """
+    # Read the template file
+    with open(src) as infile:
+        template = infile.read()
+
+    # Generate a string from the template. We disable autoescape to prevent template
+    # variables from being escaped.
+    rendered = jinja2.Template(template, autoescape=False).render(**template_vars)
+
+    # Write the generated contents to a file
+    #
+    # We use append mode in case the files have already been written to by something else
+    # (for instance, as part of the instructions in a dockerfile).
+    with open(dst, "a") as outfile:
+        # In case the existing file doesn't end with a newline
+        outfile.write("\n")
+
+        outfile.write(rendered)
+
+
+def add_sharding_to_shared_config(
+    shared_config: dict,
+    worker_type: str,
+    worker_name: str,
+    worker_port: int,
+) -> None:
+    """Given a dictionary representing a config file shared across all workers,
+    append sharded worker information to it for the current worker_type instance.
+
+    Args:
+        shared_config: The config dict that all worker instances share (after being converted to YAML)
+        worker_type: The type of worker (one of those defined in WORKERS_CONFIG).
+        worker_name: The name of the worker instance.
+        worker_port: The HTTP replication port that the worker instance is listening on.
+    """
+    # The instance_map config field marks the workers that write to various replication streams
+    instance_map = shared_config.setdefault("instance_map", {})
+
+    # Worker-type specific sharding config
+    if worker_type == "pusher":
+        shared_config.setdefault("pusher_instances", []).append(worker_name)
+
+    elif worker_type == "federation_sender":
+        shared_config.setdefault("federation_sender_instances", []).append(worker_name)
+
+    elif worker_type == "event_persister":
+        # Event persisters write to the events stream, so we need to update
+        # the list of event stream writers
+        shared_config.setdefault("stream_writers", {}).setdefault("events", []).append(
+            worker_name
+        )
+
+        # Map of stream writer instance names to host/ports combos
+        instance_map[worker_name] = {
+            "host": "localhost",
+            "port": worker_port,
+        }
+
+    elif worker_type == "media_repository":
+        # The first configured media worker will run the media background jobs
+        shared_config.setdefault("media_instance_running_background_jobs", worker_name)
+
+
+def generate_base_homeserver_config():
+    """Starts Synapse and generates a basic homeserver config, which will later be
+    modified for worker support.
+
+    Raises: CalledProcessError if calling start.py returned a non-zero exit code.
+    """
+    # start.py already does this for us, so just call that.
+    # note that this script is copied in in the official, monolith dockerfile
+    os.environ["SYNAPSE_HTTP_PORT"] = str(MAIN_PROCESS_HTTP_LISTENER_PORT)
+    subprocess.check_output(["/usr/local/bin/python", "/start.py", "migrate_config"])
+
+
+def generate_worker_files(environ, config_path: str, data_dir: str):
+    """Read the desired list of workers from environment variables and generate
+    shared homeserver, nginx and supervisord configs.
+
+    Args:
+        environ: _Environ[str]
+        config_path: Where to output the generated Synapse main worker config file.
+        data_dir: The location of the synapse data directory. Where log and
+            user-facing config files live.
+    """
+    # Note that yaml cares about indentation, so care should be taken to insert lines
+    # into files at the correct indentation below.
+
+    # shared_config is the contents of a Synapse config file that will be shared amongst
+    # the main Synapse process as well as all workers.
+    # It is intended mainly for disabling functionality when certain workers are spun up,
+    # and adding a replication listener.
+
+    # First read the original config file and extract the listeners block. Then we'll add
+    # another listener for replication. Later we'll write out the result.
+    listeners = [
+        {
+            "port": 9093,
+            "bind_address": "127.0.0.1",
+            "type": "http",
+            "resources": [{"names": ["replication"]}],
+        }
+    ]
+    with open(config_path) as file_stream:
+        original_config = yaml.safe_load(file_stream)
+        original_listeners = original_config.get("listeners")
+        if original_listeners:
+            listeners += original_listeners
+
+    # The shared homeserver config. The contents of which will be inserted into the
+    # base shared worker jinja2 template.
+    #
+    # This config file will be passed to all workers, included Synapse's main process.
+    shared_config = {"listeners": listeners}
+
+    # The supervisord config. The contents of which will be inserted into the
+    # base supervisord jinja2 template.
+    #
+    # Supervisord will be in charge of running everything, from redis to nginx to Synapse
+    # and all of its worker processes. Load the config template, which defines a few
+    # services that are necessary to run.
+    supervisord_config = ""
+
+    # Upstreams for load-balancing purposes. This dict takes the form of a worker type to the
+    # ports of each worker. For example:
+    # {
+    #   worker_type: {1234, 1235, ...}}
+    # }
+    # and will be used to construct 'upstream' nginx directives.
+    nginx_upstreams = {}
+
+    # A map of: {"endpoint": "upstream"}, where "upstream" is a str representing what will be
+    # placed after the proxy_pass directive. The main benefit to representing this data as a
+    # dict over a str is that we can easily deduplicate endpoints across multiple instances
+    # of the same worker.
+    #
+    # An nginx site config that will be amended to depending on the workers that are
+    # spun up. To be placed in /etc/nginx/conf.d.
+    nginx_locations = {}
+
+    # Read the desired worker configuration from the environment
+    worker_types = environ.get("SYNAPSE_WORKER_TYPES")
+    if worker_types is None:
+        # No workers, just the main process
+        worker_types = []
+    else:
+        # Split type names by comma
+        worker_types = worker_types.split(",")
+
+    # Create the worker configuration directory if it doesn't already exist
+    os.makedirs("/conf/workers", exist_ok=True)
+
+    # Start worker ports from this arbitrary port
+    worker_port = 18009
+
+    # A counter of worker_type -> int. Used for determining the name for a given
+    # worker type when generating its config file, as each worker's name is just
+    # worker_type + instance #
+    worker_type_counter = {}
+
+    # For each worker type specified by the user, create config values
+    for worker_type in worker_types:
+        worker_type = worker_type.strip()
+
+        worker_config = WORKERS_CONFIG.get(worker_type)
+        if worker_config:
+            worker_config = worker_config.copy()
+        else:
+            log(worker_type + " is an unknown worker type! It will be ignored")
+            continue
+
+        new_worker_count = worker_type_counter.setdefault(worker_type, 0) + 1
+        worker_type_counter[worker_type] = new_worker_count
+
+        # Name workers by their type concatenated with an incrementing number
+        # e.g. federation_reader1
+        worker_name = worker_type + str(new_worker_count)
+        worker_config.update(
+            {"name": worker_name, "port": worker_port, "config_path": config_path}
+        )
+
+        # Update the shared config with any worker-type specific options
+        shared_config.update(worker_config["shared_extra_conf"])
+
+        # Check if more than one instance of this worker type has been specified
+        worker_type_total_count = worker_types.count(worker_type)
+        if worker_type_total_count > 1:
+            # Update the shared config with sharding-related options if necessary
+            add_sharding_to_shared_config(
+                shared_config, worker_type, worker_name, worker_port
+            )
+
+        # Enable the worker in supervisord
+        supervisord_config += SUPERVISORD_PROCESS_CONFIG_BLOCK.format_map(worker_config)
+
+        # Add nginx location blocks for this worker's endpoints (if any are defined)
+        for pattern in worker_config["endpoint_patterns"]:
+            # Determine whether we need to load-balance this worker
+            if worker_type_total_count > 1:
+                # Create or add to a load-balanced upstream for this worker
+                nginx_upstreams.setdefault(worker_type, set()).add(worker_port)
+
+                # Upstreams are named after the worker_type
+                upstream = "http://" + worker_type
+            else:
+                upstream = "http://localhost:%d" % (worker_port,)
+
+            # Note that this endpoint should proxy to this upstream
+            nginx_locations[pattern] = upstream
+
+        # Write out the worker's logging config file
+
+        # Check whether we should write worker logs to disk, in addition to the console
+        extra_log_template_args = {}
+        if environ.get("SYNAPSE_WORKERS_WRITE_LOGS_TO_DISK"):
+            extra_log_template_args["LOG_FILE_PATH"] = "{dir}/logs/{name}.log".format(
+                dir=data_dir, name=worker_name
+            )
+
+        # Render and write the file
+        log_config_filepath = "/conf/workers/{name}.log.config".format(name=worker_name)
+        convert(
+            "/conf/log.config",
+            log_config_filepath,
+            worker_name=worker_name,
+            **extra_log_template_args,
+        )
+
+        # Then a worker config file
+        convert(
+            "/conf/worker.yaml.j2",
+            "/conf/workers/{name}.yaml".format(name=worker_name),
+            **worker_config,
+            worker_log_config_filepath=log_config_filepath,
+        )
+
+        worker_port += 1
+
+    # Build the nginx location config blocks
+    nginx_location_config = ""
+    for endpoint, upstream in nginx_locations.items():
+        nginx_location_config += NGINX_LOCATION_CONFIG_BLOCK.format(
+            endpoint=endpoint,
+            upstream=upstream,
+        )
+
+    # Determine the load-balancing upstreams to configure
+    nginx_upstream_config = ""
+    for upstream_worker_type, upstream_worker_ports in nginx_upstreams.items():
+        body = ""
+        for port in upstream_worker_ports:
+            body += "    server localhost:%d;\n" % (port,)
+
+        # Add to the list of configured upstreams
+        nginx_upstream_config += NGINX_UPSTREAM_CONFIG_BLOCK.format(
+            upstream_worker_type=upstream_worker_type,
+            body=body,
+        )
+
+    # Finally, we'll write out the config files.
+
+    # Shared homeserver config
+    convert(
+        "/conf/shared.yaml.j2",
+        "/conf/workers/shared.yaml",
+        shared_worker_config=yaml.dump(shared_config),
+    )
+
+    # Nginx config
+    convert(
+        "/conf/nginx.conf.j2",
+        "/etc/nginx/conf.d/matrix-synapse.conf",
+        worker_locations=nginx_location_config,
+        upstream_directives=nginx_upstream_config,
+    )
+
+    # Supervisord config
+    convert(
+        "/conf/supervisord.conf.j2",
+        "/etc/supervisor/conf.d/supervisord.conf",
+        main_config_path=config_path,
+        worker_config=supervisord_config,
+    )
+
+    # Ensure the logging directory exists
+    log_dir = data_dir + "/logs"
+    if not os.path.exists(log_dir):
+        os.mkdir(log_dir)
+
+
+def start_supervisord():
+    """Starts up supervisord which then starts and monitors all other necessary processes
+
+    Raises: CalledProcessError if calling start.py return a non-zero exit code.
+    """
+    subprocess.run(["/usr/bin/supervisord"], stdin=subprocess.PIPE)
+
+
+def main(args, environ):
+    config_dir = environ.get("SYNAPSE_CONFIG_DIR", "/data")
+    config_path = environ.get("SYNAPSE_CONFIG_PATH", config_dir + "/homeserver.yaml")
+    data_dir = environ.get("SYNAPSE_DATA_DIR", "/data")
+
+    # override SYNAPSE_NO_TLS, we don't support TLS in worker mode,
+    # this needs to be handled by a frontend proxy
+    environ["SYNAPSE_NO_TLS"] = "yes"
+
+    # Generate the base homeserver config if one does not yet exist
+    if not os.path.exists(config_path):
+        log("Generating base homeserver config")
+        generate_base_homeserver_config()
+
+    # This script may be run multiple times (mostly by Complement, see note at top of file).
+    # Don't re-configure workers in this instance.
+    mark_filepath = "/conf/workers_have_been_configured"
+    if not os.path.exists(mark_filepath):
+        # Always regenerate all other config files
+        generate_worker_files(environ, config_path, data_dir)
+
+        # Mark workers as being configured
+        with open(mark_filepath, "w") as f:
+            f.write("")
+
+    # Start supervisord, which will start Synapse, all of the configured worker
+    # processes, redis, nginx etc. according to the config we created above.
+    start_supervisord()
+
+
+if __name__ == "__main__":
+    main(sys.argv, os.environ)