diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index 1329af2e2b..8879136881 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -26,7 +26,9 @@ from typing import Awaitable, Callable, Iterable
from cryptography.utils import CryptographyDeprecationWarning
from typing_extensions import NoReturn
+import twisted
from twisted.internet import defer, error, reactor
+from twisted.logger import LoggingFile, LogLevel
from twisted.protocols.tls import TLSMemoryBIOFactory
import synapse
@@ -35,10 +37,10 @@ from synapse.app import check_bind_error
from synapse.app.phone_stats_home import start_phone_stats_home
from synapse.config.homeserver import HomeServerConfig
from synapse.crypto import context_factory
+from synapse.events.spamcheck import load_legacy_spam_checkers
from synapse.logging.context import PreserveLoggingContext
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.metrics.jemalloc import setup_jemalloc_stats
-from synapse.util.async_helpers import Linearizer
from synapse.util.daemonize import daemonize_process
from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string
@@ -112,8 +114,6 @@ def start_reactor(
run_command (Callable[]): callable that actually runs the reactor
"""
- install_dns_limiter(reactor)
-
def run():
logger.info("Running")
setup_jemalloc_stats()
@@ -141,7 +141,7 @@ def start_reactor(
def quit_with_error(error_string: str) -> NoReturn:
message_lines = error_string.split("\n")
- line_length = max(len(line) for line in message_lines if len(line) < 80) + 2
+ line_length = min(max(len(line) for line in message_lines), 80) + 2
sys.stderr.write("*" * line_length + "\n")
for line in message_lines:
sys.stderr.write(" %s\n" % (line.rstrip(),))
@@ -149,6 +149,30 @@ def quit_with_error(error_string: str) -> NoReturn:
sys.exit(1)
+def handle_startup_exception(e: Exception) -> NoReturn:
+ # Exceptions that occur between setting up the logging and forking or starting
+ # the reactor are written to the logs, followed by a summary to stderr.
+ logger.exception("Exception during startup")
+ quit_with_error(
+ f"Error during initialisation:\n {e}\nThere may be more information in the logs."
+ )
+
+
+def redirect_stdio_to_logs() -> None:
+ streams = [("stdout", LogLevel.info), ("stderr", LogLevel.error)]
+
+ for (stream, level) in streams:
+ oldStream = getattr(sys, stream)
+ loggingFile = LoggingFile(
+ logger=twisted.logger.Logger(namespace=stream),
+ level=level,
+ encoding=getattr(oldStream, "encoding", None),
+ )
+ setattr(sys, stream, loggingFile)
+
+ print("Redirected stdout/stderr to logs")
+
+
def register_start(cb: Callable[..., Awaitable], *args, **kwargs) -> None:
"""Register a callback with the reactor, to be called once it is running
@@ -292,8 +316,7 @@ async def start(hs: "synapse.server.HomeServer"):
"""
Start a Synapse server or worker.
- Should be called once the reactor is running and (if we're using ACME) the
- TLS certificates are in place.
+ Should be called once the reactor is running.
Will start the main HTTP listeners and do some other startup tasks, and then
notify systemd.
@@ -334,6 +357,14 @@ async def start(hs: "synapse.server.HomeServer"):
# Start the tracer
synapse.logging.opentracing.init_tracer(hs) # type: ignore[attr-defined] # noqa
+ # Instantiate the modules so they can register their web resources to the module API
+ # before we start the listeners.
+ module_api = hs.get_module_api()
+ for module, config in hs.config.modules.loaded_modules:
+ module(config=config, api=module_api)
+
+ load_legacy_spam_checkers(hs)
+
# It is now safe to start your Synapse.
hs.start_listening()
hs.get_datastore().db_pool.start_profiling()
@@ -398,107 +429,6 @@ def setup_sdnotify(hs):
)
-def install_dns_limiter(reactor, max_dns_requests_in_flight=100):
- """Replaces the resolver with one that limits the number of in flight DNS
- requests.
-
- This is to workaround https://twistedmatrix.com/trac/ticket/9620, where we
- can run out of file descriptors and infinite loop if we attempt to do too
- many DNS queries at once
-
- XXX: I'm confused by this. reactor.nameResolver does not use twisted.names unless
- you explicitly install twisted.names as the resolver; rather it uses a GAIResolver
- backed by the reactor's default threadpool (which is limited to 10 threads). So
- (a) I don't understand why twisted ticket 9620 is relevant, and (b) I don't
- understand why we would run out of FDs if we did too many lookups at once.
- -- richvdh 2020/08/29
- """
- new_resolver = _LimitedHostnameResolver(
- reactor.nameResolver, max_dns_requests_in_flight
- )
-
- reactor.installNameResolver(new_resolver)
-
-
-class _LimitedHostnameResolver:
- """Wraps a IHostnameResolver, limiting the number of in-flight DNS lookups."""
-
- def __init__(self, resolver, max_dns_requests_in_flight):
- self._resolver = resolver
- self._limiter = Linearizer(
- name="dns_client_limiter", max_count=max_dns_requests_in_flight
- )
-
- def resolveHostName(
- self,
- resolutionReceiver,
- hostName,
- portNumber=0,
- addressTypes=None,
- transportSemantics="TCP",
- ):
- # We need this function to return `resolutionReceiver` so we do all the
- # actual logic involving deferreds in a separate function.
-
- # even though this is happening within the depths of twisted, we need to drop
- # our logcontext before starting _resolve, otherwise: (a) _resolve will drop
- # the logcontext if it returns an incomplete deferred; (b) _resolve will
- # call the resolutionReceiver *with* a logcontext, which it won't be expecting.
- with PreserveLoggingContext():
- self._resolve(
- resolutionReceiver,
- hostName,
- portNumber,
- addressTypes,
- transportSemantics,
- )
-
- return resolutionReceiver
-
- @defer.inlineCallbacks
- def _resolve(
- self,
- resolutionReceiver,
- hostName,
- portNumber=0,
- addressTypes=None,
- transportSemantics="TCP",
- ):
-
- with (yield self._limiter.queue(())):
- # resolveHostName doesn't return a Deferred, so we need to hook into
- # the receiver interface to get told when resolution has finished.
-
- deferred = defer.Deferred()
- receiver = _DeferredResolutionReceiver(resolutionReceiver, deferred)
-
- self._resolver.resolveHostName(
- receiver, hostName, portNumber, addressTypes, transportSemantics
- )
-
- yield deferred
-
-
-class _DeferredResolutionReceiver:
- """Wraps a IResolutionReceiver and simply resolves the given deferred when
- resolution is complete
- """
-
- def __init__(self, receiver, deferred):
- self._receiver = receiver
- self._deferred = deferred
-
- def resolutionBegan(self, resolutionInProgress):
- self._receiver.resolutionBegan(resolutionInProgress)
-
- def addressResolved(self, address):
- self._receiver.addressResolved(address)
-
- def resolutionComplete(self):
- self._deferred.callback(())
- self._receiver.resolutionComplete()
-
-
sdnotify_sockaddr = os.getenv("NOTIFY_SOCKET")
diff --git a/synapse/app/admin_cmd.py b/synapse/app/admin_cmd.py
index 68ae19c977..2878d2c140 100644
--- a/synapse/app/admin_cmd.py
+++ b/synapse/app/admin_cmd.py
@@ -36,7 +36,6 @@ from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.filtering import SlavedFilteringStore
from synapse.replication.slave.storage.groups import SlavedGroupServerStore
-from synapse.replication.slave.storage.presence import SlavedPresenceStore
from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
@@ -54,7 +53,6 @@ class AdminCmdSlavedStore(
SlavedApplicationServiceStore,
SlavedRegistrationStore,
SlavedFilteringStore,
- SlavedPresenceStore,
SlavedGroupServerStore,
SlavedDeviceInboxStore,
SlavedDeviceStore,
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 57c2fc2e88..af8a1833f3 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -32,7 +32,12 @@ from synapse.api.urls import (
SERVER_KEY_V2_PREFIX,
)
from synapse.app import _base
-from synapse.app._base import max_request_body_size, register_start
+from synapse.app._base import (
+ handle_startup_exception,
+ max_request_body_size,
+ redirect_stdio_to_logs,
+ register_start,
+)
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
@@ -354,6 +359,10 @@ class GenericWorkerServer(HomeServer):
if name == "replication":
resources[REPLICATION_PREFIX] = ReplicationRestResource(self)
+ # Attach additional resources registered by modules.
+ resources.update(self._module_web_resources)
+ self._module_web_resources_consumed = True
+
root_resource = create_resource_tree(resources, OptionsResource())
_base.listen_tcp(
@@ -465,14 +474,21 @@ def start(config_options):
setup_logging(hs, config, use_worker_options=True)
- hs.setup()
+ try:
+ hs.setup()
- # Ensure the replication streamer is always started in case we write to any
- # streams. Will no-op if no streams can be written to by this worker.
- hs.get_replication_streamer()
+ # Ensure the replication streamer is always started in case we write to any
+ # streams. Will no-op if no streams can be written to by this worker.
+ hs.get_replication_streamer()
+ except Exception as e:
+ handle_startup_exception(e)
register_start(_base.start, hs)
+ # redirect stdio to the logs, if configured.
+ if not hs.config.no_redirect_stdio:
+ redirect_stdio_to_logs()
+
_base.start_worker_reactor("synapse-generic-worker", config)
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index b2501ee4d7..7af56ac136 100644
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -37,10 +37,11 @@ from synapse.api.urls import (
)
from synapse.app import _base
from synapse.app._base import (
+ handle_startup_exception,
listen_ssl,
listen_tcp,
max_request_body_size,
- quit_with_error,
+ redirect_stdio_to_logs,
register_start,
)
from synapse.config._base import ConfigError
@@ -69,8 +70,6 @@ from synapse.rest.synapse.client import build_synapse_client_resource_tree
from synapse.rest.well_known import WellKnownResource
from synapse.server import HomeServer
from synapse.storage import DataStore
-from synapse.storage.engines import IncorrectDatabaseSetup
-from synapse.storage.prepare_database import UpgradeDatabaseException
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.module_loader import load_module
from synapse.util.versionstring import get_version_string
@@ -124,6 +123,10 @@ class SynapseHomeServer(HomeServer):
)
resources[path] = resource
+ # Attach additional resources registered by modules.
+ resources.update(self._module_web_resources)
+ self._module_web_resources_consumed = True
+
# try to find something useful to redirect '/' to
if WEB_CLIENT_PREFIX in resources:
root_resource = RootOptionsRedirectResource(WEB_CLIENT_PREFIX)
@@ -358,60 +361,10 @@ def setup(config_options):
try:
hs.setup()
- except IncorrectDatabaseSetup as e:
- quit_with_error(str(e))
- except UpgradeDatabaseException as e:
- quit_with_error("Failed to upgrade database: %s" % (e,))
-
- async def do_acme() -> bool:
- """
- Reprovision an ACME certificate, if it's required.
-
- Returns:
- Whether the cert has been updated.
- """
- acme = hs.get_acme_handler()
-
- # Check how long the certificate is active for.
- cert_days_remaining = hs.config.is_disk_cert_valid(allow_self_signed=False)
-
- # We want to reprovision if cert_days_remaining is None (meaning no
- # certificate exists), or the days remaining number it returns
- # is less than our re-registration threshold.
- provision = False
-
- if (
- cert_days_remaining is None
- or cert_days_remaining < hs.config.acme_reprovision_threshold
- ):
- provision = True
-
- if provision:
- await acme.provision_certificate()
-
- return provision
-
- async def reprovision_acme():
- """
- Provision a certificate from ACME, if required, and reload the TLS
- certificate if it's renewed.
- """
- reprovisioned = await do_acme()
- if reprovisioned:
- _base.refresh_certificate(hs)
+ except Exception as e:
+ handle_startup_exception(e)
async def start():
- # Run the ACME provisioning code, if it's enabled.
- if hs.config.acme_enabled:
- acme = hs.get_acme_handler()
- # Start up the webservices which we will respond to ACME
- # challenges with, and then provision.
- await acme.start_listening()
- await do_acme()
-
- # Check if it needs to be reprovisioned every day.
- hs.get_clock().looping_call(reprovision_acme, 24 * 60 * 60 * 1000)
-
# Load the OIDC provider metadatas, if OIDC is enabled.
if hs.config.oidc_enabled:
oidc = hs.get_oidc_handler()
@@ -500,6 +453,11 @@ def main():
# check base requirements
check_requirements()
hs = setup(sys.argv[1:])
+
+ # redirect stdio to the logs, if configured.
+ if not hs.config.no_redirect_stdio:
+ redirect_stdio_to_logs()
+
run(hs)
|