summary refs log tree commit diff
path: root/synapse/app
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/app')
-rw-r--r--synapse/app/__init__.py11
-rw-r--r--synapse/app/_base.py140
-rw-r--r--synapse/app/admin_cmd.py42
-rw-r--r--synapse/app/generic_worker.py34
-rw-r--r--synapse/app/homeserver.py93
-rw-r--r--synapse/app/phone_stats_home.py23
6 files changed, 192 insertions, 151 deletions
diff --git a/synapse/app/__init__.py b/synapse/app/__init__.py
index f9940491e8..ee51480a9e 100644
--- a/synapse/app/__init__.py
+++ b/synapse/app/__init__.py
@@ -13,6 +13,7 @@
 # limitations under the License.
 import logging
 import sys
+from typing import Container
 
 from synapse import python_dependencies  # noqa: E402
 
@@ -27,7 +28,9 @@ except python_dependencies.DependencyException as e:
     sys.exit(1)
 
 
-def check_bind_error(e, address, bind_addresses):
+def check_bind_error(
+    e: Exception, address: str, bind_addresses: Container[str]
+) -> None:
     """
     This method checks an exception occurred while binding on 0.0.0.0.
     If :: is specified in the bind addresses a warning is shown.
@@ -38,9 +41,9 @@ def check_bind_error(e, address, bind_addresses):
     When binding on 0.0.0.0 after :: this can safely be ignored.
 
     Args:
-        e (Exception): Exception that was caught.
-        address (str): Address on which binding was attempted.
-        bind_addresses (list): Addresses on which the service listens.
+        e: Exception that was caught.
+        address: Address on which binding was attempted.
+        bind_addresses: Addresses on which the service listens.
     """
     if address == "0.0.0.0" and "::" in bind_addresses:
         logger.warning(
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index f2c1028b5d..573bb487b2 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -22,13 +22,27 @@ import socket
 import sys
 import traceback
 import warnings
-from typing import TYPE_CHECKING, Awaitable, Callable, Iterable
+from typing import (
+    TYPE_CHECKING,
+    Any,
+    Awaitable,
+    Callable,
+    Collection,
+    Dict,
+    Iterable,
+    List,
+    NoReturn,
+    Tuple,
+    cast,
+)
 
 from cryptography.utils import CryptographyDeprecationWarning
-from typing_extensions import NoReturn
 
 import twisted
-from twisted.internet import defer, error, reactor
+from twisted.internet import defer, error, reactor as _reactor
+from twisted.internet.interfaces import IOpenSSLContextFactory, IReactorSSL, IReactorTCP
+from twisted.internet.protocol import ServerFactory
+from twisted.internet.tcp import Port
 from twisted.logger import LoggingFile, LogLevel
 from twisted.protocols.tls import TLSMemoryBIOFactory
 from twisted.python.threadpool import ThreadPool
@@ -48,6 +62,7 @@ from synapse.logging.context import PreserveLoggingContext
 from synapse.metrics import register_threadpool
 from synapse.metrics.background_process_metrics import wrap_as_background_process
 from synapse.metrics.jemalloc import setup_jemalloc_stats
+from synapse.types import ISynapseReactor
 from synapse.util.caches.lrucache import setup_expire_lru_cache_entries
 from synapse.util.daemonize import daemonize_process
 from synapse.util.gai_resolver import GAIResolver
@@ -57,33 +72,44 @@ from synapse.util.versionstring import get_version_string
 if TYPE_CHECKING:
     from synapse.server import HomeServer
 
+# Twisted injects the global reactor to make it easier to import, this confuses
+# mypy which thinks it is a module. Tell it that it a more proper type.
+reactor = cast(ISynapseReactor, _reactor)
+
+
 logger = logging.getLogger(__name__)
 
 # list of tuples of function, args list, kwargs dict
-_sighup_callbacks = []
+_sighup_callbacks: List[
+    Tuple[Callable[..., None], Tuple[Any, ...], Dict[str, Any]]
+] = []
 
 
-def register_sighup(func, *args, **kwargs):
+def register_sighup(func: Callable[..., None], *args: Any, **kwargs: Any) -> None:
     """
     Register a function to be called when a SIGHUP occurs.
 
     Args:
-        func (function): Function to be called when sent a SIGHUP signal.
+        func: Function to be called when sent a SIGHUP signal.
         *args, **kwargs: args and kwargs to be passed to the target function.
     """
     _sighup_callbacks.append((func, args, kwargs))
 
 
-def start_worker_reactor(appname, config, run_command=reactor.run):
+def start_worker_reactor(
+    appname: str,
+    config: HomeServerConfig,
+    run_command: Callable[[], None] = reactor.run,
+) -> None:
     """Run the reactor in the main process
 
     Daemonizes if necessary, and then configures some resources, before starting
     the reactor. Pulls configuration from the 'worker' settings in 'config'.
 
     Args:
-        appname (str): application name which will be sent to syslog
-        config (synapse.config.Config): config object
-        run_command (Callable[]): callable that actually runs the reactor
+        appname: application name which will be sent to syslog
+        config: config object
+        run_command: callable that actually runs the reactor
     """
 
     logger = logging.getLogger(config.worker.worker_app)
@@ -101,32 +127,32 @@ def start_worker_reactor(appname, config, run_command=reactor.run):
 
 
 def start_reactor(
-    appname,
-    soft_file_limit,
-    gc_thresholds,
-    pid_file,
-    daemonize,
-    print_pidfile,
-    logger,
-    run_command=reactor.run,
-):
+    appname: str,
+    soft_file_limit: int,
+    gc_thresholds: Tuple[int, int, int],
+    pid_file: str,
+    daemonize: bool,
+    print_pidfile: bool,
+    logger: logging.Logger,
+    run_command: Callable[[], None] = reactor.run,
+) -> None:
     """Run the reactor in the main process
 
     Daemonizes if necessary, and then configures some resources, before starting
     the reactor
 
     Args:
-        appname (str): application name which will be sent to syslog
-        soft_file_limit (int):
+        appname: application name which will be sent to syslog
+        soft_file_limit:
         gc_thresholds:
-        pid_file (str): name of pid file to write to if daemonize is True
-        daemonize (bool): true to run the reactor in a background process
-        print_pidfile (bool): whether to print the pid file, if daemonize is True
-        logger (logging.Logger): logger instance to pass to Daemonize
-        run_command (Callable[]): callable that actually runs the reactor
+        pid_file: name of pid file to write to if daemonize is True
+        daemonize: true to run the reactor in a background process
+        print_pidfile: whether to print the pid file, if daemonize is True
+        logger: logger instance to pass to Daemonize
+        run_command: callable that actually runs the reactor
     """
 
