diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index d4c6c4c8e2..08199a5e8d 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -22,13 +22,14 @@ import traceback
import psutil
from daemonize import Daemonize
-from twisted.internet import error, reactor
+from twisted.internet import defer, error, reactor
from twisted.protocols.tls import TLSMemoryBIOFactory
import synapse
from synapse.app import check_bind_error
from synapse.crypto import context_factory
from synapse.util import PreserveLoggingContext
+from synapse.util.async_helpers import Linearizer
from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string
@@ -99,6 +100,8 @@ def start_reactor(
logger (logging.Logger): logger instance to pass to Daemonize
"""
+ install_dns_limiter(reactor)
+
def run():
# make sure that we run the reactor with the sentinel log context,
# otherwise other PreserveLoggingContext instances will get confused
@@ -312,3 +315,81 @@ def setup_sentry(hs):
name = hs.config.worker_name if hs.config.worker_name else "master"
scope.set_tag("worker_app", app)
scope.set_tag("worker_name", name)
+
+
+def install_dns_limiter(reactor, max_dns_requests_in_flight=100):
+ """Replaces the resolver with one that limits the number of in flight DNS
+ requests.
+
+ This is to workaround https://twistedmatrix.com/trac/ticket/9620, where we
+ can run out of file descriptors and infinite loop if we attempt to do too
+ many DNS queries at once
+ """
+ new_resolver = _LimitedHostnameResolver(
+ reactor.nameResolver, max_dns_requests_in_flight,
+ )
+
+ reactor.installNameResolver(new_resolver)
+
+
+class _LimitedHostnameResolver(object):
+ """Wraps a IHostnameResolver, limiting the number of in-flight DNS lookups.
+ """
+
+ def __init__(self, resolver, max_dns_requests_in_flight):
+ self._resolver = resolver
+ self._limiter = Linearizer(
+ name="dns_client_limiter", max_count=max_dns_requests_in_flight,
+ )
+
+ def resolveHostName(self, resolutionReceiver, hostName, portNumber=0,
+ addressTypes=None, transportSemantics='TCP'):
+ # Note this is happening deep within the reactor, so we don't need to
+ # worry about log contexts.
+
+ # We need this function to return `resolutionReceiver` so we do all the
+ # actual logic involving deferreds in a separate function.
+ self._resolve(
+ resolutionReceiver, hostName, portNumber,
+ addressTypes, transportSemantics,
+ )
+
+ return resolutionReceiver
+
+ @defer.inlineCallbacks
+ def _resolve(self, resolutionReceiver, hostName, portNumber=0,
+ addressTypes=None, transportSemantics='TCP'):
+
+ with (yield self._limiter.queue(())):
+ # resolveHostName doesn't return a Deferred, so we need to hook into
+ # the receiver interface to get told when resolution has finished.
+
+ deferred = defer.Deferred()
+ receiver = _DeferredResolutionReceiver(resolutionReceiver, deferred)
+
+ self._resolver.resolveHostName(
+ receiver, hostName, portNumber,
+ addressTypes, transportSemantics,
+ )
+
+ yield deferred
+
+
+class _DeferredResolutionReceiver(object):
+ """Wraps a IResolutionReceiver and simply resolves the given deferred when
+ resolution is complete
+ """
+
+ def __init__(self, receiver, deferred):
+ self._receiver = receiver
+ self._deferred = deferred
+
+ def resolutionBegan(self, resolutionInProgress):
+ self._receiver.resolutionBegan(resolutionInProgress)
+
+ def addressResolved(self, address):
+ self._receiver.addressResolved(address)
+
+ def resolutionComplete(self):
+ self._deferred.callback(())
+ self._receiver.resolutionComplete()
|