diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index 37ecdbe3d8..395e202b89 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2017 New Vector Ltd
+# Copyright 2019-2021 The Matrix.org Foundation C.I.C
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -19,7 +20,7 @@ import signal
import socket
import sys
import traceback
-from typing import Iterable
+from typing import Awaitable, Callable, Iterable
from typing_extensions import NoReturn
@@ -143,6 +144,45 @@ def quit_with_error(error_string: str) -> NoReturn:
sys.exit(1)
+def register_start(cb: Callable[..., Awaitable], *args, **kwargs) -> None:
+ """Register a callback with the reactor, to be called once it is running
+
+ This can be used to initialise parts of the system which require an asynchronous
+ setup.
+
+ Any exception raised by the callback will be printed and logged, and the process
+ will exit.
+ """
+
+ async def wrapper():
+ try:
+ await cb(*args, **kwargs)
+ except Exception:
+ # previously, we used Failure().printTraceback() here, in the hope that
+ # would give better tracebacks than traceback.print_exc(). However, that
+ # doesn't handle chained exceptions (with a __cause__ or __context__) well,
+ # and I *think* the need for Failure() is reduced now that we mostly use
+ # async/await.
+
+ # Write the exception to both the logs *and* the unredirected stderr,
+ # because people tend to get confused if it only goes to one or the other.
+ #
+ # One problem with this is that if people are using a logging config that
+ # logs to the console (as is common eg under docker), they will get two
+ # copies of the exception. We could maybe try to detect that, but it's
+ # probably a cost we can bear.
+ logger.fatal("Error during startup", exc_info=True)
+ print("Error during startup:", file=sys.__stderr__)
+ traceback.print_exc(file=sys.__stderr__)
+
+ # it's no use calling sys.exit here, since that just raises a SystemExit
+ # exception which is then caught by the reactor, and everything carries
+ # on as normal.
+ os._exit(1)
+
+ reactor.callWhenRunning(lambda: defer.ensureDeferred(wrapper()))
+
+
def listen_metrics(bind_addresses, port):
"""
Start Prometheus metrics server.
@@ -227,7 +267,7 @@ def refresh_certificate(hs):
logger.info("Context factories updated.")
-def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerConfig]):
+async def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerConfig]):
"""
Start a Synapse server or worker.
@@ -241,75 +281,67 @@ def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerConfig]):
hs: homeserver instance
listeners: Listener configuration ('listeners' in homeserver.yaml)
"""
- try:
- # Set up the SIGHUP machinery.
- if hasattr(signal, "SIGHUP"):
+ # Set up the SIGHUP machinery.
+ if hasattr(signal, "SIGHUP"):
+ reactor = hs.get_reactor()
- reactor = hs.get_reactor()
+ @wrap_as_background_process("sighup")
+ def handle_sighup(*args, **kwargs):
+ # Tell systemd our state, if we're using it. This will silently fail if
+ # we're not using systemd.
+ sdnotify(b"RELOADING=1")
- @wrap_as_background_process("sighup")
- def handle_sighup(*args, **kwargs):
- # Tell systemd our state, if we're using it. This will silently fail if
- # we're not using systemd.
- sdnotify(b"RELOADING=1")
+ for i, args, kwargs in _sighup_callbacks:
+ i(*args, **kwargs)
- for i, args, kwargs in _sighup_callbacks:
- i(*args, **kwargs)
+ sdnotify(b"READY=1")
- sdnotify(b"READY=1")
+ # We defer running the sighup handlers until next reactor tick. This
+ # is so that we're in a sane state, e.g. flushing the logs may fail
+ # if the sighup happens in the middle of writing a log entry.
+ def run_sighup(*args, **kwargs):
+ # `callFromThread` should be "signal safe" as well as thread
+ # safe.
+ reactor.callFromThread(handle_sighup, *args, **kwargs)
- # We defer running the sighup handlers until next reactor tick. This
- # is so that we're in a sane state, e.g. flushing the logs may fail
- # if the sighup happens in the middle of writing a log entry.
- def run_sighup(*args, **kwargs):
- # `callFromThread` should be "signal safe" as well as thread
- # safe.
- reactor.callFromThread(handle_sighup, *args, **kwargs)
+ signal.signal(signal.SIGHUP, run_sighup)
- signal.signal(signal.SIGHUP, run_sighup)
+ register_sighup(refresh_certificate, hs)
- register_sighup(refresh_certificate, hs)
+ # Load the certificate from disk.
+ refresh_certificate(hs)
- # Load the certificate from disk.
- refresh_certificate(hs)
+ # Start the tracer
+ synapse.logging.opentracing.init_tracer( # type: ignore[attr-defined] # noqa
+ hs
+ )
- # Start the tracer
- synapse.logging.opentracing.init_tracer( # type: ignore[attr-defined] # noqa
- hs
- )
+ # It is now safe to start your Synapse.
+ hs.start_listening(listeners)
+ hs.get_datastore().db_pool.start_profiling()
+ hs.get_pusherpool().start()
+
+ # Log when we start the shut down process.
+ hs.get_reactor().addSystemEventTrigger(
+ "before", "shutdown", logger.info, "Shutting down..."
+ )
- # It is now safe to start your Synapse.
- hs.start_listening(listeners)
- hs.get_datastore().db_pool.start_profiling()
- hs.get_pusherpool().start()
+ setup_sentry(hs)
+ setup_sdnotify(hs)
- # Log when we start the shut down process.
- hs.get_reactor().addSystemEventTrigger(
- "before", "shutdown", logger.info, "Shutting down..."
- )
+ # If background tasks are running on the main process, start collecting the
+ # phone home stats.
+ if hs.config.run_background_tasks:
+ start_phone_stats_home(hs)
- setup_sentry(hs)
- setup_sdnotify(hs)
-
- # If background tasks are running on the main process, start collecting the
- # phone home stats.
- if hs.config.run_background_tasks:
- start_phone_stats_home(hs)
-
- # We now freeze all allocated objects in the hopes that (almost)
- # everything currently allocated are things that will be used for the
- # rest of time. Doing so means less work each GC (hopefully).
- #
- # This only works on Python 3.7
- if sys.version_info >= (3, 7):
- gc.collect()
- gc.freeze()
- except Exception:
- traceback.print_exc(file=sys.stderr)
- reactor = hs.get_reactor()
- if reactor.running:
- reactor.stop()
- sys.exit(1)
+ # We now freeze all allocated objects in the hopes that (almost)
+ # everything currently allocated are things that will be used for the
+ # rest of time. Doing so means less work each GC (hopefully).
+ #
+ # This only works on Python 3.7
+ if sys.version_info >= (3, 7):
+ gc.collect()
+ gc.freeze()
def setup_sentry(hs):
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 4428472707..e60988fa4a 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -21,7 +21,7 @@ from typing import Dict, Iterable, Optional, Set
from typing_extensions import ContextManager
-from twisted.internet import address, reactor
+from twisted.internet import address
import synapse
import synapse.events
@@ -34,6 +34,7 @@ from synapse.api.urls import (
SERVER_KEY_V2_PREFIX,
)
from synapse.app import _base
+from synapse.app._base import register_start
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
@@ -99,14 +100,28 @@ from synapse.rest.client.v1.profile import (
)
from synapse.rest.client.v1.push_rule import PushRuleRestServlet
from synapse.rest.client.v1.voip import VoipRestServlet
-from synapse.rest.client.v2_alpha import groups, sync, user_directory
+from synapse.rest.client.v2_alpha import (
+ account_data,
+ groups,
+ read_marker,
+ receipts,
+ room_keys,
+ sync,
+ tags,
+ user_directory,
+)
from synapse.rest.client.v2_alpha._base import client_patterns
from synapse.rest.client.v2_alpha.account import ThreepidRestServlet
from synapse.rest.client.v2_alpha.account_data import (
AccountDataServlet,
RoomAccountDataServlet,
)
-from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet
+from synapse.rest.client.v2_alpha.devices import DevicesRestServlet
+from synapse.rest.client.v2_alpha.keys import (
+ KeyChangesServlet,
+ KeyQueryServlet,
+ OneTimeKeyServlet,
+)
from synapse.rest.client.v2_alpha.register import RegisterRestServlet
from synapse.rest.client.v2_alpha.sendtodevice import SendToDeviceRestServlet
from synapse.rest.client.versions import VersionsRestServlet
@@ -115,6 +130,7 @@ from synapse.rest.key.v2 import KeyApiV2Resource
from synapse.server import HomeServer, cache_in_self
from synapse.storage.databases.main.censor_events import CensorEventsStore
from synapse.storage.databases.main.client_ips import ClientIpWorkerStore
+from synapse.storage.databases.main.e2e_room_keys import EndToEndRoomKeyStore
from synapse.storage.databases.main.media_repository import MediaRepositoryStore
from synapse.storage.databases.main.metrics import ServerMetricsStore
from synapse.storage.databases.main.monthly_active_users import (
@@ -446,6 +462,7 @@ class GenericWorkerSlavedStore(
UserDirectoryStore,
StatsStore,
UIAuthWorkerStore,
+ EndToEndRoomKeyStore,
SlavedDeviceInboxStore,
SlavedDeviceStore,
SlavedReceiptsStore,
@@ -502,7 +519,9 @@ class GenericWorkerServer(HomeServer):
RegisterRestServlet(self).register(resource)
LoginRestServlet(self).register(resource)
ThreepidRestServlet(self).register(resource)
+ DevicesRestServlet(self).register(resource)
KeyQueryServlet(self).register(resource)
+ OneTimeKeyServlet(self).register(resource)
KeyChangesServlet(self).register(resource)
VoipRestServlet(self).register(resource)
PushRuleRestServlet(self).register(resource)
@@ -520,6 +539,11 @@ class GenericWorkerServer(HomeServer):
room.register_servlets(self, resource, True)
room.register_deprecated_servlets(self, resource)
InitialSyncRestServlet(self).register(resource)
+ room_keys.register_servlets(self, resource)
+ tags.register_servlets(self, resource)
+ account_data.register_servlets(self, resource)
+ receipts.register_servlets(self, resource)
+ read_marker.register_servlets(self, resource)
SendToDeviceRestServlet(self).register(resource)
@@ -960,9 +984,7 @@ def start(config_options):
# streams. Will no-op if no streams can be written to by this worker.
hs.get_replication_streamer()
- reactor.addSystemEventTrigger(
- "before", "startup", _base.start, hs, config.worker_listeners
- )
+ register_start(_base.start, hs, config.worker_listeners)
_base.start_worker_reactor("synapse-generic-worker", config)
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index b1d9817a6a..57a2f5237c 100644
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -15,15 +15,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import gc
import logging
import os
import sys
from typing import Iterable, Iterator
-from twisted.application import service
-from twisted.internet import defer, reactor
-from twisted.python.failure import Failure
+from twisted.internet import reactor
from twisted.web.resource import EncodingResourceWrapper, IResource
from twisted.web.server import GzipEncoderFactory
from twisted.web.static import File
@@ -40,7 +37,7 @@ from synapse.api.urls import (
WEB_CLIENT_PREFIX,
)
from synapse.app import _base
-from synapse.app._base import listen_ssl, listen_tcp, quit_with_error
+from synapse.app._base import listen_ssl, listen_tcp, quit_with_error, register_start
from synapse.config._base import ConfigError
from synapse.config.emailconfig import ThreepidBehaviour
from synapse.config.homeserver import HomeServerConfig
@@ -73,7 +70,6 @@ from synapse.storage.prepare_database import UpgradeDatabaseException
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.manhole import manhole
from synapse.util.module_loader import load_module
-from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string
logger = logging.getLogger("synapse.app.homeserver")
@@ -417,40 +413,28 @@ def setup(config_options):
_base.refresh_certificate(hs)
async def start():
- try:
- # Run the ACME provisioning code, if it's enabled.
- if hs.config.acme_enabled:
- acme = hs.get_acme_handler()
- # Start up the webservices which we will respond to ACME
- # challenges with, and then provision.
- await acme.start_listening()
- await do_acme()
+ # Run the ACME provisioning code, if it's enabled.
+ if hs.config.acme_enabled:
+ acme = hs.get_acme_handler()
+ # Start up the webservices which we will respond to ACME
+ # challenges with, and then provision.
+ await acme.start_listening()
+ await do_acme()
- # Check if it needs to be reprovisioned every day.
- hs.get_clock().looping_call(reprovision_acme, 24 * 60 * 60 * 1000)
+ # Check if it needs to be reprovisioned every day.
+ hs.get_clock().looping_call(reprovision_acme, 24 * 60 * 60 * 1000)
- # Load the OIDC provider metadatas, if OIDC is enabled.
- if hs.config.oidc_enabled:
- oidc = hs.get_oidc_handler()
- # Loading the provider metadata also ensures the provider config is valid.
- await oidc.load_metadata()
- await oidc.load_jwks()
+ # Load the OIDC provider metadatas, if OIDC is enabled.
+ if hs.config.oidc_enabled:
+ oidc = hs.get_oidc_handler()
+ # Loading the provider metadata also ensures the provider config is valid.
+ await oidc.load_metadata()
- _base.start(hs, config.listeners)
+ await _base.start(hs, config.listeners)
- hs.get_datastore().db_pool.updates.start_doing_background_updates()
- except Exception:
- # Print the exception and bail out.
- print("Error during startup:", file=sys.stderr)
+ hs.get_datastore().db_pool.updates.start_doing_background_updates()
- # this gives better tracebacks than traceback.print_exc()
- Failure().printTraceback(file=sys.stderr)
-
- if reactor.running:
- reactor.stop()
- sys.exit(1)
-
- reactor.callWhenRunning(lambda: defer.ensureDeferred(start()))
+ register_start(start)
return hs
@@ -487,25 +471,6 @@ def format_config_error(e: ConfigError) -> Iterator[str]:
e = e.__cause__
-class SynapseService(service.Service):
- """
- A twisted Service class that will start synapse. Used to run synapse
- via twistd and a .tac.
- """
-
- def __init__(self, config):
- self.config = config
-
- def startService(self):
- hs = setup(self.config)
- change_resource_limit(hs.config.soft_file_limit)
- if hs.config.gc_thresholds:
- gc.set_threshold(*hs.config.gc_thresholds)
-
- def stopService(self):
- return self._port.stopListening()
-
-
def run(hs):
PROFILE_SYNAPSE = False
if PROFILE_SYNAPSE:
|