summary refs log tree commit diff
path: root/synapse/app/_base.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/app/_base.py')
-rw-r--r--synapse/app/_base.py144
1 files changed, 37 insertions, 107 deletions
diff --git a/synapse/app/_base.py b/synapse/app/_base.py

index 1329af2e2b..8879136881 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py
@@ -26,7 +26,9 @@ from typing import Awaitable, Callable, Iterable from cryptography.utils import CryptographyDeprecationWarning from typing_extensions import NoReturn +import twisted from twisted.internet import defer, error, reactor +from twisted.logger import LoggingFile, LogLevel from twisted.protocols.tls import TLSMemoryBIOFactory import synapse @@ -35,10 +37,10 @@ from synapse.app import check_bind_error from synapse.app.phone_stats_home import start_phone_stats_home from synapse.config.homeserver import HomeServerConfig from synapse.crypto import context_factory +from synapse.events.spamcheck import load_legacy_spam_checkers from synapse.logging.context import PreserveLoggingContext from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.metrics.jemalloc import setup_jemalloc_stats -from synapse.util.async_helpers import Linearizer from synapse.util.daemonize import daemonize_process from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string @@ -112,8 +114,6 @@ def start_reactor( run_command (Callable[]): callable that actually runs the reactor """ - install_dns_limiter(reactor) - def run(): logger.info("Running") setup_jemalloc_stats() @@ -141,7 +141,7 @@ def start_reactor( def quit_with_error(error_string: str) -> NoReturn: message_lines = error_string.split("\n") - line_length = max(len(line) for line in message_lines if len(line) < 80) + 2 + line_length = min(max(len(line) for line in message_lines), 80) + 2 sys.stderr.write("*" * line_length + "\n") for line in message_lines: sys.stderr.write(" %s\n" % (line.rstrip(),)) @@ -149,6 +149,30 @@ def quit_with_error(error_string: str) -> NoReturn: sys.exit(1) +def handle_startup_exception(e: Exception) -> NoReturn: + # Exceptions that occur between setting up the logging and forking or starting + # the reactor are written to the logs, followed by a summary to stderr. + logger.exception("Exception during startup") + quit_with_error( + f"Error during initialisation:\n {e}\nThere may be more information in the logs." + ) + + +def redirect_stdio_to_logs() -> None: + streams = [("stdout", LogLevel.info), ("stderr", LogLevel.error)] + + for (stream, level) in streams: + oldStream = getattr(sys, stream) + loggingFile = LoggingFile( + logger=twisted.logger.Logger(namespace=stream), + level=level, + encoding=getattr(oldStream, "encoding", None), + ) + setattr(sys, stream, loggingFile) + + print("Redirected stdout/stderr to logs") + + def register_start(cb: Callable[..., Awaitable], *args, **kwargs) -> None: """Register a callback with the reactor, to be called once it is running @@ -292,8 +316,7 @@ async def start(hs: "synapse.server.HomeServer"): """ Start a Synapse server or worker. - Should be called once the reactor is running and (if we're using ACME) the - TLS certificates are in place. + Should be called once the reactor is running. Will start the main HTTP listeners and do some other startup tasks, and then notify systemd. @@ -334,6 +357,14 @@ async def start(hs: "synapse.server.HomeServer"): # Start the tracer synapse.logging.opentracing.init_tracer(hs) # type: ignore[attr-defined] # noqa + # Instantiate the modules so they can register their web resources to the module API + # before we start the listeners. + module_api = hs.get_module_api() + for module, config in hs.config.modules.loaded_modules: + module(config=config, api=module_api) + + load_legacy_spam_checkers(hs) + # It is now safe to start your Synapse. hs.start_listening() hs.get_datastore().db_pool.start_profiling() @@ -398,107 +429,6 @@ def setup_sdnotify(hs): ) -def install_dns_limiter(reactor, max_dns_requests_in_flight=100): - """Replaces the resolver with one that limits the number of in flight DNS - requests. - - This is to workaround https://twistedmatrix.com/trac/ticket/9620, where we - can run out of file descriptors and infinite loop if we attempt to do too - many DNS queries at once - - XXX: I'm confused by this. reactor.nameResolver does not use twisted.names unless - you explicitly install twisted.names as the resolver; rather it uses a GAIResolver - backed by the reactor's default threadpool (which is limited to 10 threads). So - (a) I don't understand why twisted ticket 9620 is relevant, and (b) I don't - understand why we would run out of FDs if we did too many lookups at once. - -- richvdh 2020/08/29 - """ - new_resolver = _LimitedHostnameResolver( - reactor.nameResolver, max_dns_requests_in_flight - ) - - reactor.installNameResolver(new_resolver) - - -class _LimitedHostnameResolver: - """Wraps a IHostnameResolver, limiting the number of in-flight DNS lookups.""" - - def __init__(self, resolver, max_dns_requests_in_flight): - self._resolver = resolver - self._limiter = Linearizer( - name="dns_client_limiter", max_count=max_dns_requests_in_flight - ) - - def resolveHostName( - self, - resolutionReceiver, - hostName, - portNumber=0, - addressTypes=None, - transportSemantics="TCP", - ): - # We need this function to return `resolutionReceiver` so we do all the - # actual logic involving deferreds in a separate function. - - # even though this is happening within the depths of twisted, we need to drop - # our logcontext before starting _resolve, otherwise: (a) _resolve will drop - # the logcontext if it returns an incomplete deferred; (b) _resolve will - # call the resolutionReceiver *with* a logcontext, which it won't be expecting. - with PreserveLoggingContext(): - self._resolve( - resolutionReceiver, - hostName, - portNumber, - addressTypes, - transportSemantics, - ) - - return resolutionReceiver - - @defer.inlineCallbacks - def _resolve( - self, - resolutionReceiver, - hostName, - portNumber=0, - addressTypes=None, - transportSemantics="TCP", - ): - - with (yield self._limiter.queue(())): - # resolveHostName doesn't return a Deferred, so we need to hook into - # the receiver interface to get told when resolution has finished. - - deferred = defer.Deferred() - receiver = _DeferredResolutionReceiver(resolutionReceiver, deferred) - - self._resolver.resolveHostName( - receiver, hostName, portNumber, addressTypes, transportSemantics - ) - - yield deferred - - -class _DeferredResolutionReceiver: - """Wraps a IResolutionReceiver and simply resolves the given deferred when - resolution is complete - """ - - def __init__(self, receiver, deferred): - self._receiver = receiver - self._deferred = deferred - - def resolutionBegan(self, resolutionInProgress): - self._receiver.resolutionBegan(resolutionInProgress) - - def addressResolved(self, address): - self._receiver.addressResolved(address) - - def resolutionComplete(self): - self._deferred.callback(()) - self._receiver.resolutionComplete() - - sdnotify_sockaddr = os.getenv("NOTIFY_SOCKET")