diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index 895b38ae76..9840a9d55b 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2017 New Vector Ltd
+# Copyright 2019-2021 The Matrix.org Foundation C.I.C
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -15,11 +16,12 @@
import gc
import logging
import os
+import platform
import signal
import socket
import sys
import traceback
-from typing import Iterable
+from typing import Awaitable, Callable, Iterable
from typing_extensions import NoReturn
@@ -143,6 +145,45 @@ def quit_with_error(error_string: str) -> NoReturn:
sys.exit(1)
+def register_start(cb: Callable[..., Awaitable], *args, **kwargs) -> None:
+ """Register a callback with the reactor, to be called once it is running
+
+ This can be used to initialise parts of the system which require an asynchronous
+ setup.
+
+ Any exception raised by the callback will be printed and logged, and the process
+ will exit.
+ """
+
+ async def wrapper():
+ try:
+ await cb(*args, **kwargs)
+ except Exception:
+ # previously, we used Failure().printTraceback() here, in the hope that
+ # would give better tracebacks than traceback.print_exc(). However, that
+ # doesn't handle chained exceptions (with a __cause__ or __context__) well,
+ # and I *think* the need for Failure() is reduced now that we mostly use
+ # async/await.
+
+ # Write the exception to both the logs *and* the unredirected stderr,
+ # because people tend to get confused if it only goes to one or the other.
+ #
+ # One problem with this is that if people are using a logging config that
+ # logs to the console (as is common eg under docker), they will get two
+ # copies of the exception. We could maybe try to detect that, but it's
+ # probably a cost we can bear.
+ logger.fatal("Error during startup", exc_info=True)
+ print("Error during startup:", file=sys.__stderr__)
+ traceback.print_exc(file=sys.__stderr__)
+
+ # it's no use calling sys.exit here, since that just raises a SystemExit
+ # exception which is then caught by the reactor, and everything carries
+ # on as normal.
+ os._exit(1)
+
+ reactor.callWhenRunning(lambda: defer.ensureDeferred(wrapper()))
+
+
def listen_metrics(bind_addresses, port):
"""
Start Prometheus metrics server.
@@ -227,7 +268,7 @@ def refresh_certificate(hs):
logger.info("Context factories updated.")
-def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerConfig]):
+async def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerConfig]):
"""
Start a Synapse server or worker.
@@ -241,71 +282,67 @@ def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerConfig]):
hs: homeserver instance
listeners: Listener configuration ('listeners' in homeserver.yaml)
"""
- try:
- # Set up the SIGHUP machinery.
- if hasattr(signal, "SIGHUP"):
+ # Set up the SIGHUP machinery.
+ if hasattr(signal, "SIGHUP"):
+ reactor = hs.get_reactor()
- @wrap_as_background_process("sighup")
- def handle_sighup(*args, **kwargs):
- # Tell systemd our state, if we're using it. This will silently fail if
- # we're not using systemd.
- sdnotify(b"RELOADING=1")
+ @wrap_as_background_process("sighup")
+ def handle_sighup(*args, **kwargs):
+ # Tell systemd our state, if we're using it. This will silently fail if
+ # we're not using systemd.
+ sdnotify(b"RELOADING=1")
- for i, args, kwargs in _sighup_callbacks:
- i(*args, **kwargs)
+ for i, args, kwargs in _sighup_callbacks:
+ i(*args, **kwargs)
- sdnotify(b"READY=1")
+ sdnotify(b"READY=1")
- # We defer running the sighup handlers until next reactor tick. This
- # is so that we're in a sane state, e.g. flushing the logs may fail
- # if the sighup happens in the middle of writing a log entry.
- def run_sighup(*args, **kwargs):
- hs.get_clock().call_later(0, handle_sighup, *args, **kwargs)
+ # We defer running the sighup handlers until next reactor tick. This
+ # is so that we're in a sane state, e.g. flushing the logs may fail
+ # if the sighup happens in the middle of writing a log entry.
+ def run_sighup(*args, **kwargs):
+ # `callFromThread` should be "signal safe" as well as thread
+ # safe.
+ reactor.callFromThread(handle_sighup, *args, **kwargs)
- signal.signal(signal.SIGHUP, run_sighup)
+ signal.signal(signal.SIGHUP, run_sighup)
- register_sighup(refresh_certificate, hs)
+ register_sighup(refresh_certificate, hs)
- # Load the certificate from disk.
- refresh_certificate(hs)
+ # Load the certificate from disk.
+ refresh_certificate(hs)
- # Start the tracer
- synapse.logging.opentracing.init_tracer( # type: ignore[attr-defined] # noqa
- hs
- )
+ # Start the tracer
+ synapse.logging.opentracing.init_tracer( # type: ignore[attr-defined] # noqa
+ hs
+ )
- # It is now safe to start your Synapse.
- hs.start_listening(listeners)
- hs.get_datastore().db_pool.start_profiling()
- hs.get_pusherpool().start()
+ # It is now safe to start your Synapse.
+ hs.start_listening(listeners)
+ hs.get_datastore().db_pool.start_profiling()
+ hs.get_pusherpool().start()
- # Log when we start the shut down process.
- hs.get_reactor().addSystemEventTrigger(
- "before", "shutdown", logger.info, "Shutting down..."
- )
+ # Log when we start the shut down process.
+ hs.get_reactor().addSystemEventTrigger(
+ "before", "shutdown", logger.info, "Shutting down..."
+ )
- setup_sentry(hs)
- setup_sdnotify(hs)
-
- # If background tasks are running on the main process, start collecting the
- # phone home stats.
- if hs.config.run_background_tasks:
- start_phone_stats_home(hs)
-
- # We now freeze all allocated objects in the hopes that (almost)
- # everything currently allocated are things that will be used for the
- # rest of time. Doing so means less work each GC (hopefully).
- #
- # This only works on Python 3.7
- if sys.version_info >= (3, 7):
- gc.collect()
- gc.freeze()
- except Exception:
- traceback.print_exc(file=sys.stderr)
- reactor = hs.get_reactor()
- if reactor.running:
- reactor.stop()
- sys.exit(1)
+ setup_sentry(hs)
+ setup_sdnotify(hs)
+
+ # If background tasks are running on the main process, start collecting the
+ # phone home stats.
+ if hs.config.run_background_tasks:
+ start_phone_stats_home(hs)
+
+ # We now freeze all allocated objects in the hopes that (almost)
+ # everything currently allocated are things that will be used for the
+ # rest of time. Doing so means less work each GC (hopefully).
+ #
+ # This only works on Python 3.7
+ if platform.python_implementation() == "CPython" and sys.version_info >= (3, 7):
+ gc.collect()
+ gc.freeze()
def setup_sentry(hs):
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 1b511890aa..516f2464b4 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -21,7 +21,8 @@ from typing import Dict, Iterable, Optional, Set
from typing_extensions import ContextManager
-from twisted.internet import address, reactor
+from twisted.internet import address
+from twisted.web.resource import IResource
import synapse
import synapse.events
@@ -34,6 +35,7 @@ from synapse.api.urls import (
SERVER_KEY_V2_PREFIX,
)
from synapse.app import _base
+from synapse.app._base import register_start
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
@@ -89,45 +91,47 @@ from synapse.replication.tcp.streams import (
ToDeviceStream,
)
from synapse.rest.admin import register_servlets_for_media_repo
-from synapse.rest.client.v1 import events
+from synapse.rest.client.v1 import events, login, room
from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
-from synapse.rest.client.v1.login import LoginRestServlet
from synapse.rest.client.v1.profile import (
ProfileAvatarURLRestServlet,
ProfileDisplaynameRestServlet,
ProfileRestServlet,
)
from synapse.rest.client.v1.push_rule import PushRuleRestServlet
-from synapse.rest.client.v1.room import (
- JoinedRoomMemberListRestServlet,
- JoinRoomAliasServlet,
- PublicRoomListRestServlet,
- RoomEventContextServlet,
- RoomInitialSyncRestServlet,
- RoomMemberListRestServlet,
- RoomMembershipRestServlet,
- RoomMessageListRestServlet,
- RoomSendEventRestServlet,
- RoomStateEventRestServlet,
- RoomStateRestServlet,
- RoomTypingRestServlet,
-)
from synapse.rest.client.v1.voip import VoipRestServlet
-from synapse.rest.client.v2_alpha import groups, sync, user_directory
+from synapse.rest.client.v2_alpha import (
+ account_data,
+ groups,
+ read_marker,
+ receipts,
+ room_keys,
+ sync,
+ tags,
+ user_directory,
+)
from synapse.rest.client.v2_alpha._base import client_patterns
from synapse.rest.client.v2_alpha.account import ThreepidRestServlet
from synapse.rest.client.v2_alpha.account_data import (
AccountDataServlet,
RoomAccountDataServlet,
)
-from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet
+from synapse.rest.client.v2_alpha.devices import DevicesRestServlet
+from synapse.rest.client.v2_alpha.keys import (
+ KeyChangesServlet,
+ KeyQueryServlet,
+ OneTimeKeyServlet,
+)
from synapse.rest.client.v2_alpha.register import RegisterRestServlet
+from synapse.rest.client.v2_alpha.sendtodevice import SendToDeviceRestServlet
from synapse.rest.client.versions import VersionsRestServlet
from synapse.rest.health import HealthResource
from synapse.rest.key.v2 import KeyApiV2Resource
+from synapse.rest.synapse.client import build_synapse_client_resource_tree
from synapse.server import HomeServer, cache_in_self
from synapse.storage.databases.main.censor_events import CensorEventsStore
from synapse.storage.databases.main.client_ips import ClientIpWorkerStore
+from synapse.storage.databases.main.e2e_room_keys import EndToEndRoomKeyStore
from synapse.storage.databases.main.media_repository import MediaRepositoryStore
from synapse.storage.databases.main.metrics import ServerMetricsStore
from synapse.storage.databases.main.monthly_active_users import (
@@ -266,7 +270,6 @@ class GenericWorkerPresence(BasePresenceHandler):
super().__init__(hs)
self.hs = hs
self.is_mine_id = hs.is_mine_id
- self.http_client = hs.get_simple_http_client()
self._presence_enabled = hs.config.use_presence
@@ -460,6 +463,7 @@ class GenericWorkerSlavedStore(
UserDirectoryStore,
StatsStore,
UIAuthWorkerStore,
+ EndToEndRoomKeyStore,
SlavedDeviceInboxStore,
SlavedDeviceStore,
SlavedReceiptsStore,
@@ -504,7 +508,7 @@ class GenericWorkerServer(HomeServer):
site_tag = port
# We always include a health resource.
- resources = {"/health": HealthResource()}
+ resources = {"/health": HealthResource()} # type: Dict[str, IResource]
for res in listener_config.http_options.resources:
for name in res.names:
@@ -513,36 +517,36 @@ class GenericWorkerServer(HomeServer):
elif name == "client":
resource = JsonResource(self, canonical_json=False)
- PublicRoomListRestServlet(self).register(resource)
- RoomMemberListRestServlet(self).register(resource)
- JoinedRoomMemberListRestServlet(self).register(resource)
- RoomStateRestServlet(self).register(resource)
- RoomEventContextServlet(self).register(resource)
- RoomMessageListRestServlet(self).register(resource)
RegisterRestServlet(self).register(resource)
- LoginRestServlet(self).register(resource)
+ login.register_servlets(self, resource)
ThreepidRestServlet(self).register(resource)
+ DevicesRestServlet(self).register(resource)
KeyQueryServlet(self).register(resource)
+ OneTimeKeyServlet(self).register(resource)
KeyChangesServlet(self).register(resource)
VoipRestServlet(self).register(resource)
PushRuleRestServlet(self).register(resource)
VersionsRestServlet(self).register(resource)
- RoomSendEventRestServlet(self).register(resource)
- RoomMembershipRestServlet(self).register(resource)
- RoomStateEventRestServlet(self).register(resource)
- JoinRoomAliasServlet(self).register(resource)
+
ProfileAvatarURLRestServlet(self).register(resource)
ProfileDisplaynameRestServlet(self).register(resource)
ProfileRestServlet(self).register(resource)
KeyUploadServlet(self).register(resource)
AccountDataServlet(self).register(resource)
RoomAccountDataServlet(self).register(resource)
- RoomTypingRestServlet(self).register(resource)
sync.register_servlets(self, resource)
events.register_servlets(self, resource)
+ room.register_servlets(self, resource, True)
+ room.register_deprecated_servlets(self, resource)
InitialSyncRestServlet(self).register(resource)
- RoomInitialSyncRestServlet(self).register(resource)
+ room_keys.register_servlets(self, resource)
+ tags.register_servlets(self, resource)
+ account_data.register_servlets(self, resource)
+ receipts.register_servlets(self, resource)
+ read_marker.register_servlets(self, resource)
+
+ SendToDeviceRestServlet(self).register(resource)
user_directory.register_servlets(self, resource)
@@ -554,6 +558,8 @@ class GenericWorkerServer(HomeServer):
groups.register_servlets(self, resource)
resources.update({CLIENT_API_PREFIX: resource})
+
+ resources.update(build_synapse_client_resource_tree(self))
elif name == "federation":
resources.update({FEDERATION_PREFIX: TransportLayerServer(self)})
elif name == "media":
@@ -981,9 +987,7 @@ def start(config_options):
# streams. Will no-op if no streams can be written to by this worker.
hs.get_replication_streamer()
- reactor.addSystemEventTrigger(
- "before", "startup", _base.start, hs, config.worker_listeners
- )
+ register_start(_base.start, hs, config.worker_listeners)
_base.start_worker_reactor("synapse-generic-worker", config)
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 2b5465417f..244657cb88 100644
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -15,15 +15,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import gc
import logging
import os
import sys
-from typing import Iterable
+from typing import Iterable, Iterator
-from twisted.application import service
-from twisted.internet import defer, reactor
-from twisted.python.failure import Failure
+from twisted.internet import reactor
from twisted.web.resource import EncodingResourceWrapper, IResource
from twisted.web.server import GzipEncoderFactory
from twisted.web.static import File
@@ -40,7 +37,7 @@ from synapse.api.urls import (
WEB_CLIENT_PREFIX,
)
from synapse.app import _base
-from synapse.app._base import listen_ssl, listen_tcp, quit_with_error
+from synapse.app._base import listen_ssl, listen_tcp, quit_with_error, register_start
from synapse.config._base import ConfigError
from synapse.config.emailconfig import ThreepidBehaviour
from synapse.config.homeserver import HomeServerConfig
@@ -63,6 +60,7 @@ from synapse.rest import ClientRestResource
from synapse.rest.admin import AdminRestResource
from synapse.rest.health import HealthResource
from synapse.rest.key.v2 import KeyApiV2Resource
+from synapse.rest.synapse.client import build_synapse_client_resource_tree
from synapse.rest.well_known import WellKnownResource
from synapse.server import HomeServer
from synapse.storage import DataStore
@@ -71,7 +69,6 @@ from synapse.storage.prepare_database import UpgradeDatabaseException
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.manhole import manhole
from synapse.util.module_loader import load_module
-from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string
logger = logging.getLogger("synapse.app.homeserver")
@@ -90,7 +87,7 @@ class SynapseHomeServer(HomeServer):
tls = listener_config.tls
site_tag = listener_config.http_options.tag
if site_tag is None:
- site_tag = port
+ site_tag = str(port)
# We always include a health resource.
resources = {"/health": HealthResource()}
@@ -107,7 +104,10 @@ class SynapseHomeServer(HomeServer):
logger.debug("Configuring additional resources: %r", additional_resources)
module_api = self.get_module_api()
for path, resmodule in additional_resources.items():
- handler_cls, config = load_module(resmodule)
+ handler_cls, config = load_module(
+ resmodule,
+ ("listeners", site_tag, "additional_resources", "<%s>" % (path,)),
+ )
handler = handler_cls(config, module_api)
if IResource.providedBy(handler):
resource = handler
@@ -189,19 +189,10 @@ class SynapseHomeServer(HomeServer):
"/_matrix/client/versions": client_resource,
"/.well-known/matrix/client": WellKnownResource(self),
"/_synapse/admin": AdminRestResource(self),
+ **build_synapse_client_resource_tree(self),
}
)
- if self.get_config().oidc_enabled:
- from synapse.rest.oidc import OIDCResource
-
- resources["/_synapse/oidc"] = OIDCResource(self)
-
- if self.get_config().saml2_enabled:
- from synapse.rest.saml2 import SAML2Resource
-
- resources["/_matrix/saml2"] = SAML2Resource(self)
-
if self.get_config().threepid_behaviour_email == ThreepidBehaviour.LOCAL:
from synapse.rest.synapse.client.password_reset import (
PasswordResetSubmitTokenResource,
@@ -342,7 +333,10 @@ def setup(config_options):
"Synapse Homeserver", config_options
)
except ConfigError as e:
- sys.stderr.write("\nERROR: %s\n" % (e,))
+ sys.stderr.write("\n")
+ for f in format_config_error(e):
+ sys.stderr.write(f)
+ sys.stderr.write("\n")
sys.exit(1)
if not config:
@@ -407,61 +401,62 @@ def setup(config_options):
_base.refresh_certificate(hs)
async def start():
- try:
- # Run the ACME provisioning code, if it's enabled.
- if hs.config.acme_enabled:
- acme = hs.get_acme_handler()
- # Start up the webservices which we will respond to ACME
- # challenges with, and then provision.
- await acme.start_listening()
- await do_acme()
-
- # Check if it needs to be reprovisioned every day.
- hs.get_clock().looping_call(reprovision_acme, 24 * 60 * 60 * 1000)
+ # Run the ACME provisioning code, if it's enabled.
+ if hs.config.acme_enabled:
+ acme = hs.get_acme_handler()
+ # Start up the webservices which we will respond to ACME
+ # challenges with, and then provision.
+ await acme.start_listening()
+ await do_acme()
- # Load the OIDC provider metadatas, if OIDC is enabled.
- if hs.config.oidc_enabled:
- oidc = hs.get_oidc_handler()
- # Loading the provider metadata also ensures the provider config is valid.
- await oidc.load_metadata()
- await oidc.load_jwks()
+ # Check if it needs to be reprovisioned every day.
+ hs.get_clock().looping_call(reprovision_acme, 24 * 60 * 60 * 1000)
- _base.start(hs, config.listeners)
+ # Load the OIDC provider metadatas, if OIDC is enabled.
+ if hs.config.oidc_enabled:
+ oidc = hs.get_oidc_handler()
+ # Loading the provider metadata also ensures the provider config is valid.
+ await oidc.load_metadata()
- hs.get_datastore().db_pool.updates.start_doing_background_updates()
- except Exception:
- # Print the exception and bail out.
- print("Error during startup:", file=sys.stderr)
+ await _base.start(hs, config.listeners)
- # this gives better tracebacks than traceback.print_exc()
- Failure().printTraceback(file=sys.stderr)
+ hs.get_datastore().db_pool.updates.start_doing_background_updates()
- if reactor.running:
- reactor.stop()
- sys.exit(1)
-
- reactor.callWhenRunning(lambda: defer.ensureDeferred(start()))
+ register_start(start)
return hs
-class SynapseService(service.Service):
+def format_config_error(e: ConfigError) -> Iterator[str]:
"""
- A twisted Service class that will start synapse. Used to run synapse
- via twistd and a .tac.
+ Formats a config error neatly
+
+ The idea is to format the immediate error, plus the "causes" of those errors,
+ hopefully in a way that makes sense to the user. For example:
+
+ Error in configuration at 'oidc_config.user_mapping_provider.config.display_name_template':
+ Failed to parse config for module 'JinjaOidcMappingProvider':
+ invalid jinja template:
+ unexpected end of template, expected 'end of print statement'.
+
+ Args:
+ e: the error to be formatted
+
+ Returns: An iterator which yields string fragments to be formatted
"""
+ yield "Error in configuration"
- def __init__(self, config):
- self.config = config
+ if e.path:
+ yield " at '%s'" % (".".join(e.path),)
- def startService(self):
- hs = setup(self.config)
- change_resource_limit(hs.config.soft_file_limit)
- if hs.config.gc_thresholds:
- gc.set_threshold(*hs.config.gc_thresholds)
+ yield ":\n %s" % (e.msg,)
- def stopService(self):
- return self._port.stopListening()
+ e = e.__cause__
+ indent = 1
+ while e:
+ indent += 1
+ yield ":\n%s%s" % (" " * indent, str(e))
+ e = e.__cause__
def run(hs):
diff --git a/synapse/app/phone_stats_home.py b/synapse/app/phone_stats_home.py
index c38cf8231f..8f86cecb76 100644
--- a/synapse/app/phone_stats_home.py
+++ b/synapse/app/phone_stats_home.py
@@ -93,15 +93,20 @@ async def phone_stats_home(hs, stats, stats_process=_stats_process):
stats["daily_active_users"] = await hs.get_datastore().count_daily_users()
stats["monthly_active_users"] = await hs.get_datastore().count_monthly_users()
+ daily_active_e2ee_rooms = await hs.get_datastore().count_daily_active_e2ee_rooms()
+ stats["daily_active_e2ee_rooms"] = daily_active_e2ee_rooms
+ stats["daily_e2ee_messages"] = await hs.get_datastore().count_daily_e2ee_messages()
+ daily_sent_e2ee_messages = await hs.get_datastore().count_daily_sent_e2ee_messages()
+ stats["daily_sent_e2ee_messages"] = daily_sent_e2ee_messages
stats["daily_active_rooms"] = await hs.get_datastore().count_daily_active_rooms()
stats["daily_messages"] = await hs.get_datastore().count_daily_messages()
+ daily_sent_messages = await hs.get_datastore().count_daily_sent_messages()
+ stats["daily_sent_messages"] = daily_sent_messages
r30_results = await hs.get_datastore().count_r30_users()
for name, count in r30_results.items():
stats["r30_users_" + name] = count
- daily_sent_messages = await hs.get_datastore().count_daily_sent_messages()
- stats["daily_sent_messages"] = daily_sent_messages
stats["cache_factor"] = hs.config.caches.global_factor
stats["event_cache_size"] = hs.config.caches.event_cache_size
|