diff options
author | Brendan Abolivier <babolivier@matrix.org> | 2019-06-05 13:38:01 +0100 |
---|---|---|
committer | Brendan Abolivier <babolivier@matrix.org> | 2019-06-05 13:38:01 +0100 |
commit | 40596aec0ec6da1e8918255b75eb5329292901ab (patch) | |
tree | a918b5ebfc4b48f8996b247df7ca75cd7cbb5d0f /synapse/app/_base.py | |
parent | Lint (diff) | |
parent | Clean up debug logging (#5347) (diff) | |
download | synapse-40596aec0ec6da1e8918255b75eb5329292901ab.tar.xz |
Merge branch 'develop' into m-heroes-empty-room-name
Diffstat (limited to 'synapse/app/_base.py')
-rw-r--r-- | synapse/app/_base.py | 89 |
1 files changed, 88 insertions, 1 deletions
diff --git a/synapse/app/_base.py b/synapse/app/_base.py index d4c6c4c8e2..8cc990399f 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -22,13 +22,14 @@ import traceback import psutil from daemonize import Daemonize -from twisted.internet import error, reactor +from twisted.internet import defer, error, reactor from twisted.protocols.tls import TLSMemoryBIOFactory import synapse from synapse.app import check_bind_error from synapse.crypto import context_factory from synapse.util import PreserveLoggingContext +from synapse.util.async_helpers import Linearizer from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string @@ -99,6 +100,8 @@ def start_reactor( logger (logging.Logger): logger instance to pass to Daemonize """ + install_dns_limiter(reactor) + def run(): # make sure that we run the reactor with the sentinel log context, # otherwise other PreserveLoggingContext instances will get confused @@ -312,3 +315,87 @@ def setup_sentry(hs): name = hs.config.worker_name if hs.config.worker_name else "master" scope.set_tag("worker_app", app) scope.set_tag("worker_name", name) + + +def install_dns_limiter(reactor, max_dns_requests_in_flight=100): + """Replaces the resolver with one that limits the number of in flight DNS + requests. + + This is to workaround https://twistedmatrix.com/trac/ticket/9620, where we + can run out of file descriptors and infinite loop if we attempt to do too + many DNS queries at once + """ + new_resolver = _LimitedHostnameResolver( + reactor.nameResolver, max_dns_requests_in_flight, + ) + + reactor.installNameResolver(new_resolver) + + +class _LimitedHostnameResolver(object): + """Wraps a IHostnameResolver, limiting the number of in-flight DNS lookups. + """ + + def __init__(self, resolver, max_dns_requests_in_flight): + self._resolver = resolver + self._limiter = Linearizer( + name="dns_client_limiter", max_count=max_dns_requests_in_flight, + ) + + def resolveHostName(self, resolutionReceiver, hostName, portNumber=0, + addressTypes=None, transportSemantics='TCP'): + # We need this function to return `resolutionReceiver` so we do all the + # actual logic involving deferreds in a separate function. + + # even though this is happening within the depths of twisted, we need to drop + # our logcontext before starting _resolve, otherwise: (a) _resolve will drop + # the logcontext if it returns an incomplete deferred; (b) _resolve will + # call the resolutionReceiver *with* a logcontext, which it won't be expecting. + with PreserveLoggingContext(): + self._resolve( + resolutionReceiver, + hostName, + portNumber, + addressTypes, + transportSemantics, + ) + + return resolutionReceiver + + @defer.inlineCallbacks + def _resolve(self, resolutionReceiver, hostName, portNumber=0, + addressTypes=None, transportSemantics='TCP'): + + with (yield self._limiter.queue(())): + # resolveHostName doesn't return a Deferred, so we need to hook into + # the receiver interface to get told when resolution has finished. + + deferred = defer.Deferred() + receiver = _DeferredResolutionReceiver(resolutionReceiver, deferred) + + self._resolver.resolveHostName( + receiver, hostName, portNumber, + addressTypes, transportSemantics, + ) + + yield deferred + + +class _DeferredResolutionReceiver(object): + """Wraps a IResolutionReceiver and simply resolves the given deferred when + resolution is complete + """ + + def __init__(self, receiver, deferred): + self._receiver = receiver + self._deferred = deferred + + def resolutionBegan(self, resolutionInProgress): + self._receiver.resolutionBegan(resolutionInProgress) + + def addressResolved(self, address): + self._receiver.addressResolved(address) + + def resolutionComplete(self): + self._deferred.callback(()) + self._receiver.resolutionComplete() |