summary refs log tree commit diff
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--.github/workflows/tests.yml49
-rw-r--r--changelog.d/13127.misc1
-rwxr-xr-xdocker/complement/conf/start_for_complement.sh3
-rw-r--r--docker/conf-workers/synapse.supervisord.conf.j226
-rw-r--r--docker/conf/log.config4
-rwxr-xr-xdocker/configure_workers_and_start.py7
-rwxr-xr-xdocker/start.py6
-rw-r--r--synapse/app/_base.py8
-rw-r--r--synapse/app/complement_fork_starter.py190
9 files changed, 243 insertions, 51 deletions
diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml
index 2e4ee723d3..a775f70c4e 100644
--- a/.github/workflows/tests.yml
+++ b/.github/workflows/tests.yml
@@ -328,51 +328,8 @@ jobs:
           - arrangement: monolith
             database: Postgres
 
-    steps:
-      # The path is set via a file given by $GITHUB_PATH. We need both Go 1.17 and GOPATH on the path to run Complement.
-      # See https://docs.github.com/en/actions/using-workflows/workflow-commands-for-github-actions#adding-a-system-path
-      - name: "Set Go Version"
-        run: |
-          # Add Go 1.17 to the PATH: see https://github.com/actions/virtual-environments/blob/main/images/linux/Ubuntu2004-Readme.md#environment-variables-2
-          echo "$GOROOT_1_17_X64/bin" >> $GITHUB_PATH
-          # Add the Go path to the PATH: We need this so we can call gotestfmt
-          echo "~/go/bin" >> $GITHUB_PATH
-
-      - name: "Install Complement Dependencies"
-        run: |
-          sudo apt-get update && sudo apt-get install -y libolm3 libolm-dev
-          go get -v github.com/haveyoudebuggedit/gotestfmt/v2/cmd/gotestfmt@latest
-
-      - name: Run actions/checkout@v2 for synapse
-        uses: actions/checkout@v2
-        with:
-          path: synapse
-
-      - name: "Install custom gotestfmt template"
-        run: |
-          mkdir .gotestfmt/github -p
-          cp synapse/.ci/complement_package.gotpl .gotestfmt/github/package.gotpl
-
-      # Attempt to check out the same branch of Complement as the PR. If it
-      # doesn't exist, fallback to HEAD.
-      - name: Checkout complement
-        run: synapse/.ci/scripts/checkout_complement.sh
-
-      - run: |
-          set -o pipefail
-          POSTGRES=${{ (matrix.database == 'Postgres') && 1 || '' }} COMPLEMENT_DIR=`pwd`/complement synapse/scripts-dev/complement.sh -json 2>&1 | gotestfmt
-        shell: bash
-        name: Run Complement Tests
-
-  # We only run the workers tests on `develop` for now, because they're too slow to wait for on PRs.
-  # Sadly, you can't have an `if` condition on the value of a matrix, so this is a temporary, separate job for now.
-  # GitHub Actions doesn't support YAML anchors, so it's full-on duplication for now.
-  complement-developonly:
-    if: "${{ !failure() && !cancelled() && (github.ref == 'refs/heads/develop') }}"
-    needs: linting-done
-    runs-on: ubuntu-latest
-    
-    name: "Complement Workers (develop only)"
+          - arrangement: workers
+            database: Postgres
 
     steps:
       # The path is set via a file given by $GITHUB_PATH. We need both Go 1.17 and GOPATH on the path to run Complement.
@@ -406,7 +363,7 @@ jobs:
 
       - run: |
           set -o pipefail
-          WORKERS=1 COMPLEMENT_DIR=`pwd`/complement synapse/scripts-dev/complement.sh -json 2>&1 | gotestfmt
+          POSTGRES=${{ (matrix.database == 'Postgres') && 1 || '' }} WORKERS=${{ (matrix.arrangement == 'workers') && 1 || '' }} COMPLEMENT_DIR=`pwd`/complement synapse/scripts-dev/complement.sh -json 2>&1 | gotestfmt
         shell: bash
         name: Run Complement Tests
 
