summary refs log tree commit diff
diff options
context:
space:
mode:
authorOlivier Wilkinson (reivilibre) <oliverw@matrix.org>2022-05-27 13:20:39 +0100
committerOlivier Wilkinson (reivilibre) <oliverw@matrix.org>2022-05-27 13:20:43 +0100
commit2102b5d4dc4dbca41b4c61a4eccd14405691e52d (patch)
tree96b9c3d8257d55952381d1db35504633aed23462
parentAdd a primitive forking process 'manager' for Synapse workers (diff)
downloadsynapse-2102b5d4dc4dbca41b4c61a4eccd14405691e52d.tar.xz
SCARY HACKS
-rw-r--r--docker/complement/SynapseWorkers.Dockerfile3
-rw-r--r--docker/complement/conf-workers/synapse_forking.supervisord.conf.j226
-rw-r--r--docker/complement/conf/log_config.yaml.j2 (renamed from docker/complement/conf/log_config.yaml)6
-rw-r--r--docker/conf-workers/supervisord.conf.j224
-rwxr-xr-xdocker/configure_workers_and_start.py17
-rw-r--r--synapse/app/_base.py4
-rw-r--r--synapse/app/_complement_fork_starter.py58
7 files changed, 115 insertions, 23 deletions
diff --git a/docker/complement/SynapseWorkers.Dockerfile b/docker/complement/SynapseWorkers.Dockerfile
index 99a09cbc2b..a5a7dd6f67 100644
--- a/docker/complement/SynapseWorkers.Dockerfile
+++ b/docker/complement/SynapseWorkers.Dockerfile
@@ -26,6 +26,9 @@ COPY conf-workers/workers-shared.yaml /conf/workers/shared.yaml
 WORKDIR /data
 
 COPY conf-workers/postgres.supervisord.conf /etc/supervisor/conf.d/postgres.conf
+COPY conf-workers/synapse_forking.supervisord.conf.j2 /conf/
+
+COPY conf/log_config.yaml.j2 /conf/
 
 # Copy the entrypoint
 COPY conf-workers/start-complement-synapse-workers.sh /
diff --git a/docker/complement/conf-workers/synapse_forking.supervisord.conf.j2 b/docker/complement/conf-workers/synapse_forking.supervisord.conf.j2
new file mode 100644
index 0000000000..9a21530341
--- /dev/null
+++ b/docker/complement/conf-workers/synapse_forking.supervisord.conf.j2
@@ -0,0 +1,26 @@
+[program:synapse_forking]
+# TODO prefix-log will be no good. We'll have to hack around ourselves.
+command=/usr/local/bin/prefix-log /usr/local/bin/python -m synapse.app._complement_fork_starter /data/homeserver.yaml \
+{%- for worker_config in worker_configs %}
+    -- \
+    {{ worker_config.app }}
+    --config-path="{{ worker_config.config_path }}" \
+    --config-path=/conf/workers/shared.yaml \
+    --config-path=/conf/workers/{{ worker_config.name }}.yaml \
+{%- endfor %}
+    -- \
+    synapse.app.homeserver \
+    --config-path="{{ main_config_path }}" \
+    --config-path=/conf/workers/shared.yaml
+
+autorestart=unexpected
+priority=500
+exitcodes=0
+stdout_logfile=/dev/stdout
+stdout_logfile_maxbytes=0
+stderr_logfile=/dev/stderr
+stderr_logfile_maxbytes=0
+
+# Required because the forking launcher creates subprocesses but doesn't
+# handle signals for us.
+stopasgroup=true
diff --git a/docker/complement/conf/log_config.yaml b/docker/complement/conf/log_config.yaml.j2
index c33fd6cd00..94ce49854a 100644
--- a/docker/complement/conf/log_config.yaml
+++ b/docker/complement/conf/log_config.yaml.j2
@@ -2,7 +2,11 @@ version: 1
 
 formatters:
   precise:
-   format: '%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s - %(message)s'
+    {% if worker_name %}
+    format: '{{ worker_name }} | %(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s - %(message)s'
+    {% else %}
+    format: '%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s - %(message)s'
+    {% endif %}
 
 filters:
   context:
diff --git a/docker/conf-workers/supervisord.conf.j2 b/docker/conf-workers/supervisord.conf.j2
index ca1f7aef8e..cd28b45348 100644
--- a/docker/conf-workers/supervisord.conf.j2
+++ b/docker/conf-workers/supervisord.conf.j2
@@ -28,17 +28,17 @@ stderr_logfile_maxbytes=0
 username=redis
 autorestart=true
 
-[program:synapse_main]
-command=/usr/local/bin/prefix-log /usr/local/bin/python -m synapse.app.homeserver --config-path="{{ main_config_path }}" --config-path=/conf/workers/shared.yaml
-priority=10
-# Log startup failures to supervisord's stdout/err
-# Regular synapse logs will still go in the configured data directory
-stdout_logfile=/dev/stdout
-stdout_logfile_maxbytes=0
-stderr_logfile=/dev/stderr
-stderr_logfile_maxbytes=0
-autorestart=unexpected
-exitcodes=0
+## [program:synapse_main]
+## command=/usr/local/bin/prefix-log /usr/local/bin/python -m synapse.app.homeserver --config-path="{{ main_config_path }}" --config-path=/conf/workers/shared.yaml
+## priority=10
+## # Log startup failures to supervisord's stdout/err
+## # Regular synapse logs will still go in the configured data directory
+## stdout_logfile=/dev/stdout
+## stdout_logfile_maxbytes=0
+## stderr_logfile=/dev/stderr
+## stderr_logfile_maxbytes=0
+## autorestart=unexpected
+## exitcodes=0
 
 # Additional process blocks
