diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml
index 2e4ee723d3..a775f70c4e 100644
--- a/.github/workflows/tests.yml
+++ b/.github/workflows/tests.yml
@@ -328,51 +328,8 @@ jobs:
- arrangement: monolith
database: Postgres
- steps:
- # The path is set via a file given by $GITHUB_PATH. We need both Go 1.17 and GOPATH on the path to run Complement.
- # See https://docs.github.com/en/actions/using-workflows/workflow-commands-for-github-actions#adding-a-system-path
- - name: "Set Go Version"
- run: |
- # Add Go 1.17 to the PATH: see https://github.com/actions/virtual-environments/blob/main/images/linux/Ubuntu2004-Readme.md#environment-variables-2
- echo "$GOROOT_1_17_X64/bin" >> $GITHUB_PATH
- # Add the Go path to the PATH: We need this so we can call gotestfmt
- echo "~/go/bin" >> $GITHUB_PATH
-
- - name: "Install Complement Dependencies"
- run: |
- sudo apt-get update && sudo apt-get install -y libolm3 libolm-dev
- go get -v github.com/haveyoudebuggedit/gotestfmt/v2/cmd/gotestfmt@latest
-
- - name: Run actions/checkout@v2 for synapse
- uses: actions/checkout@v2
- with:
- path: synapse
-
- - name: "Install custom gotestfmt template"
- run: |
- mkdir .gotestfmt/github -p
- cp synapse/.ci/complement_package.gotpl .gotestfmt/github/package.gotpl
-
- # Attempt to check out the same branch of Complement as the PR. If it
- # doesn't exist, fallback to HEAD.
- - name: Checkout complement
- run: synapse/.ci/scripts/checkout_complement.sh
-
- - run: |
- set -o pipefail
- POSTGRES=${{ (matrix.database == 'Postgres') && 1 || '' }} COMPLEMENT_DIR=`pwd`/complement synapse/scripts-dev/complement.sh -json 2>&1 | gotestfmt
- shell: bash
- name: Run Complement Tests
-
- # We only run the workers tests on `develop` for now, because they're too slow to wait for on PRs.
- # Sadly, you can't have an `if` condition on the value of a matrix, so this is a temporary, separate job for now.
- # GitHub Actions doesn't support YAML anchors, so it's full-on duplication for now.
- complement-developonly:
- if: "${{ !failure() && !cancelled() && (github.ref == 'refs/heads/develop') }}"
- needs: linting-done
- runs-on: ubuntu-latest
-
- name: "Complement Workers (develop only)"
+ - arrangement: workers
+ database: Postgres
steps:
# The path is set via a file given by $GITHUB_PATH. We need both Go 1.17 and GOPATH on the path to run Complement.
@@ -406,7 +363,7 @@ jobs:
- run: |
set -o pipefail
- WORKERS=1 COMPLEMENT_DIR=`pwd`/complement synapse/scripts-dev/complement.sh -json 2>&1 | gotestfmt
+ POSTGRES=${{ (matrix.database == 'Postgres') && 1 || '' }} WORKERS=${{ (matrix.arrangement == 'workers') && 1 || '' }} COMPLEMENT_DIR=`pwd`/complement synapse/scripts-dev/complement.sh -json 2>&1 | gotestfmt
shell: bash
name: Run Complement Tests
diff --git a/changelog.d/13127.misc b/changelog.d/13127.misc
new file mode 100644
index 0000000000..1414811e0a
--- /dev/null
+++ b/changelog.d/13127.misc
@@ -0,0 +1 @@
+Improve startup times in Complement test runs against workers, particularly in CPU-constrained environments.
\ No newline at end of file
diff --git a/docker/complement/conf/start_for_complement.sh b/docker/complement/conf/start_for_complement.sh
index 773c7db22f..cc6482f763 100755
--- a/docker/complement/conf/start_for_complement.sh
+++ b/docker/complement/conf/start_for_complement.sh
@@ -59,6 +59,9 @@ if [[ -n "$SYNAPSE_COMPLEMENT_USE_WORKERS" ]]; then
synchrotron, \
appservice, \
pusher"
+
+ # Improve startup times by using a launcher based on fork()
+ export SYNAPSE_USE_EXPERIMENTAL_FORKING_LAUNCHER=1
else
# Empty string here means 'main process only'
export SYNAPSE_WORKER_TYPES=""
diff --git a/docker/conf-workers/synapse.supervisord.conf.j2 b/docker/conf-workers/synapse.supervisord.conf.j2
index 6443450491..481eb4fc92 100644
--- a/docker/conf-workers/synapse.supervisord.conf.j2
+++ b/docker/conf-workers/synapse.supervisord.conf.j2
@@ -1,3 +1,24 @@
+{% if use_forking_launcher %}
+[program:synapse_fork]
+command=/usr/local/bin/python -m synapse.app.complement_fork_starter
+ {{ main_config_path }}
+ synapse.app.homeserver
+ --config-path="{{ main_config_path }}"
+ --config-path=/conf/workers/shared.yaml
+ {%- for worker in workers %}
+ -- {{ worker.app }}
+ --config-path="{{ main_config_path }}"
+ --config-path=/conf/workers/shared.yaml
+ --config-path=/conf/workers/{{ worker.name }}.yaml
+ {%- endfor %}
+stdout_logfile=/dev/stdout
+stdout_logfile_maxbytes=0
+stderr_logfile=/dev/stderr
+stderr_logfile_maxbytes=0
+autorestart=unexpected
+exitcodes=0
+
+{% else %}
[program:synapse_main]
command=/usr/local/bin/prefix-log /usr/local/bin/python -m synapse.app.homeserver
--config-path="{{ main_config_path }}"
@@ -13,7 +34,7 @@ autorestart=unexpected
exitcodes=0
-{% for worker in workers %}
+ {% for worker in workers %}
[program:synapse_{{ worker.name }}]
command=/usr/local/bin/prefix-log /usr/local/bin/python -m {{ worker.app }}
--config-path="{{ main_config_path }}"
@@ -27,4 +48,5 @@ stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
-{% endfor %}
+ {% endfor %}
+{% endif %}
diff --git a/docker/conf/log.config b/docker/conf/log.config
index dc8c70befd..d9e85aa533 100644
--- a/docker/conf/log.config
+++ b/docker/conf/log.config
@@ -2,7 +2,11 @@ version: 1
formatters:
precise:
+ {% if include_worker_name_in_log_line %}
+ format: '{{ worker_name }} | %(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s - %(message)s'
+ {% else %}
format: '%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s - %(message)s'
+ {% endif %}
handlers:
{% if LOG_FILE_PATH %}
diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py
index 2134b648d5..4521f99eb4 100755
--- a/docker/configure_workers_and_start.py
+++ b/docker/configure_workers_and_start.py
@@ -26,6 +26,9 @@
# * SYNAPSE_TLS_CERT: Path to a TLS certificate in PEM format.
# * SYNAPSE_TLS_KEY: Path to a TLS key. If this and SYNAPSE_TLS_CERT are specified,
# Nginx will be configured to serve TLS on port 8448.
+# * SYNAPSE_USE_EXPERIMENTAL_FORKING_LAUNCHER: Whether to use the forking launcher,
+# only intended for usage in Complement at the moment.
+# No stability guarantees are provided.
#
# NOTE: According to Complement's ENTRYPOINT expectations for a homeserver image (as defined
# in the project's README), this script may be run multiple times, and functionality should
@@ -525,6 +528,7 @@ def generate_worker_files(
"/etc/supervisor/conf.d/synapse.conf",
workers=worker_descriptors,
main_config_path=config_path,
+ use_forking_launcher=environ.get("SYNAPSE_USE_EXPERIMENTAL_FORKING_LAUNCHER"),
)
# healthcheck config
@@ -560,6 +564,9 @@ def generate_worker_log_config(
log_config_filepath,
worker_name=worker_name,
**extra_log_template_args,
+ include_worker_name_in_log_line=environ.get(
+ "SYNAPSE_USE_EXPERIMENTAL_FORKING_LAUNCHER"
+ ),
)
return log_config_filepath
diff --git a/docker/start.py b/docker/start.py
index 4ac8f03477..5a98dce551 100755
--- a/docker/start.py
+++ b/docker/start.py
@@ -110,7 +110,11 @@ def generate_config_from_template(
log_config_file = environ["SYNAPSE_LOG_CONFIG"]
log("Generating log config file " + log_config_file)
- convert("/conf/log.config", log_config_file, environ)
+ convert(
+ "/conf/log.config",
+ log_config_file,
+ {**environ, "include_worker_name_in_log_line": False},
+ )
# Hopefully we already have a signing key, but generate one if not.
args = [
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index 363ac98ea9..923891ae0d 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -106,7 +106,9 @@ def register_sighup(func: Callable[P, None], *args: P.args, **kwargs: P.kwargs)
def start_worker_reactor(
appname: str,
config: HomeServerConfig,
- run_command: Callable[[], None] = reactor.run,
+ # Use a lambda to avoid binding to a given reactor at import time.
+ # (needed when synapse.app.complement_fork_starter is being used)
+ run_command: Callable[[], None] = lambda: reactor.run(),
) -> None:
"""Run the reactor in the main process
@@ -141,7 +143,9 @@ def start_reactor(
daemonize: bool,
print_pidfile: bool,
logger: logging.Logger,
- run_command: Callable[[], None] = reactor.run,
+ # Use a lambda to avoid binding to a given reactor at import time.
+ # (needed when synapse.app.complement_fork_starter is being used)
+ run_command: Callable[[], None] = lambda: reactor.run(),
) -> None:
"""Run the reactor in the main process
diff --git a/synapse/app/complement_fork_starter.py b/synapse/app/complement_fork_starter.py
new file mode 100644
index 0000000000..89eb07df27
--- /dev/null
+++ b/synapse/app/complement_fork_starter.py
@@ -0,0 +1,190 @@
+# Copyright 2022 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ## What this script does
+#
+# This script spawns multiple workers, whilst only going through the code loading
+# process once. The net effect is that start-up time for a swarm of workers is
+# reduced, particularly in CPU-constrained environments.
+#
+# Before the workers are spawned, the database is prepared in order to avoid the
+# workers racing.
+#
+# ## Stability
+#
+# This script is only intended for use within the Synapse images for the
+# Complement test suite.
+# There are currently no stability guarantees whatsoever; especially not about:
+# - whether it will continue to exist in future versions;
+# - the format of its command-line arguments; or
+# - any details about its behaviour or principles of operation.
+#
+# ## Usage
+#
+# The first argument should be the path to the database configuration, used to
+# set up the database. The rest of the arguments are used as follows:
+# Each worker is specified as an argument group (each argument group is
+# separated by '--').
+# The first argument in each argument group is the Python module name of the application
+# to start. Further arguments are then passed to that module as-is.
+#
+# ## Example
+#
+# python -m synapse.app.complement_fork_starter path_to_db_config.yaml \
+# synapse.app.homeserver [args..] -- \
+# synapse.app.generic_worker [args..] -- \
+# ...
+# synapse.app.generic_worker [args..]
+#
+import argparse
+import importlib
+import itertools
+import multiprocessing
+import sys
+from typing import Any, Callable, List
+
+from twisted.internet.main import installReactor
+
+
+class ProxiedReactor:
+ """
+ Twisted tracks the 'installed' reactor as a global variable.
+ (Actually, it does some module trickery, but the effect is similar.)
+
+ The default EpollReactor is buggy if it's created before a process is
+ forked, then used in the child.
+ See https://twistedmatrix.com/trac/ticket/4759#comment:17.
+
+ However, importing certain Twisted modules will automatically create and
+ install a reactor if one hasn't already been installed.
+ It's not normally possible to re-install a reactor.
+
+ Given the goal of launching workers with fork() to only import the code once,
+ this presents a conflict.
+ Our work around is to 'install' this ProxiedReactor which prevents Twisted
+ from creating and installing one, but which lets us replace the actual reactor
+ in use later on.
+ """
+
+ def __init__(self) -> None:
+ self.___reactor_target: Any = None
+
+ def _install_real_reactor(self, new_reactor: Any) -> None:
+ """
+ Install a real reactor for this ProxiedReactor to forward lookups onto.
+
+ This method is specific to our ProxiedReactor and should not clash with
+ any names used on an actual Twisted reactor.
+ """
+ self.___reactor_target = new_reactor
+
+ def __getattr__(self, attr_name: str) -> Any:
+ return getattr(self.___reactor_target, attr_name)
+
+
+def _worker_entrypoint(
+ func: Callable[[], None], proxy_reactor: ProxiedReactor, args: List[str]
+) -> None:
+ """
+ Entrypoint for a forked worker process.
+
+ We just need to set up the command-line arguments, create our real reactor
+ and then kick off the worker's main() function.
+ """
+
+ sys.argv = args
+
+ from twisted.internet.epollreactor import EPollReactor
+
+ proxy_reactor._install_real_reactor(EPollReactor())
+ func()
+
+
+def main() -> None:
+ """
+ Entrypoint for the forking launcher.
+ """
+ parser = argparse.ArgumentParser()
+ parser.add_argument("db_config", help="Path to database config file")
+ parser.add_argument(
+ "args",
+ nargs="...",
+ help="Argument groups separated by `--`. "
+ "The first argument of each group is a Synapse app name. "
+ "Subsequent arguments are passed through.",
+ )
+ ns = parser.parse_args()
+
+ # Split up the subsequent arguments into each workers' arguments;
+ # `--` is our delimiter of choice.
+ args_by_worker: List[List[str]] = [
+ list(args)
+ for cond, args in itertools.groupby(ns.args, lambda ele: ele != "--")
+ if cond and args
+ ]
+
+ # Prevent Twisted from installing a shared reactor that all the workers will
+ # inherit when we fork(), by installing our own beforehand.
+ proxy_reactor = ProxiedReactor()
+ installReactor(proxy_reactor)
+
+ # Import the entrypoints for all the workers.
+ worker_functions = []
+ for worker_args in args_by_worker:
+ worker_module = importlib.import_module(worker_args[0])
+ worker_functions.append(worker_module.main)
+
+ # We need to prepare the database first as otherwise all the workers will
+ # try to create a schema version table and some will crash out.
+ from synapse._scripts import update_synapse_database
+
+ update_proc = multiprocessing.Process(
+ target=_worker_entrypoint,
+ args=(
+ update_synapse_database.main,
+ proxy_reactor,
+ [
+ "update_synapse_database",
+ "--database-config",
+ ns.db_config,
+ "--run-background-updates",
+ ],
+ ),
+ )
+ print("===== PREPARING DATABASE =====", file=sys.stderr)
+ update_proc.start()
+ update_proc.join()
+ print("===== PREPARED DATABASE =====", file=sys.stderr)
+
+ # At this point, we've imported all the main entrypoints for all the workers.
+ # Now we basically just fork() out to create the workers we need.
+ # Because we're using fork(), all the workers get a clone of this launcher's
+ # memory space and don't need to repeat the work of loading the code!
+ # Instead of using fork() directly, we use the multiprocessing library,
+ # which uses fork() on Unix platforms.
+ processes = []
+ for (func, worker_args) in zip(worker_functions, args_by_worker):
+ process = multiprocessing.Process(
+ target=_worker_entrypoint, args=(func, proxy_reactor, worker_args)
+ )
+ process.start()
+ processes.append(process)
+
+ # Be a good parent and wait for our children to die before exiting.
+ for process in processes:
+ process.join()
+
+
+if __name__ == "__main__":
+ main()
|