diff --git a/synapse/app/__init__.py b/synapse/app/__init__.py
index 9c2b627590..3b6b9368b8 100644
--- a/synapse/app/__init__.py
+++ b/synapse/app/__init__.py
@@ -14,9 +14,11 @@
# limitations under the License.
import sys
+
+from synapse import python_dependencies # noqa: E402
+
sys.dont_write_bytecode = True
-from synapse import python_dependencies # noqa: E402
try:
python_dependencies.check_requirements()
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index a6925ab139..7c866e246a 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -17,15 +17,18 @@ import gc
import logging
import sys
+from daemonize import Daemonize
+
+from twisted.internet import error, reactor
+
+from synapse.util import PreserveLoggingContext
+from synapse.util.rlimit import change_resource_limit
+
try:
import affinity
except Exception:
affinity = None
-from daemonize import Daemonize
-from synapse.util import PreserveLoggingContext
-from synapse.util.rlimit import change_resource_limit
-from twisted.internet import error, reactor
logger = logging.getLogger(__name__)
@@ -137,7 +140,7 @@ def listen_metrics(bind_addresses, port):
logger.info("Metrics now reporting on %s:%d", host, port)
-def listen_tcp(bind_addresses, port, factory, backlog=50):
+def listen_tcp(bind_addresses, port, factory, reactor=reactor, backlog=50):
"""
Create a TCP socket for a port and several addresses
"""
@@ -153,7 +156,9 @@ def listen_tcp(bind_addresses, port, factory, backlog=50):
check_bind_error(e, address, bind_addresses)
-def listen_ssl(bind_addresses, port, factory, context_factory, backlog=50):
+def listen_ssl(
+ bind_addresses, port, factory, context_factory, reactor=reactor, backlog=50
+):
"""
Create an SSL socket for a port and several addresses
"""
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
index dd114dee07..86b5067400 100644
--- a/synapse/app/appservice.py
+++ b/synapse/app/appservice.py
@@ -16,6 +16,9 @@
import logging
import sys
+from twisted.internet import defer, reactor
+from twisted.web.resource import NoResource
+
import synapse
from synapse import events
from synapse.app import _base
@@ -23,6 +26,7 @@ from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
from synapse.http.site import SynapseSite
+from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.directory import DirectoryStore
@@ -35,8 +39,6 @@ from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, run_in_background
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
-from twisted.internet import reactor, defer
-from twisted.web.resource import NoResource
logger = logging.getLogger("synapse.app.appservice")
@@ -49,10 +51,7 @@ class AppserviceSlaveStore(
class AppserviceServer(HomeServer):
- def setup(self):
- logger.info("Setting up.")
- self.datastore = AppserviceSlaveStore(self.get_db_conn(), self)
- logger.info("Finished setting up.")
+ DATASTORE_CLASS = AppserviceSlaveStore
def _listen_http(self, listener_config):
port = listener_config["port"]
@@ -62,7 +61,7 @@ class AppserviceServer(HomeServer):
for res in listener_config["resources"]:
for name in res["names"]:
if name == "metrics":
- resources[METRICS_PREFIX] = MetricsResource(self)
+ resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
root_resource = create_resource_tree(resources, NoResource())
@@ -97,7 +96,7 @@ class AppserviceServer(HomeServer):
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
- "collect_metrics is not enabled!"))
+ "enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
@@ -115,8 +114,9 @@ class ASReplicationHandler(ReplicationClientHandler):
super(ASReplicationHandler, self).__init__(hs.get_datastore())
self.appservice_handler = hs.get_application_service_handler()
+ @defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
- super(ASReplicationHandler, self).on_rdata(stream_name, token, rows)
+ yield super(ASReplicationHandler, self).on_rdata(stream_name, token, rows)
if stream_name == "events":
max_stream_id = self.store.get_room_max_stream_ordering()
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index 85dada7f9f..ce2b113dbb 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -16,6 +16,9 @@
import logging
import sys
+from twisted.internet import reactor
+from twisted.web.resource import NoResource
+
import synapse
from synapse import events
from synapse.app import _base
@@ -28,6 +31,7 @@ from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.directory import DirectoryStore
@@ -35,29 +39,34 @@ from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.keys import SlavedKeyStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.room import RoomStore
-from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
-from synapse.rest.client.v1.room import PublicRoomListRestServlet
+from synapse.rest.client.v1.room import (
+ JoinedRoomMemberListRestServlet,
+ PublicRoomListRestServlet,
+ RoomEventContextServlet,
+ RoomMemberListRestServlet,
+ RoomStateRestServlet,
+)
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
-from twisted.internet import reactor
-from twisted.web.resource import NoResource
logger = logging.getLogger("synapse.app.client_reader")
class ClientReaderSlavedStore(
+ SlavedAccountDataStore,
SlavedEventStore,
SlavedKeyStore,
RoomStore,
DirectoryStore,
SlavedApplicationServiceStore,
SlavedRegistrationStore,
- TransactionStore,
+ SlavedTransactionStore,
SlavedClientIpStore,
BaseSlavedStore,
):
@@ -65,10 +74,7 @@ class ClientReaderSlavedStore(
class ClientReaderServer(HomeServer):
- def setup(self):
- logger.info("Setting up.")
- self.datastore = ClientReaderSlavedStore(self.get_db_conn(), self)
- logger.info("Finished setting up.")
+ DATASTORE_CLASS = ClientReaderSlavedStore
def _listen_http(self, listener_config):
port = listener_config["port"]
@@ -81,7 +87,13 @@ class ClientReaderServer(HomeServer):
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
elif name == "client":
resource = JsonResource(self, canonical_json=False)
+
PublicRoomListRestServlet(self).register(resource)
+ RoomMemberListRestServlet(self).register(resource)
+ JoinedRoomMemberListRestServlet(self).register(resource)
+ RoomStateRestServlet(self).register(resource)
+ RoomEventContextServlet(self).register(resource)
+
resources.update({
"/_matrix/client/r0": resource,
"/_matrix/client/unstable": resource,
@@ -122,7 +134,7 @@ class ClientReaderServer(HomeServer):
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
- "collect_metrics is not enabled!"))
+ "enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
@@ -153,11 +165,13 @@ def start(config_options):
database_engine = create_engine(config.database_config)
tls_server_context_factory = context_factory.ServerContextFactory(config)
+ tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
ss = ClientReaderServer(
config.server_name,
db_config=config.database_config,
tls_server_context_factory=tls_server_context_factory,
+ tls_client_options_factory=tls_client_options_factory,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,
diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py
index 5ca77c0f1a..f98e456ea0 100644
--- a/synapse/app/event_creator.py
+++ b/synapse/app/event_creator.py
@@ -16,6 +16,9 @@
import logging
import sys
+from twisted.internet import reactor
+from twisted.web.resource import NoResource
+
import synapse
from synapse import events
from synapse.app import _base
@@ -40,27 +43,36 @@ from synapse.replication.slave.storage.pushers import SlavedPusherStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.room import RoomStore
-from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.rest.client.v1.profile import (
+ ProfileAvatarURLRestServlet,
+ ProfileDisplaynameRestServlet,
+ ProfileRestServlet,
+)
from synapse.rest.client.v1.room import (
- RoomSendEventRestServlet, RoomMembershipRestServlet, RoomStateEventRestServlet,
JoinRoomAliasServlet,
+ RoomMembershipRestServlet,
+ RoomSendEventRestServlet,
+ RoomStateEventRestServlet,
)
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
+from synapse.storage.user_directory import UserDirectoryStore
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
-from twisted.internet import reactor
-from twisted.web.resource import NoResource
logger = logging.getLogger("synapse.app.event_creator")
class EventCreatorSlavedStore(
+ # FIXME(#3714): We need to add UserDirectoryStore as we write directly
+ # rather than going via the correct worker.
+ UserDirectoryStore,
DirectoryStore,
- TransactionStore,
+ SlavedTransactionStore,
SlavedProfileStore,
SlavedAccountDataStore,
SlavedPusherStore,
@@ -78,10 +90,7 @@ class EventCreatorSlavedStore(
class EventCreatorServer(HomeServer):
- def setup(self):
- logger.info("Setting up.")
- self.datastore = EventCreatorSlavedStore(self.get_db_conn(), self)
- logger.info("Finished setting up.")
+ DATASTORE_CLASS = EventCreatorSlavedStore
def _listen_http(self, listener_config):
port = listener_config["port"]
@@ -98,6 +107,9 @@ class EventCreatorServer(HomeServer):
RoomMembershipRestServlet(self).register(resource)
RoomStateEventRestServlet(self).register(resource)
JoinRoomAliasServlet(self).register(resource)
+ ProfileAvatarURLRestServlet(self).register(resource)
+ ProfileDisplaynameRestServlet(self).register(resource)
+ ProfileRestServlet(self).register(resource)
resources.update({
"/_matrix/client/r0": resource,
"/_matrix/client/unstable": resource,
@@ -138,7 +150,7 @@ class EventCreatorServer(HomeServer):
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
- "collect_metrics is not enabled!"))
+ "enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
@@ -171,11 +183,13 @@ def start(config_options):
database_engine = create_engine(config.database_config)
tls_server_context_factory = context_factory.ServerContextFactory(config)
+ tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
ss = EventCreatorServer(
config.server_name,
db_config=config.database_config,
tls_server_context_factory=tls_server_context_factory,
+ tls_client_options_factory=tls_client_options_factory,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index 2a1995d0cd..60f5973505 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -16,6 +16,9 @@
import logging
import sys
+from twisted.internet import reactor
+from twisted.web.resource import NoResource
+
import synapse
from synapse import events
from synapse.api.urls import FEDERATION_PREFIX
@@ -29,11 +32,17 @@ from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
+from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.directory import DirectoryStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.keys import SlavedKeyStore
+from synapse.replication.slave.storage.profile import SlavedProfileStore
+from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
+from synapse.replication.slave.storage.pushers import SlavedPusherStore
+from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.room import RoomStore
-from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
@@ -41,28 +50,29 @@ from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
-from twisted.internet import reactor
-from twisted.web.resource import NoResource
logger = logging.getLogger("synapse.app.federation_reader")
class FederationReaderSlavedStore(
+ SlavedAccountDataStore,
+ SlavedProfileStore,
+ SlavedApplicationServiceStore,
+ SlavedPusherStore,
+ SlavedPushRuleStore,
+ SlavedReceiptsStore,
SlavedEventStore,
SlavedKeyStore,
RoomStore,
DirectoryStore,
- TransactionStore,
+ SlavedTransactionStore,
BaseSlavedStore,
):
pass
class FederationReaderServer(HomeServer):
- def setup(self):
- logger.info("Setting up.")
- self.datastore = FederationReaderSlavedStore(self.get_db_conn(), self)
- logger.info("Finished setting up.")
+ DATASTORE_CLASS = FederationReaderSlavedStore
def _listen_http(self, listener_config):
port = listener_config["port"]
@@ -111,7 +121,7 @@ class FederationReaderServer(HomeServer):
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
- "collect_metrics is not enabled!"))
+ "enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
@@ -142,11 +152,13 @@ def start(config_options):
database_engine = create_engine(config.database_config)
tls_server_context_factory = context_factory.ServerContextFactory(config)
+ tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
ss = FederationReaderServer(
config.server_name,
db_config=config.database_config,
tls_server_context_factory=tls_server_context_factory,
+ tls_client_options_factory=tls_client_options_factory,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 81ad574043..60dd09aac3 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -16,6 +16,9 @@
import logging
import sys
+from twisted.internet import defer, reactor
+from twisted.web.resource import NoResource
+
import synapse
from synapse import events
from synapse.app import _base
@@ -33,23 +36,21 @@ from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.presence import SlavedPresenceStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
-from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
-from synapse.util.async import Linearizer
+from synapse.util.async_helpers import Linearizer
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, run_in_background
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
-from twisted.internet import defer, reactor
-from twisted.web.resource import NoResource
logger = logging.getLogger("synapse.app.federation_sender")
class FederationSenderSlaveStore(
- SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore,
+ SlavedDeviceInboxStore, SlavedTransactionStore, SlavedReceiptsStore, SlavedEventStore,
SlavedRegistrationStore, SlavedDeviceStore, SlavedPresenceStore,
):
def __init__(self, db_conn, hs):
@@ -77,10 +78,7 @@ class FederationSenderSlaveStore(
class FederationSenderServer(HomeServer):
- def setup(self):
- logger.info("Setting up.")
- self.datastore = FederationSenderSlaveStore(self.get_db_conn(), self)
- logger.info("Finished setting up.")
+ DATASTORE_CLASS = FederationSenderSlaveStore
def _listen_http(self, listener_config):
port = listener_config["port"]
@@ -125,7 +123,7 @@ class FederationSenderServer(HomeServer):
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
- "collect_metrics is not enabled!"))
+ "enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
@@ -143,8 +141,9 @@ class FederationSenderReplicationHandler(ReplicationClientHandler):
super(FederationSenderReplicationHandler, self).__init__(hs.get_datastore())
self.send_handler = FederationSenderHandler(hs, self)
+ @defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
- super(FederationSenderReplicationHandler, self).on_rdata(
+ yield super(FederationSenderReplicationHandler, self).on_rdata(
stream_name, token, rows
)
self.send_handler.process_replication_rows(stream_name, token, rows)
@@ -185,11 +184,13 @@ def start(config_options):
config.send_federation = True
tls_server_context_factory = context_factory.ServerContextFactory(config)
+ tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
ps = FederationSenderServer(
config.server_name,
db_config=config.database_config,
tls_server_context_factory=tls_server_context_factory,
+ tls_client_options_factory=tls_client_options_factory,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,
diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py
index 5a164a7a95..8c0b9c67b0 100644
--- a/synapse/app/frontend_proxy.py
+++ b/synapse/app/frontend_proxy.py
@@ -16,6 +16,9 @@
import logging
import sys
+from twisted.internet import defer, reactor
+from twisted.web.resource import NoResource
+
import synapse
from synapse import events
from synapse.api.errors import SynapseError
@@ -25,9 +28,7 @@ from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
from synapse.crypto import context_factory
from synapse.http.server import JsonResource
-from synapse.http.servlet import (
- RestServlet, parse_json_object_from_request,
-)
+from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
@@ -37,6 +38,7 @@ from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.rest.client.v1.base import ClientV1RestServlet, client_path_patterns
from synapse.rest.client.v2_alpha._base import client_v2_patterns
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
@@ -44,12 +46,39 @@ from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
-from twisted.internet import defer, reactor
-from twisted.web.resource import NoResource
logger = logging.getLogger("synapse.app.frontend_proxy")
+class PresenceStatusStubServlet(ClientV1RestServlet):
+ PATTERNS = client_path_patterns("/presence/(?P<user_id>[^/]*)/status")
+
+ def __init__(self, hs):
+ super(PresenceStatusStubServlet, self).__init__(hs)
+ self.http_client = hs.get_simple_http_client()
+ self.auth = hs.get_auth()
+ self.main_uri = hs.config.worker_main_http_uri
+
+ @defer.inlineCallbacks
+ def on_GET(self, request, user_id):
+ # Pass through the auth headers, if any, in case the access token
+ # is there.
+ auth_headers = request.requestHeaders.getRawHeaders("Authorization", [])
+ headers = {
+ "Authorization": auth_headers,
+ }
+ result = yield self.http_client.get_json(
+ self.main_uri + request.uri,
+ headers=headers,
+ )
+ defer.returnValue((200, result))
+
+ @defer.inlineCallbacks
+ def on_PUT(self, request, user_id):
+ yield self.auth.get_user_by_req(request)
+ defer.returnValue((200, {}))
+
+
class KeyUploadServlet(RestServlet):
PATTERNS = client_v2_patterns("/keys/upload(/(?P<device_id>[^/]+))?$")
@@ -119,10 +148,7 @@ class FrontendProxySlavedStore(
class FrontendProxyServer(HomeServer):
- def setup(self):
- logger.info("Setting up.")
- self.datastore = FrontendProxySlavedStore(self.get_db_conn(), self)
- logger.info("Finished setting up.")
+ DATASTORE_CLASS = FrontendProxySlavedStore
def _listen_http(self, listener_config):
port = listener_config["port"]
@@ -136,6 +162,12 @@ class FrontendProxyServer(HomeServer):
elif name == "client":
resource = JsonResource(self, canonical_json=False)
KeyUploadServlet(self).register(resource)
+
+ # If presence is disabled, use the stub servlet that does
+ # not allow sending presence
+ if not self.config.use_presence:
+ PresenceStatusStubServlet(self).register(resource)
+
resources.update({
"/_matrix/client/r0": resource,
"/_matrix/client/unstable": resource,
@@ -154,7 +186,8 @@ class FrontendProxyServer(HomeServer):
listener_config,
root_resource,
self.version_string,
- )
+ ),
+ reactor=self.get_reactor()
)
logger.info("Synapse client reader now listening on port %d", port)
@@ -176,7 +209,7 @@ class FrontendProxyServer(HomeServer):
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
- "collect_metrics is not enabled!"))
+ "enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
@@ -209,11 +242,13 @@ def start(config_options):
database_engine = create_engine(config.database_config)
tls_server_context_factory = context_factory.ServerContextFactory(config)
+ tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
ss = FrontendProxyServer(
config.server_name,
db_config=config.database_config,
tls_server_context_factory=tls_server_context_factory,
+ tls_client_options_factory=tls_client_options_factory,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 714f98a3e0..3eb5b663de 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -18,34 +18,51 @@ import logging
import os
import sys
+from six import iteritems
+
+from prometheus_client import Gauge
+
+from twisted.application import service
+from twisted.internet import defer, reactor
+from twisted.web.resource import EncodingResourceWrapper, NoResource
+from twisted.web.server import GzipEncoderFactory
+from twisted.web.static import File
+
import synapse
import synapse.config.logger
from synapse import events
-from synapse.api.urls import CONTENT_REPO_PREFIX, FEDERATION_PREFIX, \
- LEGACY_MEDIA_PREFIX, MEDIA_PREFIX, SERVER_KEY_PREFIX, SERVER_KEY_V2_PREFIX, \
- STATIC_PREFIX, WEB_CLIENT_PREFIX
+from synapse.api.urls import (
+ CONTENT_REPO_PREFIX,
+ FEDERATION_PREFIX,
+ LEGACY_MEDIA_PREFIX,
+ MEDIA_PREFIX,
+ SERVER_KEY_PREFIX,
+ SERVER_KEY_V2_PREFIX,
+ STATIC_PREFIX,
+ WEB_CLIENT_PREFIX,
+)
from synapse.app import _base
-from synapse.app._base import quit_with_error, listen_ssl, listen_tcp
+from synapse.app._base import listen_ssl, listen_tcp, quit_with_error
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.crypto import context_factory
from synapse.federation.transport.server import TransportLayerServer
-from synapse.module_api import ModuleApi
from synapse.http.additional_resource import AdditionalResource
from synapse.http.server import RootRedirect
from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
-from synapse.python_dependencies import CONDITIONAL_REQUIREMENTS, \
- check_requirements
-from synapse.replication.http import ReplicationRestResource, REPLICATION_PREFIX
+from synapse.module_api import ModuleApi
+from synapse.python_dependencies import CONDITIONAL_REQUIREMENTS, check_requirements
+from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
from synapse.rest import ClientRestResource
from synapse.rest.key.v1.server_key_resource import LocalKey
from synapse.rest.key.v2 import KeyApiV2Resource
from synapse.rest.media.v0.content_repository import ContentRepoResource
from synapse.server import HomeServer
-from synapse.storage import are_all_users_on_domain
+from synapse.storage import DataStore, are_all_users_on_domain
from synapse.storage.engines import IncorrectDatabaseSetup, create_engine
from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database
from synapse.util.caches import CACHE_SIZE_FACTOR
@@ -55,11 +72,6 @@ from synapse.util.manhole import manhole
from synapse.util.module_loader import load_module
from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string
-from twisted.application import service
-from twisted.internet import defer, reactor
-from twisted.web.resource import EncodingResourceWrapper, NoResource
-from twisted.web.server import GzipEncoderFactory
-from twisted.web.static import File
logger = logging.getLogger("synapse.app.homeserver")
@@ -99,6 +111,8 @@ def build_resource_for_web_client(hs):
class SynapseHomeServer(HomeServer):
+ DATASTORE_CLASS = DataStore
+
def _listener_http(self, config, listener_config):
port = listener_config["port"]
bind_addresses = listener_config["bind_addresses"]
@@ -266,7 +280,7 @@ class SynapseHomeServer(HomeServer):
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
- "collect_metrics is not enabled!"))
+ "enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
@@ -290,6 +304,11 @@ class SynapseHomeServer(HomeServer):
quit_with_error(e.message)
+# Gauges to expose monthly active user control metrics
+current_mau_gauge = Gauge("synapse_admin_mau:current", "Current MAU")
+max_mau_gauge = Gauge("synapse_admin_mau:max", "MAU Limit")
+
+
def setup(config_options):
"""
Args:
@@ -318,14 +337,10 @@ def setup(config_options):
# check any extra requirements we have now we have a config
check_requirements(config)
- version_string = "Synapse/" + get_version_string(synapse)
-
- logger.info("Server hostname: %s", config.server_name)
- logger.info("Server version: %s", version_string)
-
events.USE_FROZEN_DICTS = config.use_frozen_dicts
tls_server_context_factory = context_factory.ServerContextFactory(config)
+ tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
database_engine = create_engine(config.database_config)
config.database_config["args"]["cp_openfun"] = database_engine.on_new_connection
@@ -334,21 +349,22 @@ def setup(config_options):
config.server_name,
db_config=config.database_config,
tls_server_context_factory=tls_server_context_factory,
+ tls_client_options_factory=tls_client_options_factory,
config=config,
- version_string=version_string,
+ version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,
)
logger.info("Preparing database: %s...", config.database_config['name'])
try:
- db_conn = hs.get_db_conn(run_new_connection=False)
- prepare_database(db_conn, database_engine, config=config)
- database_engine.on_new_connection(db_conn)
+ with hs.get_db_conn(run_new_connection=False) as db_conn:
+ prepare_database(db_conn, database_engine, config=config)
+ database_engine.on_new_connection(db_conn)
- hs.run_startup_checks(db_conn, database_engine)
+ hs.run_startup_checks(db_conn, database_engine)
- db_conn.commit()
+ db_conn.commit()
except UpgradeDatabaseException:
sys.stderr.write(
"\nFailed to upgrade database.\n"
@@ -423,6 +439,9 @@ def run(hs):
# currently either 0 or 1
stats_process = []
+ def start_phone_stats_home():
+ return run_as_background_process("phone_stats_home", phone_stats_home)
+
@defer.inlineCallbacks
def phone_stats_home():
logger.info("Gathering stats for reporting")
@@ -440,7 +459,7 @@ def run(hs):
stats["total_nonbridged_users"] = total_nonbridged_users
daily_user_type_results = yield hs.get_datastore().count_daily_user_type()
- for name, count in daily_user_type_results.iteritems():
+ for name, count in iteritems(daily_user_type_results):
stats["daily_user_type_" + name] = count
room_count = yield hs.get_datastore().get_room_count()
@@ -451,7 +470,7 @@ def run(hs):
stats["daily_messages"] = yield hs.get_datastore().count_daily_messages()
r30_results = yield hs.get_datastore().count_r30_users()
- for name, count in r30_results.iteritems():
+ for name, count in iteritems(r30_results):
stats["r30_users_" + name] = count
daily_sent_messages = yield hs.get_datastore().count_daily_sent_messages()
@@ -494,16 +513,41 @@ def run(hs):
)
def generate_user_daily_visit_stats():
- hs.get_datastore().generate_user_daily_visits()
+ return run_as_background_process(
+ "generate_user_daily_visits",
+ hs.get_datastore().generate_user_daily_visits,
+ )
# Rather than update on per session basis, batch up the requests.
# If you increase the loop period, the accuracy of user_daily_visits
# table will decrease
clock.looping_call(generate_user_daily_visit_stats, 5 * 60 * 1000)
+ # monthly active user limiting functionality
+ clock.looping_call(
+ hs.get_datastore().reap_monthly_active_users, 1000 * 60 * 60
+ )
+ hs.get_datastore().reap_monthly_active_users()
+
+ @defer.inlineCallbacks
+ def generate_monthly_active_users():
+ count = 0
+ if hs.config.limit_usage_by_mau:
+ count = yield hs.get_datastore().get_monthly_active_count()
+ current_mau_gauge.set(float(count))
+ max_mau_gauge.set(float(hs.config.max_mau_value))
+
+ hs.get_datastore().initialise_reserved_users(
+ hs.config.mau_limits_reserved_threepids
+ )
+ generate_monthly_active_users()
+ if hs.config.limit_usage_by_mau:
+ clock.looping_call(generate_monthly_active_users, 5 * 60 * 1000)
+ # End of monthly active user settings
+
if hs.config.report_stats:
logger.info("Scheduling stats reporting for 3 hour intervals")
- clock.looping_call(phone_stats_home, 3 * 60 * 60 * 1000)
+ clock.looping_call(start_phone_stats_home, 3 * 60 * 60 * 1000)
# We need to defer this init for the cases that we daemonize
# otherwise the process ID we get is that of the non-daemon process
@@ -511,7 +555,7 @@ def run(hs):
# We wait 5 minutes to send the first set of stats as the server can
# be quite busy the first few minutes
- clock.call_later(5 * 60, phone_stats_home)
+ clock.call_later(5 * 60, start_phone_stats_home)
if hs.config.daemonize and hs.config.print_pidfile:
print (hs.config.pid_file)
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index 006bba80a8..e3dbb3b4e6 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -16,11 +16,12 @@
import logging
import sys
+from twisted.internet import reactor
+from twisted.web.resource import NoResource
+
import synapse
from synapse import events
-from synapse.api.urls import (
- CONTENT_REPO_PREFIX, LEGACY_MEDIA_PREFIX, MEDIA_PREFIX
-)
+from synapse.api.urls import CONTENT_REPO_PREFIX, LEGACY_MEDIA_PREFIX, MEDIA_PREFIX
from synapse.app import _base
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
@@ -33,7 +34,7 @@ from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
-from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.media.v0.content_repository import ContentRepoResource
from synapse.server import HomeServer
@@ -43,8 +44,6 @@ from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
-from twisted.internet import reactor
-from twisted.web.resource import NoResource
logger = logging.getLogger("synapse.app.media_repository")
@@ -53,7 +52,7 @@ class MediaRepositorySlavedStore(
SlavedApplicationServiceStore,
SlavedRegistrationStore,
SlavedClientIpStore,
- TransactionStore,
+ SlavedTransactionStore,
BaseSlavedStore,
MediaRepositoryStore,
):
@@ -61,10 +60,7 @@ class MediaRepositorySlavedStore(
class MediaRepositoryServer(HomeServer):
- def setup(self):
- logger.info("Setting up.")
- self.datastore = MediaRepositorySlavedStore(self.get_db_conn(), self)
- logger.info("Finished setting up.")
+ DATASTORE_CLASS = MediaRepositorySlavedStore
def _listen_http(self, listener_config):
port = listener_config["port"]
@@ -118,7 +114,7 @@ class MediaRepositoryServer(HomeServer):
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
- "collect_metrics is not enabled!"))
+ "enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
@@ -156,11 +152,13 @@ def start(config_options):
database_engine = create_engine(config.database_config)
tls_server_context_factory = context_factory.ServerContextFactory(config)
+ tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
ss = MediaRepositoryServer(
config.server_name,
db_config=config.database_config,
tls_server_context_factory=tls_server_context_factory,
+ tls_client_options_factory=tls_client_options_factory,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 64df47f9cc..244c604de9 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -16,6 +16,9 @@
import logging
import sys
+from twisted.internet import defer, reactor
+from twisted.web.resource import NoResource
+
import synapse
from synapse import events
from synapse.app import _base
@@ -37,8 +40,6 @@ from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, run_in_background
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
-from twisted.internet import defer, reactor
-from twisted.web.resource import NoResource
logger = logging.getLogger("synapse.app.pusher")
@@ -77,10 +78,7 @@ class PusherSlaveStore(
class PusherServer(HomeServer):
- def setup(self):
- logger.info("Setting up.")
- self.datastore = PusherSlaveStore(self.get_db_conn(), self)
- logger.info("Finished setting up.")
+ DATASTORE_CLASS = PusherSlaveStore
def remove_pusher(self, app_id, push_key, user_id):
self.get_tcp_replication().send_remove_pusher(app_id, push_key, user_id)
@@ -128,7 +126,7 @@ class PusherServer(HomeServer):
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
- "collect_metrics is not enabled!"))
+ "enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
@@ -147,8 +145,9 @@ class PusherReplicationHandler(ReplicationClientHandler):
self.pusher_pool = hs.get_pusherpool()
+ @defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
- super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
+ yield super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
run_in_background(self.poke_pushers, stream_name, token, rows)
@defer.inlineCallbacks
@@ -161,11 +160,11 @@ class PusherReplicationHandler(ReplicationClientHandler):
else:
yield self.start_pusher(row.user_id, row.app_id, row.pushkey)
elif stream_name == "events":
- yield self.pusher_pool.on_new_notifications(
+ self.pusher_pool.on_new_notifications(
token, token,
)
elif stream_name == "receipts":
- yield self.pusher_pool.on_new_receipts(
+ self.pusher_pool.on_new_receipts(
token, token, set(row.room_id for row in rows)
)
except Exception:
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 6808d6d3e0..6662340797 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -17,6 +17,11 @@ import contextlib
import logging
import sys
+from six import iteritems
+
+from twisted.internet import defer, reactor
+from twisted.web.resource import NoResource
+
import synapse
from synapse.api.constants import EventTypes
from synapse.app import _base
@@ -36,12 +41,12 @@ from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.filtering import SlavedFilteringStore
+from synapse.replication.slave.storage.groups import SlavedGroupServerStore
from synapse.replication.slave.storage.presence import SlavedPresenceStore
from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.room import RoomStore
-from synapse.replication.slave.storage.groups import SlavedGroupServerStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.client.v1 import events
from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
@@ -50,16 +55,11 @@ from synapse.rest.client.v2_alpha import sync
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.storage.presence import UserPresenceState
-from synapse.storage.roommember import RoomMemberStore
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, run_in_background
from synapse.util.manhole import manhole
from synapse.util.stringutils import random_string
from synapse.util.versionstring import get_version_string
-from twisted.internet import defer, reactor
-from twisted.web.resource import NoResource
-
-from six import iteritems
logger = logging.getLogger("synapse.app.synchrotron")
@@ -80,9 +80,7 @@ class SynchrotronSlavedStore(
RoomStore,
BaseSlavedStore,
):
- did_forget = (
- RoomMemberStore.__dict__["did_forget"]
- )
+ pass
UPDATE_SYNCING_USERS_MS = 10 * 1000
@@ -116,7 +114,10 @@ class SynchrotronPresence(object):
logger.info("Presence process_id is %r", self.process_id)
def send_user_sync(self, user_id, is_syncing, last_sync_ms):
- self.hs.get_tcp_replication().send_user_sync(user_id, is_syncing, last_sync_ms)
+ if self.hs.config.use_presence:
+ self.hs.get_tcp_replication().send_user_sync(
+ user_id, is_syncing, last_sync_ms
+ )
def mark_as_coming_online(self, user_id):
"""A user has started syncing. Send a UserSync to the master, unless they
@@ -213,10 +214,13 @@ class SynchrotronPresence(object):
yield self.notify_from_replication(states, stream_id)
def get_currently_syncing_users(self):
- return [
- user_id for user_id, count in iteritems(self.user_to_num_current_syncs)
- if count > 0
- ]
+ if self.hs.config.use_presence:
+ return [
+ user_id for user_id, count in iteritems(self.user_to_num_current_syncs)
+ if count > 0
+ ]
+ else:
+ return set()
class SynchrotronTyping(object):
@@ -245,10 +249,7 @@ class SynchrotronApplicationService(object):
class SynchrotronServer(HomeServer):
- def setup(self):
- logger.info("Setting up.")
- self.datastore = SynchrotronSlavedStore(self.get_db_conn(), self)
- logger.info("Finished setting up.")
+ DATASTORE_CLASS = SynchrotronSlavedStore
def _listen_http(self, listener_config):
port = listener_config["port"]
@@ -305,7 +306,7 @@ class SynchrotronServer(HomeServer):
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
- "collect_metrics is not enabled!"))
+ "enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
@@ -334,8 +335,9 @@ class SyncReplicationHandler(ReplicationClientHandler):
self.presence_handler = hs.get_presence_handler()
self.notifier = hs.get_notifier()
+ @defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
- super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
+ yield super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
run_in_background(self.process_and_notify, stream_name, token, rows)
def get_streams_to_replicate(self):
diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py
index 56ae086128..d658f967ba 100755
--- a/synapse/app/synctl.py
+++ b/synapse/app/synctl.py
@@ -16,16 +16,19 @@
import argparse
import collections
+import errno
import glob
import os
import os.path
import signal
import subprocess
import sys
-import yaml
-import errno
import time
+from six import iteritems
+
+import yaml
+
SYNAPSE = [sys.executable, "-B", "-m", "synapse.app.homeserver"]
GREEN = "\x1b[1;32m"
@@ -172,7 +175,7 @@ def main():
os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
cache_factors = config.get("synctl_cache_factors", {})
- for cache_name, factor in cache_factors.iteritems():
+ for cache_name, factor in iteritems(cache_factors):
os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
worker_configfiles = []
diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py
index ada1c13cec..96ffcaf073 100644
--- a/synapse/app/user_dir.py
+++ b/synapse/app/user_dir.py
@@ -17,6 +17,9 @@
import logging
import sys
+from twisted.internet import defer, reactor
+from twisted.web.resource import NoResource
+
import synapse
from synapse import events
from synapse.app import _base
@@ -43,8 +46,6 @@ from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, run_in_background
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
-from twisted.internet import reactor, defer
-from twisted.web.resource import NoResource
logger = logging.getLogger("synapse.app.user_dir")
@@ -93,10 +94,7 @@ class UserDirectorySlaveStore(
class UserDirectoryServer(HomeServer):
- def setup(self):
- logger.info("Setting up.")
- self.datastore = UserDirectorySlaveStore(self.get_db_conn(), self)
- logger.info("Finished setting up.")
+ DATASTORE_CLASS = UserDirectorySlaveStore
def _listen_http(self, listener_config):
port = listener_config["port"]
@@ -150,7 +148,7 @@ class UserDirectoryServer(HomeServer):
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
- "collect_metrics is not enabled!"))
+ "enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
@@ -168,8 +166,9 @@ class UserDirectoryReplicationHandler(ReplicationClientHandler):
super(UserDirectoryReplicationHandler, self).__init__(hs.get_datastore())
self.user_directory = hs.get_user_directory_handler()
+ @defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
- super(UserDirectoryReplicationHandler, self).on_rdata(
+ yield super(UserDirectoryReplicationHandler, self).on_rdata(
stream_name, token, rows
)
if stream_name == "current_state_deltas":
@@ -213,11 +212,13 @@ def start(config_options):
config.update_user_directory = True
tls_server_context_factory = context_factory.ServerContextFactory(config)
+ tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
ps = UserDirectoryServer(
config.server_name,
db_config=config.database_config,
tls_server_context_factory=tls_server_context_factory,
+ tls_client_options_factory=tls_client_options_factory,
config=config,
version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,
|