-    def run():
+    def run() -> None:
         logger.info("Running")
         setup_jemalloc_stats()
         change_resource_limit(soft_file_limit)
@@ -185,7 +211,7 @@ def redirect_stdio_to_logs() -> None:
     print("Redirected stdout/stderr to logs")
 
 
-def register_start(cb: Callable[..., Awaitable], *args, **kwargs) -> None:
+def register_start(cb: Callable[..., Awaitable], *args: Any, **kwargs: Any) -> None:
     """Register a callback with the reactor, to be called once it is running
 
     This can be used to initialise parts of the system which require an asynchronous
@@ -195,7 +221,7 @@ def register_start(cb: Callable[..., Awaitable], *args, **kwargs) -> None:
     will exit.
     """
 
-    async def wrapper():
+    async def wrapper() -> None:
         try:
             await cb(*args, **kwargs)
         except Exception:
@@ -224,7 +250,7 @@ def register_start(cb: Callable[..., Awaitable], *args, **kwargs) -> None:
     reactor.callWhenRunning(lambda: defer.ensureDeferred(wrapper()))
 
 
-def listen_metrics(bind_addresses, port):
+def listen_metrics(bind_addresses: Iterable[str], port: int) -> None:
     """
     Start Prometheus metrics server.
     """
@@ -236,11 +262,11 @@ def listen_metrics(bind_addresses, port):
 
 
 def listen_manhole(
-    bind_addresses: Iterable[str],
+    bind_addresses: Collection[str],
     port: int,
     manhole_settings: ManholeConfig,
     manhole_globals: dict,
-):
+) -> None:
     # twisted.conch.manhole 21.1.0 uses "int_from_bytes", which produces a confusing
     # warning. It's fixed by https://github.com/twisted/twisted/pull/1522), so
     # suppress the warning for now.
