summary refs log tree commit diff
path: root/synapse/app/_base.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/app/_base.py')
-rw-r--r--synapse/app/_base.py149
1 files changed, 93 insertions, 56 deletions
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index 895b38ae76..9840a9d55b 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2017 New Vector Ltd
+# Copyright 2019-2021 The Matrix.org Foundation C.I.C
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -15,11 +16,12 @@
 import gc
 import logging
 import os
+import platform
 import signal
 import socket
 import sys
 import traceback
-from typing import Iterable
+from typing import Awaitable, Callable, Iterable
 
 from typing_extensions import NoReturn
 
@@ -143,6 +145,45 @@ def quit_with_error(error_string: str) -> NoReturn:
     sys.exit(1)
 
 
+def register_start(cb: Callable[..., Awaitable], *args, **kwargs) -> None:
+    """Register a callback with the reactor, to be called once it is running
+
+    This can be used to initialise parts of the system which require an asynchronous
+    setup.
+
+    Any exception raised by the callback will be printed and logged, and the process
+    will exit.
+    """
+
+    async def wrapper():
+        try:
+            await cb(*args, **kwargs)
+        except Exception:
+            # previously, we used Failure().printTraceback() here, in the hope that
+            # would give better tracebacks than traceback.print_exc(). However, that
+            # doesn't handle chained exceptions (with a __cause__ or __context__) well,
+            # and I *think* the need for Failure() is reduced now that we mostly use
+            # async/await.
+
+            # Write the exception to both the logs *and* the unredirected stderr,
+            # because people tend to get confused if it only goes to one or the other.
+            #
+            # One problem with this is that if people are using a logging config that
+            # logs to the console (as is common eg under docker), they will get two
+            # copies of the exception. We could maybe try to detect that, but it's
+            # probably a cost we can bear.
+            logger.fatal("Error during startup", exc_info=True)
+            print("Error during startup:", file=sys.__stderr__)
+            traceback.print_exc(file=sys.__stderr__)
+
+            # it's no use calling sys.exit here, since that just raises a SystemExit
+            # exception which is then caught by the reactor, and everything carries
+            # on as normal.
+            os._exit(1)
+
+    reactor.callWhenRunning(lambda: defer.ensureDeferred(wrapper()))
+
+
 def listen_metrics(bind_addresses, port):
     """
     Start Prometheus metrics server.
@@ -227,7 +268,7 @@ def refresh_certificate(hs):
         logger.info("Context factories updated.")
 
 
-def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerConfig]):
+async def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerConfig]):
     """
     Start a Synapse server or worker.
 
@@ -241,71 +282,67 @@ def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerConfig]):
         hs: homeserver instance
         listeners: Listener configuration ('listeners' in homeserver.yaml)
     """
-    try:
-        # Set up the SIGHUP machinery.
-        if hasattr(signal, "SIGHUP"):
+    # Set up the SIGHUP machinery.
+    if hasattr(signal, "SIGHUP"):
+        reactor = hs.get_reactor()
 
-            @wrap_as_background_process("sighup")
-            def handle_sighup(*args, **kwargs):
-                # Tell systemd our state, if we're using it. This will silently fail if
-                # we're not using systemd.
-                sdnotify(b"RELOADING=1")
+        @wrap_as_background_process("sighup")
+        def handle_sighup(*args, **kwargs):
+            # Tell systemd our state, if we're using it. This will silently fail if
+            # we're not using systemd.
+            sdnotify(b"RELOADING=1")
 
-                for i, args, kwargs in _sighup_callbacks:
-                    i(*args, **kwargs)
+            for i, args, kwargs in _sighup_callbacks:
+                i(*args, **kwargs)
 
-                sdnotify(b"READY=1")
+            sdnotify(b"READY=1")
 
-            # We defer running the sighup handlers until next reactor tick. This
-            # is so that we're in a sane state, e.g. flushing the logs may fail
-            # if the sighup happens in the middle of writing a log entry.
-            def run_sighup(*args, **kwargs):
-                hs.get_clock().call_later(0, handle_sighup, *args, **kwargs)
+        # We defer running the sighup handlers until next reactor tick. This
+        # is so that we're in a sane state, e.g. flushing the logs may fail
+        # if the sighup happens in the middle of writing a log entry.
+        def run_sighup(*args, **kwargs):
+            # `callFromThread` should be "signal safe" as well as thread
+            # safe.
+            reactor.callFromThread(handle_sighup, *args, **kwargs)
 
-            signal.signal(signal.SIGHUP, run_sighup)
+        signal.signal(signal.SIGHUP, run_sighup)
 
-            register_sighup(refresh_certificate, hs)
+        register_sighup(refresh_certificate, hs)
 
-        # Load the certificate from disk.
-        refresh_certificate(hs)
+    # Load the certificate from disk.
+    refresh_certificate(hs)
 
-        # Start the tracer
-        synapse.logging.opentracing.init_tracer(  # type: ignore[attr-defined] # noqa
-            hs
-        )
+    # Start the tracer
+    synapse.logging.opentracing.init_tracer(  # type: ignore[attr-defined] # noqa
+        hs
+    )
 
-        # It is now safe to start your Synapse.
-        hs.start_listening(listeners)
-        hs.get_datastore().db_pool.start_profiling()
-        hs.get_pusherpool().start()
+    # It is now safe to start your Synapse.
+    hs.start_listening(listeners)
+    hs.get_datastore().db_pool.start_profiling()
+    hs.get_pusherpool().start()
 
-        # Log when we start the shut down process.
-        hs.get_reactor().addSystemEventTrigger(
-            "before", "shutdown", logger.info, "Shutting down..."
-        )
+    # Log when we start the shut down process.
+    hs.get_reactor().addSystemEventTrigger(
+        "before", "shutdown", logger.info, "Shutting down..."
+    )
 
-        setup_sentry(hs)
-        setup_sdnotify(hs)
-
-        # If background tasks are running on the main process, start collecting the
-        # phone home stats.
-        if hs.config.run_background_tasks:
-            start_phone_stats_home(hs)
-
-        # We now freeze all allocated objects in the hopes that (almost)
-        # everything currently allocated are things that will be used for the
-        # rest of time. Doing so means less work each GC (hopefully).
-        #
-        # This only works on Python 3.7
-        if sys.version_info >= (3, 7):
-            gc.collect()
-            gc.freeze()
-    except Exception:
-        traceback.print_exc(file=sys.stderr)
-        reactor = hs.get_reactor()
-        if reactor.running:
-            reactor.stop()
-        sys.exit(1)
+    setup_sentry(hs)
+    setup_sdnotify(hs)
+
+    # If background tasks are running on the main process, start collecting the
+    # phone home stats.
+    if hs.config.run_background_tasks:
+        start_phone_stats_home(hs)
+
+    # We now freeze all allocated objects in the hopes that (almost)
+    # everything currently allocated are things that will be used for the
+    # rest of time. Doing so means less work each GC (hopefully).
+    #
+    # This only works on Python 3.7
+    if platform.python_implementation() == "CPython" and sys.version_info >= (3, 7):
+        gc.collect()
+        gc.freeze()
 
 
 def setup_sentry(hs):