diff options
Diffstat (limited to 'synapse/app')
-rw-r--r-- | synapse/app/__init__.py | 11 | ||||
-rw-r--r-- | synapse/app/_base.py | 140 | ||||
-rw-r--r-- | synapse/app/admin_cmd.py | 42 | ||||
-rw-r--r-- | synapse/app/generic_worker.py | 34 | ||||
-rw-r--r-- | synapse/app/homeserver.py | 93 | ||||
-rw-r--r-- | synapse/app/phone_stats_home.py | 23 |
6 files changed, 192 insertions, 151 deletions
diff --git a/synapse/app/__init__.py b/synapse/app/__init__.py index f9940491e8..ee51480a9e 100644 --- a/synapse/app/__init__.py +++ b/synapse/app/__init__.py @@ -13,6 +13,7 @@ # limitations under the License. import logging import sys +from typing import Container from synapse import python_dependencies # noqa: E402 @@ -27,7 +28,9 @@ except python_dependencies.DependencyException as e: sys.exit(1) -def check_bind_error(e, address, bind_addresses): +def check_bind_error( + e: Exception, address: str, bind_addresses: Container[str] +) -> None: """ This method checks an exception occurred while binding on 0.0.0.0. If :: is specified in the bind addresses a warning is shown. @@ -38,9 +41,9 @@ def check_bind_error(e, address, bind_addresses): When binding on 0.0.0.0 after :: this can safely be ignored. Args: - e (Exception): Exception that was caught. - address (str): Address on which binding was attempted. - bind_addresses (list): Addresses on which the service listens. + e: Exception that was caught. + address: Address on which binding was attempted. + bind_addresses: Addresses on which the service listens. """ if address == "0.0.0.0" and "::" in bind_addresses: logger.warning( diff --git a/synapse/app/_base.py b/synapse/app/_base.py index f2c1028b5d..573bb487b2 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -22,13 +22,27 @@ import socket import sys import traceback import warnings -from typing import TYPE_CHECKING, Awaitable, Callable, Iterable +from typing import ( + TYPE_CHECKING, + Any, + Awaitable, + Callable, + Collection, + Dict, + Iterable, + List, + NoReturn, + Tuple, + cast, +) from cryptography.utils import CryptographyDeprecationWarning -from typing_extensions import NoReturn import twisted -from twisted.internet import defer, error, reactor +from twisted.internet import defer, error, reactor as _reactor +from twisted.internet.interfaces import IOpenSSLContextFactory, IReactorSSL, IReactorTCP +from twisted.internet.protocol import ServerFactory +from twisted.internet.tcp import Port from twisted.logger import LoggingFile, LogLevel from twisted.protocols.tls import TLSMemoryBIOFactory from twisted.python.threadpool import ThreadPool @@ -48,6 +62,7 @@ from synapse.logging.context import PreserveLoggingContext from synapse.metrics import register_threadpool from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.metrics.jemalloc import setup_jemalloc_stats +from synapse.types import ISynapseReactor from synapse.util.caches.lrucache import setup_expire_lru_cache_entries from synapse.util.daemonize import daemonize_process from synapse.util.gai_resolver import GAIResolver @@ -57,33 +72,44 @@ from synapse.util.versionstring import get_version_string if TYPE_CHECKING: from synapse.server import HomeServer +# Twisted injects the global reactor to make it easier to import, this confuses +# mypy which thinks it is a module. Tell it that it a more proper type. +reactor = cast(ISynapseReactor, _reactor) + + logger = logging.getLogger(__name__) # list of tuples of function, args list, kwargs dict -_sighup_callbacks = [] +_sighup_callbacks: List[ + Tuple[Callable[..., None], Tuple[Any, ...], Dict[str, Any]] +] = [] -def register_sighup(func, *args, **kwargs): +def register_sighup(func: Callable[..., None], *args: Any, **kwargs: Any) -> None: """ Register a function to be called when a SIGHUP occurs. Args: - func (function): Function to be called when sent a SIGHUP signal. + func: Function to be called when sent a SIGHUP signal. *args, **kwargs: args and kwargs to be passed to the target function. """ _sighup_callbacks.append((func, args, kwargs)) -def start_worker_reactor(appname, config, run_command=reactor.run): +def start_worker_reactor( + appname: str, + config: HomeServerConfig, + run_command: Callable[[], None] = reactor.run, +) -> None: """Run the reactor in the main process Daemonizes if necessary, and then configures some resources, before starting the reactor. Pulls configuration from the 'worker' settings in 'config'. Args: - appname (str): application name which will be sent to syslog - config (synapse.config.Config): config object - run_command (Callable[]): callable that actually runs the reactor + appname: application name which will be sent to syslog + config: config object + run_command: callable that actually runs the reactor """ logger = logging.getLogger(config.worker.worker_app) @@ -101,32 +127,32 @@ def start_worker_reactor(appname, config, run_command=reactor.run): def start_reactor( - appname, - soft_file_limit, - gc_thresholds, - pid_file, - daemonize, - print_pidfile, - logger, - run_command=reactor.run, -): + appname: str, + soft_file_limit: int, + gc_thresholds: Tuple[int, int, int], + pid_file: str, + daemonize: bool, + print_pidfile: bool, + logger: logging.Logger, + run_command: Callable[[], None] = reactor.run, +) -> None: """Run the reactor in the main process Daemonizes if necessary, and then configures some resources, before starting the reactor Args: - appname (str): application name which will be sent to syslog - soft_file_limit (int): + appname: application name which will be sent to syslog + soft_file_limit: gc_thresholds: - pid_file (str): name of pid file to write to if daemonize is True - daemonize (bool): true to run the reactor in a background process - print_pidfile (bool): whether to print the pid file, if daemonize is True - logger (logging.Logger): logger instance to pass to Daemonize - run_command (Callable[]): callable that actually runs the reactor + pid_file: name of pid file to write to if daemonize is True + daemonize: true to run the reactor in a background process + print_pidfile: whether to print the pid file, if daemonize is True + logger: logger instance to pass to Daemonize + run_command: callable that actually runs the reactor """ - def run(): + def run() -> None: logger.info("Running") setup_jemalloc_stats() change_resource_limit(soft_file_limit) @@ -185,7 +211,7 @@ def redirect_stdio_to_logs() -> None: print("Redirected stdout/stderr to logs") -def register_start(cb: Callable[..., Awaitable], *args, **kwargs) -> None: +def register_start(cb: Callable[..., Awaitable], *args: Any, **kwargs: Any) -> None: """Register a callback with the reactor, to be called once it is running This can be used to initialise parts of the system which require an asynchronous @@ -195,7 +221,7 @@ def register_start(cb: Callable[..., Awaitable], *args, **kwargs) -> None: will exit. """ - async def wrapper(): + async def wrapper() -> None: try: await cb(*args, **kwargs) except Exception: @@ -224,7 +250,7 @@ def register_start(cb: Callable[..., Awaitable], *args, **kwargs) -> None: reactor.callWhenRunning(lambda: defer.ensureDeferred(wrapper())) -def listen_metrics(bind_addresses, port): +def listen_metrics(bind_addresses: Iterable[str], port: int) -> None: """ Start Prometheus metrics server. """ @@ -236,11 +262,11 @@ def listen_metrics(bind_addresses, port): def listen_manhole( - bind_addresses: Iterable[str], + bind_addresses: Collection[str], port: int, manhole_settings: ManholeConfig, manhole_globals: dict, -): +) -> None: # twisted.conch.manhole 21.1.0 uses "int_from_bytes", which produces a confusing # warning. It's fixed by https://github.com/twisted/twisted/pull/1522), so # suppress the warning for now. @@ -259,12 +285,18 @@ def listen_manhole( ) -def listen_tcp(bind_addresses, port, factory, reactor=reactor, backlog=50): +def listen_tcp( + bind_addresses: Collection[str], + port: int, + factory: ServerFactory, + reactor: IReactorTCP = reactor, + backlog: int = 50, +) -> List[Port]: """ Create a TCP socket for a port and several addresses Returns: - list[twisted.internet.tcp.Port]: listening for TCP connections + list of twisted.internet.tcp.Port listening for TCP connections """ r = [] for address in bind_addresses: @@ -273,12 +305,19 @@ def listen_tcp(bind_addresses, port, factory, reactor=reactor, backlog=50): except error.CannotListenError as e: check_bind_error(e, address, bind_addresses) - return r + # IReactorTCP returns an object implementing IListeningPort from listenTCP, + # but we know it will be a Port instance. + return r # type: ignore[return-value] def listen_ssl( - bind_addresses, port, factory, context_factory, reactor=reactor, backlog=50 -): + bind_addresses: Collection[str], + port: int, + factory: ServerFactory, + context_factory: IOpenSSLContextFactory, + reactor: IReactorSSL = reactor, + backlog: int = 50, +) -> List[Port]: """ Create an TLS-over-TCP socket for a port and several addresses @@ -294,10 +333,13 @@ def listen_ssl( except error.CannotListenError as e: check_bind_error(e, address, bind_addresses) - return r + # IReactorSSL incorrectly declares that an int is returned from listenSSL, + # it actually returns an object implementing IListeningPort, but we know it + # will be a Port instance. + return r # type: ignore[return-value] -def refresh_certificate(hs: "HomeServer"): +def refresh_certificate(hs: "HomeServer") -> None: """ Refresh the TLS certificates that Synapse is using by re-reading them from disk and updating the TLS context factories to use them. @@ -329,7 +371,7 @@ def refresh_certificate(hs: "HomeServer"): logger.info("Context factories updated.") -async def start(hs: "HomeServer"): +async def start(hs: "HomeServer") -> None: """ Start a Synapse server or worker. @@ -360,7 +402,7 @@ async def start(hs: "HomeServer"): if hasattr(signal, "SIGHUP"): @wrap_as_background_process("sighup") - def handle_sighup(*args, **kwargs): + def handle_sighup(*args: Any, **kwargs: Any) -> None: # Tell systemd our state, if we're using it. This will silently fail if # we're not using systemd. sdnotify(b"RELOADING=1") @@ -373,7 +415,7 @@ async def start(hs: "HomeServer"): # We defer running the sighup handlers until next reactor tick. This # is so that we're in a sane state, e.g. flushing the logs may fail # if the sighup happens in the middle of writing a log entry. - def run_sighup(*args, **kwargs): + def run_sighup(*args: Any, **kwargs: Any) -> None: # `callFromThread` should be "signal safe" as well as thread # safe. reactor.callFromThread(handle_sighup, *args, **kwargs) @@ -436,12 +478,8 @@ async def start(hs: "HomeServer"): atexit.register(gc.freeze) -def setup_sentry(hs: "HomeServer"): - """Enable sentry integration, if enabled in configuration - - Args: - hs - """ +def setup_sentry(hs: "HomeServer") -> None: + """Enable sentry integration, if enabled in configuration""" if not hs.config.metrics.sentry_enabled: return @@ -466,7 +504,7 @@ def setup_sentry(hs: "HomeServer"): scope.set_tag("worker_name", name) -def setup_sdnotify(hs: "HomeServer"): +def setup_sdnotify(hs: "HomeServer") -> None: """Adds process state hooks to tell systemd what we are up to.""" # Tell systemd our state, if we're using it. This will silently fail if @@ -481,7 +519,7 @@ def setup_sdnotify(hs: "HomeServer"): sdnotify_sockaddr = os.getenv("NOTIFY_SOCKET") -def sdnotify(state): +def sdnotify(state: bytes) -> None: """ Send a notification to systemd, if the NOTIFY_SOCKET env var is set. @@ -490,7 +528,7 @@ def sdnotify(state): package which many OSes don't include as a matter of principle. Args: - state (bytes): notification to send + state: notification to send """ if not isinstance(state, bytes): raise TypeError("sdnotify should be called with a bytes") diff --git a/synapse/app/admin_cmd.py b/synapse/app/admin_cmd.py index ad20b1d6aa..42238f7f28 100644 --- a/synapse/app/admin_cmd.py +++ b/synapse/app/admin_cmd.py @@ -17,6 +17,7 @@ import logging import os import sys import tempfile +from typing import List, Optional from twisted.internet import defer, task @@ -25,6 +26,7 @@ from synapse.app import _base from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging +from synapse.events import EventBase from synapse.handlers.admin import ExfiltrationWriter from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.account_data import SlavedAccountDataStore @@ -40,6 +42,7 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.server import HomeServer from synapse.storage.databases.main.room import RoomWorkerStore +from synapse.types import StateMap from synapse.util.logcontext import LoggingContext from synapse.util.versionstring import get_version_string @@ -65,16 +68,11 @@ class AdminCmdSlavedStore( class AdminCmdServer(HomeServer): - DATASTORE_CLASS = AdminCmdSlavedStore + DATASTORE_CLASS = AdminCmdSlavedStore # type: ignore -async def export_data_command(hs: HomeServer, args): - """Export data for a user. - - Args: - hs - args (argparse.Namespace) - """ +async def export_data_command(hs: HomeServer, args: argparse.Namespace) -> None: + """Export data for a user.""" user_id = args.user_id directory = args.output_directory @@ -92,12 +90,12 @@ class FileExfiltrationWriter(ExfiltrationWriter): Note: This writes to disk on the main reactor thread. Args: - user_id (str): The user whose data is being exfiltrated. - directory (str|None): The directory to write the data to, if None then - will write to a temporary directory. + user_id: The user whose data is being exfiltrated. + directory: The directory to write the data to, if None then will write + to a temporary directory. """ - def __init__(self, user_id, directory=None): + def __init__(self, user_id: str, directory: Optional[str] = None): self.user_id = user_id if directory: @@ -111,7 +109,7 @@ class FileExfiltrationWriter(ExfiltrationWriter): if list(os.listdir(self.base_directory)): raise Exception("Directory must be empty") - def write_events(self, room_id, events): + def write_events(self, room_id: str, events: List[EventBase]) -> None: room_directory = os.path.join(self.base_directory, "rooms", room_id) os.makedirs(room_directory, exist_ok=True) events_file = os.path.join(room_directory, "events") @@ -120,7 +118,9 @@ class FileExfiltrationWriter(ExfiltrationWriter): for event in events: print(json.dumps(event.get_pdu_json()), file=f) - def write_state(self, room_id, event_id, state): + def write_state( + self, room_id: str, event_id: str, state: StateMap[EventBase] + ) -> None: room_directory = os.path.join(self.base_directory, "rooms", room_id) state_directory = os.path.join(room_directory, "state") os.makedirs(state_directory, exist_ok=True) @@ -131,7 +131,9 @@ class FileExfiltrationWriter(ExfiltrationWriter): for event in state.values(): print(json.dumps(event.get_pdu_json()), file=f) - def write_invite(self, room_id, event, state): + def write_invite( + self, room_id: str, event: EventBase, state: StateMap[EventBase] + ) -> None: self.write_events(room_id, [event]) # We write the invite state somewhere else as they aren't full events @@ -145,7 +147,9 @@ class FileExfiltrationWriter(ExfiltrationWriter): for event in state.values(): print(json.dumps(event), file=f) - def write_knock(self, room_id, event, state): + def write_knock( + self, room_id: str, event: EventBase, state: StateMap[EventBase] + ) -> None: self.write_events(room_id, [event]) # We write the knock state somewhere else as they aren't full events @@ -159,11 +163,11 @@ class FileExfiltrationWriter(ExfiltrationWriter): for event in state.values(): print(json.dumps(event), file=f) - def finished(self): + def finished(self) -> str: return self.base_directory -def start(config_options): +def start(config_options: List[str]) -> None: parser = argparse.ArgumentParser(description="Synapse Admin Command") HomeServerConfig.add_arguments_to_parser(parser) @@ -231,7 +235,7 @@ def start(config_options): # We also make sure that `_base.start` gets run before we actually run the # command. - async def run(): + async def run() -> None: with LoggingContext("command"): await _base.start(ss) await args.func(ss, args) diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 218826741e..46f0feff70 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -14,11 +14,10 @@ # limitations under the License. import logging import sys -from typing import Dict, Optional +from typing import Dict, List, Optional, Tuple from twisted.internet import address -from twisted.web.resource import IResource -from twisted.web.server import Request +from twisted.web.resource import Resource import synapse import synapse.events @@ -44,7 +43,7 @@ from synapse.config.server import ListenerConfig from synapse.federation.transport.server import TransportLayerServer from synapse.http.server import JsonResource, OptionsResource from synapse.http.servlet import RestServlet, parse_json_object_from_request -from synapse.http.site import SynapseSite +from synapse.http.site import SynapseRequest, SynapseSite from synapse.logging.context import LoggingContext from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource @@ -119,6 +118,7 @@ from synapse.storage.databases.main.stats import StatsStore from synapse.storage.databases.main.transactions import TransactionWorkerStore from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore from synapse.storage.databases.main.user_directory import UserDirectoryStore +from synapse.types import JsonDict from synapse.util.httpresourcetree import create_resource_tree from synapse.util.versionstring import get_version_string @@ -143,7 +143,9 @@ class KeyUploadServlet(RestServlet): self.http_client = hs.get_simple_http_client() self.main_uri = hs.config.worker.worker_main_http_uri - async def on_POST(self, request: Request, device_id: Optional[str]): + async def on_POST( + self, request: SynapseRequest, device_id: Optional[str] + ) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request, allow_guest=True) user_id = requester.user.to_string() body = parse_json_object_from_request(request) @@ -187,9 +189,8 @@ class KeyUploadServlet(RestServlet): # If the header exists, add to the comma-separated list of the first # instance of the header. Otherwise, generate a new header. if x_forwarded_for: - x_forwarded_for = [ - x_forwarded_for[0] + b", " + previous_host - ] + x_forwarded_for[1:] + x_forwarded_for = [x_forwarded_for[0] + b", " + previous_host] + x_forwarded_for.extend(x_forwarded_for[1:]) else: x_forwarded_for = [previous_host] headers[b"X-Forwarded-For"] = x_forwarded_for @@ -253,13 +254,16 @@ class GenericWorkerSlavedStore( SessionStore, BaseSlavedStore, ): - pass + # Properties that multiple storage classes define. Tell mypy what the + # expected type is. + server_name: str + config: HomeServerConfig class GenericWorkerServer(HomeServer): - DATASTORE_CLASS = GenericWorkerSlavedStore + DATASTORE_CLASS = GenericWorkerSlavedStore # type: ignore - def _listen_http(self, listener_config: ListenerConfig): + def _listen_http(self, listener_config: ListenerConfig) -> None: port = listener_config.port bind_addresses = listener_config.bind_addresses @@ -267,10 +271,10 @@ class GenericWorkerServer(HomeServer): site_tag = listener_config.http_options.tag if site_tag is None: - site_tag = port + site_tag = str(port) # We always include a health resource. - resources: Dict[str, IResource] = {"/health": HealthResource()} + resources: Dict[str, Resource] = {"/health": HealthResource()} for res in listener_config.http_options.resources: for name in res.names: @@ -386,7 +390,7 @@ class GenericWorkerServer(HomeServer): logger.info("Synapse worker now listening on port %d", port) - def start_listening(self): + def start_listening(self) -> None: for listener in self.config.worker.worker_listeners: if listener.type == "http": self._listen_http(listener) @@ -411,7 +415,7 @@ class GenericWorkerServer(HomeServer): self.get_tcp_replication().start_replication(self) -def start(config_options): +def start(config_options: List[str]) -> None: try: config = HomeServerConfig.load_config("Synapse worker", config_options) except ConfigError as e: diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 336c279a44..4efadde57e 100644 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -16,10 +16,10 @@ import logging import os import sys -from typing import Iterator +from typing import Dict, Iterable, Iterator, List -from twisted.internet import reactor -from twisted.web.resource import EncodingResourceWrapper, IResource +from twisted.internet.tcp import Port +from twisted.web.resource import EncodingResourceWrapper, Resource from twisted.web.server import GzipEncoderFactory from twisted.web.static import File @@ -76,23 +76,27 @@ from synapse.util.versionstring import get_version_string logger = logging.getLogger("synapse.app.homeserver") -def gz_wrap(r): +def gz_wrap(r: Resource) -> Resource: return EncodingResourceWrapper(r, [GzipEncoderFactory()]) class SynapseHomeServer(HomeServer): - DATASTORE_CLASS = DataStore + DATASTORE_CLASS = DataStore # type: ignore - def _listener_http(self, config: HomeServerConfig, listener_config: ListenerConfig): + def _listener_http( + self, config: HomeServerConfig, listener_config: ListenerConfig + ) -> Iterable[Port]: port = listener_config.port bind_addresses = listener_config.bind_addresses tls = listener_config.tls + # Must exist since this is an HTTP listener. + assert listener_config.http_options is not None site_tag = listener_config.http_options.tag if site_tag is None: site_tag = str(port) # We always include a health resource. - resources = {"/health": HealthResource()} + resources: Dict[str, Resource] = {"/health": HealthResource()} for res in listener_config.http_options.resources: for name in res.names: @@ -111,7 +115,7 @@ class SynapseHomeServer(HomeServer): ("listeners", site_tag, "additional_resources", "<%s>" % (path,)), ) handler = handler_cls(config, module_api) - if IResource.providedBy(handler): + if isinstance(handler, Resource): resource = handler elif hasattr(handler, "handle_request"): resource = AdditionalResource(self, handler.handle_request) @@ -128,7 +132,7 @@ class SynapseHomeServer(HomeServer): # try to find something useful to redirect '/' to if WEB_CLIENT_PREFIX in resources: - root_resource = RootOptionsRedirectResource(WEB_CLIENT_PREFIX) + root_resource: Resource = RootOptionsRedirectResource(WEB_CLIENT_PREFIX) elif STATIC_PREFIX in resources: root_resource = RootOptionsRedirectResource(STATIC_PREFIX) else: @@ -145,6 +149,8 @@ class SynapseHomeServer(HomeServer): ) if tls: + # refresh_certificate should have been called before this. + assert self.tls_server_context_factory is not None ports = listen_ssl( bind_addresses, port, @@ -165,20 +171,21 @@ class SynapseHomeServer(HomeServer): return ports - def _configure_named_resource(self, name, compress=False): + def _configure_named_resource( + self, name: str, compress: bool = False + ) -> Dict[str, Resource]: """Build a resource map for a named resource Args: - name (str): named resource: one of "client", "federation", etc - compress (bool): whether to enable gzip compression for this - resource + name: named resource: one of "client", "federation", etc + compress: whether to enable gzip compression for this resource Returns: - dict[str, Resource]: map from path to HTTP resource + map from path to HTTP resource """ - resources = {} + resources: Dict[str, Resource] = {} if name == "client": - client_resource = ClientRestResource(self) + client_resource: Resource = ClientRestResource(self) if compress: client_resource = gz_wrap(client_resource) @@ -186,6 +193,7 @@ class SynapseHomeServer(HomeServer): { "/_matrix/client/api/v1": client_resource, "/_matrix/client/r0": client_resource, + "/_matrix/client/v3": client_resource, "/_matrix/client/unstable": client_resource, "/_matrix/client/v2_alpha": client_resource, "/_matrix/client/versions": client_resource, @@ -207,7 +215,7 @@ class SynapseHomeServer(HomeServer): if name == "consent": from synapse.rest.consent.consent_resource import ConsentResource - consent_resource = ConsentResource(self) + consent_resource: Resource = ConsentResource(self) if compress: consent_resource = gz_wrap(consent_resource) resources.update({"/_matrix/consent": consent_resource}) @@ -277,7 +285,7 @@ class SynapseHomeServer(HomeServer): return resources - def start_listening(self): + def start_listening(self) -> None: if self.config.redis.redis_enabled: # If redis is enabled we connect via the replication command handler # in the same way as the workers (since we're effectively a client @@ -303,7 +311,9 @@ class SynapseHomeServer(HomeServer): ReplicationStreamProtocolFactory(self), ) for s in services: - reactor.addSystemEventTrigger("before", "shutdown", s.stopListening) + self.get_reactor().addSystemEventTrigger( + "before", "shutdown", s.stopListening + ) elif listener.type == "metrics": if not self.config.metrics.enable_metrics: logger.warning( @@ -318,14 +328,13 @@ class SynapseHomeServer(HomeServer): logger.warning("Unrecognized listener type: %s", listener.type) -def setup(config_options): +def setup(config_options: List[str]) -> SynapseHomeServer: """ Args: - config_options_options: The options passed to Synapse. Usually - `sys.argv[1:]`. + config_options_options: The options passed to Synapse. Usually `sys.argv[1:]`. Returns: - HomeServer + A homeserver instance. """ try: config = HomeServerConfig.load_or_generate_config( @@ -364,7 +373,7 @@ def setup(config_options): except Exception as e: handle_startup_exception(e) - async def start(): + async def start() -> None: # Load the OIDC provider metadatas, if OIDC is enabled. if hs.config.oidc.oidc_enabled: oidc = hs.get_oidc_handler() @@ -404,39 +413,15 @@ def format_config_error(e: ConfigError) -> Iterator[str]: yield ":\n %s" % (e.msg,) - e = e.__cause__ + parent_e = e.__cause__ indent = 1 - while e: + while parent_e: indent += 1 - yield ":\n%s%s" % (" " * indent, str(e)) - e = e.__cause__ - - -def run(hs: HomeServer): - PROFILE_SYNAPSE = False - if PROFILE_SYNAPSE: - - def profile(func): - from cProfile import Profile - from threading import current_thread - - def profiled(*args, **kargs): - profile = Profile() - profile.enable() - func(*args, **kargs) - profile.disable() - ident = current_thread().ident - profile.dump_stats( - "/tmp/%s.%s.%i.pstat" % (hs.hostname, func.__name__, ident) - ) - - return profiled - - from twisted.python.threadpool import ThreadPool + yield ":\n%s%s" % (" " * indent, str(parent_e)) + parent_e = parent_e.__cause__ - ThreadPool._worker = profile(ThreadPool._worker) - reactor.run = profile(reactor.run) +def run(hs: HomeServer) -> None: _base.start_reactor( "synapse-homeserver", soft_file_limit=hs.config.server.soft_file_limit, @@ -448,7 +433,7 @@ def run(hs: HomeServer): ) -def main(): +def main() -> None: with LoggingContext("main"): # check base requirements check_requirements() diff --git a/synapse/app/phone_stats_home.py b/synapse/app/phone_stats_home.py index 126450e17a..899dba5c3d 100644 --- a/synapse/app/phone_stats_home.py +++ b/synapse/app/phone_stats_home.py @@ -15,11 +15,12 @@ import logging import math import resource import sys -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, List, Sized, Tuple from prometheus_client import Gauge from synapse.metrics.background_process_metrics import wrap_as_background_process +from synapse.types import JsonDict if TYPE_CHECKING: from synapse.server import HomeServer @@ -28,7 +29,7 @@ logger = logging.getLogger("synapse.app.homeserver") # Contains the list of processes we will be monitoring # currently either 0 or 1 -_stats_process = [] +_stats_process: List[Tuple[int, "resource.struct_rusage"]] = [] # Gauges to expose monthly active user control metrics current_mau_gauge = Gauge("synapse_admin_mau:current", "Current MAU") @@ -45,9 +46,15 @@ registered_reserved_users_mau_gauge = Gauge( @wrap_as_background_process("phone_stats_home") -async def phone_stats_home(hs: "HomeServer", stats, stats_process=_stats_process): +async def phone_stats_home( + hs: "HomeServer", + stats: JsonDict, + stats_process: List[Tuple[int, "resource.struct_rusage"]] = _stats_process, +) -> None: logger.info("Gathering stats for reporting") now = int(hs.get_clock().time()) + # Ensure the homeserver has started. + assert hs.start_time is not None uptime = int(now - hs.start_time) if uptime < 0: uptime = 0 @@ -146,15 +153,15 @@ async def phone_stats_home(hs: "HomeServer", stats, stats_process=_stats_process logger.warning("Error reporting stats: %s", e) -def start_phone_stats_home(hs: "HomeServer"): +def start_phone_stats_home(hs: "HomeServer") -> None: """ Start the background tasks which report phone home stats. """ clock = hs.get_clock() - stats = {} + stats: JsonDict = {} - def performance_stats_init(): + def performance_stats_init() -> None: _stats_process.clear() _stats_process.append( (int(hs.get_clock().time()), resource.getrusage(resource.RUSAGE_SELF)) @@ -170,10 +177,10 @@ def start_phone_stats_home(hs: "HomeServer"): hs.get_datastore().reap_monthly_active_users() @wrap_as_background_process("generate_monthly_active_users") - async def generate_monthly_active_users(): + async def generate_monthly_active_users() -> None: current_mau_count = 0 current_mau_count_by_service = {} - reserved_users = () + reserved_users: Sized = () store = hs.get_datastore() if hs.config.server.limit_usage_by_mau or hs.config.server.mau_stats_only: current_mau_count = await store.get_monthly_active_count() |