@@ -259,12 +285,18 @@ def listen_manhole(
     )
 
 
-def listen_tcp(bind_addresses, port, factory, reactor=reactor, backlog=50):
+def listen_tcp(
+    bind_addresses: Collection[str],
+    port: int,
+    factory: ServerFactory,
+    reactor: IReactorTCP = reactor,
+    backlog: int = 50,
+) -> List[Port]:
     """
     Create a TCP socket for a port and several addresses
 
     Returns:
-        list[twisted.internet.tcp.Port]: listening for TCP connections
+        list of twisted.internet.tcp.Port listening for TCP connections
     """
     r = []
     for address in bind_addresses:
@@ -273,12 +305,19 @@ def listen_tcp(bind_addresses, port, factory, reactor=reactor, backlog=50):
         except error.CannotListenError as e:
             check_bind_error(e, address, bind_addresses)
 
-    return r
+    # IReactorTCP returns an object implementing IListeningPort from listenTCP,
+    # but we know it will be a Port instance.
+    return r  # type: ignore[return-value]
 
 
 def listen_ssl(
-    bind_addresses, port, factory, context_factory, reactor=reactor, backlog=50
-):
+    bind_addresses: Collection[str],
+    port: int,
+    factory: ServerFactory,
+    context_factory: IOpenSSLContextFactory,
+    reactor: IReactorSSL = reactor,
+    backlog: int = 50,
+) -> List[Port]:
     """
     Create an TLS-over-TCP socket for a port and several addresses
 
@@ -294,10 +333,13 @@ def listen_ssl(
         except error.CannotListenError as e:
             check_bind_error(e, address, bind_addresses)
 
-    return r
+    # IReactorSSL incorrectly declares that an int is returned from listenSSL,
+    # it actually returns an object implementing IListeningPort, but we know it
+    # will be a Port instance.
+    return r  # type: ignore[return-value]
 
 
-def refresh_certificate(hs: "HomeServer"):
+def refresh_certificate(hs: "HomeServer") -> None:
     """
     Refresh the TLS certificates that Synapse is using by re-reading them from
     disk and updating the TLS context factories to use them.
@@ -329,7 +371,7 @@ def refresh_certificate(hs: "HomeServer"):
         logger.info("Context factories updated.")
 
 
-async def start(hs: "HomeServer"):
+async def start(hs: "HomeServer") -> None:
     """
     Start a Synapse server or worker.
 
@@ -360,7 +402,7 @@ async def start(hs: "HomeServer"):
     if hasattr(signal, "SIGHUP"):
 
         @wrap_as_background_process("sighup")
-        def handle_sighup(*args, **kwargs):
+        def handle_sighup(*args: Any, **kwargs: Any) -> None:
             # Tell systemd our state, if we're using it. This will silently fail if
             # we're not using systemd.
             sdnotify(b"RELOADING=1")
@@ -373,7 +415,7 @@ async def start(hs: "HomeServer"):
         # We defer running the sighup handlers until next reactor tick. This
         # is so that we're in a sane state, e.g. flushing the logs may fail
         # if the sighup happens in the middle of writing a log entry.
-        def run_sighup(*args, **kwargs):
+        def run_sighup(*args: Any, **kwargs: Any) -> None:
             # `callFromThread` should be "signal safe" as well as thread
             # safe.
             reactor.callFromThread(handle_sighup, *args, **kwargs)
@@ -436,12 +478,8 @@ async def start(hs: "HomeServer"):
         atexit.register(gc.freeze)
 
 
-def setup_sentry(hs: "HomeServer"):
-    """Enable sentry integration, if enabled in configuration
-
-    Args:
-        hs
-    """
+def setup_sentry(hs: "HomeServer") -> None:
+    """Enable sentry integration, if enabled in configuration"""
 
     if not hs.config.metrics.sentry_enabled:
         return
@@ -466,7 +504,7 @@ def setup_sentry(hs: "HomeServer"):
         scope.set_tag("worker_name", name)
 
 
-def setup_sdnotify(hs: "HomeServer"):
+def setup_sdnotify(hs: "HomeServer") -> None:
     """Adds process state hooks to tell systemd what we are up to."""
 
     # Tell systemd our state, if we're using it. This will silently fail if
@@ -481,7 +519,7 @@ def setup_sdnotify(hs: "HomeServer"):
 sdnotify_sockaddr = os.getenv("NOTIFY_SOCKET")
 
 
-def sdnotify(state):
+def sdnotify(state: bytes) -> None:
     """
     Send a notification to systemd, if the NOTIFY_SOCKET env var is set.
 