diff --git a/changelog.d/13127.misc b/changelog.d/13127.misc
new file mode 100644
index 0000000000..1414811e0a
--- /dev/null
+++ b/changelog.d/13127.misc
@@ -0,0 +1 @@
+Improve startup times in Complement test runs against workers, particularly in CPU-constrained environments.
\ No newline at end of file
diff --git a/docker/complement/conf/start_for_complement.sh b/docker/complement/conf/start_for_complement.sh
index 773c7db22f..cc6482f763 100755
--- a/docker/complement/conf/start_for_complement.sh
+++ b/docker/complement/conf/start_for_complement.sh
@@ -59,6 +59,9 @@ if [[ -n "$SYNAPSE_COMPLEMENT_USE_WORKERS" ]]; then
       synchrotron, \
       appservice, \
       pusher"
+
+  # Improve startup times by using a launcher based on fork()
+  export SYNAPSE_USE_EXPERIMENTAL_FORKING_LAUNCHER=1
 else
   # Empty string here means 'main process only'
   export SYNAPSE_WORKER_TYPES=""
diff --git a/docker/conf-workers/synapse.supervisord.conf.j2 b/docker/conf-workers/synapse.supervisord.conf.j2
index 6443450491..481eb4fc92 100644
--- a/docker/conf-workers/synapse.supervisord.conf.j2
+++ b/docker/conf-workers/synapse.supervisord.conf.j2
@@ -1,3 +1,24 @@
+{% if use_forking_launcher %}
+[program:synapse_fork]
+command=/usr/local/bin/python -m synapse.app.complement_fork_starter
+  {{ main_config_path }}
+  synapse.app.homeserver
+  --config-path="{{ main_config_path }}"
+  --config-path=/conf/workers/shared.yaml
+  {%- for worker in workers %}
+    -- {{ worker.app }}
+    --config-path="{{ main_config_path }}"
+    --config-path=/conf/workers/shared.yaml
+    --config-path=/conf/workers/{{ worker.name }}.yaml
+  {%- endfor %}
+stdout_logfile=/dev/stdout
+stdout_logfile_maxbytes=0
+stderr_logfile=/dev/stderr
+stderr_logfile_maxbytes=0
+autorestart=unexpected
+exitcodes=0
+
+{% else %}
 [program:synapse_main]
 command=/usr/local/bin/prefix-log /usr/local/bin/python -m synapse.app.homeserver
   --config-path="{{ main_config_path }}"
@@ -13,7 +34,7 @@ autorestart=unexpected
 exitcodes=0
 
 
-{% for worker in workers %}
+  {% for worker in workers %}
 [program:synapse_{{ worker.name }}]
 command=/usr/local/bin/prefix-log /usr/local/bin/python -m {{ worker.app }}
   --config-path="{{ main_config_path }}"
@@ -27,4 +48,5 @@ stdout_logfile_maxbytes=0
 stderr_logfile=/dev/stderr
 stderr_logfile_maxbytes=0
 
-{% endfor %}
+  {% endfor %}
+{% endif %}
diff --git a/docker/conf/log.config b/docker/conf/log.config
index dc8c70befd..d9e85aa533 100644
--- a/docker/conf/log.config
+++ b/docker/conf/log.config
@@ -2,7 +2,11 @@ version: 1
 
 formatters:
   precise:
+    {% if include_worker_name_in_log_line %}
+    format: '{{ worker_name }} | %(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s - %(message)s'
+    {% else %}
     format: '%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s - %(message)s'
+    {% endif %}
 
 handlers:
 {% if LOG_FILE_PATH %}
diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py
index 2134b648d5..4521f99eb4 100755
--- a/docker/configure_workers_and_start.py
+++ b/docker/configure_workers_and_start.py
@@ -26,6 +26,9 @@
 #   * SYNAPSE_TLS_CERT: Path to a TLS certificate in PEM format.
 #   * SYNAPSE_TLS_KEY: Path to a TLS key. If this and SYNAPSE_TLS_CERT are specified,
 #         Nginx will be configured to serve TLS on port 8448.
+#   * SYNAPSE_USE_EXPERIMENTAL_FORKING_LAUNCHER: Whether to use the forking launcher,
+#         only intended for usage in Complement at the moment.
+#         No stability guarantees are provided.
 #
 # NOTE: According to Complement's ENTRYPOINT expectations for a homeserver image (as defined
 # in the project's README), this script may be run multiple times, and functionality should
