diff --git a/synapse/app/__init__.py b/synapse/app/__init__.py
index 9c2b627590..3b6b9368b8 100644
--- a/synapse/app/__init__.py
+++ b/synapse/app/__init__.py
@@ -14,9 +14,11 @@
# limitations under the License.
import sys
+
+from synapse import python_dependencies # noqa: E402
+
sys.dont_write_bytecode = True
-from synapse import python_dependencies # noqa: E402
try:
python_dependencies.check_requirements()
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
new file mode 100644
index 0000000000..391bd14c5c
--- /dev/null
+++ b/synapse/app/_base.py
@@ -0,0 +1,194 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import gc
+import logging
+import sys
+
+from daemonize import Daemonize
+
+from twisted.internet import error, reactor
+
+from synapse.util import PreserveLoggingContext
+from synapse.util.rlimit import change_resource_limit
+
+try:
+ import affinity
+except Exception:
+ affinity = None
+
+
+logger = logging.getLogger(__name__)
+
+
+def start_worker_reactor(appname, config):
+ """ Run the reactor in the main process
+
+ Daemonizes if necessary, and then configures some resources, before starting
+ the reactor. Pulls configuration from the 'worker' settings in 'config'.
+
+ Args:
+ appname (str): application name which will be sent to syslog
+ config (synapse.config.Config): config object
+ """
+
+ logger = logging.getLogger(config.worker_app)
+
+ start_reactor(
+ appname,
+ config.soft_file_limit,
+ config.gc_thresholds,
+ config.worker_pid_file,
+ config.worker_daemonize,
+ config.worker_cpu_affinity,
+ logger,
+ )
+
+
+def start_reactor(
+ appname,
+ soft_file_limit,
+ gc_thresholds,
+ pid_file,
+ daemonize,
+ cpu_affinity,
+ logger,
+):
+ """ Run the reactor in the main process
+
+ Daemonizes if necessary, and then configures some resources, before starting
+ the reactor
+
+ Args:
+ appname (str): application name which will be sent to syslog
+ soft_file_limit (int):
+ gc_thresholds:
+ pid_file (str): name of pid file to write to if daemonize is True
+ daemonize (bool): true to run the reactor in a background process
+ cpu_affinity (int|None): cpu affinity mask
+ logger (logging.Logger): logger instance to pass to Daemonize
+ """
+
+ def run():
+ # make sure that we run the reactor with the sentinel log context,
+ # otherwise other PreserveLoggingContext instances will get confused
+ # and complain when they see the logcontext arbitrarily swapping
+ # between the sentinel and `run` logcontexts.
+ with PreserveLoggingContext():
+ logger.info("Running")
+ if cpu_affinity is not None:
+ if not affinity:
+ quit_with_error(
+ "Missing package 'affinity' required for cpu_affinity\n"
+ "option\n\n"
+ "Install by running:\n\n"
+ " pip install affinity\n\n"
+ )
+ logger.info("Setting CPU affinity to %s" % cpu_affinity)
+ affinity.set_process_affinity_mask(0, cpu_affinity)
+ change_resource_limit(soft_file_limit)
+ if gc_thresholds:
+ gc.set_threshold(*gc_thresholds)
+ reactor.run()
+
+ if daemonize:
+ daemon = Daemonize(
+ app=appname,
+ pid=pid_file,
+ action=run,
+ auto_close_fds=False,
+ verbose=True,
+ logger=logger,
+ )
+ daemon.start()
+ else:
+ run()
+
+
+def quit_with_error(error_string):
+ message_lines = error_string.split("\n")
+ line_length = max([len(l) for l in message_lines if len(l) < 80]) + 2
+ sys.stderr.write("*" * line_length + '\n')
+ for line in message_lines:
+ sys.stderr.write(" %s\n" % (line.rstrip(),))
+ sys.stderr.write("*" * line_length + '\n')
+ sys.exit(1)
+
+
+def listen_metrics(bind_addresses, port):
+ """
+ Start Prometheus metrics server.
+ """
+ from synapse.metrics import RegistryProxy
+ from prometheus_client import start_http_server
+
+ for host in bind_addresses:
+ reactor.callInThread(start_http_server, int(port),
+ addr=host, registry=RegistryProxy)
+ logger.info("Metrics now reporting on %s:%d", host, port)
+
+
+def listen_tcp(bind_addresses, port, factory, backlog=50):
+ """
+ Create a TCP socket for a port and several addresses
+ """
+ for address in bind_addresses:
+ try:
+ reactor.listenTCP(
+ port,
+ factory,
+ backlog,
+ address
+ )
+ except error.CannotListenError as e:
+ check_bind_error(e, address, bind_addresses)
+
+
+def listen_ssl(bind_addresses, port, factory, context_factory, backlog=50):
+ """
+ Create an SSL socket for a port and several addresses
+ """
+ for address in bind_addresses:
+ try:
+ reactor.listenSSL(
+ port,
+ factory,
+ context_factory,
+ backlog,
+ address
+ )
+ except error.CannotListenError as e:
+ check_bind_error(e, address, bind_addresses)
+
+
+def check_bind_error(e, address, bind_addresses):
+ """
+ This method checks an exception occurred while binding on 0.0.0.0.
+ If :: is specified in the bind addresses a warning is shown.
+ The exception is still raised otherwise.
+
+ Binding on both 0.0.0.0 and :: causes an exception on Linux and macOS
+ because :: binds on both IPv4 and IPv6 (as per RFC 3493).
+ When binding on 0.0.0.0 after :: this can safely be ignored.
+
+ Args:
+ e (Exception): Exception that was caught.
+ address (str): Address on which binding was attempted.
+ bind_addresses (list): Addresses on which the service listens.
+ """
+ if address == '0.0.0.0' and '::' in bind_addresses:
+ logger.warn('Failed to listen on 0.0.0.0, continuing because listening on [::]')
+ else:
+ raise e
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
index 9a476efa63..9a37384fb7 100644
--- a/synapse/app/appservice.py
+++ b/synapse/app/appservice.py
@@ -13,38 +13,33 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
+import sys
-import synapse
+from twisted.internet import defer, reactor
+from twisted.web.resource import NoResource
-from synapse.server import HomeServer
+import synapse
+from synapse import events
+from synapse.app import _base
from synapse.config._base import ConfigError
-from synapse.config.logger import setup_logging
from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
from synapse.http.site import SynapseSite
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.metrics import RegistryProxy
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.directory import DirectoryStore
from synapse.replication.slave.storage.events import SlavedEventStore
-from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, run_in_background
from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string
-from synapse import events
-
-from twisted.internet import reactor
-from twisted.web.resource import Resource
-
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
-
logger = logging.getLogger("synapse.app.appservice")
@@ -56,19 +51,6 @@ class AppserviceSlaveStore(
class AppserviceServer(HomeServer):
- def get_db_conn(self, run_new_connection=True):
- # Any param beginning with cp_ is a parameter for adbapi, and should
- # not be passed to the database engine.
- db_params = {
- k: v for k, v in self.db_config.get("args", {}).items()
- if not k.startswith("cp_")
- }
- db_conn = self.database_engine.module.connect(**db_params)
-
- if run_new_connection:
- self.database_engine.on_new_connection(db_conn)
- return db_conn
-
def setup(self):
logger.info("Setting up.")
self.datastore = AppserviceSlaveStore(self.get_db_conn(), self)
@@ -82,21 +64,21 @@ class AppserviceServer(HomeServer):
for res in listener_config["resources"]:
for name in res["names"]:
if name == "metrics":
- resources[METRICS_PREFIX] = MetricsResource(self)
-
- root_resource = create_resource_tree(resources, Resource())
-
- for address in bind_addresses:
- reactor.listenTCP(
- port,
- SynapseSite(
- "synapse.access.http.%s" % (site_tag,),
- site_tag,
- listener_config,
- root_resource,
- ),
- interface=address
+ resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
+
+ root_resource = create_resource_tree(resources, NoResource())
+
+ _base.listen_tcp(
+ bind_addresses,
+ port,
+ SynapseSite(
+ "synapse.access.http.%s" % (site_tag,),
+ site_tag,
+ listener_config,
+ root_resource,
+ self.version_string,
)
+ )
logger.info("Synapse appservice now listening on port %d", port)
@@ -105,18 +87,22 @@ class AppserviceServer(HomeServer):
if listener["type"] == "http":
self._listen_http(listener)
elif listener["type"] == "manhole":
- bind_addresses = listener["bind_addresses"]
-
- for address in bind_addresses:
- reactor.listenTCP(
- listener["port"],
- manhole(
- username="matrix",
- password="rabbithole",
- globals={"hs": self},
- ),
- interface=address
+ _base.listen_tcp(
+ listener["bind_addresses"],
+ listener["port"],
+ manhole(
+ username="matrix",
+ password="rabbithole",
+ globals={"hs": self},
)
+ )
+ elif listener["type"] == "metrics":
+ if not self.get_config().enable_metrics:
+ logger.warn(("Metrics listener configured, but "
+ "enable_metrics is not True!"))
+ else:
+ _base.listen_metrics(listener["bind_addresses"],
+ listener["port"])
else:
logger.warn("Unrecognized listener type: %s", listener["type"])
@@ -136,9 +122,14 @@ class ASReplicationHandler(ReplicationClientHandler):
if stream_name == "events":
max_stream_id = self.store.get_room_max_stream_ordering()
- preserve_fn(
- self.appservice_handler.notify_interested_services
- )(max_stream_id)
+ run_in_background(self._notify_app_services, max_stream_id)
+
+ @defer.inlineCallbacks
+ def _notify_app_services(self, room_stream_id):
+ try:
+ yield self.appservice_handler.notify_interested_services(room_stream_id)
+ except Exception:
+ logger.exception("Error notifying application services of event")
def start(config_options):
@@ -181,36 +172,13 @@ def start(config_options):
ps.setup()
ps.start_listening(config.worker_listeners)
- def run():
- # make sure that we run the reactor with the sentinel log context,
- # otherwise other PreserveLoggingContext instances will get confused
- # and complain when they see the logcontext arbitrarily swapping
- # between the sentinel and `run` logcontexts.
- with PreserveLoggingContext():
- logger.info("Running")
- change_resource_limit(config.soft_file_limit)
- if config.gc_thresholds:
- gc.set_threshold(*config.gc_thresholds)
- reactor.run()
-
def start():
ps.get_datastore().start_profiling()
ps.get_state_handler().start_caching()
reactor.callWhenRunning(start)
- if config.worker_daemonize:
- daemon = Daemonize(
- app="synapse-appservice",
- pid=config.worker_pid_file,
- action=run,
- auto_close_fds=False,
- verbose=True,
- logger=logger,
- )
- daemon.start()
- else:
- run()
+ _base.start_worker_reactor("synapse-appservice", config)
if __name__ == '__main__':
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index 09bc1935f1..398bb36602 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -13,46 +13,46 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
+import sys
-import synapse
+from twisted.internet import reactor
+from twisted.web.resource import NoResource
+import synapse
+from synapse import events
+from synapse.app import _base
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
-from synapse.http.site import SynapseSite
+from synapse.crypto import context_factory
from synapse.http.server import JsonResource
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.http.site import SynapseSite
+from synapse.metrics import RegistryProxy
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
+from synapse.replication.slave.storage.directory import DirectoryStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.keys import SlavedKeyStore
-from synapse.replication.slave.storage.room import RoomStore
-from synapse.replication.slave.storage.directory import DirectoryStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
+from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
-from synapse.rest.client.v1.room import PublicRoomListRestServlet
+from synapse.rest.client.v1.room import (
+ JoinedRoomMemberListRestServlet,
+ PublicRoomListRestServlet,
+ RoomEventContextServlet,
+ RoomMemberListRestServlet,
+ RoomStateRestServlet,
+)
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.util.logcontext import LoggingContext
from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string
-from synapse.crypto import context_factory
-
-from synapse import events
-
-
-from twisted.internet import reactor
-from twisted.web.resource import Resource
-
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
logger = logging.getLogger("synapse.app.client_reader")
@@ -72,19 +72,6 @@ class ClientReaderSlavedStore(
class ClientReaderServer(HomeServer):
- def get_db_conn(self, run_new_connection=True):
- # Any param beginning with cp_ is a parameter for adbapi, and should
- # not be passed to the database engine.
- db_params = {
- k: v for k, v in self.db_config.get("args", {}).items()
- if not k.startswith("cp_")
- }
- db_conn = self.database_engine.module.connect(**db_params)
-
- if run_new_connection:
- self.database_engine.on_new_connection(db_conn)
- return db_conn
-
def setup(self):
logger.info("Setting up.")
self.datastore = ClientReaderSlavedStore(self.get_db_conn(), self)
@@ -98,10 +85,16 @@ class ClientReaderServer(HomeServer):
for res in listener_config["resources"]:
for name in res["names"]:
if name == "metrics":
- resources[METRICS_PREFIX] = MetricsResource(self)
+ resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
elif name == "client":
resource = JsonResource(self, canonical_json=False)
+
PublicRoomListRestServlet(self).register(resource)
+ RoomMemberListRestServlet(self).register(resource)
+ JoinedRoomMemberListRestServlet(self).register(resource)
+ RoomStateRestServlet(self).register(resource)
+ RoomEventContextServlet(self).register(resource)
+
resources.update({
"/_matrix/client/r0": resource,
"/_matrix/client/unstable": resource,
@@ -109,19 +102,19 @@ class ClientReaderServer(HomeServer):
"/_matrix/client/api/v1": resource,
})
- root_resource = create_resource_tree(resources, Resource())
-
- for address in bind_addresses:
- reactor.listenTCP(
- port,
- SynapseSite(
- "synapse.access.http.%s" % (site_tag,),
- site_tag,
- listener_config,
- root_resource,
- ),
- interface=address
+ root_resource = create_resource_tree(resources, NoResource())
+
+ _base.listen_tcp(
+ bind_addresses,
+ port,
+ SynapseSite(
+ "synapse.access.http.%s" % (site_tag,),
+ site_tag,
+ listener_config,
+ root_resource,
+ self.version_string,
)
+ )
logger.info("Synapse client reader now listening on port %d", port)
@@ -130,18 +123,22 @@ class ClientReaderServer(HomeServer):
if listener["type"] == "http":
self._listen_http(listener)
elif listener["type"] == "manhole":
- bind_addresses = listener["bind_addresses"]
-
- for address in bind_addresses:
- reactor.listenTCP(
- listener["port"],
- manhole(
- username="matrix",
- password="rabbithole",
- globals={"hs": self},
- ),
- interface=address
+ _base.listen_tcp(
+ listener["bind_addresses"],
+ listener["port"],
+ manhole(
+ username="matrix",
+ password="rabbithole",
+ globals={"hs": self},
)
+ )
+ elif listener["type"] == "metrics":
+ if not self.get_config().enable_metrics:
+ logger.warn(("Metrics listener configured, but "
+ "enable_metrics is not True!"))
+ else:
+ _base.listen_metrics(listener["bind_addresses"],
+ listener["port"])
else:
logger.warn("Unrecognized listener type: %s", listener["type"])
@@ -180,39 +177,15 @@ def start(config_options):
)
ss.setup()
- ss.get_handlers()
ss.start_listening(config.worker_listeners)
- def run():
- # make sure that we run the reactor with the sentinel log context,
- # otherwise other PreserveLoggingContext instances will get confused
- # and complain when they see the logcontext arbitrarily swapping
- # between the sentinel and `run` logcontexts.
- with PreserveLoggingContext():
- logger.info("Running")
- change_resource_limit(config.soft_file_limit)
- if config.gc_thresholds:
- gc.set_threshold(*config.gc_thresholds)
- reactor.run()
-
def start():
ss.get_state_handler().start_caching()
ss.get_datastore().start_profiling()
reactor.callWhenRunning(start)
- if config.worker_daemonize:
- daemon = Daemonize(
- app="synapse-client-reader",
- pid=config.worker_pid_file,
- action=run,
- auto_close_fds=False,
- verbose=True,
- logger=logger,
- )
- daemon.start()
- else:
- run()
+ _base.start_worker_reactor("synapse-client-reader", config)
if __name__ == '__main__':
diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py
new file mode 100644
index 0000000000..374f115644
--- /dev/null
+++ b/synapse/app/event_creator.py
@@ -0,0 +1,201 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+import sys
+
+from twisted.internet import reactor
+from twisted.web.resource import NoResource
+
+import synapse
+from synapse import events
+from synapse.app import _base
+from synapse.config._base import ConfigError
+from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
+from synapse.crypto import context_factory
+from synapse.http.server import JsonResource
+from synapse.http.site import SynapseSite
+from synapse.metrics import RegistryProxy
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
+from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
+from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
+from synapse.replication.slave.storage.devices import SlavedDeviceStore
+from synapse.replication.slave.storage.directory import DirectoryStore
+from synapse.replication.slave.storage.events import SlavedEventStore
+from synapse.replication.slave.storage.profile import SlavedProfileStore
+from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
+from synapse.replication.slave.storage.pushers import SlavedPusherStore
+from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
+from synapse.replication.slave.storage.registration import SlavedRegistrationStore
+from synapse.replication.slave.storage.room import RoomStore
+from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.rest.client.v1.room import (
+ JoinRoomAliasServlet,
+ RoomMembershipRestServlet,
+ RoomSendEventRestServlet,
+ RoomStateEventRestServlet,
+)
+from synapse.server import HomeServer
+from synapse.storage.engines import create_engine
+from synapse.util.httpresourcetree import create_resource_tree
+from synapse.util.logcontext import LoggingContext
+from synapse.util.manhole import manhole
+from synapse.util.versionstring import get_version_string
+
+logger = logging.getLogger("synapse.app.event_creator")
+
+
+class EventCreatorSlavedStore(
+ DirectoryStore,
+ TransactionStore,
+ SlavedProfileStore,
+ SlavedAccountDataStore,
+ SlavedPusherStore,
+ SlavedReceiptsStore,
+ SlavedPushRuleStore,
+ SlavedDeviceStore,
+ SlavedClientIpStore,
+ SlavedApplicationServiceStore,
+ SlavedEventStore,
+ SlavedRegistrationStore,
+ RoomStore,
+ BaseSlavedStore,
+):
+ pass
+
+
+class EventCreatorServer(HomeServer):
+ def setup(self):
+ logger.info("Setting up.")
+ self.datastore = EventCreatorSlavedStore(self.get_db_conn(), self)
+ logger.info("Finished setting up.")
+
+ def _listen_http(self, listener_config):
+ port = listener_config["port"]
+ bind_addresses = listener_config["bind_addresses"]
+ site_tag = listener_config.get("tag", port)
+ resources = {}
+ for res in listener_config["resources"]:
+ for name in res["names"]:
+ if name == "metrics":
+ resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
+ elif name == "client":
+ resource = JsonResource(self, canonical_json=False)
+ RoomSendEventRestServlet(self).register(resource)
+ RoomMembershipRestServlet(self).register(resource)
+ RoomStateEventRestServlet(self).register(resource)
+ JoinRoomAliasServlet(self).register(resource)
+ resources.update({
+ "/_matrix/client/r0": resource,
+ "/_matrix/client/unstable": resource,
+ "/_matrix/client/v2_alpha": resource,
+ "/_matrix/client/api/v1": resource,
+ })
+
+ root_resource = create_resource_tree(resources, NoResource())
+
+ _base.listen_tcp(
+ bind_addresses,
+ port,
+ SynapseSite(
+ "synapse.access.http.%s" % (site_tag,),
+ site_tag,
+ listener_config,
+ root_resource,
+ self.version_string,
+ )
+ )
+
+ logger.info("Synapse event creator now listening on port %d", port)
+
+ def start_listening(self, listeners):
+ for listener in listeners:
+ if listener["type"] == "http":
+ self._listen_http(listener)
+ elif listener["type"] == "manhole":
+ _base.listen_tcp(
+ listener["bind_addresses"],
+ listener["port"],
+ manhole(
+ username="matrix",
+ password="rabbithole",
+ globals={"hs": self},
+ )
+ )
+ elif listener["type"] == "metrics":
+ if not self.get_config().enable_metrics:
+ logger.warn(("Metrics listener configured, but "
+ "enable_metrics is not True!"))
+ else:
+ _base.listen_metrics(listener["bind_addresses"],
+ listener["port"])
+ else:
+ logger.warn("Unrecognized listener type: %s", listener["type"])
+
+ self.get_tcp_replication().start_replication(self)
+
+ def build_tcp_replication(self):
+ return ReplicationClientHandler(self.get_datastore())
+
+
+def start(config_options):
+ try:
+ config = HomeServerConfig.load_config(
+ "Synapse event creator", config_options
+ )
+ except ConfigError as e:
+ sys.stderr.write("\n" + e.message + "\n")
+ sys.exit(1)
+
+ assert config.worker_app == "synapse.app.event_creator"
+
+ assert config.worker_replication_http_port is not None
+
+ setup_logging(config, use_worker_options=True)
+
+ events.USE_FROZEN_DICTS = config.use_frozen_dicts
+
+ database_engine = create_engine(config.database_config)
+
+ tls_server_context_factory = context_factory.ServerContextFactory(config)
+
+ ss = EventCreatorServer(
+ config.server_name,
+ db_config=config.database_config,
+ tls_server_context_factory=tls_server_context_factory,
+ config=config,
+ version_string="Synapse/" + get_version_string(synapse),
+ database_engine=database_engine,
+ )
+
+ ss.setup()
+ ss.start_listening(config.worker_listeners)
+
+ def start():
+ ss.get_state_handler().start_caching()
+ ss.get_datastore().start_profiling()
+
+ reactor.callWhenRunning(start)
+
+ _base.start_worker_reactor("synapse-event-creator", config)
+
+
+if __name__ == '__main__':
+ with LoggingContext("main"):
+ start(sys.argv[1:])
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index eb392e1c9d..7af00b8bcf 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -13,43 +13,37 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
+import sys
-import synapse
+from twisted.internet import reactor
+from twisted.web.resource import NoResource
+import synapse
+from synapse import events
+from synapse.api.urls import FEDERATION_PREFIX
+from synapse.app import _base
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
+from synapse.crypto import context_factory
+from synapse.federation.transport.server import TransportLayerServer
from synapse.http.site import SynapseSite
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.metrics import RegistryProxy
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage.directory import DirectoryStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.keys import SlavedKeyStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import TransactionStore
-from synapse.replication.slave.storage.directory import DirectoryStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.util.logcontext import LoggingContext
from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string
-from synapse.api.urls import FEDERATION_PREFIX
-from synapse.federation.transport.server import TransportLayerServer
-from synapse.crypto import context_factory
-
-from synapse import events
-
-
-from twisted.internet import reactor
-from twisted.web.resource import Resource
-
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
logger = logging.getLogger("synapse.app.federation_reader")
@@ -66,19 +60,6 @@ class FederationReaderSlavedStore(
class FederationReaderServer(HomeServer):
- def get_db_conn(self, run_new_connection=True):
- # Any param beginning with cp_ is a parameter for adbapi, and should
- # not be passed to the database engine.
- db_params = {
- k: v for k, v in self.db_config.get("args", {}).items()
- if not k.startswith("cp_")
- }
- db_conn = self.database_engine.module.connect(**db_params)
-
- if run_new_connection:
- self.database_engine.on_new_connection(db_conn)
- return db_conn
-
def setup(self):
logger.info("Setting up.")
self.datastore = FederationReaderSlavedStore(self.get_db_conn(), self)
@@ -92,25 +73,25 @@ class FederationReaderServer(HomeServer):
for res in listener_config["resources"]:
for name in res["names"]:
if name == "metrics":
- resources[METRICS_PREFIX] = MetricsResource(self)
+ resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
elif name == "federation":
resources.update({
FEDERATION_PREFIX: TransportLayerServer(self),
})
- root_resource = create_resource_tree(resources, Resource())
-
- for address in bind_addresses:
- reactor.listenTCP(
- port,
- SynapseSite(
- "synapse.access.http.%s" % (site_tag,),
- site_tag,
- listener_config,
- root_resource,
- ),
- interface=address
+ root_resource = create_resource_tree(resources, NoResource())
+
+ _base.listen_tcp(
+ bind_addresses,
+ port,
+ SynapseSite(
+ "synapse.access.http.%s" % (site_tag,),
+ site_tag,
+ listener_config,
+ root_resource,
+ self.version_string,
)
+ )
logger.info("Synapse federation reader now listening on port %d", port)
@@ -119,18 +100,22 @@ class FederationReaderServer(HomeServer):
if listener["type"] == "http":
self._listen_http(listener)
elif listener["type"] == "manhole":
- bind_addresses = listener["bind_addresses"]
-
- for address in bind_addresses:
- reactor.listenTCP(
- listener["port"],
- manhole(
- username="matrix",
- password="rabbithole",
- globals={"hs": self},
- ),
- interface=address
+ _base.listen_tcp(
+ listener["bind_addresses"],
+ listener["port"],
+ manhole(
+ username="matrix",
+ password="rabbithole",
+ globals={"hs": self},
)
+ )
+ elif listener["type"] == "metrics":
+ if not self.get_config().enable_metrics:
+ logger.warn(("Metrics listener configured, but "
+ "enable_metrics is not True!"))
+ else:
+ _base.listen_metrics(listener["bind_addresses"],
+ listener["port"])
else:
logger.warn("Unrecognized listener type: %s", listener["type"])
@@ -169,39 +154,15 @@ def start(config_options):
)
ss.setup()
- ss.get_handlers()
ss.start_listening(config.worker_listeners)
- def run():
- # make sure that we run the reactor with the sentinel log context,
- # otherwise other PreserveLoggingContext instances will get confused
- # and complain when they see the logcontext arbitrarily swapping
- # between the sentinel and `run` logcontexts.
- with PreserveLoggingContext():
- logger.info("Running")
- change_resource_limit(config.soft_file_limit)
- if config.gc_thresholds:
- gc.set_threshold(*config.gc_thresholds)
- reactor.run()
-
def start():
ss.get_state_handler().start_caching()
ss.get_datastore().start_profiling()
reactor.callWhenRunning(start)
- if config.worker_daemonize:
- daemon = Daemonize(
- app="synapse-federation-reader",
- pid=config.worker_pid_file,
- action=run,
- auto_close_fds=False,
- verbose=True,
- logger=logger,
- )
- daemon.start()
- else:
- run()
+ _base.start_worker_reactor("synapse-federation-reader", config)
if __name__ == '__main__':
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 03327dc47a..18469013fa 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -13,44 +13,39 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
+import sys
-import synapse
+from twisted.internet import defer, reactor
+from twisted.web.resource import NoResource
-from synapse.server import HomeServer
+import synapse
+from synapse import events
+from synapse.app import _base
from synapse.config._base import ConfigError
-from synapse.config.logger import setup_logging
from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
from synapse.crypto import context_factory
-from synapse.http.site import SynapseSite
from synapse.federation import send_queue
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.http.site import SynapseSite
+from synapse.metrics import RegistryProxy
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
+from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.slave.storage.events import SlavedEventStore
+from synapse.replication.slave.storage.presence import SlavedPresenceStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
-from synapse.replication.slave.storage.presence import SlavedPresenceStore
from synapse.replication.slave.storage.transactions import TransactionStore
-from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.util.async import Linearizer
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, run_in_background
from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string
-from synapse import events
-
-from twisted.internet import reactor, defer
-from twisted.web.resource import Resource
-
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
-
logger = logging.getLogger("synapse.app.federation_sender")
@@ -83,19 +78,6 @@ class FederationSenderSlaveStore(
class FederationSenderServer(HomeServer):
- def get_db_conn(self, run_new_connection=True):
- # Any param beginning with cp_ is a parameter for adbapi, and should
- # not be passed to the database engine.
- db_params = {
- k: v for k, v in self.db_config.get("args", {}).items()
- if not k.startswith("cp_")
- }
- db_conn = self.database_engine.module.connect(**db_params)
-
- if run_new_connection:
- self.database_engine.on_new_connection(db_conn)
- return db_conn
-
def setup(self):
logger.info("Setting up.")
self.datastore = FederationSenderSlaveStore(self.get_db_conn(), self)
@@ -109,21 +91,21 @@ class FederationSenderServer(HomeServer):
for res in listener_config["resources"]:
for name in res["names"]:
if name == "metrics":
- resources[METRICS_PREFIX] = MetricsResource(self)
-
- root_resource = create_resource_tree(resources, Resource())
-
- for address in bind_addresses:
- reactor.listenTCP(
- port,
- SynapseSite(
- "synapse.access.http.%s" % (site_tag,),
- site_tag,
- listener_config,
- root_resource,
- ),
- interface=address
+ resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
+
+ root_resource = create_resource_tree(resources, NoResource())
+
+ _base.listen_tcp(
+ bind_addresses,
+ port,
+ SynapseSite(
+ "synapse.access.http.%s" % (site_tag,),
+ site_tag,
+ listener_config,
+ root_resource,
+ self.version_string,
)
+ )
logger.info("Synapse federation_sender now listening on port %d", port)
@@ -132,18 +114,22 @@ class FederationSenderServer(HomeServer):
if listener["type"] == "http":
self._listen_http(listener)
elif listener["type"] == "manhole":
- bind_addresses = listener["bind_addresses"]
-
- for address in bind_addresses:
- reactor.listenTCP(
- listener["port"],
- manhole(
- username="matrix",
- password="rabbithole",
- globals={"hs": self},
- ),
- interface=address
+ _base.listen_tcp(
+ listener["bind_addresses"],
+ listener["port"],
+ manhole(
+ username="matrix",
+ password="rabbithole",
+ globals={"hs": self},
)
+ )
+ elif listener["type"] == "metrics":
+ if not self.get_config().enable_metrics:
+ logger.warn(("Metrics listener configured, but "
+ "enable_metrics is not True!"))
+ else:
+ _base.listen_metrics(listener["bind_addresses"],
+ listener["port"])
else:
logger.warn("Unrecognized listener type: %s", listener["type"])
@@ -213,36 +199,12 @@ def start(config_options):
ps.setup()
ps.start_listening(config.worker_listeners)
- def run():
- # make sure that we run the reactor with the sentinel log context,
- # otherwise other PreserveLoggingContext instances will get confused
- # and complain when they see the logcontext arbitrarily swapping
- # between the sentinel and `run` logcontexts.
- with PreserveLoggingContext():
- logger.info("Running")
- change_resource_limit(config.soft_file_limit)
- if config.gc_thresholds:
- gc.set_threshold(*config.gc_thresholds)
- reactor.run()
-
def start():
ps.get_datastore().start_profiling()
ps.get_state_handler().start_caching()
reactor.callWhenRunning(start)
-
- if config.worker_daemonize:
- daemon = Daemonize(
- app="synapse-federation-sender",
- pid=config.worker_pid_file,
- action=run,
- auto_close_fds=False,
- verbose=True,
- logger=logger,
- )
- daemon.start()
- else:
- run()
+ _base.start_worker_reactor("synapse-federation-sender", config)
class FederationSenderHandler(object):
@@ -277,7 +239,7 @@ class FederationSenderHandler(object):
# presence, typing, etc.
if stream_name == "federation":
send_queue.process_rows_for_federation(self.federation_sender, rows)
- preserve_fn(self.update_token)(token)
+ run_in_background(self.update_token, token)
# We also need to poke the federation sender when new events happen
elif stream_name == "events":
@@ -285,19 +247,22 @@ class FederationSenderHandler(object):
@defer.inlineCallbacks
def update_token(self, token):
- self.federation_position = token
-
- # We linearize here to ensure we don't have races updating the token
- with (yield self._fed_position_linearizer.queue(None)):
- if self._last_ack < self.federation_position:
- yield self.store.update_federation_out_pos(
- "federation", self.federation_position
- )
+ try:
+ self.federation_position = token
+
+ # We linearize here to ensure we don't have races updating the token
+ with (yield self._fed_position_linearizer.queue(None)):
+ if self._last_ack < self.federation_position:
+ yield self.store.update_federation_out_pos(
+ "federation", self.federation_position
+ )
- # We ACK this token over replication so that the master can drop
- # its in memory queues
- self.replication_client.send_federation_ack(self.federation_position)
- self._last_ack = self.federation_position
+ # We ACK this token over replication so that the master can drop
+ # its in memory queues
+ self.replication_client.send_federation_ack(self.federation_position)
+ self._last_ack = self.federation_position
+ except Exception:
+ logger.exception("Error updating federation stream position")
if __name__ == '__main__':
diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py
new file mode 100644
index 0000000000..b5f78f4640
--- /dev/null
+++ b/synapse/app/frontend_proxy.py
@@ -0,0 +1,235 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+import sys
+
+from twisted.internet import defer, reactor
+from twisted.web.resource import NoResource
+
+import synapse
+from synapse import events
+from synapse.api.errors import SynapseError
+from synapse.app import _base
+from synapse.config._base import ConfigError
+from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
+from synapse.crypto import context_factory
+from synapse.http.server import JsonResource
+from synapse.http.servlet import RestServlet, parse_json_object_from_request
+from synapse.http.site import SynapseSite
+from synapse.metrics import RegistryProxy
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
+from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
+from synapse.replication.slave.storage.devices import SlavedDeviceStore
+from synapse.replication.slave.storage.registration import SlavedRegistrationStore
+from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.rest.client.v2_alpha._base import client_v2_patterns
+from synapse.server import HomeServer
+from synapse.storage.engines import create_engine
+from synapse.util.httpresourcetree import create_resource_tree
+from synapse.util.logcontext import LoggingContext
+from synapse.util.manhole import manhole
+from synapse.util.versionstring import get_version_string
+
+logger = logging.getLogger("synapse.app.frontend_proxy")
+
+
+class KeyUploadServlet(RestServlet):
+ PATTERNS = client_v2_patterns("/keys/upload(/(?P<device_id>[^/]+))?$")
+
+ def __init__(self, hs):
+ """
+ Args:
+ hs (synapse.server.HomeServer): server
+ """
+ super(KeyUploadServlet, self).__init__()
+ self.auth = hs.get_auth()
+ self.store = hs.get_datastore()
+ self.http_client = hs.get_simple_http_client()
+ self.main_uri = hs.config.worker_main_http_uri
+
+ @defer.inlineCallbacks
+ def on_POST(self, request, device_id):
+ requester = yield self.auth.get_user_by_req(request, allow_guest=True)
+ user_id = requester.user.to_string()
+ body = parse_json_object_from_request(request)
+
+ if device_id is not None:
+ # passing the device_id here is deprecated; however, we allow it
+ # for now for compatibility with older clients.
+ if (requester.device_id is not None and
+ device_id != requester.device_id):
+ logger.warning("Client uploading keys for a different device "
+ "(logged in as %s, uploading for %s)",
+ requester.device_id, device_id)
+ else:
+ device_id = requester.device_id
+
+ if device_id is None:
+ raise SynapseError(
+ 400,
+ "To upload keys, you must pass device_id when authenticating"
+ )
+
+ if body:
+ # They're actually trying to upload something, proxy to main synapse.
+ # Pass through the auth headers, if any, in case the access token
+ # is there.
+ auth_headers = request.requestHeaders.getRawHeaders(b"Authorization", [])
+ headers = {
+ "Authorization": auth_headers,
+ }
+ result = yield self.http_client.post_json_get_json(
+ self.main_uri + request.uri,
+ body,
+ headers=headers,
+ )
+
+ defer.returnValue((200, result))
+ else:
+ # Just interested in counts.
+ result = yield self.store.count_e2e_one_time_keys(user_id, device_id)
+ defer.returnValue((200, {"one_time_key_counts": result}))
+
+
+class FrontendProxySlavedStore(
+ SlavedDeviceStore,
+ SlavedClientIpStore,
+ SlavedApplicationServiceStore,
+ SlavedRegistrationStore,
+ BaseSlavedStore,
+):
+ pass
+
+
+class FrontendProxyServer(HomeServer):
+ def setup(self):
+ logger.info("Setting up.")
+ self.datastore = FrontendProxySlavedStore(self.get_db_conn(), self)
+ logger.info("Finished setting up.")
+
+ def _listen_http(self, listener_config):
+ port = listener_config["port"]
+ bind_addresses = listener_config["bind_addresses"]
+ site_tag = listener_config.get("tag", port)
+ resources = {}
+ for res in listener_config["resources"]:
+ for name in res["names"]:
+ if name == "metrics":
+ resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
+ elif name == "client":
+ resource = JsonResource(self, canonical_json=False)
+ KeyUploadServlet(self).register(resource)
+ resources.update({
+ "/_matrix/client/r0": resource,
+ "/_matrix/client/unstable": resource,
+ "/_matrix/client/v2_alpha": resource,
+ "/_matrix/client/api/v1": resource,
+ })
+
+ root_resource = create_resource_tree(resources, NoResource())
+
+ _base.listen_tcp(
+ bind_addresses,
+ port,
+ SynapseSite(
+ "synapse.access.http.%s" % (site_tag,),
+ site_tag,
+ listener_config,
+ root_resource,
+ self.version_string,
+ )
+ )
+
+ logger.info("Synapse client reader now listening on port %d", port)
+
+ def start_listening(self, listeners):
+ for listener in listeners:
+ if listener["type"] == "http":
+ self._listen_http(listener)
+ elif listener["type"] == "manhole":
+ _base.listen_tcp(
+ listener["bind_addresses"],
+ listener["port"],
+ manhole(
+ username="matrix",
+ password="rabbithole",
+ globals={"hs": self},
+ )
+ )
+ elif listener["type"] == "metrics":
+ if not self.get_config().enable_metrics:
+ logger.warn(("Metrics listener configured, but "
+ "enable_metrics is not True!"))
+ else:
+ _base.listen_metrics(listener["bind_addresses"],
+ listener["port"])
+ else:
+ logger.warn("Unrecognized listener type: %s", listener["type"])
+
+ self.get_tcp_replication().start_replication(self)
+
+ def build_tcp_replication(self):
+ return ReplicationClientHandler(self.get_datastore())
+
+
+def start(config_options):
+ try:
+ config = HomeServerConfig.load_config(
+ "Synapse frontend proxy", config_options
+ )
+ except ConfigError as e:
+ sys.stderr.write("\n" + e.message + "\n")
+ sys.exit(1)
+
+ assert config.worker_app == "synapse.app.frontend_proxy"
+
+ assert config.worker_main_http_uri is not None
+
+ setup_logging(config, use_worker_options=True)
+
+ events.USE_FROZEN_DICTS = config.use_frozen_dicts
+
+ database_engine = create_engine(config.database_config)
+
+ tls_server_context_factory = context_factory.ServerContextFactory(config)
+
+ ss = FrontendProxyServer(
+ config.server_name,
+ db_config=config.database_config,
+ tls_server_context_factory=tls_server_context_factory,
+ config=config,
+ version_string="Synapse/" + get_version_string(synapse),
+ database_engine=database_engine,
+ )
+
+ ss.setup()
+ ss.start_listening(config.worker_listeners)
+
+ def start():
+ ss.get_state_handler().start_caching()
+ ss.get_datastore().start_profiling()
+
+ reactor.callWhenRunning(start)
+
+ _base.start_worker_reactor("synapse-frontend-proxy", config)
+
+
+if __name__ == '__main__':
+ with LoggingContext("main"):
+ start(sys.argv[1:])
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 081e7cce59..2ad1beb8d8 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -13,61 +13,62 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
-import synapse
-
import gc
import logging
import os
import sys
-import synapse.config.logger
-from synapse.config._base import ConfigError
-
-from synapse.python_dependencies import (
- check_requirements, CONDITIONAL_REQUIREMENTS
-)
-
-from synapse.rest import ClientRestResource
-from synapse.storage.engines import create_engine, IncorrectDatabaseSetup
-from synapse.storage import are_all_users_on_domain
-from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database
-
-from synapse.server import HomeServer
+from six import iteritems
-from twisted.internet import reactor, defer
from twisted.application import service
-from twisted.web.resource import Resource, EncodingResourceWrapper
-from twisted.web.static import File
+from twisted.internet import defer, reactor
+from twisted.web.resource import EncodingResourceWrapper, NoResource
from twisted.web.server import GzipEncoderFactory
-from synapse.http.server import RootRedirect
-from synapse.rest.media.v0.content_repository import ContentRepoResource
-from synapse.rest.media.v1.media_repository import MediaRepositoryResource
-from synapse.rest.key.v1.server_key_resource import LocalKey
-from synapse.rest.key.v2 import KeyApiV2Resource
+from twisted.web.static import File
+
+import synapse
+import synapse.config.logger
+from synapse import events
from synapse.api.urls import (
- FEDERATION_PREFIX, WEB_CLIENT_PREFIX, CONTENT_REPO_PREFIX,
- SERVER_KEY_PREFIX, LEGACY_MEDIA_PREFIX, MEDIA_PREFIX, STATIC_PREFIX,
+ CONTENT_REPO_PREFIX,
+ FEDERATION_PREFIX,
+ LEGACY_MEDIA_PREFIX,
+ MEDIA_PREFIX,
+ SERVER_KEY_PREFIX,
SERVER_KEY_V2_PREFIX,
+ STATIC_PREFIX,
+ WEB_CLIENT_PREFIX,
)
+from synapse.app import _base
+from synapse.app._base import listen_ssl, listen_tcp, quit_with_error
+from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.crypto import context_factory
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
-from synapse.metrics import register_memory_metrics
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
-from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
from synapse.federation.transport.server import TransportLayerServer
-
-from synapse.util.rlimit import change_resource_limit
-from synapse.util.versionstring import get_version_string
+from synapse.http.additional_resource import AdditionalResource
+from synapse.http.server import RootRedirect
+from synapse.http.site import SynapseSite
+from synapse.metrics import RegistryProxy
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.module_api import ModuleApi
+from synapse.python_dependencies import CONDITIONAL_REQUIREMENTS, check_requirements
+from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
+from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
+from synapse.rest import ClientRestResource
+from synapse.rest.key.v1.server_key_resource import LocalKey
+from synapse.rest.key.v2 import KeyApiV2Resource
+from synapse.rest.media.v0.content_repository import ContentRepoResource
+from synapse.server import HomeServer
+from synapse.storage import are_all_users_on_domain
+from synapse.storage.engines import IncorrectDatabaseSetup, create_engine
+from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database
+from synapse.util.caches import CACHE_SIZE_FACTOR
from synapse.util.httpresourcetree import create_resource_tree
+from synapse.util.logcontext import LoggingContext
from synapse.util.manhole import manhole
-
-from synapse.http.site import SynapseSite
-
-from synapse import events
-
-from daemonize import Daemonize
+from synapse.util.module_loader import load_module
+from synapse.util.rlimit import change_resource_limit
+from synapse.util.versionstring import get_version_string
logger = logging.getLogger("synapse.app.homeserver")
@@ -119,87 +120,132 @@ class SynapseHomeServer(HomeServer):
resources = {}
for res in listener_config["resources"]:
for name in res["names"]:
- if name == "client":
- client_resource = ClientRestResource(self)
- if res["compress"]:
- client_resource = gz_wrap(client_resource)
-
- resources.update({
- "/_matrix/client/api/v1": client_resource,
- "/_matrix/client/r0": client_resource,
- "/_matrix/client/unstable": client_resource,
- "/_matrix/client/v2_alpha": client_resource,
- "/_matrix/client/versions": client_resource,
- })
-
- if name == "federation":
- resources.update({
- FEDERATION_PREFIX: TransportLayerServer(self),
- })
-
- if name in ["static", "client"]:
- resources.update({
- STATIC_PREFIX: File(
- os.path.join(os.path.dirname(synapse.__file__), "static")
- ),
- })
-
- if name in ["media", "federation", "client"]:
- media_repo = MediaRepositoryResource(self)
- resources.update({
- MEDIA_PREFIX: media_repo,
- LEGACY_MEDIA_PREFIX: media_repo,
- CONTENT_REPO_PREFIX: ContentRepoResource(
- self, self.config.uploads_path
- ),
- })
-
- if name in ["keys", "federation"]:
- resources.update({
- SERVER_KEY_PREFIX: LocalKey(self),
- SERVER_KEY_V2_PREFIX: KeyApiV2Resource(self),
- })
-
- if name == "webclient":
- resources[WEB_CLIENT_PREFIX] = build_resource_for_web_client(self)
-
- if name == "metrics" and self.get_config().enable_metrics:
- resources[METRICS_PREFIX] = MetricsResource(self)
+ resources.update(self._configure_named_resource(
+ name, res.get("compress", False),
+ ))
+
+ additional_resources = listener_config.get("additional_resources", {})
+ logger.debug("Configuring additional resources: %r",
+ additional_resources)
+ module_api = ModuleApi(self, self.get_auth_handler())
+ for path, resmodule in additional_resources.items():
+ handler_cls, config = load_module(resmodule)
+ handler = handler_cls(config, module_api)
+ resources[path] = AdditionalResource(self, handler.handle_request)
if WEB_CLIENT_PREFIX in resources:
root_resource = RootRedirect(WEB_CLIENT_PREFIX)
else:
- root_resource = Resource()
+ root_resource = NoResource()
root_resource = create_resource_tree(resources, root_resource)
if tls:
- for address in bind_addresses:
- reactor.listenSSL(
- port,
- SynapseSite(
- "synapse.access.https.%s" % (site_tag,),
- site_tag,
- listener_config,
- root_resource,
- ),
- self.tls_server_context_factory,
- interface=address
- )
+ listen_ssl(
+ bind_addresses,
+ port,
+ SynapseSite(
+ "synapse.access.https.%s" % (site_tag,),
+ site_tag,
+ listener_config,
+ root_resource,
+ self.version_string,
+ ),
+ self.tls_server_context_factory,
+ )
+
else:
- for address in bind_addresses:
- reactor.listenTCP(
- port,
- SynapseSite(
- "synapse.access.http.%s" % (site_tag,),
- site_tag,
- listener_config,
- root_resource,
- ),
- interface=address
+ listen_tcp(
+ bind_addresses,
+ port,
+ SynapseSite(
+ "synapse.access.http.%s" % (site_tag,),
+ site_tag,
+ listener_config,
+ root_resource,
+ self.version_string,
)
+ )
logger.info("Synapse now listening on port %d", port)
+ def _configure_named_resource(self, name, compress=False):
+ """Build a resource map for a named resource
+
+ Args:
+ name (str): named resource: one of "client", "federation", etc
+ compress (bool): whether to enable gzip compression for this
+ resource
+
+ Returns:
+ dict[str, Resource]: map from path to HTTP resource
+ """
+ resources = {}
+ if name == "client":
+ client_resource = ClientRestResource(self)
+ if compress:
+ client_resource = gz_wrap(client_resource)
+
+ resources.update({
+ "/_matrix/client/api/v1": client_resource,
+ "/_matrix/client/r0": client_resource,
+ "/_matrix/client/unstable": client_resource,
+ "/_matrix/client/v2_alpha": client_resource,
+ "/_matrix/client/versions": client_resource,
+ })
+
+ if name == "consent":
+ from synapse.rest.consent.consent_resource import ConsentResource
+ consent_resource = ConsentResource(self)
+ if compress:
+ consent_resource = gz_wrap(consent_resource)
+ resources.update({
+ "/_matrix/consent": consent_resource,
+ })
+
+ if name == "federation":
+ resources.update({
+ FEDERATION_PREFIX: TransportLayerServer(self),
+ })
+
+ if name in ["static", "client"]:
+ resources.update({
+ STATIC_PREFIX: File(
+ os.path.join(os.path.dirname(synapse.__file__), "static")
+ ),
+ })
+
+ if name in ["media", "federation", "client"]:
+ if self.get_config().enable_media_repo:
+ media_repo = self.get_media_repository_resource()
+ resources.update({
+ MEDIA_PREFIX: media_repo,
+ LEGACY_MEDIA_PREFIX: media_repo,
+ CONTENT_REPO_PREFIX: ContentRepoResource(
+ self, self.config.uploads_path
+ ),
+ })
+ elif name == "media":
+ raise ConfigError(
+ "'media' resource conflicts with enable_media_repo=False",
+ )
+
+ if name in ["keys", "federation"]:
+ resources.update({
+ SERVER_KEY_PREFIX: LocalKey(self),
+ SERVER_KEY_V2_PREFIX: KeyApiV2Resource(self),
+ })
+
+ if name == "webclient":
+ resources[WEB_CLIENT_PREFIX] = build_resource_for_web_client(self)
+
+ if name == "metrics" and self.get_config().enable_metrics:
+ resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
+
+ if name == "replication":
+ resources[REPLICATION_PREFIX] = ReplicationRestResource(self)
+
+ return resources
+
def start_listening(self):
config = self.get_config()
@@ -207,18 +253,15 @@ class SynapseHomeServer(HomeServer):
if listener["type"] == "http":
self._listener_http(config, listener)
elif listener["type"] == "manhole":
- bind_addresses = listener["bind_addresses"]
-
- for address in bind_addresses:
- reactor.listenTCP(
- listener["port"],
- manhole(
- username="matrix",
- password="rabbithole",
- globals={"hs": self},
- ),
- interface=address
+ listen_tcp(
+ listener["bind_addresses"],
+ listener["port"],
+ manhole(
+ username="matrix",
+ password="rabbithole",
+ globals={"hs": self},
)
+ )
elif listener["type"] == "replication":
bind_addresses = listener["bind_addresses"]
for address in bind_addresses:
@@ -229,6 +272,13 @@ class SynapseHomeServer(HomeServer):
reactor.addSystemEventTrigger(
"before", "shutdown", server_listener.stopListening,
)
+ elif listener["type"] == "metrics":
+ if not self.get_config().enable_metrics:
+ logger.warn(("Metrics listener configured, but "
+ "enable_metrics is not True!"))
+ else:
+ _base.listen_metrics(listener["bind_addresses"],
+ listener["port"])
else:
logger.warn("Unrecognized listener type: %s", listener["type"])
@@ -248,29 +298,6 @@ class SynapseHomeServer(HomeServer):
except IncorrectDatabaseSetup as e:
quit_with_error(e.message)
- def get_db_conn(self, run_new_connection=True):
- # Any param beginning with cp_ is a parameter for adbapi, and should
- # not be passed to the database engine.
- db_params = {
- k: v for k, v in self.db_config.get("args", {}).items()
- if not k.startswith("cp_")
- }
- db_conn = self.database_engine.module.connect(**db_params)
-
- if run_new_connection:
- self.database_engine.on_new_connection(db_conn)
- return db_conn
-
-
-def quit_with_error(error_string):
- message_lines = error_string.split("\n")
- line_length = max([len(l) for l in message_lines if len(l) < 80]) + 2
- sys.stderr.write("*" * line_length + '\n')
- for line in message_lines:
- sys.stderr.write(" %s\n" % (line.rstrip(),))
- sys.stderr.write("*" * line_length + '\n')
- sys.exit(1)
-
def setup(config_options):
"""
@@ -300,11 +327,6 @@ def setup(config_options):
# check any extra requirements we have now we have a config
check_requirements(config)
- version_string = "Synapse/" + get_version_string(synapse)
-
- logger.info("Server hostname: %s", config.server_name)
- logger.info("Server version: %s", version_string)
-
events.USE_FROZEN_DICTS = config.use_frozen_dicts
tls_server_context_factory = context_factory.ServerContextFactory(config)
@@ -317,7 +339,7 @@ def setup(config_options):
db_config=config.database_config,
tls_server_context_factory=tls_server_context_factory,
config=config,
- version_string=version_string,
+ version_string="Synapse/" + get_version_string(synapse),
database_engine=database_engine,
)
@@ -349,9 +371,7 @@ def setup(config_options):
hs.get_state_handler().start_caching()
hs.get_datastore().start_profiling()
hs.get_datastore().start_doing_background_updates()
- hs.get_replication_layer().start_get_pdu_cache()
-
- register_memory_metrics(hs)
+ hs.get_federation_client().start_get_pdu_cache()
reactor.callWhenRunning(start)
@@ -403,6 +423,10 @@ def run(hs):
stats = {}
+ # Contains the list of processes we will be monitoring
+ # currently either 0 or 1
+ stats_process = []
+
@defer.inlineCallbacks
def phone_stats_home():
logger.info("Gathering stats for reporting")
@@ -419,6 +443,10 @@ def run(hs):
total_nonbridged_users = yield hs.get_datastore().count_nonbridged_users()
stats["total_nonbridged_users"] = total_nonbridged_users
+ daily_user_type_results = yield hs.get_datastore().count_daily_user_type()
+ for name, count in iteritems(daily_user_type_results):
+ stats["daily_user_type_" + name] = count
+
room_count = yield hs.get_datastore().get_room_count()
stats["total_room_count"] = room_count
@@ -426,8 +454,21 @@ def run(hs):
stats["daily_active_rooms"] = yield hs.get_datastore().count_daily_active_rooms()
stats["daily_messages"] = yield hs.get_datastore().count_daily_messages()
+ r30_results = yield hs.get_datastore().count_r30_users()
+ for name, count in iteritems(r30_results):
+ stats["r30_users_" + name] = count
+
daily_sent_messages = yield hs.get_datastore().count_daily_sent_messages()
stats["daily_sent_messages"] = daily_sent_messages
+ stats["cache_factor"] = CACHE_SIZE_FACTOR
+ stats["event_cache_size"] = hs.config.event_cache_size
+
+ if len(stats_process) > 0:
+ stats["memory_rss"] = 0
+ stats["cpu_average"] = 0
+ for process in stats_process:
+ stats["memory_rss"] += process.memory_info().rss
+ stats["cpu_average"] += int(process.cpu_percent(interval=None))
logger.info("Reporting stats to matrix.org: %s" % (stats,))
try:
@@ -438,45 +479,56 @@ def run(hs):
except Exception as e:
logger.warn("Error reporting stats: %s", e)
+ def performance_stats_init():
+ try:
+ import psutil
+ process = psutil.Process()
+ # Ensure we can fetch both, and make the initial request for cpu_percent
+ # so the next request will use this as the initial point.
+ process.memory_info().rss
+ process.cpu_percent(interval=None)
+ logger.info("report_stats can use psutil")
+ stats_process.append(process)
+ except (ImportError, AttributeError):
+ logger.warn(
+ "report_stats enabled but psutil is not installed or incorrect version."
+ " Disabling reporting of memory/cpu stats."
+ " Ensuring psutil is available will help matrix.org track performance"
+ " changes across releases."
+ )
+
+ def generate_user_daily_visit_stats():
+ hs.get_datastore().generate_user_daily_visits()
+
+ # Rather than update on per session basis, batch up the requests.
+ # If you increase the loop period, the accuracy of user_daily_visits
+ # table will decrease
+ clock.looping_call(generate_user_daily_visit_stats, 5 * 60 * 1000)
+
if hs.config.report_stats:
logger.info("Scheduling stats reporting for 3 hour intervals")
clock.looping_call(phone_stats_home, 3 * 60 * 60 * 1000)
+ # We need to defer this init for the cases that we daemonize
+ # otherwise the process ID we get is that of the non-daemon process
+ clock.call_later(0, performance_stats_init)
+
# We wait 5 minutes to send the first set of stats as the server can
# be quite busy the first few minutes
clock.call_later(5 * 60, phone_stats_home)
- def in_thread():
- # Uncomment to enable tracing of log context changes.
- # sys.settrace(logcontext_tracer)
-
- # make sure that we run the reactor with the sentinel log context,
- # otherwise other PreserveLoggingContext instances will get confused
- # and complain when they see the logcontext arbitrarily swapping
- # between the sentinel and `run` logcontexts.
- with PreserveLoggingContext():
- change_resource_limit(hs.config.soft_file_limit)
- if hs.config.gc_thresholds:
- gc.set_threshold(*hs.config.gc_thresholds)
- reactor.run()
-
- if hs.config.daemonize:
-
- if hs.config.print_pidfile:
- print (hs.config.pid_file)
-
- daemon = Daemonize(
- app="synapse-homeserver",
- pid=hs.config.pid_file,
- action=lambda: in_thread(),
- auto_close_fds=False,
- verbose=True,
- logger=logger,
- )
-
- daemon.start()
- else:
- in_thread()
+ if hs.config.daemonize and hs.config.print_pidfile:
+ print (hs.config.pid_file)
+
+ _base.start_reactor(
+ "synapse-homeserver",
+ hs.config.soft_file_limit,
+ hs.config.gc_thresholds,
+ hs.config.pid_file,
+ hs.config.daemonize,
+ hs.config.cpu_affinity,
+ logger,
+ )
def main():
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index f57ec784fe..749bbf37d0 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -13,14 +13,23 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
+import sys
-import synapse
+from twisted.internet import reactor
+from twisted.web.resource import NoResource
+import synapse
+from synapse import events
+from synapse.api.urls import CONTENT_REPO_PREFIX, LEGACY_MEDIA_PREFIX, MEDIA_PREFIX
+from synapse.app import _base
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
+from synapse.crypto import context_factory
from synapse.http.site import SynapseSite
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.metrics import RegistryProxy
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
@@ -28,31 +37,13 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.media.v0.content_repository import ContentRepoResource
-from synapse.rest.media.v1.media_repository import MediaRepositoryResource
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.storage.media_repository import MediaRepositoryStore
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.util.logcontext import LoggingContext
from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string
-from synapse.api.urls import (
- CONTENT_REPO_PREFIX, LEGACY_MEDIA_PREFIX, MEDIA_PREFIX
-)
-from synapse.crypto import context_factory
-
-from synapse import events
-
-
-from twisted.internet import reactor
-from twisted.web.resource import Resource
-
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
logger = logging.getLogger("synapse.app.media_repository")
@@ -69,19 +60,6 @@ class MediaRepositorySlavedStore(
class MediaRepositoryServer(HomeServer):
- def get_db_conn(self, run_new_connection=True):
- # Any param beginning with cp_ is a parameter for adbapi, and should
- # not be passed to the database engine.
- db_params = {
- k: v for k, v in self.db_config.get("args", {}).items()
- if not k.startswith("cp_")
- }
- db_conn = self.database_engine.module.connect(**db_params)
-
- if run_new_connection:
- self.database_engine.on_new_connection(db_conn)
- return db_conn
-
def setup(self):
logger.info("Setting up.")
self.datastore = MediaRepositorySlavedStore(self.get_db_conn(), self)
@@ -95,9 +73,9 @@ class MediaRepositoryServer(HomeServer):
for res in listener_config["resources"]:
for name in res["names"]:
if name == "metrics":
- resources[METRICS_PREFIX] = MetricsResource(self)
+ resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
elif name == "media":
- media_repo = MediaRepositoryResource(self)
+ media_repo = self.get_media_repository_resource()
resources.update({
MEDIA_PREFIX: media_repo,
LEGACY_MEDIA_PREFIX: media_repo,
@@ -106,19 +84,19 @@ class MediaRepositoryServer(HomeServer):
),
})
- root_resource = create_resource_tree(resources, Resource())
-
- for address in bind_addresses:
- reactor.listenTCP(
- port,
- SynapseSite(
- "synapse.access.http.%s" % (site_tag,),
- site_tag,
- listener_config,
- root_resource,
- ),
- interface=address
+ root_resource = create_resource_tree(resources, NoResource())
+
+ _base.listen_tcp(
+ bind_addresses,
+ port,
+ SynapseSite(
+ "synapse.access.http.%s" % (site_tag,),
+ site_tag,
+ listener_config,
+ root_resource,
+ self.version_string,
)
+ )
logger.info("Synapse media repository now listening on port %d", port)
@@ -127,18 +105,22 @@ class MediaRepositoryServer(HomeServer):
if listener["type"] == "http":
self._listen_http(listener)
elif listener["type"] == "manhole":
- bind_addresses = listener["bind_addresses"]
-
- for address in bind_addresses:
- reactor.listenTCP(
- listener["port"],
- manhole(
- username="matrix",
- password="rabbithole",
- globals={"hs": self},
- ),
- interface=address
+ _base.listen_tcp(
+ listener["bind_addresses"],
+ listener["port"],
+ manhole(
+ username="matrix",
+ password="rabbithole",
+ globals={"hs": self},
)
+ )
+ elif listener["type"] == "metrics":
+ if not self.get_config().enable_metrics:
+ logger.warn(("Metrics listener configured, but "
+ "enable_metrics is not True!"))
+ else:
+ _base.listen_metrics(listener["bind_addresses"],
+ listener["port"])
else:
logger.warn("Unrecognized listener type: %s", listener["type"])
@@ -159,6 +141,13 @@ def start(config_options):
assert config.worker_app == "synapse.app.media_repository"
+ if config.enable_media_repo:
+ _base.quit_with_error(
+ "enable_media_repo must be disabled in the main synapse process\n"
+ "before the media repo can be run in a separate worker.\n"
+ "Please add ``enable_media_repo: false`` to the main config\n"
+ )
+
setup_logging(config, use_worker_options=True)
events.USE_FROZEN_DICTS = config.use_frozen_dicts
@@ -177,39 +166,15 @@ def start(config_options):
)
ss.setup()
- ss.get_handlers()
ss.start_listening(config.worker_listeners)
- def run():
- # make sure that we run the reactor with the sentinel log context,
- # otherwise other PreserveLoggingContext instances will get confused
- # and complain when they see the logcontext arbitrarily swapping
- # between the sentinel and `run` logcontexts.
- with PreserveLoggingContext():
- logger.info("Running")
- change_resource_limit(config.soft_file_limit)
- if config.gc_thresholds:
- gc.set_threshold(*config.gc_thresholds)
- reactor.run()
-
def start():
ss.get_state_handler().start_caching()
ss.get_datastore().start_profiling()
reactor.callWhenRunning(start)
- if config.worker_daemonize:
- daemon = Daemonize(
- app="synapse-media-repository",
- pid=config.worker_pid_file,
- action=run,
- auto_close_fds=False,
- verbose=True,
- logger=logger,
- )
- daemon.start()
- else:
- run()
+ _base.start_worker_reactor("synapse-media-repository", config)
if __name__ == '__main__':
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index f9114acfcb..9295a51d5b 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -13,41 +13,34 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
+import sys
-import synapse
+from twisted.internet import defer, reactor
+from twisted.web.resource import NoResource
-from synapse.server import HomeServer
+import synapse
+from synapse import events
+from synapse.app import _base
from synapse.config._base import ConfigError
-from synapse.config.logger import setup_logging
from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
from synapse.http.site import SynapseSite
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
-from synapse.storage.roommember import RoomMemberStore
+from synapse.metrics import RegistryProxy
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.pushers import SlavedPusherStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
-from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.tcp.client import ReplicationClientHandler
-from synapse.storage.engines import create_engine
+from synapse.server import HomeServer
from synapse.storage import DataStore
+from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, preserve_fn, \
- PreserveLoggingContext
+from synapse.util.logcontext import LoggingContext, run_in_background
from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string
-from synapse import events
-
-from twisted.internet import reactor, defer
-from twisted.web.resource import Resource
-
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
-
logger = logging.getLogger("synapse.app.pusher")
@@ -83,25 +76,8 @@ class PusherSlaveStore(
DataStore.get_profile_displayname.__func__
)
- who_forgot_in_room = (
- RoomMemberStore.__dict__["who_forgot_in_room"]
- )
-
class PusherServer(HomeServer):
- def get_db_conn(self, run_new_connection=True):
- # Any param beginning with cp_ is a parameter for adbapi, and should
- # not be passed to the database engine.
- db_params = {
- k: v for k, v in self.db_config.get("args", {}).items()
- if not k.startswith("cp_")
- }
- db_conn = self.database_engine.module.connect(**db_params)
-
- if run_new_connection:
- self.database_engine.on_new_connection(db_conn)
- return db_conn
-
def setup(self):
logger.info("Setting up.")
self.datastore = PusherSlaveStore(self.get_db_conn(), self)
@@ -118,21 +94,21 @@ class PusherServer(HomeServer):
for res in listener_config["resources"]:
for name in res["names"]:
if name == "metrics":
- resources[METRICS_PREFIX] = MetricsResource(self)
-
- root_resource = create_resource_tree(resources, Resource())
-
- for address in bind_addresses:
- reactor.listenTCP(
- port,
- SynapseSite(
- "synapse.access.http.%s" % (site_tag,),
- site_tag,
- listener_config,
- root_resource,
- ),
- interface=address
+ resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
+
+ root_resource = create_resource_tree(resources, NoResource())
+
+ _base.listen_tcp(
+ bind_addresses,
+ port,
+ SynapseSite(
+ "synapse.access.http.%s" % (site_tag,),
+ site_tag,
+ listener_config,
+ root_resource,
+ self.version_string,
)
+ )
logger.info("Synapse pusher now listening on port %d", port)
@@ -141,18 +117,22 @@ class PusherServer(HomeServer):
if listener["type"] == "http":
self._listen_http(listener)
elif listener["type"] == "manhole":
- bind_addresses = listener["bind_addresses"]
-
- for address in bind_addresses:
- reactor.listenTCP(
- listener["port"],
- manhole(
- username="matrix",
- password="rabbithole",
- globals={"hs": self},
- ),
- interface=address
+ _base.listen_tcp(
+ listener["bind_addresses"],
+ listener["port"],
+ manhole(
+ username="matrix",
+ password="rabbithole",
+ globals={"hs": self},
)
+ )
+ elif listener["type"] == "metrics":
+ if not self.get_config().enable_metrics:
+ logger.warn(("Metrics listener configured, but "
+ "enable_metrics is not True!"))
+ else:
+ _base.listen_metrics(listener["bind_addresses"],
+ listener["port"])
else:
logger.warn("Unrecognized listener type: %s", listener["type"])
@@ -170,24 +150,27 @@ class PusherReplicationHandler(ReplicationClientHandler):
def on_rdata(self, stream_name, token, rows):
super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
- preserve_fn(self.poke_pushers)(stream_name, token, rows)
+ run_in_background(self.poke_pushers, stream_name, token, rows)
@defer.inlineCallbacks
def poke_pushers(self, stream_name, token, rows):
- if stream_name == "pushers":
- for row in rows:
- if row.deleted:
- yield self.stop_pusher(row.user_id, row.app_id, row.pushkey)
- else:
- yield self.start_pusher(row.user_id, row.app_id, row.pushkey)
- elif stream_name == "events":
- yield self.pusher_pool.on_new_notifications(
- token, token,
- )
- elif stream_name == "receipts":
- yield self.pusher_pool.on_new_receipts(
- token, token, set(row.room_id for row in rows)
- )
+ try:
+ if stream_name == "pushers":
+ for row in rows:
+ if row.deleted:
+ yield self.stop_pusher(row.user_id, row.app_id, row.pushkey)
+ else:
+ yield self.start_pusher(row.user_id, row.app_id, row.pushkey)
+ elif stream_name == "events":
+ yield self.pusher_pool.on_new_notifications(
+ token, token,
+ )
+ elif stream_name == "receipts":
+ yield self.pusher_pool.on_new_receipts(
+ token, token, set(row.room_id for row in rows)
+ )
+ except Exception:
+ logger.exception("Error poking pushers")
def stop_pusher(self, user_id, app_id, pushkey):
key = "%s:%s" % (app_id, pushkey)
@@ -244,18 +227,6 @@ def start(config_options):
ps.setup()
ps.start_listening(config.worker_listeners)
- def run():
- # make sure that we run the reactor with the sentinel log context,
- # otherwise other PreserveLoggingContext instances will get confused
- # and complain when they see the logcontext arbitrarily swapping
- # between the sentinel and `run` logcontexts.
- with PreserveLoggingContext():
- logger.info("Running")
- change_resource_limit(config.soft_file_limit)
- if config.gc_thresholds:
- gc.set_threshold(*config.gc_thresholds)
- reactor.run()
-
def start():
ps.get_pusherpool().start()
ps.get_datastore().start_profiling()
@@ -263,18 +234,7 @@ def start(config_options):
reactor.callWhenRunning(start)
- if config.worker_daemonize:
- daemon = Daemonize(
- app="synapse-pusher",
- pid=config.worker_pid_file,
- action=run,
- auto_close_fds=False,
- verbose=True,
- logger=logger,
- )
- daemon.start()
- else:
- run()
+ _base.start_worker_reactor("synapse-pusher", config)
if __name__ == '__main__':
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 4bdd99a966..26b9ec85f2 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -13,78 +13,74 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import contextlib
+import logging
+import sys
-import synapse
+from six import iteritems
+
+from twisted.internet import defer, reactor
+from twisted.web.resource import NoResource
+import synapse
from synapse.api.constants import EventTypes
+from synapse.app import _base
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
from synapse.handlers.presence import PresenceHandler, get_interested_parties
-from synapse.http.site import SynapseSite
from synapse.http.server import JsonResource
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
-from synapse.rest.client.v2_alpha import sync
-from synapse.rest.client.v1 import events
-from synapse.rest.client.v1.room import RoomInitialSyncRestServlet
-from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
+from synapse.http.site import SynapseSite
+from synapse.metrics import RegistryProxy
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
-from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
-from synapse.replication.slave.storage.events import SlavedEventStore
-from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
-from synapse.replication.slave.storage.registration import SlavedRegistrationStore
-from synapse.replication.slave.storage.filtering import SlavedFilteringStore
-from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
-from synapse.replication.slave.storage.presence import SlavedPresenceStore
+from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
+from synapse.replication.slave.storage.events import SlavedEventStore
+from synapse.replication.slave.storage.filtering import SlavedFilteringStore
+from synapse.replication.slave.storage.groups import SlavedGroupServerStore
+from synapse.replication.slave.storage.presence import SlavedPresenceStore
+from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
+from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
+from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.rest.client.v1 import events
+from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
+from synapse.rest.client.v1.room import RoomInitialSyncRestServlet
+from synapse.rest.client.v2_alpha import sync
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.storage.presence import UserPresenceState
from synapse.storage.roommember import RoomMemberStore
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, run_in_background
from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
from synapse.util.stringutils import random_string
from synapse.util.versionstring import get_version_string
-from twisted.internet import reactor, defer
-from twisted.web.resource import Resource
-
-from daemonize import Daemonize
-
-import sys
-import logging
-import contextlib
-import gc
-
logger = logging.getLogger("synapse.app.synchrotron")
class SynchrotronSlavedStore(
- SlavedPushRuleStore,
- SlavedEventStore,
SlavedReceiptsStore,
SlavedAccountDataStore,
SlavedApplicationServiceStore,
SlavedRegistrationStore,
SlavedFilteringStore,
SlavedPresenceStore,
+ SlavedGroupServerStore,
SlavedDeviceInboxStore,
SlavedDeviceStore,
+ SlavedPushRuleStore,
+ SlavedEventStore,
SlavedClientIpStore,
RoomStore,
BaseSlavedStore,
):
- who_forgot_in_room = (
- RoomMemberStore.__dict__["who_forgot_in_room"]
- )
-
did_forget = (
RoomMemberStore.__dict__["did_forget"]
)
@@ -219,7 +215,7 @@ class SynchrotronPresence(object):
def get_currently_syncing_users(self):
return [
- user_id for user_id, count in self.user_to_num_current_syncs.iteritems()
+ user_id for user_id, count in iteritems(self.user_to_num_current_syncs)
if count > 0
]
@@ -250,19 +246,6 @@ class SynchrotronApplicationService(object):
class SynchrotronServer(HomeServer):
- def get_db_conn(self, run_new_connection=True):
- # Any param beginning with cp_ is a parameter for adbapi, and should
- # not be passed to the database engine.
- db_params = {
- k: v for k, v in self.db_config.get("args", {}).items()
- if not k.startswith("cp_")
- }
- db_conn = self.database_engine.module.connect(**db_params)
-
- if run_new_connection:
- self.database_engine.on_new_connection(db_conn)
- return db_conn
-
def setup(self):
logger.info("Setting up.")
self.datastore = SynchrotronSlavedStore(self.get_db_conn(), self)
@@ -276,7 +259,7 @@ class SynchrotronServer(HomeServer):
for res in listener_config["resources"]:
for name in res["names"]:
if name == "metrics":
- resources[METRICS_PREFIX] = MetricsResource(self)
+ resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
elif name == "client":
resource = JsonResource(self, canonical_json=False)
sync.register_servlets(self, resource)
@@ -290,19 +273,19 @@ class SynchrotronServer(HomeServer):
"/_matrix/client/api/v1": resource,
})
- root_resource = create_resource_tree(resources, Resource())
-
- for address in bind_addresses:
- reactor.listenTCP(
- port,
- SynapseSite(
- "synapse.access.http.%s" % (site_tag,),
- site_tag,
- listener_config,
- root_resource,
- ),
- interface=address
+ root_resource = create_resource_tree(resources, NoResource())
+
+ _base.listen_tcp(
+ bind_addresses,
+ port,
+ SynapseSite(
+ "synapse.access.http.%s" % (site_tag,),
+ site_tag,
+ listener_config,
+ root_resource,
+ self.version_string,
)
+ )
logger.info("Synapse synchrotron now listening on port %d", port)
@@ -311,18 +294,22 @@ class SynchrotronServer(HomeServer):
if listener["type"] == "http":
self._listen_http(listener)
elif listener["type"] == "manhole":
- bind_addresses = listener["bind_addresses"]
-
- for address in bind_addresses:
- reactor.listenTCP(
- listener["port"],
- manhole(
- username="matrix",
- password="rabbithole",
- globals={"hs": self},
- ),
- interface=address
+ _base.listen_tcp(
+ listener["bind_addresses"],
+ listener["port"],
+ manhole(
+ username="matrix",
+ password="rabbithole",
+ globals={"hs": self},
)
+ )
+ elif listener["type"] == "metrics":
+ if not self.get_config().enable_metrics:
+ logger.warn(("Metrics listener configured, but "
+ "enable_metrics is not True!"))
+ else:
+ _base.listen_metrics(listener["bind_addresses"],
+ listener["port"])
else:
logger.warn("Unrecognized listener type: %s", listener["type"])
@@ -344,15 +331,13 @@ class SyncReplicationHandler(ReplicationClientHandler):
self.store = hs.get_datastore()
self.typing_handler = hs.get_typing_handler()
+ # NB this is a SynchrotronPresence, not a normal PresenceHandler
self.presence_handler = hs.get_presence_handler()
self.notifier = hs.get_notifier()
- self.presence_handler.sync_callback = self.send_user_sync
-
def on_rdata(self, stream_name, token, rows):
super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
-
- preserve_fn(self.process_and_notify)(stream_name, token, rows)
+ run_in_background(self.process_and_notify, stream_name, token, rows)
def get_streams_to_replicate(self):
args = super(SyncReplicationHandler, self).get_streams_to_replicate()
@@ -364,51 +349,58 @@ class SyncReplicationHandler(ReplicationClientHandler):
@defer.inlineCallbacks
def process_and_notify(self, stream_name, token, rows):
- if stream_name == "events":
- # We shouldn't get multiple rows per token for events stream, so
- # we don't need to optimise this for multiple rows.
- for row in rows:
- event = yield self.store.get_event(row.event_id)
- extra_users = ()
- if event.type == EventTypes.Member:
- extra_users = (event.state_key,)
- max_token = self.store.get_room_max_stream_ordering()
- self.notifier.on_new_room_event(
- event, token, max_token, extra_users
+ try:
+ if stream_name == "events":
+ # We shouldn't get multiple rows per token for events stream, so
+ # we don't need to optimise this for multiple rows.
+ for row in rows:
+ event = yield self.store.get_event(row.event_id)
+ extra_users = ()
+ if event.type == EventTypes.Member:
+ extra_users = (event.state_key,)
+ max_token = self.store.get_room_max_stream_ordering()
+ self.notifier.on_new_room_event(
+ event, token, max_token, extra_users
+ )
+ elif stream_name == "push_rules":
+ self.notifier.on_new_event(
+ "push_rules_key", token, users=[row.user_id for row in rows],
)
- elif stream_name == "push_rules":
- self.notifier.on_new_event(
- "push_rules_key", token, users=[row.user_id for row in rows],
- )
- elif stream_name in ("account_data", "tag_account_data",):
- self.notifier.on_new_event(
- "account_data_key", token, users=[row.user_id for row in rows],
- )
- elif stream_name == "receipts":
- self.notifier.on_new_event(
- "receipt_key", token, rooms=[row.room_id for row in rows],
- )
- elif stream_name == "typing":
- self.typing_handler.process_replication_rows(token, rows)
- self.notifier.on_new_event(
- "typing_key", token, rooms=[row.room_id for row in rows],
- )
- elif stream_name == "to_device":
- entities = [row.entity for row in rows if row.entity.startswith("@")]
- if entities:
+ elif stream_name in ("account_data", "tag_account_data",):
self.notifier.on_new_event(
- "to_device_key", token, users=entities,
+ "account_data_key", token, users=[row.user_id for row in rows],
)
- elif stream_name == "device_lists":
- all_room_ids = set()
- for row in rows:
- room_ids = yield self.store.get_rooms_for_user(row.user_id)
- all_room_ids.update(room_ids)
- self.notifier.on_new_event(
- "device_list_key", token, rooms=all_room_ids,
- )
- elif stream_name == "presence":
- yield self.presence_handler.process_replication_rows(token, rows)
+ elif stream_name == "receipts":
+ self.notifier.on_new_event(
+ "receipt_key", token, rooms=[row.room_id for row in rows],
+ )
+ elif stream_name == "typing":
+ self.typing_handler.process_replication_rows(token, rows)
+ self.notifier.on_new_event(
+ "typing_key", token, rooms=[row.room_id for row in rows],
+ )
+ elif stream_name == "to_device":
+ entities = [row.entity for row in rows if row.entity.startswith("@")]
+ if entities:
+ self.notifier.on_new_event(
+ "to_device_key", token, users=entities,
+ )
+ elif stream_name == "device_lists":
+ all_room_ids = set()
+ for row in rows:
+ room_ids = yield self.store.get_rooms_for_user(row.user_id)
+ all_room_ids.update(room_ids)
+ self.notifier.on_new_event(
+ "device_list_key", token, rooms=all_room_ids,
+ )
+ elif stream_name == "presence":
+ yield self.presence_handler.process_replication_rows(token, rows)
+ elif stream_name == "receipts":
+ self.notifier.on_new_event(
+ "groups_key", token, users=[row.user_id for row in rows],
+ )
+ except Exception:
+ logger.exception("Error processing replication")
def start(config_options):
@@ -440,36 +432,13 @@ def start(config_options):
ss.setup()
ss.start_listening(config.worker_listeners)
- def run():
- # make sure that we run the reactor with the sentinel log context,
- # otherwise other PreserveLoggingContext instances will get confused
- # and complain when they see the logcontext arbitrarily swapping
- # between the sentinel and `run` logcontexts.
- with PreserveLoggingContext():
- logger.info("Running")
- change_resource_limit(config.soft_file_limit)
- if config.gc_thresholds:
- gc.set_threshold(*config.gc_thresholds)
- reactor.run()
-
def start():
ss.get_datastore().start_profiling()
ss.get_state_handler().start_caching()
reactor.callWhenRunning(start)
- if config.worker_daemonize:
- daemon = Daemonize(
- app="synapse-synchrotron",
- pid=config.worker_pid_file,
- action=run,
- auto_close_fds=False,
- verbose=True,
- logger=logger,
- )
- daemon.start()
- else:
- run()
+ _base.start_worker_reactor("synapse-synchrotron", config)
if __name__ == '__main__':
diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py
index 3bd7ef7bba..d658f967ba 100755
--- a/synapse/app/synctl.py
+++ b/synapse/app/synctl.py
@@ -16,16 +16,19 @@
import argparse
import collections
+import errno
import glob
import os
import os.path
import signal
import subprocess
import sys
-import yaml
-import errno
import time
+from six import iteritems
+
+import yaml
+
SYNAPSE = [sys.executable, "-B", "-m", "synapse.app.homeserver"]
GREEN = "\x1b[1;32m"
@@ -38,7 +41,7 @@ def pid_running(pid):
try:
os.kill(pid, 0)
return True
- except OSError, err:
+ except OSError as err:
if err.errno == errno.EPERM:
return True
return False
@@ -98,7 +101,7 @@ def stop(pidfile, app):
try:
os.kill(pid, signal.SIGTERM)
write("stopped %s" % (app,), colour=GREEN)
- except OSError, err:
+ except OSError as err:
if err.errno == errno.ESRCH:
write("%s not running" % (app,), colour=YELLOW)
elif err.errno == errno.EPERM:
@@ -171,6 +174,10 @@ def main():
if cache_factor:
os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
+ cache_factors = config.get("synctl_cache_factors", {})
+ for cache_name, factor in iteritems(cache_factors):
+ os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
+
worker_configfiles = []
if options.worker:
start_stop_synapse = False
@@ -184,6 +191,9 @@ def main():
worker_configfiles.append(worker_configfile)
if options.all_processes:
+ # To start the main synapse with -a you need to add a worker file
+ # with worker_app == "synapse.app.homeserver"
+ start_stop_synapse = False
worker_configdir = options.all_processes
if not os.path.isdir(worker_configdir):
write(
@@ -200,11 +210,29 @@ def main():
with open(worker_configfile) as stream:
worker_config = yaml.load(stream)
worker_app = worker_config["worker_app"]
- worker_pidfile = worker_config["worker_pid_file"]
- worker_daemonize = worker_config["worker_daemonize"]
- assert worker_daemonize, "In config %r: expected '%s' to be True" % (
- worker_configfile, "worker_daemonize")
- worker_cache_factor = worker_config.get("synctl_cache_factor")
+ if worker_app == "synapse.app.homeserver":
+ # We need to special case all of this to pick up options that may
+ # be set in the main config file or in this worker config file.
+ worker_pidfile = (
+ worker_config.get("pid_file")
+ or pidfile
+ )
+ worker_cache_factor = worker_config.get("synctl_cache_factor") or cache_factor
+ daemonize = worker_config.get("daemonize") or config.get("daemonize")
+ assert daemonize, "Main process must have daemonize set to true"
+
+ # The master process doesn't support using worker_* config.
+ for key in worker_config:
+ if key == "worker_app": # But we allow worker_app
+ continue
+ assert not key.startswith("worker_"), \
+ "Main process cannot use worker_* config"
+ else:
+ worker_pidfile = worker_config["worker_pid_file"]
+ worker_daemonize = worker_config["worker_daemonize"]
+ assert worker_daemonize, "In config %r: expected '%s' to be True" % (
+ worker_configfile, "worker_daemonize")
+ worker_cache_factor = worker_config.get("synctl_cache_factor")
workers.append(Worker(
worker_app, worker_configfile, worker_pidfile, worker_cache_factor,
))
@@ -231,6 +259,7 @@ def main():
for running_pid in running_pids:
while pid_running(running_pid):
time.sleep(0.2)
+ write("All processes exited; now restarting...")
if action == "start" or action == "restart":
if start_stop_synapse:
diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py
index 8c6300db9d..637a89530a 100644
--- a/synapse/app/user_dir.py
+++ b/synapse/app/user_dir.py
@@ -14,16 +14,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import synapse
+import logging
+import sys
-from synapse.server import HomeServer
+from twisted.internet import defer, reactor
+from twisted.web.resource import NoResource
+
+import synapse
+from synapse import events
+from synapse.app import _base
from synapse.config._base import ConfigError
-from synapse.config.logger import setup_logging
from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
from synapse.crypto import context_factory
-from synapse.http.site import SynapseSite
from synapse.http.server import JsonResource
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.http.site import SynapseSite
+from synapse.metrics import RegistryProxy
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
@@ -31,25 +38,14 @@ from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.client.v2_alpha import user_directory
+from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.storage.user_directory import UserDirectoryStore
+from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, run_in_background
from synapse.util.manhole import manhole
-from synapse.util.rlimit import change_resource_limit
from synapse.util.versionstring import get_version_string
-from synapse.util.caches.stream_change_cache import StreamChangeCache
-
-from synapse import events
-
-from twisted.internet import reactor
-from twisted.web.resource import Resource
-
-from daemonize import Daemonize
-
-import sys
-import logging
-import gc
logger = logging.getLogger("synapse.app.user_dir")
@@ -98,19 +94,6 @@ class UserDirectorySlaveStore(
class UserDirectoryServer(HomeServer):
- def get_db_conn(self, run_new_connection=True):
- # Any param beginning with cp_ is a parameter for adbapi, and should
- # not be passed to the database engine.
- db_params = {
- k: v for k, v in self.db_config.get("args", {}).items()
- if not k.startswith("cp_")
- }
- db_conn = self.database_engine.module.connect(**db_params)
-
- if run_new_connection:
- self.database_engine.on_new_connection(db_conn)
- return db_conn
-
def setup(self):
logger.info("Setting up.")
self.datastore = UserDirectorySlaveStore(self.get_db_conn(), self)
@@ -124,7 +107,7 @@ class UserDirectoryServer(HomeServer):
for res in listener_config["resources"]:
for name in res["names"]:
if name == "metrics":
- resources[METRICS_PREFIX] = MetricsResource(self)
+ resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
elif name == "client":
resource = JsonResource(self, canonical_json=False)
user_directory.register_servlets(self, resource)
@@ -135,19 +118,19 @@ class UserDirectoryServer(HomeServer):
"/_matrix/client/api/v1": resource,
})
- root_resource = create_resource_tree(resources, Resource())
-
- for address in bind_addresses:
- reactor.listenTCP(
- port,
- SynapseSite(
- "synapse.access.http.%s" % (site_tag,),
- site_tag,
- listener_config,
- root_resource,
- ),
- interface=address
+ root_resource = create_resource_tree(resources, NoResource())
+
+ _base.listen_tcp(
+ bind_addresses,
+ port,
+ SynapseSite(
+ "synapse.access.http.%s" % (site_tag,),
+ site_tag,
+ listener_config,
+ root_resource,
+ self.version_string,
)
+ )
logger.info("Synapse user_dir now listening on port %d", port)
@@ -156,18 +139,22 @@ class UserDirectoryServer(HomeServer):
if listener["type"] == "http":
self._listen_http(listener)
elif listener["type"] == "manhole":
- bind_addresses = listener["bind_addresses"]
-
- for address in bind_addresses:
- reactor.listenTCP(
- listener["port"],
- manhole(
- username="matrix",
- password="rabbithole",
- globals={"hs": self},
- ),
- interface=address
+ _base.listen_tcp(
+ listener["bind_addresses"],
+ listener["port"],
+ manhole(
+ username="matrix",
+ password="rabbithole",
+ globals={"hs": self},
)
+ )
+ elif listener["type"] == "metrics":
+ if not self.get_config().enable_metrics:
+ logger.warn(("Metrics listener configured, but "
+ "enable_metrics is not True!"))
+ else:
+ _base.listen_metrics(listener["bind_addresses"],
+ listener["port"])
else:
logger.warn("Unrecognized listener type: %s", listener["type"])
@@ -187,7 +174,14 @@ class UserDirectoryReplicationHandler(ReplicationClientHandler):
stream_name, token, rows
)
if stream_name == "current_state_deltas":
- preserve_fn(self.user_directory.notify_new_event)()
+ run_in_background(self._notify_directory)
+
+ @defer.inlineCallbacks
+ def _notify_directory(self):
+ try:
+ yield self.user_directory.notify_new_event()
+ except Exception:
+ logger.exception("Error notifiying user directory of state update")
def start(config_options):
@@ -233,36 +227,13 @@ def start(config_options):
ps.setup()
ps.start_listening(config.worker_listeners)
- def run():
- # make sure that we run the reactor with the sentinel log context,
- # otherwise other PreserveLoggingContext instances will get confused
- # and complain when they see the logcontext arbitrarily swapping
- # between the sentinel and `run` logcontexts.
- with PreserveLoggingContext():
- logger.info("Running")
- change_resource_limit(config.soft_file_limit)
- if config.gc_thresholds:
- gc.set_threshold(*config.gc_thresholds)
- reactor.run()
-
def start():
ps.get_datastore().start_profiling()
ps.get_state_handler().start_caching()
reactor.callWhenRunning(start)
- if config.worker_daemonize:
- daemon = Daemonize(
- app="synapse-user-dir",
- pid=config.worker_pid_file,
- action=run,
- auto_close_fds=False,
- verbose=True,
- logger=logger,
- )
- daemon.start()
- else:
- run()
+ _base.start_worker_reactor("synapse-user-dir", config)
if __name__ == '__main__':
|