@@ -490,7 +528,7 @@ def sdnotify(state):
     package which many OSes don't include as a matter of principle.
 
     Args:
-        state (bytes): notification to send
+        state: notification to send
     """
     if not isinstance(state, bytes):
         raise TypeError("sdnotify should be called with a bytes")
diff --git a/synapse/app/admin_cmd.py b/synapse/app/admin_cmd.py
index ad20b1d6aa..42238f7f28 100644
--- a/synapse/app/admin_cmd.py
+++ b/synapse/app/admin_cmd.py
@@ -17,6 +17,7 @@ import logging
 import os
 import sys
 import tempfile
+from typing import List, Optional
 
 from twisted.internet import defer, task
 
@@ -25,6 +26,7 @@ from synapse.app import _base
 from synapse.config._base import ConfigError
 from synapse.config.homeserver import HomeServerConfig
 from synapse.config.logger import setup_logging
+from synapse.events import EventBase
 from synapse.handlers.admin import ExfiltrationWriter
 from synapse.replication.slave.storage._base import BaseSlavedStore
 from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
@@ -40,6 +42,7 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.server import HomeServer
 from synapse.storage.databases.main.room import RoomWorkerStore
+from synapse.types import StateMap
 from synapse.util.logcontext import LoggingContext
 from synapse.util.versionstring import get_version_string
 
@@ -65,16 +68,11 @@ class AdminCmdSlavedStore(
 
 
 class AdminCmdServer(HomeServer):
-    DATASTORE_CLASS = AdminCmdSlavedStore
+    DATASTORE_CLASS = AdminCmdSlavedStore  # type: ignore
 
 
-async def export_data_command(hs: HomeServer, args):
-    """Export data for a user.
-
-    Args:
-        hs
-        args (argparse.Namespace)
-    """
+async def export_data_command(hs: HomeServer, args: argparse.Namespace) -> None:
+    """Export data for a user."""
 
     user_id = args.user_id
     directory = args.output_directory
@@ -92,12 +90,12 @@ class FileExfiltrationWriter(ExfiltrationWriter):
     Note: This writes to disk on the main reactor thread.
 
     Args:
-        user_id (str): The user whose data is being exfiltrated.
-        directory (str|None): The directory to write the data to, if None then
-            will write to a temporary directory.
+        user_id: The user whose data is being exfiltrated.
+        directory: The directory to write the data to, if None then will write
+            to a temporary directory.
     """
 
-    def __init__(self, user_id, directory=None):
+    def __init__(self, user_id: str, directory: Optional[str] = None):
         self.user_id = user_id
 
         if directory:
@@ -111,7 +109,7 @@ class FileExfiltrationWriter(ExfiltrationWriter):
         if list(os.listdir(self.base_directory)):
             raise Exception("Directory must be empty")
 
-    def write_events(self, room_id, events):
+    def write_events(self, room_id: str, events: List[EventBase]) -> None:
         room_directory = os.path.join(self.base_directory, "rooms", room_id)
         os.makedirs(room_directory, exist_ok=True)
         events_file = os.path.join(room_directory, "events")
@@ -120,7 +118,9 @@ class FileExfiltrationWriter(ExfiltrationWriter):
             for event in events:
                 print(json.dumps(event.get_pdu_json()), file=f)
 
-    def write_state(self, room_id, event_id, state):
+    def write_state(
+        self, room_id: str, event_id: str, state: StateMap[EventBase]
+    ) -> None:
         room_directory = os.path.join(self.base_directory, "rooms", room_id)
         state_directory = os.path.join(room_directory, "state")
         os.makedirs(state_directory, exist_ok=True)