@@ -525,6 +528,7 @@ def generate_worker_files(
         "/etc/supervisor/conf.d/synapse.conf",
         workers=worker_descriptors,
         main_config_path=config_path,
+        use_forking_launcher=environ.get("SYNAPSE_USE_EXPERIMENTAL_FORKING_LAUNCHER"),
     )
 
     # healthcheck config
@@ -560,6 +564,9 @@ def generate_worker_log_config(
         log_config_filepath,
         worker_name=worker_name,
         **extra_log_template_args,
+        include_worker_name_in_log_line=environ.get(
+            "SYNAPSE_USE_EXPERIMENTAL_FORKING_LAUNCHER"
+        ),
     )
     return log_config_filepath
 
diff --git a/docker/start.py b/docker/start.py
index 4ac8f03477..5a98dce551 100755
--- a/docker/start.py
+++ b/docker/start.py
@@ -110,7 +110,11 @@ def generate_config_from_template(
 
     log_config_file = environ["SYNAPSE_LOG_CONFIG"]
     log("Generating log config file " + log_config_file)
-    convert("/conf/log.config", log_config_file, environ)
+    convert(
+        "/conf/log.config",
+        log_config_file,
+        {**environ, "include_worker_name_in_log_line": False},
+    )
 
     # Hopefully we already have a signing key, but generate one if not.
     args = [
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index 363ac98ea9..923891ae0d 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -106,7 +106,9 @@ def register_sighup(func: Callable[P, None], *args: P.args, **kwargs: P.kwargs)
 def start_worker_reactor(
     appname: str,
     config: HomeServerConfig,
-    run_command: Callable[[], None] = reactor.run,
+    # Use a lambda to avoid binding to a given reactor at import time.
+    # (needed when synapse.app.complement_fork_starter is being used)
+    run_command: Callable[[], None] = lambda: reactor.run(),
 ) -> None:
     """Run the reactor in the main process
 
@@ -141,7 +143,9 @@ def start_reactor(
     daemonize: bool,
     print_pidfile: bool,
     logger: logging.Logger,
-    run_command: Callable[[], None] = reactor.run,
+    # Use a lambda to avoid binding to a given reactor at import time.
+    # (needed when synapse.app.complement_fork_starter is being used)
+    run_command: Callable[[], None] = lambda: reactor.run(),
 ) -> None:
     """Run the reactor in the main process
 
diff --git a/synapse/app/complement_fork_starter.py b/synapse/app/complement_fork_starter.py
new file mode 100644
index 0000000000..89eb07df27
--- /dev/null
+++ b/synapse/app/complement_fork_starter.py
@@ -0,0 +1,190 @@
+# Copyright 2022 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ## What this script does
+#
+# This script spawns multiple workers, whilst only going through the code loading
+# process once. The net effect is that start-up time for a swarm of workers is
+# reduced, particularly in CPU-constrained environments.
+#
+# Before the workers are spawned, the database is prepared in order to avoid the
+# workers racing.
+#
+# ## Stability
+#
+# This script is only intended for use within the Synapse images for the
+# Complement test suite.
+# There are currently no stability guarantees whatsoever; especially not about:
+# - whether it will continue to exist in future versions;
+# - the format of its command-line arguments; or
+# - any details about its behaviour or principles of operation.
+#
+# ## Usage
+#
+# The first argument should be the path to the database configuration, used to
+# set up the database. The rest of the arguments are used as follows:
+# Each worker is specified as an argument group (each argument group is
+# separated by '--').
+# The first argument in each argument group is the Python module name of the application
+# to start. Further arguments are then passed to that module as-is.
+#
+# ## Example
+#
+#   python -m synapse.app.complement_fork_starter path_to_db_config.yaml \
+#     synapse.app.homeserver [args..] -- \
+#     synapse.app.generic_worker [args..] -- \
+#   ...
+#     synapse.app.generic_worker [args..]
+#
+import argparse
+import importlib
+import itertools
+import multiprocessing
+import sys
+from typing import Any, Callable, List
+
+from twisted.internet.main import installReactor
+
+
+class ProxiedReactor:
+    """
+    Twisted tracks the 'installed' reactor as a global variable.
+    (Actually, it does some module trickery, but the effect is similar.)
+
+    The default EpollReactor is buggy if it's created before a process is
+    forked, then used in the child.
+    See https://twistedmatrix.com/trac/ticket/4759#comment:17.
+
+    However, importing certain Twisted modules will automatically create and
+    install a reactor if one hasn't already been installed.
+    It's not normally possible to re-install a reactor.
+
+    Given the goal of launching workers with fork() to only import the code once,
+    this presents a conflict.
+    Our work around is to 'install' this ProxiedReactor which prevents Twisted
+    from creating and installing one, but which lets us replace the actual reactor
+    in use later on.
+    """
+
+    def __init__(self) -> None:
+        self.___reactor_target: Any = None
+
+    def _install_real_reactor(self, new_reactor: Any) -> None:
+        """
+        Install a real reactor for this ProxiedReactor to forward lookups onto.
+
+        This method is specific to our ProxiedReactor and should not clash with
+        any names used on an actual Twisted reactor.
+        """
+        self.___reactor_target = new_reactor
+
+    def __getattr__(self, attr_name: str) -> Any:
+        return getattr(self.___reactor_target, attr_name)
+
+
+def _worker_entrypoint(
+    func: Callable[[], None], proxy_reactor: ProxiedReactor, args: List[str]
+) -> None:
+    """
+    Entrypoint for a forked worker process.
+
+    We just need to set up the command-line arguments, create our real reactor
+    and then kick off the worker's main() function.
+    """
+
+    sys.argv = args
+
+    from twisted.internet.epollreactor import EPollReactor
+
+    proxy_reactor._install_real_reactor(EPollReactor())
+    func()
+
+
+def main() -> None:
+    """
+    Entrypoint for the forking launcher.
+    """
+    parser = argparse.ArgumentParser()
+    parser.add_argument("db_config", help="Path to database config file")
+    parser.add_argument(
+        "args",
+        nargs="...",
+        help="Argument groups separated by `--`. "
+        "The first argument of each group is a Synapse app name. "
+        "Subsequent arguments are passed through.",
+    )
+    ns = parser.parse_args()
+
+    # Split up the subsequent arguments into each workers' arguments;
+    # `--` is our delimiter of choice.
+    args_by_worker: List[List[str]] = [
+        list(args)
+        for cond, args in itertools.groupby(ns.args, lambda ele: ele != "--")
+        if cond and args
+    ]
+
+    # Prevent Twisted from installing a shared reactor that all the workers will
+    # inherit when we fork(), by installing our own beforehand.
+    proxy_reactor = ProxiedReactor()
+    installReactor(proxy_reactor)
+
+    # Import the entrypoints for all the workers.
+    worker_functions = []
+    for worker_args in args_by_worker:
+        worker_module = importlib.import_module(worker_args[0])
+        worker_functions.append(worker_module.main)
+
+    # We need to prepare the database first as otherwise all the workers will
+    # try to create a schema version table and some will crash out.
+    from synapse._scripts import update_synapse_database
+
+    update_proc = multiprocessing.Process(
+        target=_worker_entrypoint,
+        args=(
+            update_synapse_database.main,
+            proxy_reactor,
+            [
+                "update_synapse_database",
+                "--database-config",
+                ns.db_config,
+                "--run-background-updates",
+            ],
+        ),
+    )
+    print("===== PREPARING DATABASE =====", file=sys.stderr)
+    update_proc.start()
+    update_proc.join()
+    print("===== PREPARED DATABASE =====", file=sys.stderr)
+
+    # At this point, we've imported all the main entrypoints for all the workers.
+    # Now we basically just fork() out to create the workers we need.
+    # Because we're using fork(), all the workers get a clone of this launcher's
+    # memory space and don't need to repeat the work of loading the code!
+    # Instead of using fork() directly, we use the multiprocessing library,
+    # which uses fork() on Unix platforms.
+    processes = []
+    for (func, worker_args) in zip(worker_functions, args_by_worker):
+        process = multiprocessing.Process(
+            target=_worker_entrypoint, args=(func, proxy_reactor, worker_args)
+        )
+        process.start()
+        processes.append(process)
+
+    # Be a good parent and wait for our children to die before exiting.
+    for process in processes:
+        process.join()
+
+
+if __name__ == "__main__":
+    main()