diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index d4c6c4c8e2..08199a5e8d 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -22,13 +22,14 @@ import traceback
import psutil
from daemonize import Daemonize
-from twisted.internet import error, reactor
+from twisted.internet import defer, error, reactor
from twisted.protocols.tls import TLSMemoryBIOFactory
import synapse
from synapse.app import check_bind_error
from synapse.crypto import context_factory
from synapse.util import PreserveLoggingContext
+from synapse.util.async_helpers import Linearizer
from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string
@@ -99,6 +100,8 @@ def start_reactor(
logger (logging.Logger): logger instance to pass to Daemonize
"""
+ install_dns_limiter(reactor)
+
def run():
# make sure that we run the reactor with the sentinel log context,
# otherwise other PreserveLoggingContext instances will get confused
@@ -312,3 +315,81 @@ def setup_sentry(hs):
name = hs.config.worker_name if hs.config.worker_name else "master"
scope.set_tag("worker_app", app)
scope.set_tag("worker_name", name)
+
+
+def install_dns_limiter(reactor, max_dns_requests_in_flight=100):
+ """Replaces the resolver with one that limits the number of in flight DNS
+ requests.
+
+ This is to workaround https://twistedmatrix.com/trac/ticket/9620, where we
+ can run out of file descriptors and infinite loop if we attempt to do too
+ many DNS queries at once
+ """
+ new_resolver = _LimitedHostnameResolver(
+ reactor.nameResolver, max_dns_requests_in_flight,
+ )
+
+ reactor.installNameResolver(new_resolver)
+
+
+class _LimitedHostnameResolver(object):
+ """Wraps a IHostnameResolver, limiting the number of in-flight DNS lookups.
+ """
+
+ def __init__(self, resolver, max_dns_requests_in_flight):
+ self._resolver = resolver
+ self._limiter = Linearizer(
+ name="dns_client_limiter", max_count=max_dns_requests_in_flight,
+ )
+
+ def resolveHostName(self, resolutionReceiver, hostName, portNumber=0,
+ addressTypes=None, transportSemantics='TCP'):
+ # Note this is happening deep within the reactor, so we don't need to
+ # worry about log contexts.
+
+ # We need this function to return `resolutionReceiver` so we do all the
+ # actual logic involving deferreds in a separate function.
+ self._resolve(
+ resolutionReceiver, hostName, portNumber,
+ addressTypes, transportSemantics,
+ )
+
+ return resolutionReceiver
+
+ @defer.inlineCallbacks
+ def _resolve(self, resolutionReceiver, hostName, portNumber=0,
+ addressTypes=None, transportSemantics='TCP'):
+
+ with (yield self._limiter.queue(())):
+ # resolveHostName doesn't return a Deferred, so we need to hook into
+ # the receiver interface to get told when resolution has finished.
+
+ deferred = defer.Deferred()
+ receiver = _DeferredResolutionReceiver(resolutionReceiver, deferred)
+
+ self._resolver.resolveHostName(
+ receiver, hostName, portNumber,
+ addressTypes, transportSemantics,
+ )
+
+ yield deferred
+
+
+class _DeferredResolutionReceiver(object):
+ """Wraps a IResolutionReceiver and simply resolves the given deferred when
+ resolution is complete
+ """
+
+ def __init__(self, receiver, deferred):
+ self._receiver = receiver
+ self._deferred = deferred
+
+ def resolutionBegan(self, resolutionInProgress):
+ self._receiver.resolutionBegan(resolutionInProgress)
+
+ def addressResolved(self, address):
+ self._receiver.addressResolved(address)
+
+ def resolutionComplete(self):
+ self._deferred.callback(())
+ self._receiver.resolutionComplete()
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index beaea64a61..864f1eac48 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -45,6 +45,7 @@ from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.client.v1.login import LoginRestServlet
+from synapse.rest.client.v1.push_rule import PushRuleRestServlet
from synapse.rest.client.v1.room import (
JoinedRoomMemberListRestServlet,
PublicRoomListRestServlet,
@@ -52,9 +53,11 @@ from synapse.rest.client.v1.room import (
RoomMemberListRestServlet,
RoomStateRestServlet,
)
+from synapse.rest.client.v1.voip import VoipRestServlet
from synapse.rest.client.v2_alpha.account import ThreepidRestServlet
from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet
from synapse.rest.client.v2_alpha.register import RegisterRestServlet
+from synapse.rest.client.versions import VersionsRestServlet
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree
@@ -109,12 +112,12 @@ class ClientReaderServer(HomeServer):
ThreepidRestServlet(self).register(resource)
KeyQueryServlet(self).register(resource)
KeyChangesServlet(self).register(resource)
+ VoipRestServlet(self).register(resource)
+ PushRuleRestServlet(self).register(resource)
+ VersionsRestServlet().register(resource)
resources.update({
- "/_matrix/client/r0": resource,
- "/_matrix/client/unstable": resource,
- "/_matrix/client/v2_alpha": resource,
- "/_matrix/client/api/v1": resource,
+ "/_matrix/client": resource,
})
root_resource = create_resource_tree(resources, NoResource())
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 9711a7147c..1d43f2b075 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -38,7 +38,7 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
-from synapse.replication.tcp.streams import ReceiptsStream
+from synapse.replication.tcp.streams._base import ReceiptsStream
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.types import ReadReceipt
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 869c028d1f..1045d28949 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -62,6 +62,7 @@ from synapse.python_dependencies import check_requirements
from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
from synapse.rest import ClientRestResource
+from synapse.rest.admin import AdminRestResource
from synapse.rest.key.v2 import KeyApiV2Resource
from synapse.rest.media.v0.content_repository import ContentRepoResource
from synapse.rest.well_known import WellKnownResource
@@ -180,6 +181,7 @@ class SynapseHomeServer(HomeServer):
"/_matrix/client/v2_alpha": client_resource,
"/_matrix/client/versions": client_resource,
"/.well-known/matrix/client": WellKnownResource(self),
+ "/_synapse/admin": AdminRestResource(self),
})
if self.get_config().saml2_enabled:
@@ -518,6 +520,7 @@ def run(hs):
uptime = 0
stats["homeserver"] = hs.config.server_name
+ stats["server_context"] = hs.config.server_context
stats["timestamp"] = now
stats["uptime_seconds"] = uptime
version = sys.version_info
@@ -558,7 +561,6 @@ def run(hs):
stats["database_engine"] = hs.get_datastore().database_engine_name
stats["database_server_version"] = hs.get_datastore().get_server_version()
-
logger.info("Reporting stats to matrix.org: %s" % (stats,))
try:
yield hs.get_simple_http_client().put_json(
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 9163b56d86..5388def28a 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -48,6 +48,7 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.replication.tcp.streams.events import EventsStreamEventRow
from synapse.rest.client.v1 import events
from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
from synapse.rest.client.v1.room import RoomInitialSyncRestServlet
@@ -369,7 +370,9 @@ class SyncReplicationHandler(ReplicationClientHandler):
# We shouldn't get multiple rows per token for events stream, so
# we don't need to optimise this for multiple rows.
for row in rows:
- event = yield self.store.get_event(row.event_id)
+ if row.type != EventsStreamEventRow.TypeId:
+ continue
+ event = yield self.store.get_event(row.data.event_id)
extra_users = ()
if event.type == EventTypes.Member:
extra_users = (event.state_key,)
diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py
index d1ab9512cd..355f5aa71d 100644
--- a/synapse/app/user_dir.py
+++ b/synapse/app/user_dir.py
@@ -36,6 +36,10 @@ from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.replication.tcp.streams.events import (
+ EventsStream,
+ EventsStreamCurrentStateRow,
+)
from synapse.rest.client.v2_alpha import user_directory
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
@@ -73,19 +77,18 @@ class UserDirectorySlaveStore(
prefilled_cache=curr_state_delta_prefill,
)
- self._current_state_delta_pos = events_max
-
def stream_positions(self):
result = super(UserDirectorySlaveStore, self).stream_positions()
- result["current_state_deltas"] = self._current_state_delta_pos
return result
def process_replication_rows(self, stream_name, token, rows):
- if stream_name == "current_state_deltas":
- self._current_state_delta_pos = token
+ if stream_name == EventsStream.NAME:
+ self._stream_id_gen.advance(token)
for row in rows:
+ if row.type != EventsStreamCurrentStateRow.TypeId:
+ continue
self._curr_state_delta_stream_cache.entity_has_changed(
- row.room_id, token
+ row.data.room_id, token
)
return super(UserDirectorySlaveStore, self).process_replication_rows(
stream_name, token, rows
@@ -170,7 +173,7 @@ class UserDirectoryReplicationHandler(ReplicationClientHandler):
yield super(UserDirectoryReplicationHandler, self).on_rdata(
stream_name, token, rows
)
- if stream_name == "current_state_deltas":
+ if stream_name == EventsStream.NAME:
run_in_background(self._notify_directory)
@defer.inlineCallbacks
|