@@ -131,7 +131,9 @@ class FileExfiltrationWriter(ExfiltrationWriter):
             for event in state.values():
                 print(json.dumps(event.get_pdu_json()), file=f)
 
-    def write_invite(self, room_id, event, state):
+    def write_invite(
+        self, room_id: str, event: EventBase, state: StateMap[EventBase]
+    ) -> None:
         self.write_events(room_id, [event])
 
         # We write the invite state somewhere else as they aren't full events
@@ -145,7 +147,9 @@ class FileExfiltrationWriter(ExfiltrationWriter):
             for event in state.values():
                 print(json.dumps(event), file=f)
 
-    def write_knock(self, room_id, event, state):
+    def write_knock(
+        self, room_id: str, event: EventBase, state: StateMap[EventBase]
+    ) -> None:
         self.write_events(room_id, [event])
 
         # We write the knock state somewhere else as they aren't full events
@@ -159,11 +163,11 @@ class FileExfiltrationWriter(ExfiltrationWriter):
             for event in state.values():
                 print(json.dumps(event), file=f)
 
-    def finished(self):
+    def finished(self) -> str:
         return self.base_directory
 
 
-def start(config_options):
+def start(config_options: List[str]) -> None:
     parser = argparse.ArgumentParser(description="Synapse Admin Command")
     HomeServerConfig.add_arguments_to_parser(parser)
 
@@ -231,7 +235,7 @@ def start(config_options):
     # We also make sure that `_base.start` gets run before we actually run the
     # command.
 
-    async def run():
+    async def run() -> None:
         with LoggingContext("command"):
             await _base.start(ss)
             await args.func(ss, args)
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 218826741e..46f0feff70 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -14,11 +14,10 @@
 # limitations under the License.
 import logging
 import sys
-from typing import Dict, Optional
+from typing import Dict, List, Optional, Tuple
 
 from twisted.internet import address
-from twisted.web.resource import IResource
-from twisted.web.server import Request
+from twisted.web.resource import Resource
 
 import synapse
 import synapse.events
@@ -44,7 +43,7 @@ from synapse.config.server import ListenerConfig
 from synapse.federation.transport.server import TransportLayerServer
 from synapse.http.server import JsonResource, OptionsResource
 from synapse.http.servlet import RestServlet, parse_json_object_from_request
-from synapse.http.site import SynapseSite
+from synapse.http.site import SynapseRequest, SynapseSite
 from synapse.logging.context import LoggingContext
 from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
 from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
@@ -119,6 +118,7 @@ from synapse.storage.databases.main.stats import StatsStore
 from synapse.storage.databases.main.transactions import TransactionWorkerStore
 from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore
 from synapse.storage.databases.main.user_directory import UserDirectoryStore
+from synapse.types import JsonDict
 from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.versionstring import get_version_string
 
@@ -143,7 +143,9 @@ class KeyUploadServlet(RestServlet):
         self.http_client = hs.get_simple_http_client()
         self.main_uri = hs.config.worker.worker_main_http_uri
 
-    async def on_POST(self, request: Request, device_id: Optional[str]):
+    async def on_POST(
+        self, request: SynapseRequest, device_id: Optional[str]
+    ) -> Tuple[int, JsonDict]:
         requester = await self.auth.get_user_by_req(request, allow_guest=True)
         user_id = requester.user.to_string()
         body = parse_json_object_from_request(request)
@@ -187,9 +189,8 @@ class KeyUploadServlet(RestServlet):
                 # If the header exists, add to the comma-separated list of the first
                 # instance of the header. Otherwise, generate a new header.
                 if x_forwarded_for:
-                    x_forwarded_for = [
-                        x_forwarded_for[0] + b", " + previous_host
-                    ] + x_forwarded_for[1:]
+                    x_forwarded_for = [x_forwarded_for[0] + b", " + previous_host]
+                    x_forwarded_for.extend(x_forwarded_for[1:])
                 else:
                     x_forwarded_for = [previous_host]
             headers[b"X-Forwarded-For"] = x_forwarded_for
@@ -253,13 +254,16 @@ class GenericWorkerSlavedStore(
     SessionStore,
     BaseSlavedStore,
 ):