-{{ worker_config }}
\ No newline at end of file
+{{ worker_config }}
diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py
index f7dac90222..9dbdd8f38b 100755
--- a/docker/configure_workers_and_start.py
+++ b/docker/configure_workers_and_start.py
@@ -401,8 +401,11 @@ def generate_worker_files(
     # which exists even if no workers do.
     healthcheck_urls = ["http://localhost:8080/health"]
 
+    worker_configs: List[Dict[str, Any]] = []
+
     # For each worker type specified by the user, create config values
     for worker_type in worker_types:
+        startup_config: Dict[str, Any] = {}
         worker_type = worker_type.strip()
 
         worker_config = WORKERS_CONFIG.get(worker_type)
@@ -438,6 +441,8 @@ def generate_worker_files(
         # Enable the worker in supervisord
         supervisord_config += SUPERVISORD_PROCESS_CONFIG_BLOCK.format_map(worker_config)
 
+        worker_configs.append(worker_config)
+
         # Add nginx location blocks for this worker's endpoints (if any are defined)
         for pattern in worker_config["endpoint_patterns"]:
             # Determine whether we need to load-balance this worker
@@ -530,7 +535,15 @@ def generate_worker_files(
         "/conf/supervisord.conf.j2",
         "/etc/supervisor/supervisord.conf",
         main_config_path=config_path,
-        worker_config=supervisord_config,
+        #worker_config=supervisord_config,
+        worker_config="",
+    )
+
+    convert(
+        "/conf/synapse_forking.supervisord.conf.j2",
+        "/etc/supervisor/conf.d/synapse_forking.supervisor.conf",
+        worker_configs=worker_configs,
+        main_config_path=config_path,
     )
 
     # healthcheck config
@@ -562,7 +575,7 @@ def generate_worker_log_config(
     # Render and write the file
     log_config_filepath = "/conf/workers/{name}.log.config".format(name=worker_name)
     convert(
-        "/conf/log.config",
+        "/conf/log_config.yaml.j2",
         log_config_filepath,
         worker_name=worker_name,
         **extra_log_template_args,
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index a3446ac6e8..e564838180 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -106,7 +106,7 @@ def register_sighup(func: Callable[P, None], *args: P.args, **kwargs: P.kwargs)
 def start_worker_reactor(
     appname: str,
     config: HomeServerConfig,
-    run_command: Callable[[], None] = reactor.run,
+    run_command: Callable[[], None] = lambda: reactor.run(),
 ) -> None:
     """Run the reactor in the main process
 
@@ -141,7 +141,7 @@ def start_reactor(
     daemonize: bool,
     print_pidfile: bool,
     logger: logging.Logger,
-    run_command: Callable[[], None] = reactor.run,
+    run_command: Callable[[], None] = lambda: reactor.run(),
 ) -> None:
     """Run the reactor in the main process
 
diff --git a/synapse/app/_complement_fork_starter.py b/synapse/app/_complement_fork_starter.py
index fc29e6e5b7..9cdb819394 100644
--- a/synapse/app/_complement_fork_starter.py
+++ b/synapse/app/_complement_fork_starter.py
@@ -31,22 +31,54 @@ import sys
 #     synapse.app.generic_worker [args..] -- \
 #   ...
 #     synapse.app.generic_worker [args..]
-from typing import Callable, List
+from typing import Callable, List, Any
 
+from twisted.internet.main import installReactor
 
-def _worker_entrypoint(func: Callable[[], None], args: List[str]) -> None:
+
+class ProxiedReactor:
+    """
+    Global state is horrible. Use this proxy reactor so we can 'reinstall'
+    the reactor by changing the target of the proxy.
+    """
+
+    def __init__(self):
+        self.___reactor_target = None
+
+    def ___install(self, new_reactor):
+        self.___reactor_target = new_reactor
+
+    def __getattr__(self, attr_name: str) -> Any:
+        if attr_name == "___install":
+            return self.___install
+        return getattr(self.___reactor_target, attr_name)
+
+
+def _worker_entrypoint(func: Callable[[], None], proxy_reactor: ProxiedReactor, args: List[str]) -> None:
     sys.argv = args
+
+    from twisted.internet.epollreactor import EPollReactor
+    proxy_reactor.___install(EPollReactor())
     func()
 
 
 def main() -> None:
     # Split up the arguments into each workers' arguments
+    # Strip out any newlines.
+    # HACK
+    db_config_path = sys.argv[1]
+    args = [arg.replace("\n", "") for arg in sys.argv[2:]]
     args_by_worker: List[List[str]] = [
         list(args)
-        for cond, args in itertools.groupby(sys.argv[1:], lambda ele: ele != "--")
-        if cond
+        for cond, args in itertools.groupby(args, lambda ele: ele != "--")
+        if cond and args
     ]
-    print(args_by_worker)
+
+    # Prevent Twisted from installing a shared reactor that all the workers will
+    # pick up.
+    proxy_reactor = ProxiedReactor()
+    installReactor(proxy_reactor)
+
     # Import the entrypoints for all the workers
     worker_functions = []
     for worker_args in args_by_worker:
@@ -61,10 +93,24 @@ def main() -> None:
     # which *can* use fork() on Unix platforms.
     # Now we fork our process!
 
+    # TODO Can we do this better?
+    # We need to prepare the database first as otherwise all the workers will
+    # try to create a schema version table and some will crash out.
+    # HACK
+    from synapse._scripts import update_synapse_database
+    update_proc = multiprocessing.Process(
+        target=_worker_entrypoint, args=(update_synapse_database.main, proxy_reactor, ["update_synapse_database", "--database-config", db_config_path, "--run-background-updates"])
+    )
+    print("===== PREPARING DATABASE =====", file=sys.stderr)
+    update_proc.start()
+    print("JNG UPROC", file=sys.stderr)
+    update_proc.join()
+    print("===== PREPARED DATABASE =====", file=sys.stderr)
+
     processes = []
     for (func, worker_args) in zip(worker_functions, args_by_worker):
         process = multiprocessing.Process(
-            target=_worker_entrypoint, args=(func, worker_args)
+            target=_worker_entrypoint, args=(func, proxy_reactor, worker_args)
         )
         process.start()
         processes.append(process)