From 2b82ec425fccb0ef626242779f7ccd4d77a0685c Mon Sep 17 00:00:00 2001 From: Sean Quah <8349537+squahtx@users.noreply.github.com> Date: Fri, 22 Oct 2021 18:15:41 +0100 Subject: Add type hints for most `HomeServer` parameters (#11095) --- synapse/app/_base.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'synapse/app/_base.py') diff --git a/synapse/app/_base.py b/synapse/app/_base.py index bb4d53d778..2ca2e051e4 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -294,7 +294,7 @@ def listen_ssl( return r -def refresh_certificate(hs): +def refresh_certificate(hs: "HomeServer"): """ Refresh the TLS certificates that Synapse is using by re-reading them from disk and updating the TLS context factories to use them. @@ -419,11 +419,11 @@ async def start(hs: "HomeServer"): atexit.register(gc.freeze) -def setup_sentry(hs): +def setup_sentry(hs: "HomeServer"): """Enable sentry integration, if enabled in configuration Args: - hs (synapse.server.HomeServer) + hs """ if not hs.config.metrics.sentry_enabled: @@ -449,7 +449,7 @@ def setup_sentry(hs): scope.set_tag("worker_name", name) -def setup_sdnotify(hs): +def setup_sdnotify(hs: "HomeServer"): """Adds process state hooks to tell systemd what we are up to.""" # Tell systemd our state, if we're using it. This will silently fail if -- cgit 1.5.1 From 7004f43da143f5d1d35c742add1238c51e62ca19 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 26 Oct 2021 13:45:38 +0100 Subject: Move DNS lookups into separate thread pool (#11177) This is to stop large bursts of lookups starving out other users of the thread pools. Fixes #11049. --- changelog.d/11177.bugfix | 1 + synapse/app/_base.py | 13 ++++- synapse/util/gai_resolver.py | 136 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 149 insertions(+), 1 deletion(-) create mode 100644 changelog.d/11177.bugfix create mode 100644 synapse/util/gai_resolver.py (limited to 'synapse/app/_base.py') diff --git a/changelog.d/11177.bugfix b/changelog.d/11177.bugfix new file mode 100644 index 0000000000..ca5bc0df28 --- /dev/null +++ b/changelog.d/11177.bugfix @@ -0,0 +1 @@ +Fix a performance regression introduced in v1.44.0 which could cause client requests to time out when making large numbers of outbound requests. diff --git a/synapse/app/_base.py b/synapse/app/_base.py index 2ca2e051e4..03627cdcba 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -31,6 +31,7 @@ import twisted from twisted.internet import defer, error, reactor from twisted.logger import LoggingFile, LogLevel from twisted.protocols.tls import TLSMemoryBIOFactory +from twisted.python.threadpool import ThreadPool import synapse from synapse.api.constants import MAX_PDU_SIZE @@ -48,6 +49,7 @@ from synapse.metrics.background_process_metrics import wrap_as_background_proces from synapse.metrics.jemalloc import setup_jemalloc_stats from synapse.util.caches.lrucache import setup_expire_lru_cache_entries from synapse.util.daemonize import daemonize_process +from synapse.util.gai_resolver import GAIResolver from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string @@ -338,9 +340,18 @@ async def start(hs: "HomeServer"): Args: hs: homeserver instance """ + reactor = hs.get_reactor() + + # We want to use a separate thread pool for the resolver so that large + # numbers of DNS requests don't starve out other users of the threadpool. + resolver_threadpool = ThreadPool(name="gai_resolver") + resolver_threadpool.start() + reactor.installNameResolver( + GAIResolver(reactor, getThreadPool=lambda: resolver_threadpool) + ) + # Set up the SIGHUP machinery. if hasattr(signal, "SIGHUP"): - reactor = hs.get_reactor() @wrap_as_background_process("sighup") def handle_sighup(*args, **kwargs): diff --git a/synapse/util/gai_resolver.py b/synapse/util/gai_resolver.py new file mode 100644 index 0000000000..a447ce4e55 --- /dev/null +++ b/synapse/util/gai_resolver.py @@ -0,0 +1,136 @@ +# This is a direct lift from +# https://github.com/twisted/twisted/blob/release-21.2.0-10091/src/twisted/internet/_resolver.py. +# We copy it here as we need to instantiate `GAIResolver` manually, but it is a +# private class. + + +from socket import ( + AF_INET, + AF_INET6, + AF_UNSPEC, + SOCK_DGRAM, + SOCK_STREAM, + gaierror, + getaddrinfo, +) + +from zope.interface import implementer + +from twisted.internet.address import IPv4Address, IPv6Address +from twisted.internet.interfaces import IHostnameResolver, IHostResolution +from twisted.internet.threads import deferToThreadPool + + +@implementer(IHostResolution) +class HostResolution: + """ + The in-progress resolution of a given hostname. + """ + + def __init__(self, name): + """ + Create a L{HostResolution} with the given name. + """ + self.name = name + + def cancel(self): + # IHostResolution.cancel + raise NotImplementedError() + + +_any = frozenset([IPv4Address, IPv6Address]) + +_typesToAF = { + frozenset([IPv4Address]): AF_INET, + frozenset([IPv6Address]): AF_INET6, + _any: AF_UNSPEC, +} + +_afToType = { + AF_INET: IPv4Address, + AF_INET6: IPv6Address, +} + +_transportToSocket = { + "TCP": SOCK_STREAM, + "UDP": SOCK_DGRAM, +} + +_socktypeToType = { + SOCK_STREAM: "TCP", + SOCK_DGRAM: "UDP", +} + + +@implementer(IHostnameResolver) +class GAIResolver: + """ + L{IHostnameResolver} implementation that resolves hostnames by calling + L{getaddrinfo} in a thread. + """ + + def __init__(self, reactor, getThreadPool=None, getaddrinfo=getaddrinfo): + """ + Create a L{GAIResolver}. + @param reactor: the reactor to schedule result-delivery on + @type reactor: L{IReactorThreads} + @param getThreadPool: a function to retrieve the thread pool to use for + scheduling name resolutions. If not supplied, the use the given + C{reactor}'s thread pool. + @type getThreadPool: 0-argument callable returning a + L{twisted.python.threadpool.ThreadPool} + @param getaddrinfo: a reference to the L{getaddrinfo} to use - mainly + parameterized for testing. + @type getaddrinfo: callable with the same signature as L{getaddrinfo} + """ + self._reactor = reactor + self._getThreadPool = ( + reactor.getThreadPool if getThreadPool is None else getThreadPool + ) + self._getaddrinfo = getaddrinfo + + def resolveHostName( + self, + resolutionReceiver, + hostName, + portNumber=0, + addressTypes=None, + transportSemantics="TCP", + ): + """ + See L{IHostnameResolver.resolveHostName} + @param resolutionReceiver: see interface + @param hostName: see interface + @param portNumber: see interface + @param addressTypes: see interface + @param transportSemantics: see interface + @return: see interface + """ + pool = self._getThreadPool() + addressFamily = _typesToAF[ + _any if addressTypes is None else frozenset(addressTypes) + ] + socketType = _transportToSocket[transportSemantics] + + def get(): + try: + return self._getaddrinfo( + hostName, portNumber, addressFamily, socketType + ) + except gaierror: + return [] + + d = deferToThreadPool(self._reactor, pool, get) + resolution = HostResolution(hostName) + resolutionReceiver.resolutionBegan(resolution) + + @d.addCallback + def deliverResults(result): + for family, socktype, _proto, _cannoname, sockaddr in result: + addrType = _afToType[family] + resolutionReceiver.addressResolved( + addrType(_socktypeToType.get(socktype, "TCP"), *sockaddr) + ) + resolutionReceiver.resolutionComplete() + + return resolution -- cgit 1.5.1 From f3a4be870092e28531957702931c5d74b9d1f18f Mon Sep 17 00:00:00 2001 From: Sean Quah <8349537+squahtx@users.noreply.github.com> Date: Wed, 27 Oct 2021 13:04:56 +0100 Subject: Shut down the DNS threadpool (#11190) The DNS threadpool must be explicitly stopped, otherwise Synapse will hang indefinitely when asked to shut down. --- changelog.d/11190.bugfix | 1 + synapse/app/_base.py | 1 + 2 files changed, 2 insertions(+) create mode 100644 changelog.d/11190.bugfix (limited to 'synapse/app/_base.py') diff --git a/changelog.d/11190.bugfix b/changelog.d/11190.bugfix new file mode 100644 index 0000000000..0d913805ac --- /dev/null +++ b/changelog.d/11190.bugfix @@ -0,0 +1 @@ +Fix a performance regression introduced in 1.44.0 which could cause client requests to time out when making large numbers of outbound requests. diff --git a/synapse/app/_base.py b/synapse/app/_base.py index 03627cdcba..f4c3f867a8 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -346,6 +346,7 @@ async def start(hs: "HomeServer"): # numbers of DNS requests don't starve out other users of the threadpool. resolver_threadpool = ThreadPool(name="gai_resolver") resolver_threadpool.start() + reactor.addSystemEventTrigger("during", "shutdown", resolver_threadpool.stop) reactor.installNameResolver( GAIResolver(reactor, getThreadPool=lambda: resolver_threadpool) ) -- cgit 1.5.1 From 82d2168a15741ed4546c12c06d797627469fb684 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 1 Nov 2021 11:21:36 +0000 Subject: Add metrics to the threadpools (#11178) --- changelog.d/11178.feature | 1 + synapse/app/_base.py | 5 +++++ synapse/metrics/__init__.py | 37 +++++++++++++++++++++++++++++++++++++ synapse/storage/database.py | 7 ++++++- 4 files changed, 49 insertions(+), 1 deletion(-) create mode 100644 changelog.d/11178.feature (limited to 'synapse/app/_base.py') diff --git a/changelog.d/11178.feature b/changelog.d/11178.feature new file mode 100644 index 0000000000..10b1cdffdc --- /dev/null +++ b/changelog.d/11178.feature @@ -0,0 +1 @@ +Add metrics for thread pool usage. diff --git a/synapse/app/_base.py b/synapse/app/_base.py index f4c3f867a8..f2c1028b5d 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -45,6 +45,7 @@ from synapse.events.spamcheck import load_legacy_spam_checkers from synapse.events.third_party_rules import load_legacy_third_party_event_rules from synapse.handlers.auth import load_legacy_password_auth_providers from synapse.logging.context import PreserveLoggingContext +from synapse.metrics import register_threadpool from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.metrics.jemalloc import setup_jemalloc_stats from synapse.util.caches.lrucache import setup_expire_lru_cache_entries @@ -351,6 +352,10 @@ async def start(hs: "HomeServer"): GAIResolver(reactor, getThreadPool=lambda: resolver_threadpool) ) + # Register the threadpools with our metrics. + register_threadpool("default", reactor.getThreadPool()) + register_threadpool("gai_resolver", resolver_threadpool) + # Set up the SIGHUP machinery. if hasattr(signal, "SIGHUP"): diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index e902109af3..91ee5c8193 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -32,6 +32,7 @@ from prometheus_client.core import ( ) from twisted.internet import reactor +from twisted.python.threadpool import ThreadPool import synapse from synapse.metrics._exposition import ( @@ -526,6 +527,42 @@ threepid_send_requests = Histogram( labelnames=("type", "reason"), ) +threadpool_total_threads = Gauge( + "synapse_threadpool_total_threads", + "Total number of threads currently in the threadpool", + ["name"], +) + +threadpool_total_working_threads = Gauge( + "synapse_threadpool_working_threads", + "Number of threads currently working in the threadpool", + ["name"], +) + +threadpool_total_min_threads = Gauge( + "synapse_threadpool_min_threads", + "Minimum number of threads configured in the threadpool", + ["name"], +) + +threadpool_total_max_threads = Gauge( + "synapse_threadpool_max_threads", + "Maximum number of threads configured in the threadpool", + ["name"], +) + + +def register_threadpool(name: str, threadpool: ThreadPool) -> None: + """Add metrics for the threadpool.""" + + threadpool_total_min_threads.labels(name).set(threadpool.min) + threadpool_total_max_threads.labels(name).set(threadpool.max) + + threadpool_total_threads.labels(name).set_function(lambda: len(threadpool.threads)) + threadpool_total_working_threads.labels(name).set_function( + lambda: len(threadpool.working) + ) + class ReactorLastSeenMetric: def collect(self): diff --git a/synapse/storage/database.py b/synapse/storage/database.py index fa4e89d35c..5c71e27518 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -48,6 +48,7 @@ from synapse.logging.context import ( current_context, make_deferred_yieldable, ) +from synapse.metrics import register_threadpool from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.background_updates import BackgroundUpdater from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine @@ -104,13 +105,17 @@ def make_pool( LoggingDatabaseConnection(conn, engine, "on_new_connection") ) - return adbapi.ConnectionPool( + connection_pool = adbapi.ConnectionPool( db_config.config["name"], cp_reactor=reactor, cp_openfun=_on_new_connection, **db_args, ) + register_threadpool(f"database-{db_config.name}", connection_pool.threadpool) + + return connection_pool + def make_conn( db_config: DatabaseConnectionConfig, -- cgit 1.5.1