summary refs log tree commit diff
path: root/synapse/app
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/app')
-rw-r--r--synapse/app/_base.py150
-rw-r--r--synapse/app/generic_worker.py34
-rw-r--r--synapse/app/homeserver.py73
3 files changed, 138 insertions, 119 deletions
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index 37ecdbe3d8..395e202b89 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2017 New Vector Ltd
+# Copyright 2019-2021 The Matrix.org Foundation C.I.C
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -19,7 +20,7 @@ import signal
 import socket
 import sys
 import traceback
-from typing import Iterable
+from typing import Awaitable, Callable, Iterable
 
 from typing_extensions import NoReturn
 
@@ -143,6 +144,45 @@ def quit_with_error(error_string: str) -> NoReturn:
     sys.exit(1)
 
 
+def register_start(cb: Callable[..., Awaitable], *args, **kwargs) -> None:
+    """Register a callback with the reactor, to be called once it is running
+
+    This can be used to initialise parts of the system which require an asynchronous
+    setup.
+
+    Any exception raised by the callback will be printed and logged, and the process
+    will exit.
+    """
+
+    async def wrapper():
+        try:
+            await cb(*args, **kwargs)
+        except Exception:
+            # previously, we used Failure().printTraceback() here, in the hope that
+            # would give better tracebacks than traceback.print_exc(). However, that
+            # doesn't handle chained exceptions (with a __cause__ or __context__) well,
+            # and I *think* the need for Failure() is reduced now that we mostly use
+            # async/await.
+
+            # Write the exception to both the logs *and* the unredirected stderr,
+            # because people tend to get confused if it only goes to one or the other.
+            #
+            # One problem with this is that if people are using a logging config that
+            # logs to the console (as is common eg under docker), they will get two
+            # copies of the exception. We could maybe try to detect that, but it's
+            # probably a cost we can bear.
+            logger.fatal("Error during startup", exc_info=True)
+            print("Error during startup:", file=sys.__stderr__)
+            traceback.print_exc(file=sys.__stderr__)
+
+            # it's no use calling sys.exit here, since that just raises a SystemExit
+            # exception which is then caught by the reactor, and everything carries
+            # on as normal.
+            os._exit(1)
+
+    reactor.callWhenRunning(lambda: defer.ensureDeferred(wrapper()))
+
+
 def listen_metrics(bind_addresses, port):
     """
     Start Prometheus metrics server.
@@ -227,7 +267,7 @@ def refresh_certificate(hs):
         logger.info("Context factories updated.")
 
 
-def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerConfig]):
+async def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerConfig]):
     """
     Start a Synapse server or worker.
 