-    pass
+    # Properties that multiple storage classes define. Tell mypy what the
+    # expected type is.
+    server_name: str
+    config: HomeServerConfig
 
 
 class GenericWorkerServer(HomeServer):
-    DATASTORE_CLASS = GenericWorkerSlavedStore
+    DATASTORE_CLASS = GenericWorkerSlavedStore  # type: ignore
 
-    def _listen_http(self, listener_config: ListenerConfig):
+    def _listen_http(self, listener_config: ListenerConfig) -> None:
         port = listener_config.port
         bind_addresses = listener_config.bind_addresses
 
@@ -267,10 +271,10 @@ class GenericWorkerServer(HomeServer):
 
         site_tag = listener_config.http_options.tag
         if site_tag is None:
-            site_tag = port
+            site_tag = str(port)
 
         # We always include a health resource.
-        resources: Dict[str, IResource] = {"/health": HealthResource()}
+        resources: Dict[str, Resource] = {"/health": HealthResource()}
 
         for res in listener_config.http_options.resources:
             for name in res.names:
@@ -386,7 +390,7 @@ class GenericWorkerServer(HomeServer):
 
         logger.info("Synapse worker now listening on port %d", port)
 
-    def start_listening(self):
+    def start_listening(self) -> None:
         for listener in self.config.worker.worker_listeners:
             if listener.type == "http":
                 self._listen_http(listener)
@@ -411,7 +415,7 @@ class GenericWorkerServer(HomeServer):
         self.get_tcp_replication().start_replication(self)
 
 
-def start(config_options):
+def start(config_options: List[str]) -> None:
     try:
         config = HomeServerConfig.load_config("Synapse worker", config_options)
     except ConfigError as e:
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 336c279a44..4efadde57e 100644
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -16,10 +16,10 @@
 import logging
 import os
 import sys
-from typing import Iterator
+from typing import Dict, Iterable, Iterator, List
 
-from twisted.internet import reactor
-from twisted.web.resource import EncodingResourceWrapper, IResource
+from twisted.internet.tcp import Port
+from twisted.web.resource import EncodingResourceWrapper, Resource
 from twisted.web.server import GzipEncoderFactory
 from twisted.web.static import File
 
@@ -76,23 +76,27 @@ from synapse.util.versionstring import get_version_string
 logger = logging.getLogger("synapse.app.homeserver")
 
 
-def gz_wrap(r):
+def gz_wrap(r: Resource) -> Resource:
     return EncodingResourceWrapper(r, [GzipEncoderFactory()])
 
 
 class SynapseHomeServer(HomeServer):
-    DATASTORE_CLASS = DataStore
+    DATASTORE_CLASS = DataStore  # type: ignore
 
-    def _listener_http(self, config: HomeServerConfig, listener_config: ListenerConfig):
+    def _listener_http(
+        self, config: HomeServerConfig, listener_config: ListenerConfig
+    ) -> Iterable[Port]:
         port = listener_config.port
         bind_addresses = listener_config.bind_addresses
         tls = listener_config.tls
+        # Must exist since this is an HTTP listener.
+        assert listener_config.http_options is not None
         site_tag = listener_config.http_options.tag
         if site_tag is None:
             site_tag = str(port)
 
         # We always include a health resource.
-        resources = {"/health": HealthResource()}
+        resources: Dict[str, Resource] = {"/health": HealthResource()}
 
         for res in listener_config.http_options.resources:
             for name in res.names:
@@ -111,7 +115,7 @@ class SynapseHomeServer(HomeServer):
                 ("listeners", site_tag, "additional_resources", "<%s>" % (path,)),
             )
             handler = handler_cls(config, module_api)
-            if IResource.providedBy(handler):
+            if isinstance(handler, Resource):
                 resource = handler
             elif hasattr(handler, "handle_request"):
                 resource = AdditionalResource(self, handler.handle_request)
@@ -128,7 +132,7 @@ class SynapseHomeServer(HomeServer):
 
         # try to find something useful to redirect '/' to
         if WEB_CLIENT_PREFIX in resources:
-            root_resource = RootOptionsRedirectResource(WEB_CLIENT_PREFIX)
+            root_resource: Resource = RootOptionsRedirectResource(WEB_CLIENT_PREFIX)
         elif STATIC_PREFIX in resources:
             root_resource = RootOptionsRedirectResource(STATIC_PREFIX)
         else:
