diff options
author | Richard van der Hoff <1389908+richvdh@users.noreply.github.com> | 2022-09-26 23:07:02 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-09-26 23:07:02 +0100 |
commit | d6b85a2a7dea2737e69d67842c2246975ec64bce (patch) | |
tree | 62d0b0e0b9e3c091a1b4829dee61e71a11890128 /synapse/app/complement_fork_starter.py | |
parent | Improve tests for get_unread_push_actions_for_user_in_range_*. (#13893) (diff) | |
download | synapse-d6b85a2a7dea2737e69d67842c2246975ec64bce.tar.xz |
Complement image: propagate SIGTERM to all workers (#13914)
This should mean that logs from worker processes are flushed before shutdown. When a test completes, Complement stops the docker container, which means that synapse will receive a SIGTERM. Currently, the `complement_fork_starter` exits immediately (without notifying the worker processes), which means that the workers never get a chance to flush their logs before the whole container is vaped. We can fix this by propagating the SIGTERM to the children.
Diffstat (limited to '')
-rw-r--r-- | synapse/app/complement_fork_starter.py | 32 |
1 files changed, 30 insertions, 2 deletions
diff --git a/synapse/app/complement_fork_starter.py b/synapse/app/complement_fork_starter.py index 89eb07df27..b22f315453 100644 --- a/synapse/app/complement_fork_starter.py +++ b/synapse/app/complement_fork_starter.py @@ -51,11 +51,18 @@ import argparse import importlib import itertools import multiprocessing +import os +import signal import sys -from typing import Any, Callable, List +from types import FrameType +from typing import Any, Callable, List, Optional from twisted.internet.main import installReactor +# a list of the original signal handlers, before we installed our custom ones. +# We restore these in our child processes. +_original_signal_handlers: dict[int, Any] = {} + class ProxiedReactor: """ @@ -105,6 +112,11 @@ def _worker_entrypoint( sys.argv = args + # reset the custom signal handlers that we installed, so that the children start + # from a clean slate. + for sig, handler in _original_signal_handlers.items(): + signal.signal(sig, handler) + from twisted.internet.epollreactor import EPollReactor proxy_reactor._install_real_reactor(EPollReactor()) @@ -167,13 +179,29 @@ def main() -> None: update_proc.join() print("===== PREPARED DATABASE =====", file=sys.stderr) + processes: List[multiprocessing.Process] = [] + + # Install signal handlers to propagate signals to all our children, so that they + # shut down cleanly. This also inhibits our own exit, but that's good: we want to + # wait until the children have exited. + def handle_signal(signum: int, frame: Optional[FrameType]) -> None: + print( + f"complement_fork_starter: Caught signal {signum}. Stopping children.", + file=sys.stderr, + ) + for p in processes: + if p.pid: + os.kill(p.pid, signum) + + for sig in (signal.SIGINT, signal.SIGTERM): + _original_signal_handlers[sig] = signal.signal(sig, handle_signal) + # At this point, we've imported all the main entrypoints for all the workers. # Now we basically just fork() out to create the workers we need. # Because we're using fork(), all the workers get a clone of this launcher's # memory space and don't need to repeat the work of loading the code! # Instead of using fork() directly, we use the multiprocessing library, # which uses fork() on Unix platforms. - processes = [] for (func, worker_args) in zip(worker_functions, args_by_worker): process = multiprocessing.Process( target=_worker_entrypoint, args=(func, proxy_reactor, worker_args) |