diff options
author | Olivier Wilkinson (reivilibre) <oliverw@matrix.org> | 2022-05-27 13:20:39 +0100 |
---|---|---|
committer | Olivier Wilkinson (reivilibre) <oliverw@matrix.org> | 2022-05-27 13:20:43 +0100 |
commit | 2102b5d4dc4dbca41b4c61a4eccd14405691e52d (patch) | |
tree | 96b9c3d8257d55952381d1db35504633aed23462 | |
parent | Add a primitive forking process 'manager' for Synapse workers (diff) | |
download | synapse-2102b5d4dc4dbca41b4c61a4eccd14405691e52d.tar.xz |
SCARY HACKS
-rw-r--r-- | docker/complement/SynapseWorkers.Dockerfile | 3 | ||||
-rw-r--r-- | docker/complement/conf-workers/synapse_forking.supervisord.conf.j2 | 26 | ||||
-rw-r--r-- | docker/complement/conf/log_config.yaml.j2 (renamed from docker/complement/conf/log_config.yaml) | 6 | ||||
-rw-r--r-- | docker/conf-workers/supervisord.conf.j2 | 24 | ||||
-rwxr-xr-x | docker/configure_workers_and_start.py | 17 | ||||
-rw-r--r-- | synapse/app/_base.py | 4 | ||||
-rw-r--r-- | synapse/app/_complement_fork_starter.py | 58 |
7 files changed, 115 insertions, 23 deletions
diff --git a/docker/complement/SynapseWorkers.Dockerfile b/docker/complement/SynapseWorkers.Dockerfile index 99a09cbc2b..a5a7dd6f67 100644 --- a/docker/complement/SynapseWorkers.Dockerfile +++ b/docker/complement/SynapseWorkers.Dockerfile @@ -26,6 +26,9 @@ COPY conf-workers/workers-shared.yaml /conf/workers/shared.yaml WORKDIR /data COPY conf-workers/postgres.supervisord.conf /etc/supervisor/conf.d/postgres.conf +COPY conf-workers/synapse_forking.supervisord.conf.j2 /conf/ + +COPY conf/log_config.yaml.j2 /conf/ # Copy the entrypoint COPY conf-workers/start-complement-synapse-workers.sh / diff --git a/docker/complement/conf-workers/synapse_forking.supervisord.conf.j2 b/docker/complement/conf-workers/synapse_forking.supervisord.conf.j2 new file mode 100644 index 0000000000..9a21530341 --- /dev/null +++ b/docker/complement/conf-workers/synapse_forking.supervisord.conf.j2 @@ -0,0 +1,26 @@ +[program:synapse_forking] +# TODO prefix-log will be no good. We'll have to hack around ourselves. +command=/usr/local/bin/prefix-log /usr/local/bin/python -m synapse.app._complement_fork_starter /data/homeserver.yaml \ +{%- for worker_config in worker_configs %} + -- \ + {{ worker_config.app }} + --config-path="{{ worker_config.config_path }}" \ + --config-path=/conf/workers/shared.yaml \ + --config-path=/conf/workers/{{ worker_config.name }}.yaml \ +{%- endfor %} + -- \ + synapse.app.homeserver \ + --config-path="{{ main_config_path }}" \ + --config-path=/conf/workers/shared.yaml + +autorestart=unexpected +priority=500 +exitcodes=0 +stdout_logfile=/dev/stdout +stdout_logfile_maxbytes=0 +stderr_logfile=/dev/stderr +stderr_logfile_maxbytes=0 + +# Required because the forking launcher creates subprocesses but doesn't +# handle signals for us. +stopasgroup=true diff --git a/docker/complement/conf/log_config.yaml b/docker/complement/conf/log_config.yaml.j2 index c33fd6cd00..94ce49854a 100644 --- a/docker/complement/conf/log_config.yaml +++ b/docker/complement/conf/log_config.yaml.j2 @@ -2,7 +2,11 @@ version: 1 formatters: precise: - format: '%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s - %(message)s' + {% if worker_name %} + format: '{{ worker_name }} | %(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s - %(message)s' + {% else %} + format: '%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s - %(message)s' + {% endif %} filters: context: diff --git a/docker/conf-workers/supervisord.conf.j2 b/docker/conf-workers/supervisord.conf.j2 index ca1f7aef8e..cd28b45348 100644 --- a/docker/conf-workers/supervisord.conf.j2 +++ b/docker/conf-workers/supervisord.conf.j2 @@ -28,17 +28,17 @@ stderr_logfile_maxbytes=0 username=redis autorestart=true -[program:synapse_main] -command=/usr/local/bin/prefix-log /usr/local/bin/python -m synapse.app.homeserver --config-path="{{ main_config_path }}" --config-path=/conf/workers/shared.yaml -priority=10 -# Log startup failures to supervisord's stdout/err -# Regular synapse logs will still go in the configured data directory -stdout_logfile=/dev/stdout -stdout_logfile_maxbytes=0 -stderr_logfile=/dev/stderr -stderr_logfile_maxbytes=0 -autorestart=unexpected -exitcodes=0 +## [program:synapse_main] +## command=/usr/local/bin/prefix-log /usr/local/bin/python -m synapse.app.homeserver --config-path="{{ main_config_path }}" --config-path=/conf/workers/shared.yaml +## priority=10 +## # Log startup failures to supervisord's stdout/err +## # Regular synapse logs will still go in the configured data directory +## stdout_logfile=/dev/stdout +## stdout_logfile_maxbytes=0 +## stderr_logfile=/dev/stderr +## stderr_logfile_maxbytes=0 +## autorestart=unexpected +## exitcodes=0 # Additional process blocks -{{ worker_config }} \ No newline at end of file +{{ worker_config }} diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py index f7dac90222..9dbdd8f38b 100755 --- a/docker/configure_workers_and_start.py +++ b/docker/configure_workers_and_start.py @@ -401,8 +401,11 @@ def generate_worker_files( # which exists even if no workers do. healthcheck_urls = ["http://localhost:8080/health"] + worker_configs: List[Dict[str, Any]] = [] + # For each worker type specified by the user, create config values for worker_type in worker_types: + startup_config: Dict[str, Any] = {} worker_type = worker_type.strip() worker_config = WORKERS_CONFIG.get(worker_type) @@ -438,6 +441,8 @@ def generate_worker_files( # Enable the worker in supervisord supervisord_config += SUPERVISORD_PROCESS_CONFIG_BLOCK.format_map(worker_config) + worker_configs.append(worker_config) + # Add nginx location blocks for this worker's endpoints (if any are defined) for pattern in worker_config["endpoint_patterns"]: # Determine whether we need to load-balance this worker @@ -530,7 +535,15 @@ def generate_worker_files( "/conf/supervisord.conf.j2", "/etc/supervisor/supervisord.conf", main_config_path=config_path, - worker_config=supervisord_config, + #worker_config=supervisord_config, + worker_config="", + ) + + convert( + "/conf/synapse_forking.supervisord.conf.j2", + "/etc/supervisor/conf.d/synapse_forking.supervisor.conf", + worker_configs=worker_configs, + main_config_path=config_path, ) # healthcheck config @@ -562,7 +575,7 @@ def generate_worker_log_config( # Render and write the file log_config_filepath = "/conf/workers/{name}.log.config".format(name=worker_name) convert( - "/conf/log.config", + "/conf/log_config.yaml.j2", log_config_filepath, worker_name=worker_name, **extra_log_template_args, diff --git a/synapse/app/_base.py b/synapse/app/_base.py index a3446ac6e8..e564838180 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -106,7 +106,7 @@ def register_sighup(func: Callable[P, None], *args: P.args, **kwargs: P.kwargs) def start_worker_reactor( appname: str, config: HomeServerConfig, - run_command: Callable[[], None] = reactor.run, + run_command: Callable[[], None] = lambda: reactor.run(), ) -> None: """Run the reactor in the main process @@ -141,7 +141,7 @@ def start_reactor( daemonize: bool, print_pidfile: bool, logger: logging.Logger, - run_command: Callable[[], None] = reactor.run, + run_command: Callable[[], None] = lambda: reactor.run(), ) -> None: """Run the reactor in the main process diff --git a/synapse/app/_complement_fork_starter.py b/synapse/app/_complement_fork_starter.py index fc29e6e5b7..9cdb819394 100644 --- a/synapse/app/_complement_fork_starter.py +++ b/synapse/app/_complement_fork_starter.py @@ -31,22 +31,54 @@ import sys # synapse.app.generic_worker [args..] -- \ # ... # synapse.app.generic_worker [args..] -from typing import Callable, List +from typing import Callable, List, Any +from twisted.internet.main import installReactor -def _worker_entrypoint(func: Callable[[], None], args: List[str]) -> None: + +class ProxiedReactor: + """ + Global state is horrible. Use this proxy reactor so we can 'reinstall' + the reactor by changing the target of the proxy. + """ + + def __init__(self): + self.___reactor_target = None + + def ___install(self, new_reactor): + self.___reactor_target = new_reactor + + def __getattr__(self, attr_name: str) -> Any: + if attr_name == "___install": + return self.___install + return getattr(self.___reactor_target, attr_name) + + +def _worker_entrypoint(func: Callable[[], None], proxy_reactor: ProxiedReactor, args: List[str]) -> None: sys.argv = args + + from twisted.internet.epollreactor import EPollReactor + proxy_reactor.___install(EPollReactor()) func() def main() -> None: # Split up the arguments into each workers' arguments + # Strip out any newlines. + # HACK + db_config_path = sys.argv[1] + args = [arg.replace("\n", "") for arg in sys.argv[2:]] args_by_worker: List[List[str]] = [ list(args) - for cond, args in itertools.groupby(sys.argv[1:], lambda ele: ele != "--") - if cond + for cond, args in itertools.groupby(args, lambda ele: ele != "--") + if cond and args ] - print(args_by_worker) + + # Prevent Twisted from installing a shared reactor that all the workers will + # pick up. + proxy_reactor = ProxiedReactor() + installReactor(proxy_reactor) + # Import the entrypoints for all the workers worker_functions = [] for worker_args in args_by_worker: @@ -61,10 +93,24 @@ def main() -> None: # which *can* use fork() on Unix platforms. # Now we fork our process! + # TODO Can we do this better? + # We need to prepare the database first as otherwise all the workers will + # try to create a schema version table and some will crash out. + # HACK + from synapse._scripts import update_synapse_database + update_proc = multiprocessing.Process( + target=_worker_entrypoint, args=(update_synapse_database.main, proxy_reactor, ["update_synapse_database", "--database-config", db_config_path, "--run-background-updates"]) + ) + print("===== PREPARING DATABASE =====", file=sys.stderr) + update_proc.start() + print("JNG UPROC", file=sys.stderr) + update_proc.join() + print("===== PREPARED DATABASE =====", file=sys.stderr) + processes = [] for (func, worker_args) in zip(worker_functions, args_by_worker): process = multiprocessing.Process( - target=_worker_entrypoint, args=(func, worker_args) + target=_worker_entrypoint, args=(func, proxy_reactor, worker_args) ) process.start() processes.append(process) |