diff options
author | Ben Banfield-Zanin <benbz@matrix.org> | 2021-02-16 13:33:20 +0000 |
---|---|---|
committer | Ben Banfield-Zanin <benbz@matrix.org> | 2021-02-16 13:33:20 +0000 |
commit | dcf1b9c276e22bb6f5200fc029301c4d40e87a1f (patch) | |
tree | 1f5badce24645d99534133a7a989069906088fff /synapse/app | |
parent | Merge remote-tracking branch 'origin/release-v1.24.0' into bbz/info-mainline-... (diff) | |
parent | Fixup CHANGES (diff) | |
download | synapse-bbz/info-mainline-1.27.0.tar.xz |
Merge remote-tracking branch 'origin/release-v1.27.0' into bbz/info-mainline-1.27.0 github/bbz/info-mainline-1.27.0 bbz/info-mainline-1.27.0
Diffstat (limited to 'synapse/app')
-rw-r--r-- | synapse/app/_base.py | 149 | ||||
-rw-r--r-- | synapse/app/generic_worker.py | 78 | ||||
-rw-r--r-- | synapse/app/homeserver.py | 117 | ||||
-rw-r--r-- | synapse/app/phone_stats_home.py | 9 |
4 files changed, 197 insertions, 156 deletions
diff --git a/synapse/app/_base.py b/synapse/app/_base.py index 895b38ae76..9840a9d55b 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2017 New Vector Ltd +# Copyright 2019-2021 The Matrix.org Foundation C.I.C # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,11 +16,12 @@ import gc import logging import os +import platform import signal import socket import sys import traceback -from typing import Iterable +from typing import Awaitable, Callable, Iterable from typing_extensions import NoReturn @@ -143,6 +145,45 @@ def quit_with_error(error_string: str) -> NoReturn: sys.exit(1) +def register_start(cb: Callable[..., Awaitable], *args, **kwargs) -> None: + """Register a callback with the reactor, to be called once it is running + + This can be used to initialise parts of the system which require an asynchronous + setup. + + Any exception raised by the callback will be printed and logged, and the process + will exit. + """ + + async def wrapper(): + try: + await cb(*args, **kwargs) + except Exception: + # previously, we used Failure().printTraceback() here, in the hope that + # would give better tracebacks than traceback.print_exc(). However, that + # doesn't handle chained exceptions (with a __cause__ or __context__) well, + # and I *think* the need for Failure() is reduced now that we mostly use + # async/await. + + # Write the exception to both the logs *and* the unredirected stderr, + # because people tend to get confused if it only goes to one or the other. + # + # One problem with this is that if people are using a logging config that + # logs to the console (as is common eg under docker), they will get two + # copies of the exception. We could maybe try to detect that, but it's + # probably a cost we can bear. + logger.fatal("Error during startup", exc_info=True) + print("Error during startup:", file=sys.__stderr__) + traceback.print_exc(file=sys.__stderr__) + + # it's no use calling sys.exit here, since that just raises a SystemExit + # exception which is then caught by the reactor, and everything carries + # on as normal. + os._exit(1) + + reactor.callWhenRunning(lambda: defer.ensureDeferred(wrapper())) + + def listen_metrics(bind_addresses, port): """ Start Prometheus metrics server. @@ -227,7 +268,7 @@ def refresh_certificate(hs): logger.info("Context factories updated.") -def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerConfig]): +async def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerConfig]): """ Start a Synapse server or worker. @@ -241,71 +282,67 @@ def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerConfig]): hs: homeserver instance listeners: Listener configuration ('listeners' in homeserver.yaml) """ - try: - # Set up the SIGHUP machinery. - if hasattr(signal, "SIGHUP"): + # Set up the SIGHUP machinery. + if hasattr(signal, "SIGHUP"): + reactor = hs.get_reactor() - @wrap_as_background_process("sighup") - def handle_sighup(*args, **kwargs): - # Tell systemd our state, if we're using it. This will silently fail if - # we're not using systemd. - sdnotify(b"RELOADING=1") + @wrap_as_background_process("sighup") + def handle_sighup(*args, **kwargs): + # Tell systemd our state, if we're using it. This will silently fail if + # we're not using systemd. + sdnotify(b"RELOADING=1") - for i, args, kwargs in _sighup_callbacks: - i(*args, **kwargs) + for i, args, kwargs in _sighup_callbacks: + i(*args, **kwargs) - sdnotify(b"READY=1") + sdnotify(b"READY=1") - # We defer running the sighup handlers until next reactor tick. This - # is so that we're in a sane state, e.g. flushing the logs may fail - # if the sighup happens in the middle of writing a log entry. - def run_sighup(*args, **kwargs): - hs.get_clock().call_later(0, handle_sighup, *args, **kwargs) + # We defer running the sighup handlers until next reactor tick. This + # is so that we're in a sane state, e.g. flushing the logs may fail + # if the sighup happens in the middle of writing a log entry. + def run_sighup(*args, **kwargs): + # `callFromThread` should be "signal safe" as well as thread + # safe. + reactor.callFromThread(handle_sighup, *args, **kwargs) - signal.signal(signal.SIGHUP, run_sighup) + signal.signal(signal.SIGHUP, run_sighup) - register_sighup(refresh_certificate, hs) + register_sighup(refresh_certificate, hs) - # Load the certificate from disk. - refresh_certificate(hs) + # Load the certificate from disk. + refresh_certificate(hs) - # Start the tracer - synapse.logging.opentracing.init_tracer( # type: ignore[attr-defined] # noqa - hs - ) + # Start the tracer + synapse.logging.opentracing.init_tracer( # type: ignore[attr-defined] # noqa + hs + ) - # It is now safe to start your Synapse. - hs.start_listening(listeners) - hs.get_datastore().db_pool.start_profiling() - hs.get_pusherpool().start() + # It is now safe to start your Synapse. + hs.start_listening(listeners) + hs.get_datastore().db_pool.start_profiling() + hs.get_pusherpool().start() - # Log when we start the shut down process. - hs.get_reactor().addSystemEventTrigger( - "before", "shutdown", logger.info, "Shutting down..." - ) + # Log when we start the shut down process. + hs.get_reactor().addSystemEventTrigger( + "before", "shutdown", logger.info, "Shutting down..." + ) - setup_sentry(hs) - setup_sdnotify(hs) - - # If background tasks are running on the main process, start collecting the - # phone home stats. - if hs.config.run_background_tasks: - start_phone_stats_home(hs) - - # We now freeze all allocated objects in the hopes that (almost) - # everything currently allocated are things that will be used for the - # rest of time. Doing so means less work each GC (hopefully). - # - # This only works on Python 3.7 - if sys.version_info >= (3, 7): - gc.collect() - gc.freeze() - except Exception: - traceback.print_exc(file=sys.stderr) - reactor = hs.get_reactor() - if reactor.running: - reactor.stop() - sys.exit(1) + setup_sentry(hs) + setup_sdnotify(hs) + + # If background tasks are running on the main process, start collecting the + # phone home stats. + if hs.config.run_background_tasks: + start_phone_stats_home(hs) + + # We now freeze all allocated objects in the hopes that (almost) + # everything currently allocated are things that will be used for the + # rest of time. Doing so means less work each GC (hopefully). + # + # This only works on Python 3.7 + if platform.python_implementation() == "CPython" and sys.version_info >= (3, 7): + gc.collect() + gc.freeze() def setup_sentry(hs): diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 1b511890aa..516f2464b4 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -21,7 +21,8 @@ from typing import Dict, Iterable, Optional, Set from typing_extensions import ContextManager -from twisted.internet import address, reactor +from twisted.internet import address +from twisted.web.resource import IResource import synapse import synapse.events @@ -34,6 +35,7 @@ from synapse.api.urls import ( SERVER_KEY_V2_PREFIX, ) from synapse.app import _base +from synapse.app._base import register_start from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging @@ -89,45 +91,47 @@ from synapse.replication.tcp.streams import ( ToDeviceStream, ) from synapse.rest.admin import register_servlets_for_media_repo -from synapse.rest.client.v1 import events +from synapse.rest.client.v1 import events, login, room from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet -from synapse.rest.client.v1.login import LoginRestServlet from synapse.rest.client.v1.profile import ( ProfileAvatarURLRestServlet, ProfileDisplaynameRestServlet, ProfileRestServlet, ) from synapse.rest.client.v1.push_rule import PushRuleRestServlet -from synapse.rest.client.v1.room import ( - JoinedRoomMemberListRestServlet, - JoinRoomAliasServlet, - PublicRoomListRestServlet, - RoomEventContextServlet, - RoomInitialSyncRestServlet, - RoomMemberListRestServlet, - RoomMembershipRestServlet, - RoomMessageListRestServlet, - RoomSendEventRestServlet, - RoomStateEventRestServlet, - RoomStateRestServlet, - RoomTypingRestServlet, -) from synapse.rest.client.v1.voip import VoipRestServlet -from synapse.rest.client.v2_alpha import groups, sync, user_directory +from synapse.rest.client.v2_alpha import ( + account_data, + groups, + read_marker, + receipts, + room_keys, + sync, + tags, + user_directory, +) from synapse.rest.client.v2_alpha._base import client_patterns from synapse.rest.client.v2_alpha.account import ThreepidRestServlet from synapse.rest.client.v2_alpha.account_data import ( AccountDataServlet, RoomAccountDataServlet, ) -from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet +from synapse.rest.client.v2_alpha.devices import DevicesRestServlet +from synapse.rest.client.v2_alpha.keys import ( + KeyChangesServlet, + KeyQueryServlet, + OneTimeKeyServlet, +) from synapse.rest.client.v2_alpha.register import RegisterRestServlet +from synapse.rest.client.v2_alpha.sendtodevice import SendToDeviceRestServlet from synapse.rest.client.versions import VersionsRestServlet from synapse.rest.health import HealthResource from synapse.rest.key.v2 import KeyApiV2Resource +from synapse.rest.synapse.client import build_synapse_client_resource_tree from synapse.server import HomeServer, cache_in_self from synapse.storage.databases.main.censor_events import CensorEventsStore from synapse.storage.databases.main.client_ips import ClientIpWorkerStore +from synapse.storage.databases.main.e2e_room_keys import EndToEndRoomKeyStore from synapse.storage.databases.main.media_repository import MediaRepositoryStore from synapse.storage.databases.main.metrics import ServerMetricsStore from synapse.storage.databases.main.monthly_active_users import ( @@ -266,7 +270,6 @@ class GenericWorkerPresence(BasePresenceHandler): super().__init__(hs) self.hs = hs self.is_mine_id = hs.is_mine_id - self.http_client = hs.get_simple_http_client() self._presence_enabled = hs.config.use_presence @@ -460,6 +463,7 @@ class GenericWorkerSlavedStore( UserDirectoryStore, StatsStore, UIAuthWorkerStore, + EndToEndRoomKeyStore, SlavedDeviceInboxStore, SlavedDeviceStore, SlavedReceiptsStore, @@ -504,7 +508,7 @@ class GenericWorkerServer(HomeServer): site_tag = port # We always include a health resource. - resources = {"/health": HealthResource()} + resources = {"/health": HealthResource()} # type: Dict[str, IResource] for res in listener_config.http_options.resources: for name in res.names: @@ -513,36 +517,36 @@ class GenericWorkerServer(HomeServer): elif name == "client": resource = JsonResource(self, canonical_json=False) - PublicRoomListRestServlet(self).register(resource) - RoomMemberListRestServlet(self).register(resource) - JoinedRoomMemberListRestServlet(self).register(resource) - RoomStateRestServlet(self).register(resource) - RoomEventContextServlet(self).register(resource) - RoomMessageListRestServlet(self).register(resource) RegisterRestServlet(self).register(resource) - LoginRestServlet(self).register(resource) + login.register_servlets(self, resource) ThreepidRestServlet(self).register(resource) + DevicesRestServlet(self).register(resource) KeyQueryServlet(self).register(resource) + OneTimeKeyServlet(self).register(resource) KeyChangesServlet(self).register(resource) VoipRestServlet(self).register(resource) PushRuleRestServlet(self).register(resource) VersionsRestServlet(self).register(resource) - RoomSendEventRestServlet(self).register(resource) - RoomMembershipRestServlet(self).register(resource) - RoomStateEventRestServlet(self).register(resource) - JoinRoomAliasServlet(self).register(resource) + ProfileAvatarURLRestServlet(self).register(resource) ProfileDisplaynameRestServlet(self).register(resource) ProfileRestServlet(self).register(resource) KeyUploadServlet(self).register(resource) AccountDataServlet(self).register(resource) RoomAccountDataServlet(self).register(resource) - RoomTypingRestServlet(self).register(resource) sync.register_servlets(self, resource) events.register_servlets(self, resource) + room.register_servlets(self, resource, True) + room.register_deprecated_servlets(self, resource) InitialSyncRestServlet(self).register(resource) - RoomInitialSyncRestServlet(self).register(resource) + room_keys.register_servlets(self, resource) + tags.register_servlets(self, resource) + account_data.register_servlets(self, resource) + receipts.register_servlets(self, resource) + read_marker.register_servlets(self, resource) + + SendToDeviceRestServlet(self).register(resource) user_directory.register_servlets(self, resource) @@ -554,6 +558,8 @@ class GenericWorkerServer(HomeServer): groups.register_servlets(self, resource) resources.update({CLIENT_API_PREFIX: resource}) + + resources.update(build_synapse_client_resource_tree(self)) elif name == "federation": resources.update({FEDERATION_PREFIX: TransportLayerServer(self)}) elif name == "media": @@ -981,9 +987,7 @@ def start(config_options): # streams. Will no-op if no streams can be written to by this worker. hs.get_replication_streamer() - reactor.addSystemEventTrigger( - "before", "startup", _base.start, hs, config.worker_listeners - ) + register_start(_base.start, hs, config.worker_listeners) _base.start_worker_reactor("synapse-generic-worker", config) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 2b5465417f..244657cb88 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -15,15 +15,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -import gc import logging import os import sys -from typing import Iterable +from typing import Iterable, Iterator -from twisted.application import service -from twisted.internet import defer, reactor -from twisted.python.failure import Failure +from twisted.internet import reactor from twisted.web.resource import EncodingResourceWrapper, IResource from twisted.web.server import GzipEncoderFactory from twisted.web.static import File @@ -40,7 +37,7 @@ from synapse.api.urls import ( WEB_CLIENT_PREFIX, ) from synapse.app import _base -from synapse.app._base import listen_ssl, listen_tcp, quit_with_error +from synapse.app._base import listen_ssl, listen_tcp, quit_with_error, register_start from synapse.config._base import ConfigError from synapse.config.emailconfig import ThreepidBehaviour from synapse.config.homeserver import HomeServerConfig @@ -63,6 +60,7 @@ from synapse.rest import ClientRestResource from synapse.rest.admin import AdminRestResource from synapse.rest.health import HealthResource from synapse.rest.key.v2 import KeyApiV2Resource +from synapse.rest.synapse.client import build_synapse_client_resource_tree from synapse.rest.well_known import WellKnownResource from synapse.server import HomeServer from synapse.storage import DataStore @@ -71,7 +69,6 @@ from synapse.storage.prepare_database import UpgradeDatabaseException from synapse.util.httpresourcetree import create_resource_tree from synapse.util.manhole import manhole from synapse.util.module_loader import load_module -from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string logger = logging.getLogger("synapse.app.homeserver") @@ -90,7 +87,7 @@ class SynapseHomeServer(HomeServer): tls = listener_config.tls site_tag = listener_config.http_options.tag if site_tag is None: - site_tag = port + site_tag = str(port) # We always include a health resource. resources = {"/health": HealthResource()} @@ -107,7 +104,10 @@ class SynapseHomeServer(HomeServer): logger.debug("Configuring additional resources: %r", additional_resources) module_api = self.get_module_api() for path, resmodule in additional_resources.items(): - handler_cls, config = load_module(resmodule) + handler_cls, config = load_module( + resmodule, + ("listeners", site_tag, "additional_resources", "<%s>" % (path,)), + ) handler = handler_cls(config, module_api) if IResource.providedBy(handler): resource = handler @@ -189,19 +189,10 @@ class SynapseHomeServer(HomeServer): "/_matrix/client/versions": client_resource, "/.well-known/matrix/client": WellKnownResource(self), "/_synapse/admin": AdminRestResource(self), + **build_synapse_client_resource_tree(self), } ) - if self.get_config().oidc_enabled: - from synapse.rest.oidc import OIDCResource - - resources["/_synapse/oidc"] = OIDCResource(self) - - if self.get_config().saml2_enabled: - from synapse.rest.saml2 import SAML2Resource - - resources["/_matrix/saml2"] = SAML2Resource(self) - if self.get_config().threepid_behaviour_email == ThreepidBehaviour.LOCAL: from synapse.rest.synapse.client.password_reset import ( PasswordResetSubmitTokenResource, @@ -342,7 +333,10 @@ def setup(config_options): "Synapse Homeserver", config_options ) except ConfigError as e: - sys.stderr.write("\nERROR: %s\n" % (e,)) + sys.stderr.write("\n") + for f in format_config_error(e): + sys.stderr.write(f) + sys.stderr.write("\n") sys.exit(1) if not config: @@ -407,61 +401,62 @@ def setup(config_options): _base.refresh_certificate(hs) async def start(): - try: - # Run the ACME provisioning code, if it's enabled. - if hs.config.acme_enabled: - acme = hs.get_acme_handler() - # Start up the webservices which we will respond to ACME - # challenges with, and then provision. - await acme.start_listening() - await do_acme() - - # Check if it needs to be reprovisioned every day. - hs.get_clock().looping_call(reprovision_acme, 24 * 60 * 60 * 1000) + # Run the ACME provisioning code, if it's enabled. + if hs.config.acme_enabled: + acme = hs.get_acme_handler() + # Start up the webservices which we will respond to ACME + # challenges with, and then provision. + await acme.start_listening() + await do_acme() - # Load the OIDC provider metadatas, if OIDC is enabled. - if hs.config.oidc_enabled: - oidc = hs.get_oidc_handler() - # Loading the provider metadata also ensures the provider config is valid. - await oidc.load_metadata() - await oidc.load_jwks() + # Check if it needs to be reprovisioned every day. + hs.get_clock().looping_call(reprovision_acme, 24 * 60 * 60 * 1000) - _base.start(hs, config.listeners) + # Load the OIDC provider metadatas, if OIDC is enabled. + if hs.config.oidc_enabled: + oidc = hs.get_oidc_handler() + # Loading the provider metadata also ensures the provider config is valid. + await oidc.load_metadata() - hs.get_datastore().db_pool.updates.start_doing_background_updates() - except Exception: - # Print the exception and bail out. - print("Error during startup:", file=sys.stderr) + await _base.start(hs, config.listeners) - # this gives better tracebacks than traceback.print_exc() - Failure().printTraceback(file=sys.stderr) + hs.get_datastore().db_pool.updates.start_doing_background_updates() - if reactor.running: - reactor.stop() - sys.exit(1) - - reactor.callWhenRunning(lambda: defer.ensureDeferred(start())) + register_start(start) return hs -class SynapseService(service.Service): +def format_config_error(e: ConfigError) -> Iterator[str]: """ - A twisted Service class that will start synapse. Used to run synapse - via twistd and a .tac. + Formats a config error neatly + + The idea is to format the immediate error, plus the "causes" of those errors, + hopefully in a way that makes sense to the user. For example: + + Error in configuration at 'oidc_config.user_mapping_provider.config.display_name_template': + Failed to parse config for module 'JinjaOidcMappingProvider': + invalid jinja template: + unexpected end of template, expected 'end of print statement'. + + Args: + e: the error to be formatted + + Returns: An iterator which yields string fragments to be formatted """ + yield "Error in configuration" - def __init__(self, config): - self.config = config + if e.path: + yield " at '%s'" % (".".join(e.path),) - def startService(self): - hs = setup(self.config) - change_resource_limit(hs.config.soft_file_limit) - if hs.config.gc_thresholds: - gc.set_threshold(*hs.config.gc_thresholds) + yield ":\n %s" % (e.msg,) - def stopService(self): - return self._port.stopListening() + e = e.__cause__ + indent = 1 + while e: + indent += 1 + yield ":\n%s%s" % (" " * indent, str(e)) + e = e.__cause__ def run(hs): diff --git a/synapse/app/phone_stats_home.py b/synapse/app/phone_stats_home.py index c38cf8231f..8f86cecb76 100644 --- a/synapse/app/phone_stats_home.py +++ b/synapse/app/phone_stats_home.py @@ -93,15 +93,20 @@ async def phone_stats_home(hs, stats, stats_process=_stats_process): stats["daily_active_users"] = await hs.get_datastore().count_daily_users() stats["monthly_active_users"] = await hs.get_datastore().count_monthly_users() + daily_active_e2ee_rooms = await hs.get_datastore().count_daily_active_e2ee_rooms() + stats["daily_active_e2ee_rooms"] = daily_active_e2ee_rooms + stats["daily_e2ee_messages"] = await hs.get_datastore().count_daily_e2ee_messages() + daily_sent_e2ee_messages = await hs.get_datastore().count_daily_sent_e2ee_messages() + stats["daily_sent_e2ee_messages"] = daily_sent_e2ee_messages stats["daily_active_rooms"] = await hs.get_datastore().count_daily_active_rooms() stats["daily_messages"] = await hs.get_datastore().count_daily_messages() + daily_sent_messages = await hs.get_datastore().count_daily_sent_messages() + stats["daily_sent_messages"] = daily_sent_messages r30_results = await hs.get_datastore().count_r30_users() for name, count in r30_results.items(): stats["r30_users_" + name] = count - daily_sent_messages = await hs.get_datastore().count_daily_sent_messages() - stats["daily_sent_messages"] = daily_sent_messages stats["cache_factor"] = hs.config.caches.global_factor stats["event_cache_size"] = hs.config.caches.event_cache_size |