@@ -241,75 +281,67 @@ def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerConfig]):
         hs: homeserver instance
         listeners: Listener configuration ('listeners' in homeserver.yaml)
     """
-    try:
-        # Set up the SIGHUP machinery.
-        if hasattr(signal, "SIGHUP"):
+    # Set up the SIGHUP machinery.
+    if hasattr(signal, "SIGHUP"):
+        reactor = hs.get_reactor()
 
-            reactor = hs.get_reactor()
+        @wrap_as_background_process("sighup")
+        def handle_sighup(*args, **kwargs):
+            # Tell systemd our state, if we're using it. This will silently fail if
+            # we're not using systemd.
+            sdnotify(b"RELOADING=1")
 
-            @wrap_as_background_process("sighup")
-            def handle_sighup(*args, **kwargs):
-                # Tell systemd our state, if we're using it. This will silently fail if
-                # we're not using systemd.
-                sdnotify(b"RELOADING=1")
+            for i, args, kwargs in _sighup_callbacks:
+                i(*args, **kwargs)
 
-                for i, args, kwargs in _sighup_callbacks:
-                    i(*args, **kwargs)
+            sdnotify(b"READY=1")
 
-                sdnotify(b"READY=1")
+        # We defer running the sighup handlers until next reactor tick. This
+        # is so that we're in a sane state, e.g. flushing the logs may fail
+        # if the sighup happens in the middle of writing a log entry.
+        def run_sighup(*args, **kwargs):
+            # `callFromThread` should be "signal safe" as well as thread
+            # safe.
+            reactor.callFromThread(handle_sighup, *args, **kwargs)
 
-            # We defer running the sighup handlers until next reactor tick. This
-            # is so that we're in a sane state, e.g. flushing the logs may fail
-            # if the sighup happens in the middle of writing a log entry.
-            def run_sighup(*args, **kwargs):
-                # `callFromThread` should be "signal safe" as well as thread
-                # safe.
-                reactor.callFromThread(handle_sighup, *args, **kwargs)
+        signal.signal(signal.SIGHUP, run_sighup)
 
-            signal.signal(signal.SIGHUP, run_sighup)
+        register_sighup(refresh_certificate, hs)
 
-            register_sighup(refresh_certificate, hs)
+    # Load the certificate from disk.
+    refresh_certificate(hs)
 
-        # Load the certificate from disk.
-        refresh_certificate(hs)
+    # Start the tracer
+    synapse.logging.opentracing.init_tracer(  # type: ignore[attr-defined] # noqa
+        hs
+    )
 
-        # Start the tracer
-        synapse.logging.opentracing.init_tracer(  # type: ignore[attr-defined] # noqa
-            hs
-        )
+    # It is now safe to start your Synapse.
+    hs.start_listening(listeners)
+    hs.get_datastore().db_pool.start_profiling()
+    hs.get_pusherpool().start()
+
+    # Log when we start the shut down process.
+    hs.get_reactor().addSystemEventTrigger(
+        "before", "shutdown", logger.info, "Shutting down..."
+    )
 
-        # It is now safe to start your Synapse.
-        hs.start_listening(listeners)
-        hs.get_datastore().db_pool.start_profiling()
-        hs.get_pusherpool().start()
+    setup_sentry(hs)
+    setup_sdnotify(hs)
 
-        # Log when we start the shut down process.
-        hs.get_reactor().addSystemEventTrigger(
-            "before", "shutdown", logger.info, "Shutting down..."
-        )
+    # If background tasks are running on the main process, start collecting the
+    # phone home stats.
+    if hs.config.run_background_tasks:
+        start_phone_stats_home(hs)
 
-        setup_sentry(hs)
-        setup_sdnotify(hs)
-
-        # If background tasks are running on the main process, start collecting the
-        # phone home stats.
-        if hs.config.run_background_tasks:
-            start_phone_stats_home(hs)
-
-        # We now freeze all allocated objects in the hopes that (almost)
-        # everything currently allocated are things that will be used for the
-        # rest of time. Doing so means less work each GC (hopefully).
-        #
-        # This only works on Python 3.7
-        if sys.version_info >= (3, 7):
-            gc.collect()
-            gc.freeze()
-    except Exception:
-        traceback.print_exc(file=sys.stderr)
-        reactor = hs.get_reactor()
-        if reactor.running:
-            reactor.stop()
-        sys.exit(1)
+    # We now freeze all allocated objects in the hopes that (almost)
+    # everything currently allocated are things that will be used for the
+    # rest of time. Doing so means less work each GC (hopefully).
+    #
+    # This only works on Python 3.7
+    if sys.version_info >= (3, 7):
+        gc.collect()
+        gc.freeze()
 
 
 def setup_sentry(hs):
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 4428472707..e60988fa4a 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -21,7 +21,7 @@ from typing import Dict, Iterable, Optional, Set
 
 from typing_extensions import ContextManager
 
-from twisted.internet import address, reactor
+from twisted.internet import address
 
 import synapse
 import synapse.events
@@ -34,6 +34,7 @@ from synapse.api.urls import (
     SERVER_KEY_V2_PREFIX,
 )
 from synapse.app import _base
+from synapse.app._base import register_start
 from synapse.config._base import ConfigError
 from synapse.config.homeserver import HomeServerConfig
 from synapse.config.logger import setup_logging
@@ -99,14 +100,28 @@ from synapse.rest.client.v1.profile import (
 )
 from synapse.rest.client.v1.push_rule import PushRuleRestServlet
 from synapse.rest.client.v1.voip import VoipRestServlet
-from synapse.rest.client.v2_alpha import groups, sync, user_directory
+from synapse.rest.client.v2_alpha import (
+    account_data,
+    groups,
+    read_marker,
+    receipts,
+    room_keys,
+    sync,
+    tags,
+    user_directory,
+)
 from synapse.rest.client.v2_alpha._base import client_patterns
 from synapse.rest.client.v2_alpha.account import ThreepidRestServlet
 from synapse.rest.client.v2_alpha.account_data import (
     AccountDataServlet,
     RoomAccountDataServlet,
 )
-from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet
+from synapse.rest.client.v2_alpha.devices import DevicesRestServlet
+from synapse.rest.client.v2_alpha.keys import (
+    KeyChangesServlet,
+    KeyQueryServlet,
+    OneTimeKeyServlet,
+)
 from synapse.rest.client.v2_alpha.register import RegisterRestServlet
 from synapse.rest.client.v2_alpha.sendtodevice import SendToDeviceRestServlet
 from synapse.rest.client.versions import VersionsRestServlet
@@ -115,6 +130,7 @@ from synapse.rest.key.v2 import KeyApiV2Resource
 from synapse.server import HomeServer, cache_in_self
 from synapse.storage.databases.main.censor_events import CensorEventsStore
 from synapse.storage.databases.main.client_ips import ClientIpWorkerStore
+from synapse.storage.databases.main.e2e_room_keys import EndToEndRoomKeyStore
 from synapse.storage.databases.main.media_repository import MediaRepositoryStore
 from synapse.storage.databases.main.metrics import ServerMetricsStore
 from synapse.storage.databases.main.monthly_active_users import (
@@ -446,6 +462,7 @@ class GenericWorkerSlavedStore(
     UserDirectoryStore,
     StatsStore,
     UIAuthWorkerStore,
+    EndToEndRoomKeyStore,
     SlavedDeviceInboxStore,
     SlavedDeviceStore,
     SlavedReceiptsStore,
@@ -502,7 +519,9 @@ class GenericWorkerServer(HomeServer):
                     RegisterRestServlet(self).register(resource)
                     LoginRestServlet(self).register(resource)
                     ThreepidRestServlet(self).register(resource)
+                    DevicesRestServlet(self).register(resource)
                     KeyQueryServlet(self).register(resource)
+                    OneTimeKeyServlet(self).register(resource)
                     KeyChangesServlet(self).register(resource)
                     VoipRestServlet(self).register(resource)
                     PushRuleRestServlet(self).register(resource)
@@ -520,6 +539,11 @@ class GenericWorkerServer(HomeServer):
                     room.register_servlets(self, resource, True)
                     room.register_deprecated_servlets(self, resource)
                     InitialSyncRestServlet(self).register(resource)
+                    room_keys.register_servlets(self, resource)
+                    tags.register_servlets(self, resource)
+                    account_data.register_servlets(self, resource)
+                    receipts.register_servlets(self, resource)
+                    read_marker.register_servlets(self, resource)
 
                     SendToDeviceRestServlet(self).register(resource)
 
@@ -960,9 +984,7 @@ def start(config_options):
     # streams. Will no-op if no streams can be written to by this worker.
     hs.get_replication_streamer()
 
-    reactor.addSystemEventTrigger(
-        "before", "startup", _base.start, hs, config.worker_listeners
-    )
+    register_start(_base.start, hs, config.worker_listeners)
 
     _base.start_worker_reactor("synapse-generic-worker", config)
 
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index b1d9817a6a..57a2f5237c 100644
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -15,15 +15,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import gc
 import logging
 import os
 import sys
 from typing import Iterable, Iterator
 
-from twisted.application import service
-from twisted.internet import defer, reactor
-from twisted.python.failure import Failure
+from twisted.internet import reactor
 from twisted.web.resource import EncodingResourceWrapper, IResource
 from twisted.web.server import GzipEncoderFactory
 from twisted.web.static import File
@@ -40,7 +37,7 @@ from synapse.api.urls import (
     WEB_CLIENT_PREFIX,
 )
 from synapse.app import _base
-from synapse.app._base import listen_ssl, listen_tcp, quit_with_error
+from synapse.app._base import listen_ssl, listen_tcp, quit_with_error, register_start
 from synapse.config._base import ConfigError
 from synapse.config.emailconfig import ThreepidBehaviour
 from synapse.config.homeserver import HomeServerConfig
@@ -73,7 +70,6 @@ from synapse.storage.prepare_database import UpgradeDatabaseException
 from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.manhole import manhole
 from synapse.util.module_loader import load_module
-from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
 
 logger = logging.getLogger("synapse.app.homeserver")
@@ -417,40 +413,28 @@ def setup(config_options):
             _base.refresh_certificate(hs)
 
     async def start():
-        try:
-            # Run the ACME provisioning code, if it's enabled.
-            if hs.config.acme_enabled:
-                acme = hs.get_acme_handler()
-                # Start up the webservices which we will respond to ACME
-                # challenges with, and then provision.
-                await acme.start_listening()
-                await do_acme()
+        # Run the ACME provisioning code, if it's enabled.
+        if hs.config.acme_enabled:
+            acme = hs.get_acme_handler()
+            # Start up the webservices which we will respond to ACME
+            # challenges with, and then provision.
+            await acme.start_listening()
+            await do_acme()
 
-                # Check if it needs to be reprovisioned every day.
-                hs.get_clock().looping_call(reprovision_acme, 24 * 60 * 60 * 1000)
+            # Check if it needs to be reprovisioned every day.
+            hs.get_clock().looping_call(reprovision_acme, 24 * 60 * 60 * 1000)
 
-            # Load the OIDC provider metadatas, if OIDC is enabled.
-            if hs.config.oidc_enabled:
-                oidc = hs.get_oidc_handler()
-                # Loading the provider metadata also ensures the provider config is valid.
-                await oidc.load_metadata()
-                await oidc.load_jwks()
+        # Load the OIDC provider metadatas, if OIDC is enabled.
+        if hs.config.oidc_enabled:
+            oidc = hs.get_oidc_handler()
+            # Loading the provider metadata also ensures the provider config is valid.
+            await oidc.load_metadata()
 
-            _base.start(hs, config.listeners)
+        await _base.start(hs, config.listeners)
 
-            hs.get_datastore().db_pool.updates.start_doing_background_updates()
-        except Exception:
-            # Print the exception and bail out.
-            print("Error during startup:", file=sys.stderr)
+        hs.get_datastore().db_pool.updates.start_doing_background_updates()
 
-            # this gives better tracebacks than traceback.print_exc()
-            Failure().printTraceback(file=sys.stderr)
-
-            if reactor.running:
-                reactor.stop()
-            sys.exit(1)
-
-    reactor.callWhenRunning(lambda: defer.ensureDeferred(start()))
+    register_start(start)
 
     return hs
 
@@ -487,25 +471,6 @@ def format_config_error(e: ConfigError) -> Iterator[str]:
         e = e.__cause__
 
 
-class SynapseService(service.Service):
-    """
-    A twisted Service class that will start synapse. Used to run synapse
-    via twistd and a .tac.
-    """
-
-    def __init__(self, config):
-        self.config = config
-
-    def startService(self):
-        hs = setup(self.config)
-        change_resource_limit(hs.config.soft_file_limit)
-        if hs.config.gc_thresholds:
-            gc.set_threshold(*hs.config.gc_thresholds)
-
-    def stopService(self):
-        return self._port.stopListening()
-
-
 def run(hs):
     PROFILE_SYNAPSE = False
     if PROFILE_SYNAPSE: