diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index 1329af2e2b..8879136881 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -26,7 +26,9 @@ from typing import Awaitable, Callable, Iterable
from cryptography.utils import CryptographyDeprecationWarning
from typing_extensions import NoReturn
+import twisted
from twisted.internet import defer, error, reactor
+from twisted.logger import LoggingFile, LogLevel
from twisted.protocols.tls import TLSMemoryBIOFactory
import synapse
@@ -35,10 +37,10 @@ from synapse.app import check_bind_error
from synapse.app.phone_stats_home import start_phone_stats_home
from synapse.config.homeserver import HomeServerConfig
from synapse.crypto import context_factory
+from synapse.events.spamcheck import load_legacy_spam_checkers
from synapse.logging.context import PreserveLoggingContext
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.metrics.jemalloc import setup_jemalloc_stats
-from synapse.util.async_helpers import Linearizer
from synapse.util.daemonize import daemonize_process
from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string
@@ -112,8 +114,6 @@ def start_reactor(
run_command (Callable[]): callable that actually runs the reactor
"""
- install_dns_limiter(reactor)
-
def run():
logger.info("Running")
setup_jemalloc_stats()
@@ -141,7 +141,7 @@ def start_reactor(
def quit_with_error(error_string: str) -> NoReturn:
message_lines = error_string.split("\n")
- line_length = max(len(line) for line in message_lines if len(line) < 80) + 2
+ line_length = min(max(len(line) for line in message_lines), 80) + 2
sys.stderr.write("*" * line_length + "\n")
for line in message_lines:
sys.stderr.write(" %s\n" % (line.rstrip(),))
@@ -149,6 +149,30 @@ def quit_with_error(error_string: str) -> NoReturn:
sys.exit(1)
+def handle_startup_exception(e: Exception) -> NoReturn:
+ # Exceptions that occur between setting up the logging and forking or starting
+ # the reactor are written to the logs, followed by a summary to stderr.
+ logger.exception("Exception during startup")
+ quit_with_error(
+ f"Error during initialisation:\n {e}\nThere may be more information in the logs."
+ )
+
+
+def redirect_stdio_to_logs() -> None:
+ streams = [("stdout", LogLevel.info), ("stderr", LogLevel.error)]
+
+ for (stream, level) in streams:
+ oldStream = getattr(sys, stream)
+ loggingFile = LoggingFile(
+ logger=twisted.logger.Logger(namespace=stream),
+ level=level,
+ encoding=getattr(oldStream, "encoding", None),
+ )
+ setattr(sys, stream, loggingFile)
+
+ print("Redirected stdout/stderr to logs")
+
+
def register_start(cb: Callable[..., Awaitable], *args, **kwargs) -> None:
"""Register a callback with the reactor, to be called once it is running
@@ -292,8 +316,7 @@ async def start(hs: "synapse.server.HomeServer"):
"""
Start a Synapse server or worker.
- Should be called once the reactor is running and (if we're using ACME) the
- TLS certificates are in place.
+ Should be called once the reactor is running.
Will start the main HTTP listeners and do some other startup tasks, and then
notify systemd.
@@ -334,6 +357,14 @@ async def start(hs: "synapse.server.HomeServer"):
# Start the tracer
synapse.logging.opentracing.init_tracer(hs) # type: ignore[attr-defined] # noqa
+ # Instantiate the modules so they can register their web resources to the module API
+ # before we start the listeners.
+ module_api = hs.get_module_api()
+ for module, config in hs.config.modules.loaded_modules:
+ module(config=config, api=module_api)
+
+ load_legacy_spam_checkers(hs)
+
# It is now safe to start your Synapse.
hs.start_listening()
hs.get_datastore().db_pool.start_profiling()
@@ -398,107 +429,6 @@ def setup_sdnotify(hs):
)
-def install_dns_limiter(reactor, max_dns_requests_in_flight=100):
- """Replaces the resolver with one that limits the number of in flight DNS
- requests.
-
- This is to workaround https://twistedmatrix.com/trac/ticket/9620, where we
- can run out of file descriptors and infinite loop if we attempt to do too
- many DNS queries at once
-
- XXX: I'm confused by this. reactor.nameResolver does not use twisted.names unless
- you explicitly install twisted.names as the resolver; rather it uses a GAIResolver
- backed by the reactor's default threadpool (which is limited to 10 threads). So
- (a) I don't understand why twisted ticket 9620 is relevant, and (b) I don't
- understand why we would run out of FDs if we did too many lookups at once.
- -- richvdh 2020/08/29
- """
- new_resolver = _LimitedHostnameResolver(
- reactor.nameResolver, max_dns_requests_in_flight
- )
-
- reactor.installNameResolver(new_resolver)
-
-
-class _LimitedHostnameResolver:
- """Wraps a IHostnameResolver, limiting the number of in-flight DNS lookups."""
-
- def __init__(self, resolver, max_dns_requests_in_flight):
- self._resolver = resolver
- self._limiter = Linearizer(
- name="dns_client_limiter", max_count=max_dns_requests_in_flight
- )
-
- def resolveHostName(
- self,
- resolutionReceiver,
- hostName,
- portNumber=0,
- addressTypes=None,
- transportSemantics="TCP",
- ):
- # We need this function to return `resolutionReceiver` so we do all the
- # actual logic involving deferreds in a separate function.
-
- # even though this is happening within the depths of twisted, we need to drop
- # our logcontext before starting _resolve, otherwise: (a) _resolve will drop
- # the logcontext if it returns an incomplete deferred; (b) _resolve will
- # call the resolutionReceiver *with* a logcontext, which it won't be expecting.
- with PreserveLoggingContext():
- self._resolve(
- resolutionReceiver,
- hostName,
- portNumber,
- addressTypes,
- transportSemantics,
- )
-
- return resolutionReceiver
-
- @defer.inlineCallbacks
- def _resolve(
- self,
- resolutionReceiver,
- hostName,
- portNumber=0,
- addressTypes=None,
- transportSemantics="TCP",
- ):
-
- with (yield self._limiter.queue(())):
- # resolveHostName doesn't return a Deferred, so we need to hook into
- # the receiver interface to get told when resolution has finished.
-
- deferred = defer.Deferred()
- receiver = _DeferredResolutionReceiver(resolutionReceiver, deferred)
-
- self._resolver.resolveHostName(
- receiver, hostName, portNumber, addressTypes, transportSemantics
- )
-
- yield deferred
-
-
-class _DeferredResolutionReceiver:
- """Wraps a IResolutionReceiver and simply resolves the given deferred when
- resolution is complete
- """
-
- def __init__(self, receiver, deferred):
- self._receiver = receiver
- self._deferred = deferred
-
- def resolutionBegan(self, resolutionInProgress):
- self._receiver.resolutionBegan(resolutionInProgress)
-
- def addressResolved(self, address):
- self._receiver.addressResolved(address)
-
- def resolutionComplete(self):
- self._deferred.callback(())
- self._receiver.resolutionComplete()
-
-
sdnotify_sockaddr = os.getenv("NOTIFY_SOCKET")
|