@@ -145,6 +149,8 @@ class SynapseHomeServer(HomeServer):
         )
 
         if tls:
+            # refresh_certificate should have been called before this.
+            assert self.tls_server_context_factory is not None
             ports = listen_ssl(
                 bind_addresses,
                 port,
@@ -165,20 +171,21 @@ class SynapseHomeServer(HomeServer):
 
         return ports
 
-    def _configure_named_resource(self, name, compress=False):
+    def _configure_named_resource(
+        self, name: str, compress: bool = False
+    ) -> Dict[str, Resource]:
         """Build a resource map for a named resource
 
         Args:
-            name (str): named resource: one of "client", "federation", etc
-            compress (bool): whether to enable gzip compression for this
-                resource
+            name: named resource: one of "client", "federation", etc
+            compress: whether to enable gzip compression for this resource
 
         Returns:
-            dict[str, Resource]: map from path to HTTP resource
+            map from path to HTTP resource
         """
-        resources = {}
+        resources: Dict[str, Resource] = {}
         if name == "client":
-            client_resource = ClientRestResource(self)
+            client_resource: Resource = ClientRestResource(self)
             if compress:
                 client_resource = gz_wrap(client_resource)
 
@@ -186,6 +193,7 @@ class SynapseHomeServer(HomeServer):
                 {
                     "/_matrix/client/api/v1": client_resource,
                     "/_matrix/client/r0": client_resource,
+                    "/_matrix/client/v3": client_resource,
                     "/_matrix/client/unstable": client_resource,
                     "/_matrix/client/v2_alpha": client_resource,
                     "/_matrix/client/versions": client_resource,
@@ -207,7 +215,7 @@ class SynapseHomeServer(HomeServer):
         if name == "consent":
             from synapse.rest.consent.consent_resource import ConsentResource
 
-            consent_resource = ConsentResource(self)
+            consent_resource: Resource = ConsentResource(self)
             if compress:
                 consent_resource = gz_wrap(consent_resource)
             resources.update({"/_matrix/consent": consent_resource})
@@ -277,7 +285,7 @@ class SynapseHomeServer(HomeServer):
 
         return resources
 
-    def start_listening(self):
+    def start_listening(self) -> None:
         if self.config.redis.redis_enabled:
             # If redis is enabled we connect via the replication command handler
             # in the same way as the workers (since we're effectively a client
@@ -303,7 +311,9 @@ class SynapseHomeServer(HomeServer):
                     ReplicationStreamProtocolFactory(self),
                 )
                 for s in services:
-                    reactor.addSystemEventTrigger("before", "shutdown", s.stopListening)
+                    self.get_reactor().addSystemEventTrigger(
+                        "before", "shutdown", s.stopListening
+                    )
             elif listener.type == "metrics":
                 if not self.config.metrics.enable_metrics:
                     logger.warning(
@@ -318,14 +328,13 @@ class SynapseHomeServer(HomeServer):
                 logger.warning("Unrecognized listener type: %s", listener.type)
 
 
-def setup(config_options):
+def setup(config_options: List[str]) -> SynapseHomeServer:
     """
     Args:
-        config_options_options: The options passed to Synapse. Usually
-            `sys.argv[1:]`.
+        config_options_options: The options passed to Synapse. Usually `sys.argv[1:]`.
 
     Returns:
-        HomeServer
+        A homeserver instance.
     """
     try:
         config = HomeServerConfig.load_or_generate_config(
@@ -364,7 +373,7 @@ def setup(config_options):
     except Exception as e:
         handle_startup_exception(e)
 
-    async def start():
+    async def start() -> None:
         # Load the OIDC provider metadatas, if OIDC is enabled.
         if hs.config.oidc.oidc_enabled:
             oidc = hs.get_oidc_handler()
@@ -404,39 +413,15 @@ def format_config_error(e: ConfigError) -> Iterator[str]:
 
     yield ":\n  %s" % (e.msg,)
 
-    e = e.__cause__
+    parent_e = e.__cause__
     indent = 1
-    while e:
+    while parent_e:
         indent += 1
-        yield ":\n%s%s" % ("  " * indent, str(e))
-        e = e.__cause__
-
-
-def run(hs: HomeServer):
-    PROFILE_SYNAPSE = False
-    if PROFILE_SYNAPSE:
-
-        def profile(func):
-            from cProfile import Profile
-            from threading import current_thread
-
-            def profiled(*args, **kargs):
-                profile = Profile()
-                profile.enable()
-                func(*args, **kargs)
-                profile.disable()
-                ident = current_thread().ident
-                profile.dump_stats(
-                    "/tmp/%s.%s.%i.pstat" % (hs.hostname, func.__name__, ident)
-                )
-
-            return profiled
-
-        from twisted.python.threadpool import ThreadPool
+        yield ":\n%s%s" % ("  " * indent, str(parent_e))
+        parent_e = parent_e.__cause__
 
-        ThreadPool._worker = profile(ThreadPool._worker)
-        reactor.run = profile(reactor.run)
 
+def run(hs: HomeServer) -> None:
     _base.start_reactor(
         "synapse-homeserver",
         soft_file_limit=hs.config.server.soft_file_limit,
@@ -448,7 +433,7 @@ def run(hs: HomeServer):
     )
 
 
-def main():
+def main() -> None:
     with LoggingContext("main"):
         # check base requirements
         check_requirements()
diff --git a/synapse/app/phone_stats_home.py b/synapse/app/phone_stats_home.py
index 126450e17a..899dba5c3d 100644
--- a/synapse/app/phone_stats_home.py
+++ b/synapse/app/phone_stats_home.py
@@ -15,11 +15,12 @@ import logging
 import math
 import resource
 import sys
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, List, Sized, Tuple
 
 from prometheus_client import Gauge
 
 from synapse.metrics.background_process_metrics import wrap_as_background_process
+from synapse.types import JsonDict
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -28,7 +29,7 @@ logger = logging.getLogger("synapse.app.homeserver")
 
 # Contains the list of processes we will be monitoring
 # currently either 0 or 1
-_stats_process = []
+_stats_process: List[Tuple[int, "resource.struct_rusage"]] = []
 
 # Gauges to expose monthly active user control metrics
 current_mau_gauge = Gauge("synapse_admin_mau:current", "Current MAU")
@@ -45,9 +46,15 @@ registered_reserved_users_mau_gauge = Gauge(
 
 
 @wrap_as_background_process("phone_stats_home")
-async def phone_stats_home(hs: "HomeServer", stats, stats_process=_stats_process):
+async def phone_stats_home(
+    hs: "HomeServer",
+    stats: JsonDict,
+    stats_process: List[Tuple[int, "resource.struct_rusage"]] = _stats_process,
+) -> None:
     logger.info("Gathering stats for reporting")
     now = int(hs.get_clock().time())
+    # Ensure the homeserver has started.
+    assert hs.start_time is not None
     uptime = int(now - hs.start_time)
     if uptime < 0:
         uptime = 0
@@ -146,15 +153,15 @@ async def phone_stats_home(hs: "HomeServer", stats, stats_process=_stats_process
         logger.warning("Error reporting stats: %s", e)
 
 
-def start_phone_stats_home(hs: "HomeServer"):
+def start_phone_stats_home(hs: "HomeServer") -> None:
     """
     Start the background tasks which report phone home stats.
     """
     clock = hs.get_clock()
 
-    stats = {}
+    stats: JsonDict = {}
 
-    def performance_stats_init():
+    def performance_stats_init() -> None:
         _stats_process.clear()
         _stats_process.append(
             (int(hs.get_clock().time()), resource.getrusage(resource.RUSAGE_SELF))
@@ -170,10 +177,10 @@ def start_phone_stats_home(hs: "HomeServer"):
     hs.get_datastore().reap_monthly_active_users()
 
     @wrap_as_background_process("generate_monthly_active_users")
-    async def generate_monthly_active_users():
+    async def generate_monthly_active_users() -> None:
         current_mau_count = 0
         current_mau_count_by_service = {}
-        reserved_users = ()
+        reserved_users: Sized = ()
         store = hs.get_datastore()
         if hs.config.server.limit_usage_by_mau or hs.config.server.mau_stats_only:
             current_mau_count = await store.get_monthly_active_count()