diff options
Diffstat (limited to 'synapse/app')
-rw-r--r-- | synapse/app/__init__.py | 4 | ||||
-rw-r--r-- | synapse/app/_base.py | 194 | ||||
-rw-r--r-- | synapse/app/appservice.py | 134 | ||||
-rw-r--r-- | synapse/app/client_reader.py | 139 | ||||
-rw-r--r-- | synapse/app/event_creator.py | 201 | ||||
-rw-r--r-- | synapse/app/federation_reader.py | 125 | ||||
-rw-r--r-- | synapse/app/federation_sender.py | 157 | ||||
-rw-r--r-- | synapse/app/frontend_proxy.py | 235 | ||||
-rwxr-xr-x | synapse/app/homeserver.py | 416 | ||||
-rw-r--r-- | synapse/app/media_repository.py | 133 | ||||
-rw-r--r-- | synapse/app/pusher.py | 164 | ||||
-rw-r--r-- | synapse/app/synchrotron.py | 249 | ||||
-rwxr-xr-x | synapse/app/synctl.py | 47 | ||||
-rw-r--r-- | synapse/app/user_dir.py | 133 |
14 files changed, 1388 insertions, 943 deletions
diff --git a/synapse/app/__init__.py b/synapse/app/__init__.py index 9c2b627590..3b6b9368b8 100644 --- a/synapse/app/__init__.py +++ b/synapse/app/__init__.py @@ -14,9 +14,11 @@ # limitations under the License. import sys + +from synapse import python_dependencies # noqa: E402 + sys.dont_write_bytecode = True -from synapse import python_dependencies # noqa: E402 try: python_dependencies.check_requirements() diff --git a/synapse/app/_base.py b/synapse/app/_base.py new file mode 100644 index 0000000000..391bd14c5c --- /dev/null +++ b/synapse/app/_base.py @@ -0,0 +1,194 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import gc +import logging +import sys + +from daemonize import Daemonize + +from twisted.internet import error, reactor + +from synapse.util import PreserveLoggingContext +from synapse.util.rlimit import change_resource_limit + +try: + import affinity +except Exception: + affinity = None + + +logger = logging.getLogger(__name__) + + +def start_worker_reactor(appname, config): + """ Run the reactor in the main process + + Daemonizes if necessary, and then configures some resources, before starting + the reactor. Pulls configuration from the 'worker' settings in 'config'. + + Args: + appname (str): application name which will be sent to syslog + config (synapse.config.Config): config object + """ + + logger = logging.getLogger(config.worker_app) + + start_reactor( + appname, + config.soft_file_limit, + config.gc_thresholds, + config.worker_pid_file, + config.worker_daemonize, + config.worker_cpu_affinity, + logger, + ) + + +def start_reactor( + appname, + soft_file_limit, + gc_thresholds, + pid_file, + daemonize, + cpu_affinity, + logger, +): + """ Run the reactor in the main process + + Daemonizes if necessary, and then configures some resources, before starting + the reactor + + Args: + appname (str): application name which will be sent to syslog + soft_file_limit (int): + gc_thresholds: + pid_file (str): name of pid file to write to if daemonize is True + daemonize (bool): true to run the reactor in a background process + cpu_affinity (int|None): cpu affinity mask + logger (logging.Logger): logger instance to pass to Daemonize + """ + + def run(): + # make sure that we run the reactor with the sentinel log context, + # otherwise other PreserveLoggingContext instances will get confused + # and complain when they see the logcontext arbitrarily swapping + # between the sentinel and `run` logcontexts. + with PreserveLoggingContext(): + logger.info("Running") + if cpu_affinity is not None: + if not affinity: + quit_with_error( + "Missing package 'affinity' required for cpu_affinity\n" + "option\n\n" + "Install by running:\n\n" + " pip install affinity\n\n" + ) + logger.info("Setting CPU affinity to %s" % cpu_affinity) + affinity.set_process_affinity_mask(0, cpu_affinity) + change_resource_limit(soft_file_limit) + if gc_thresholds: + gc.set_threshold(*gc_thresholds) + reactor.run() + + if daemonize: + daemon = Daemonize( + app=appname, + pid=pid_file, + action=run, + auto_close_fds=False, + verbose=True, + logger=logger, + ) + daemon.start() + else: + run() + + +def quit_with_error(error_string): + message_lines = error_string.split("\n") + line_length = max([len(l) for l in message_lines if len(l) < 80]) + 2 + sys.stderr.write("*" * line_length + '\n') + for line in message_lines: + sys.stderr.write(" %s\n" % (line.rstrip(),)) + sys.stderr.write("*" * line_length + '\n') + sys.exit(1) + + +def listen_metrics(bind_addresses, port): + """ + Start Prometheus metrics server. + """ + from synapse.metrics import RegistryProxy + from prometheus_client import start_http_server + + for host in bind_addresses: + reactor.callInThread(start_http_server, int(port), + addr=host, registry=RegistryProxy) + logger.info("Metrics now reporting on %s:%d", host, port) + + +def listen_tcp(bind_addresses, port, factory, backlog=50): + """ + Create a TCP socket for a port and several addresses + """ + for address in bind_addresses: + try: + reactor.listenTCP( + port, + factory, + backlog, + address + ) + except error.CannotListenError as e: + check_bind_error(e, address, bind_addresses) + + +def listen_ssl(bind_addresses, port, factory, context_factory, backlog=50): + """ + Create an SSL socket for a port and several addresses + """ + for address in bind_addresses: + try: + reactor.listenSSL( + port, + factory, + context_factory, + backlog, + address + ) + except error.CannotListenError as e: + check_bind_error(e, address, bind_addresses) + + +def check_bind_error(e, address, bind_addresses): + """ + This method checks an exception occurred while binding on 0.0.0.0. + If :: is specified in the bind addresses a warning is shown. + The exception is still raised otherwise. + + Binding on both 0.0.0.0 and :: causes an exception on Linux and macOS + because :: binds on both IPv4 and IPv6 (as per RFC 3493). + When binding on 0.0.0.0 after :: this can safely be ignored. + + Args: + e (Exception): Exception that was caught. + address (str): Address on which binding was attempted. + bind_addresses (list): Addresses on which the service listens. + """ + if address == '0.0.0.0' and '::' in bind_addresses: + logger.warn('Failed to listen on 0.0.0.0, continuing because listening on [::]') + else: + raise e diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py index 9a476efa63..9a37384fb7 100644 --- a/synapse/app/appservice.py +++ b/synapse/app/appservice.py @@ -13,38 +13,33 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging +import sys -import synapse +from twisted.internet import defer, reactor +from twisted.web.resource import NoResource -from synapse.server import HomeServer +import synapse +from synapse import events +from synapse.app import _base from synapse.config._base import ConfigError -from synapse.config.logger import setup_logging from synapse.config.homeserver import HomeServerConfig +from synapse.config.logger import setup_logging from synapse.http.site import SynapseSite -from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.metrics import RegistryProxy +from synapse.metrics.resource import METRICS_PREFIX, MetricsResource +from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.directory import DirectoryStore from synapse.replication.slave.storage.events import SlavedEventStore -from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.tcp.client import ReplicationClientHandler +from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, run_in_background from synapse.util.manhole import manhole -from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string -from synapse import events - -from twisted.internet import reactor -from twisted.web.resource import Resource - -from daemonize import Daemonize - -import sys -import logging -import gc - logger = logging.getLogger("synapse.app.appservice") @@ -56,19 +51,6 @@ class AppserviceSlaveStore( class AppserviceServer(HomeServer): - def get_db_conn(self, run_new_connection=True): - # Any param beginning with cp_ is a parameter for adbapi, and should - # not be passed to the database engine. - db_params = { - k: v for k, v in self.db_config.get("args", {}).items() - if not k.startswith("cp_") - } - db_conn = self.database_engine.module.connect(**db_params) - - if run_new_connection: - self.database_engine.on_new_connection(db_conn) - return db_conn - def setup(self): logger.info("Setting up.") self.datastore = AppserviceSlaveStore(self.get_db_conn(), self) @@ -82,21 +64,21 @@ class AppserviceServer(HomeServer): for res in listener_config["resources"]: for name in res["names"]: if name == "metrics": - resources[METRICS_PREFIX] = MetricsResource(self) - - root_resource = create_resource_tree(resources, Resource()) - - for address in bind_addresses: - reactor.listenTCP( - port, - SynapseSite( - "synapse.access.http.%s" % (site_tag,), - site_tag, - listener_config, - root_resource, - ), - interface=address + resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) + + root_resource = create_resource_tree(resources, NoResource()) + + _base.listen_tcp( + bind_addresses, + port, + SynapseSite( + "synapse.access.http.%s" % (site_tag,), + site_tag, + listener_config, + root_resource, + self.version_string, ) + ) logger.info("Synapse appservice now listening on port %d", port) @@ -105,18 +87,22 @@ class AppserviceServer(HomeServer): if listener["type"] == "http": self._listen_http(listener) elif listener["type"] == "manhole": - bind_addresses = listener["bind_addresses"] - - for address in bind_addresses: - reactor.listenTCP( - listener["port"], - manhole( - username="matrix", - password="rabbithole", - globals={"hs": self}, - ), - interface=address + _base.listen_tcp( + listener["bind_addresses"], + listener["port"], + manhole( + username="matrix", + password="rabbithole", + globals={"hs": self}, ) + ) + elif listener["type"] == "metrics": + if not self.get_config().enable_metrics: + logger.warn(("Metrics listener configured, but " + "enable_metrics is not True!")) + else: + _base.listen_metrics(listener["bind_addresses"], + listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) @@ -136,9 +122,14 @@ class ASReplicationHandler(ReplicationClientHandler): if stream_name == "events": max_stream_id = self.store.get_room_max_stream_ordering() - preserve_fn( - self.appservice_handler.notify_interested_services - )(max_stream_id) + run_in_background(self._notify_app_services, max_stream_id) + + @defer.inlineCallbacks + def _notify_app_services(self, room_stream_id): + try: + yield self.appservice_handler.notify_interested_services(room_stream_id) + except Exception: + logger.exception("Error notifying application services of event") def start(config_options): @@ -181,36 +172,13 @@ def start(config_options): ps.setup() ps.start_listening(config.worker_listeners) - def run(): - # make sure that we run the reactor with the sentinel log context, - # otherwise other PreserveLoggingContext instances will get confused - # and complain when they see the logcontext arbitrarily swapping - # between the sentinel and `run` logcontexts. - with PreserveLoggingContext(): - logger.info("Running") - change_resource_limit(config.soft_file_limit) - if config.gc_thresholds: - gc.set_threshold(*config.gc_thresholds) - reactor.run() - def start(): ps.get_datastore().start_profiling() ps.get_state_handler().start_caching() reactor.callWhenRunning(start) - if config.worker_daemonize: - daemon = Daemonize( - app="synapse-appservice", - pid=config.worker_pid_file, - action=run, - auto_close_fds=False, - verbose=True, - logger=logger, - ) - daemon.start() - else: - run() + _base.start_worker_reactor("synapse-appservice", config) if __name__ == '__main__': diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py index 09bc1935f1..398bb36602 100644 --- a/synapse/app/client_reader.py +++ b/synapse/app/client_reader.py @@ -13,46 +13,46 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging +import sys -import synapse +from twisted.internet import reactor +from twisted.web.resource import NoResource +import synapse +from synapse import events +from synapse.app import _base from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging -from synapse.http.site import SynapseSite +from synapse.crypto import context_factory from synapse.http.server import JsonResource -from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.http.site import SynapseSite +from synapse.metrics import RegistryProxy +from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.client_ips import SlavedClientIpStore +from synapse.replication.slave.storage.directory import DirectoryStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.keys import SlavedKeyStore -from synapse.replication.slave.storage.room import RoomStore -from synapse.replication.slave.storage.directory import DirectoryStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore +from synapse.replication.slave.storage.room import RoomStore from synapse.replication.slave.storage.transactions import TransactionStore from synapse.replication.tcp.client import ReplicationClientHandler -from synapse.rest.client.v1.room import PublicRoomListRestServlet +from synapse.rest.client.v1.room import ( + JoinedRoomMemberListRestServlet, + PublicRoomListRestServlet, + RoomEventContextServlet, + RoomMemberListRestServlet, + RoomStateRestServlet, +) from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext +from synapse.util.logcontext import LoggingContext from synapse.util.manhole import manhole -from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string -from synapse.crypto import context_factory - -from synapse import events - - -from twisted.internet import reactor -from twisted.web.resource import Resource - -from daemonize import Daemonize - -import sys -import logging -import gc logger = logging.getLogger("synapse.app.client_reader") @@ -72,19 +72,6 @@ class ClientReaderSlavedStore( class ClientReaderServer(HomeServer): - def get_db_conn(self, run_new_connection=True): - # Any param beginning with cp_ is a parameter for adbapi, and should - # not be passed to the database engine. - db_params = { - k: v for k, v in self.db_config.get("args", {}).items() - if not k.startswith("cp_") - } - db_conn = self.database_engine.module.connect(**db_params) - - if run_new_connection: - self.database_engine.on_new_connection(db_conn) - return db_conn - def setup(self): logger.info("Setting up.") self.datastore = ClientReaderSlavedStore(self.get_db_conn(), self) @@ -98,10 +85,16 @@ class ClientReaderServer(HomeServer): for res in listener_config["resources"]: for name in res["names"]: if name == "metrics": - resources[METRICS_PREFIX] = MetricsResource(self) + resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) elif name == "client": resource = JsonResource(self, canonical_json=False) + PublicRoomListRestServlet(self).register(resource) + RoomMemberListRestServlet(self).register(resource) + JoinedRoomMemberListRestServlet(self).register(resource) + RoomStateRestServlet(self).register(resource) + RoomEventContextServlet(self).register(resource) + resources.update({ "/_matrix/client/r0": resource, "/_matrix/client/unstable": resource, @@ -109,19 +102,19 @@ class ClientReaderServer(HomeServer): "/_matrix/client/api/v1": resource, }) - root_resource = create_resource_tree(resources, Resource()) - - for address in bind_addresses: - reactor.listenTCP( - port, - SynapseSite( - "synapse.access.http.%s" % (site_tag,), - site_tag, - listener_config, - root_resource, - ), - interface=address + root_resource = create_resource_tree(resources, NoResource()) + + _base.listen_tcp( + bind_addresses, + port, + SynapseSite( + "synapse.access.http.%s" % (site_tag,), + site_tag, + listener_config, + root_resource, + self.version_string, ) + ) logger.info("Synapse client reader now listening on port %d", port) @@ -130,18 +123,22 @@ class ClientReaderServer(HomeServer): if listener["type"] == "http": self._listen_http(listener) elif listener["type"] == "manhole": - bind_addresses = listener["bind_addresses"] - - for address in bind_addresses: - reactor.listenTCP( - listener["port"], - manhole( - username="matrix", - password="rabbithole", - globals={"hs": self}, - ), - interface=address + _base.listen_tcp( + listener["bind_addresses"], + listener["port"], + manhole( + username="matrix", + password="rabbithole", + globals={"hs": self}, ) + ) + elif listener["type"] == "metrics": + if not self.get_config().enable_metrics: + logger.warn(("Metrics listener configured, but " + "enable_metrics is not True!")) + else: + _base.listen_metrics(listener["bind_addresses"], + listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) @@ -180,39 +177,15 @@ def start(config_options): ) ss.setup() - ss.get_handlers() ss.start_listening(config.worker_listeners) - def run(): - # make sure that we run the reactor with the sentinel log context, - # otherwise other PreserveLoggingContext instances will get confused - # and complain when they see the logcontext arbitrarily swapping - # between the sentinel and `run` logcontexts. - with PreserveLoggingContext(): - logger.info("Running") - change_resource_limit(config.soft_file_limit) - if config.gc_thresholds: - gc.set_threshold(*config.gc_thresholds) - reactor.run() - def start(): ss.get_state_handler().start_caching() ss.get_datastore().start_profiling() reactor.callWhenRunning(start) - if config.worker_daemonize: - daemon = Daemonize( - app="synapse-client-reader", - pid=config.worker_pid_file, - action=run, - auto_close_fds=False, - verbose=True, - logger=logger, - ) - daemon.start() - else: - run() + _base.start_worker_reactor("synapse-client-reader", config) if __name__ == '__main__': diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py new file mode 100644 index 0000000000..374f115644 --- /dev/null +++ b/synapse/app/event_creator.py @@ -0,0 +1,201 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging +import sys + +from twisted.internet import reactor +from twisted.web.resource import NoResource + +import synapse +from synapse import events +from synapse.app import _base +from synapse.config._base import ConfigError +from synapse.config.homeserver import HomeServerConfig +from synapse.config.logger import setup_logging +from synapse.crypto import context_factory +from synapse.http.server import JsonResource +from synapse.http.site import SynapseSite +from synapse.metrics import RegistryProxy +from synapse.metrics.resource import METRICS_PREFIX, MetricsResource +from synapse.replication.slave.storage._base import BaseSlavedStore +from synapse.replication.slave.storage.account_data import SlavedAccountDataStore +from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore +from synapse.replication.slave.storage.client_ips import SlavedClientIpStore +from synapse.replication.slave.storage.devices import SlavedDeviceStore +from synapse.replication.slave.storage.directory import DirectoryStore +from synapse.replication.slave.storage.events import SlavedEventStore +from synapse.replication.slave.storage.profile import SlavedProfileStore +from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore +from synapse.replication.slave.storage.pushers import SlavedPusherStore +from synapse.replication.slave.storage.receipts import SlavedReceiptsStore +from synapse.replication.slave.storage.registration import SlavedRegistrationStore +from synapse.replication.slave.storage.room import RoomStore +from synapse.replication.slave.storage.transactions import TransactionStore +from synapse.replication.tcp.client import ReplicationClientHandler +from synapse.rest.client.v1.room import ( + JoinRoomAliasServlet, + RoomMembershipRestServlet, + RoomSendEventRestServlet, + RoomStateEventRestServlet, +) +from synapse.server import HomeServer +from synapse.storage.engines import create_engine +from synapse.util.httpresourcetree import create_resource_tree +from synapse.util.logcontext import LoggingContext +from synapse.util.manhole import manhole +from synapse.util.versionstring import get_version_string + +logger = logging.getLogger("synapse.app.event_creator") + + +class EventCreatorSlavedStore( + DirectoryStore, + TransactionStore, + SlavedProfileStore, + SlavedAccountDataStore, + SlavedPusherStore, + SlavedReceiptsStore, + SlavedPushRuleStore, + SlavedDeviceStore, + SlavedClientIpStore, + SlavedApplicationServiceStore, + SlavedEventStore, + SlavedRegistrationStore, + RoomStore, + BaseSlavedStore, +): + pass + + +class EventCreatorServer(HomeServer): + def setup(self): + logger.info("Setting up.") + self.datastore = EventCreatorSlavedStore(self.get_db_conn(), self) + logger.info("Finished setting up.") + + def _listen_http(self, listener_config): + port = listener_config["port"] + bind_addresses = listener_config["bind_addresses"] + site_tag = listener_config.get("tag", port) + resources = {} + for res in listener_config["resources"]: + for name in res["names"]: + if name == "metrics": + resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) + elif name == "client": + resource = JsonResource(self, canonical_json=False) + RoomSendEventRestServlet(self).register(resource) + RoomMembershipRestServlet(self).register(resource) + RoomStateEventRestServlet(self).register(resource) + JoinRoomAliasServlet(self).register(resource) + resources.update({ + "/_matrix/client/r0": resource, + "/_matrix/client/unstable": resource, + "/_matrix/client/v2_alpha": resource, + "/_matrix/client/api/v1": resource, + }) + + root_resource = create_resource_tree(resources, NoResource()) + + _base.listen_tcp( + bind_addresses, + port, + SynapseSite( + "synapse.access.http.%s" % (site_tag,), + site_tag, + listener_config, + root_resource, + self.version_string, + ) + ) + + logger.info("Synapse event creator now listening on port %d", port) + + def start_listening(self, listeners): + for listener in listeners: + if listener["type"] == "http": + self._listen_http(listener) + elif listener["type"] == "manhole": + _base.listen_tcp( + listener["bind_addresses"], + listener["port"], + manhole( + username="matrix", + password="rabbithole", + globals={"hs": self}, + ) + ) + elif listener["type"] == "metrics": + if not self.get_config().enable_metrics: + logger.warn(("Metrics listener configured, but " + "enable_metrics is not True!")) + else: + _base.listen_metrics(listener["bind_addresses"], + listener["port"]) + else: + logger.warn("Unrecognized listener type: %s", listener["type"]) + + self.get_tcp_replication().start_replication(self) + + def build_tcp_replication(self): + return ReplicationClientHandler(self.get_datastore()) + + +def start(config_options): + try: + config = HomeServerConfig.load_config( + "Synapse event creator", config_options + ) + except ConfigError as e: + sys.stderr.write("\n" + e.message + "\n") + sys.exit(1) + + assert config.worker_app == "synapse.app.event_creator" + + assert config.worker_replication_http_port is not None + + setup_logging(config, use_worker_options=True) + + events.USE_FROZEN_DICTS = config.use_frozen_dicts + + database_engine = create_engine(config.database_config) + + tls_server_context_factory = context_factory.ServerContextFactory(config) + + ss = EventCreatorServer( + config.server_name, + db_config=config.database_config, + tls_server_context_factory=tls_server_context_factory, + config=config, + version_string="Synapse/" + get_version_string(synapse), + database_engine=database_engine, + ) + + ss.setup() + ss.start_listening(config.worker_listeners) + + def start(): + ss.get_state_handler().start_caching() + ss.get_datastore().start_profiling() + + reactor.callWhenRunning(start) + + _base.start_worker_reactor("synapse-event-creator", config) + + +if __name__ == '__main__': + with LoggingContext("main"): + start(sys.argv[1:]) diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index eb392e1c9d..7af00b8bcf 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -13,43 +13,37 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging +import sys -import synapse +from twisted.internet import reactor +from twisted.web.resource import NoResource +import synapse +from synapse import events +from synapse.api.urls import FEDERATION_PREFIX +from synapse.app import _base from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging +from synapse.crypto import context_factory +from synapse.federation.transport.server import TransportLayerServer from synapse.http.site import SynapseSite -from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.metrics import RegistryProxy +from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage._base import BaseSlavedStore +from synapse.replication.slave.storage.directory import DirectoryStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.keys import SlavedKeyStore from synapse.replication.slave.storage.room import RoomStore from synapse.replication.slave.storage.transactions import TransactionStore -from synapse.replication.slave.storage.directory import DirectoryStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext +from synapse.util.logcontext import LoggingContext from synapse.util.manhole import manhole -from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string -from synapse.api.urls import FEDERATION_PREFIX -from synapse.federation.transport.server import TransportLayerServer -from synapse.crypto import context_factory - -from synapse import events - - -from twisted.internet import reactor -from twisted.web.resource import Resource - -from daemonize import Daemonize - -import sys -import logging -import gc logger = logging.getLogger("synapse.app.federation_reader") @@ -66,19 +60,6 @@ class FederationReaderSlavedStore( class FederationReaderServer(HomeServer): - def get_db_conn(self, run_new_connection=True): - # Any param beginning with cp_ is a parameter for adbapi, and should - # not be passed to the database engine. - db_params = { - k: v for k, v in self.db_config.get("args", {}).items() - if not k.startswith("cp_") - } - db_conn = self.database_engine.module.connect(**db_params) - - if run_new_connection: - self.database_engine.on_new_connection(db_conn) - return db_conn - def setup(self): logger.info("Setting up.") self.datastore = FederationReaderSlavedStore(self.get_db_conn(), self) @@ -92,25 +73,25 @@ class FederationReaderServer(HomeServer): for res in listener_config["resources"]: for name in res["names"]: if name == "metrics": - resources[METRICS_PREFIX] = MetricsResource(self) + resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) elif name == "federation": resources.update({ FEDERATION_PREFIX: TransportLayerServer(self), }) - root_resource = create_resource_tree(resources, Resource()) - - for address in bind_addresses: - reactor.listenTCP( - port, - SynapseSite( - "synapse.access.http.%s" % (site_tag,), - site_tag, - listener_config, - root_resource, - ), - interface=address + root_resource = create_resource_tree(resources, NoResource()) + + _base.listen_tcp( + bind_addresses, + port, + SynapseSite( + "synapse.access.http.%s" % (site_tag,), + site_tag, + listener_config, + root_resource, + self.version_string, ) + ) logger.info("Synapse federation reader now listening on port %d", port) @@ -119,18 +100,22 @@ class FederationReaderServer(HomeServer): if listener["type"] == "http": self._listen_http(listener) elif listener["type"] == "manhole": - bind_addresses = listener["bind_addresses"] - - for address in bind_addresses: - reactor.listenTCP( - listener["port"], - manhole( - username="matrix", - password="rabbithole", - globals={"hs": self}, - ), - interface=address + _base.listen_tcp( + listener["bind_addresses"], + listener["port"], + manhole( + username="matrix", + password="rabbithole", + globals={"hs": self}, ) + ) + elif listener["type"] == "metrics": + if not self.get_config().enable_metrics: + logger.warn(("Metrics listener configured, but " + "enable_metrics is not True!")) + else: + _base.listen_metrics(listener["bind_addresses"], + listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) @@ -169,39 +154,15 @@ def start(config_options): ) ss.setup() - ss.get_handlers() ss.start_listening(config.worker_listeners) - def run(): - # make sure that we run the reactor with the sentinel log context, - # otherwise other PreserveLoggingContext instances will get confused - # and complain when they see the logcontext arbitrarily swapping - # between the sentinel and `run` logcontexts. - with PreserveLoggingContext(): - logger.info("Running") - change_resource_limit(config.soft_file_limit) - if config.gc_thresholds: - gc.set_threshold(*config.gc_thresholds) - reactor.run() - def start(): ss.get_state_handler().start_caching() ss.get_datastore().start_profiling() reactor.callWhenRunning(start) - if config.worker_daemonize: - daemon = Daemonize( - app="synapse-federation-reader", - pid=config.worker_pid_file, - action=run, - auto_close_fds=False, - verbose=True, - logger=logger, - ) - daemon.start() - else: - run() + _base.start_worker_reactor("synapse-federation-reader", config) if __name__ == '__main__': diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 03327dc47a..18469013fa 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -13,44 +13,39 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging +import sys -import synapse +from twisted.internet import defer, reactor +from twisted.web.resource import NoResource -from synapse.server import HomeServer +import synapse +from synapse import events +from synapse.app import _base from synapse.config._base import ConfigError -from synapse.config.logger import setup_logging from synapse.config.homeserver import HomeServerConfig +from synapse.config.logger import setup_logging from synapse.crypto import context_factory -from synapse.http.site import SynapseSite from synapse.federation import send_queue -from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.http.site import SynapseSite +from synapse.metrics import RegistryProxy +from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore +from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.slave.storage.events import SlavedEventStore +from synapse.replication.slave.storage.presence import SlavedPresenceStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore -from synapse.replication.slave.storage.presence import SlavedPresenceStore from synapse.replication.slave.storage.transactions import TransactionStore -from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.tcp.client import ReplicationClientHandler +from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.async import Linearizer from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, run_in_background from synapse.util.manhole import manhole -from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string -from synapse import events - -from twisted.internet import reactor, defer -from twisted.web.resource import Resource - -from daemonize import Daemonize - -import sys -import logging -import gc - logger = logging.getLogger("synapse.app.federation_sender") @@ -83,19 +78,6 @@ class FederationSenderSlaveStore( class FederationSenderServer(HomeServer): - def get_db_conn(self, run_new_connection=True): - # Any param beginning with cp_ is a parameter for adbapi, and should - # not be passed to the database engine. - db_params = { - k: v for k, v in self.db_config.get("args", {}).items() - if not k.startswith("cp_") - } - db_conn = self.database_engine.module.connect(**db_params) - - if run_new_connection: - self.database_engine.on_new_connection(db_conn) - return db_conn - def setup(self): logger.info("Setting up.") self.datastore = FederationSenderSlaveStore(self.get_db_conn(), self) @@ -109,21 +91,21 @@ class FederationSenderServer(HomeServer): for res in listener_config["resources"]: for name in res["names"]: if name == "metrics": - resources[METRICS_PREFIX] = MetricsResource(self) - - root_resource = create_resource_tree(resources, Resource()) - - for address in bind_addresses: - reactor.listenTCP( - port, - SynapseSite( - "synapse.access.http.%s" % (site_tag,), - site_tag, - listener_config, - root_resource, - ), - interface=address + resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) + + root_resource = create_resource_tree(resources, NoResource()) + + _base.listen_tcp( + bind_addresses, + port, + SynapseSite( + "synapse.access.http.%s" % (site_tag,), + site_tag, + listener_config, + root_resource, + self.version_string, ) + ) logger.info("Synapse federation_sender now listening on port %d", port) @@ -132,18 +114,22 @@ class FederationSenderServer(HomeServer): if listener["type"] == "http": self._listen_http(listener) elif listener["type"] == "manhole": - bind_addresses = listener["bind_addresses"] - - for address in bind_addresses: - reactor.listenTCP( - listener["port"], - manhole( - username="matrix", - password="rabbithole", - globals={"hs": self}, - ), - interface=address + _base.listen_tcp( + listener["bind_addresses"], + listener["port"], + manhole( + username="matrix", + password="rabbithole", + globals={"hs": self}, ) + ) + elif listener["type"] == "metrics": + if not self.get_config().enable_metrics: + logger.warn(("Metrics listener configured, but " + "enable_metrics is not True!")) + else: + _base.listen_metrics(listener["bind_addresses"], + listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) @@ -213,36 +199,12 @@ def start(config_options): ps.setup() ps.start_listening(config.worker_listeners) - def run(): - # make sure that we run the reactor with the sentinel log context, - # otherwise other PreserveLoggingContext instances will get confused - # and complain when they see the logcontext arbitrarily swapping - # between the sentinel and `run` logcontexts. - with PreserveLoggingContext(): - logger.info("Running") - change_resource_limit(config.soft_file_limit) - if config.gc_thresholds: - gc.set_threshold(*config.gc_thresholds) - reactor.run() - def start(): ps.get_datastore().start_profiling() ps.get_state_handler().start_caching() reactor.callWhenRunning(start) - - if config.worker_daemonize: - daemon = Daemonize( - app="synapse-federation-sender", - pid=config.worker_pid_file, - action=run, - auto_close_fds=False, - verbose=True, - logger=logger, - ) - daemon.start() - else: - run() + _base.start_worker_reactor("synapse-federation-sender", config) class FederationSenderHandler(object): @@ -277,7 +239,7 @@ class FederationSenderHandler(object): # presence, typing, etc. if stream_name == "federation": send_queue.process_rows_for_federation(self.federation_sender, rows) - preserve_fn(self.update_token)(token) + run_in_background(self.update_token, token) # We also need to poke the federation sender when new events happen elif stream_name == "events": @@ -285,19 +247,22 @@ class FederationSenderHandler(object): @defer.inlineCallbacks def update_token(self, token): - self.federation_position = token - - # We linearize here to ensure we don't have races updating the token - with (yield self._fed_position_linearizer.queue(None)): - if self._last_ack < self.federation_position: - yield self.store.update_federation_out_pos( - "federation", self.federation_position - ) + try: + self.federation_position = token + + # We linearize here to ensure we don't have races updating the token + with (yield self._fed_position_linearizer.queue(None)): + if self._last_ack < self.federation_position: + yield self.store.update_federation_out_pos( + "federation", self.federation_position + ) - # We ACK this token over replication so that the master can drop - # its in memory queues - self.replication_client.send_federation_ack(self.federation_position) - self._last_ack = self.federation_position + # We ACK this token over replication so that the master can drop + # its in memory queues + self.replication_client.send_federation_ack(self.federation_position) + self._last_ack = self.federation_position + except Exception: + logger.exception("Error updating federation stream position") if __name__ == '__main__': diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py new file mode 100644 index 0000000000..b5f78f4640 --- /dev/null +++ b/synapse/app/frontend_proxy.py @@ -0,0 +1,235 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging +import sys + +from twisted.internet import defer, reactor +from twisted.web.resource import NoResource + +import synapse +from synapse import events +from synapse.api.errors import SynapseError +from synapse.app import _base +from synapse.config._base import ConfigError +from synapse.config.homeserver import HomeServerConfig +from synapse.config.logger import setup_logging +from synapse.crypto import context_factory +from synapse.http.server import JsonResource +from synapse.http.servlet import RestServlet, parse_json_object_from_request +from synapse.http.site import SynapseSite +from synapse.metrics import RegistryProxy +from synapse.metrics.resource import METRICS_PREFIX, MetricsResource +from synapse.replication.slave.storage._base import BaseSlavedStore +from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore +from synapse.replication.slave.storage.client_ips import SlavedClientIpStore +from synapse.replication.slave.storage.devices import SlavedDeviceStore +from synapse.replication.slave.storage.registration import SlavedRegistrationStore +from synapse.replication.tcp.client import ReplicationClientHandler +from synapse.rest.client.v2_alpha._base import client_v2_patterns +from synapse.server import HomeServer +from synapse.storage.engines import create_engine +from synapse.util.httpresourcetree import create_resource_tree +from synapse.util.logcontext import LoggingContext +from synapse.util.manhole import manhole +from synapse.util.versionstring import get_version_string + +logger = logging.getLogger("synapse.app.frontend_proxy") + + +class KeyUploadServlet(RestServlet): + PATTERNS = client_v2_patterns("/keys/upload(/(?P<device_id>[^/]+))?$") + + def __init__(self, hs): + """ + Args: + hs (synapse.server.HomeServer): server + """ + super(KeyUploadServlet, self).__init__() + self.auth = hs.get_auth() + self.store = hs.get_datastore() + self.http_client = hs.get_simple_http_client() + self.main_uri = hs.config.worker_main_http_uri + + @defer.inlineCallbacks + def on_POST(self, request, device_id): + requester = yield self.auth.get_user_by_req(request, allow_guest=True) + user_id = requester.user.to_string() + body = parse_json_object_from_request(request) + + if device_id is not None: + # passing the device_id here is deprecated; however, we allow it + # for now for compatibility with older clients. + if (requester.device_id is not None and + device_id != requester.device_id): + logger.warning("Client uploading keys for a different device " + "(logged in as %s, uploading for %s)", + requester.device_id, device_id) + else: + device_id = requester.device_id + + if device_id is None: + raise SynapseError( + 400, + "To upload keys, you must pass device_id when authenticating" + ) + + if body: + # They're actually trying to upload something, proxy to main synapse. + # Pass through the auth headers, if any, in case the access token + # is there. + auth_headers = request.requestHeaders.getRawHeaders(b"Authorization", []) + headers = { + "Authorization": auth_headers, + } + result = yield self.http_client.post_json_get_json( + self.main_uri + request.uri, + body, + headers=headers, + ) + + defer.returnValue((200, result)) + else: + # Just interested in counts. + result = yield self.store.count_e2e_one_time_keys(user_id, device_id) + defer.returnValue((200, {"one_time_key_counts": result})) + + +class FrontendProxySlavedStore( + SlavedDeviceStore, + SlavedClientIpStore, + SlavedApplicationServiceStore, + SlavedRegistrationStore, + BaseSlavedStore, +): + pass + + +class FrontendProxyServer(HomeServer): + def setup(self): + logger.info("Setting up.") + self.datastore = FrontendProxySlavedStore(self.get_db_conn(), self) + logger.info("Finished setting up.") + + def _listen_http(self, listener_config): + port = listener_config["port"] + bind_addresses = listener_config["bind_addresses"] + site_tag = listener_config.get("tag", port) + resources = {} + for res in listener_config["resources"]: + for name in res["names"]: + if name == "metrics": + resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) + elif name == "client": + resource = JsonResource(self, canonical_json=False) + KeyUploadServlet(self).register(resource) + resources.update({ + "/_matrix/client/r0": resource, + "/_matrix/client/unstable": resource, + "/_matrix/client/v2_alpha": resource, + "/_matrix/client/api/v1": resource, + }) + + root_resource = create_resource_tree(resources, NoResource()) + + _base.listen_tcp( + bind_addresses, + port, + SynapseSite( + "synapse.access.http.%s" % (site_tag,), + site_tag, + listener_config, + root_resource, + self.version_string, + ) + ) + + logger.info("Synapse client reader now listening on port %d", port) + + def start_listening(self, listeners): + for listener in listeners: + if listener["type"] == "http": + self._listen_http(listener) + elif listener["type"] == "manhole": + _base.listen_tcp( + listener["bind_addresses"], + listener["port"], + manhole( + username="matrix", + password="rabbithole", + globals={"hs": self}, + ) + ) + elif listener["type"] == "metrics": + if not self.get_config().enable_metrics: + logger.warn(("Metrics listener configured, but " + "enable_metrics is not True!")) + else: + _base.listen_metrics(listener["bind_addresses"], + listener["port"]) + else: + logger.warn("Unrecognized listener type: %s", listener["type"]) + + self.get_tcp_replication().start_replication(self) + + def build_tcp_replication(self): + return ReplicationClientHandler(self.get_datastore()) + + +def start(config_options): + try: + config = HomeServerConfig.load_config( + "Synapse frontend proxy", config_options + ) + except ConfigError as e: + sys.stderr.write("\n" + e.message + "\n") + sys.exit(1) + + assert config.worker_app == "synapse.app.frontend_proxy" + + assert config.worker_main_http_uri is not None + + setup_logging(config, use_worker_options=True) + + events.USE_FROZEN_DICTS = config.use_frozen_dicts + + database_engine = create_engine(config.database_config) + + tls_server_context_factory = context_factory.ServerContextFactory(config) + + ss = FrontendProxyServer( + config.server_name, + db_config=config.database_config, + tls_server_context_factory=tls_server_context_factory, + config=config, + version_string="Synapse/" + get_version_string(synapse), + database_engine=database_engine, + ) + + ss.setup() + ss.start_listening(config.worker_listeners) + + def start(): + ss.get_state_handler().start_caching() + ss.get_datastore().start_profiling() + + reactor.callWhenRunning(start) + + _base.start_worker_reactor("synapse-frontend-proxy", config) + + +if __name__ == '__main__': + with LoggingContext("main"): + start(sys.argv[1:]) diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 081e7cce59..2ad1beb8d8 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -13,61 +13,62 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -import synapse - import gc import logging import os import sys -import synapse.config.logger -from synapse.config._base import ConfigError - -from synapse.python_dependencies import ( - check_requirements, CONDITIONAL_REQUIREMENTS -) - -from synapse.rest import ClientRestResource -from synapse.storage.engines import create_engine, IncorrectDatabaseSetup -from synapse.storage import are_all_users_on_domain -from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database - -from synapse.server import HomeServer +from six import iteritems -from twisted.internet import reactor, defer from twisted.application import service -from twisted.web.resource import Resource, EncodingResourceWrapper -from twisted.web.static import File +from twisted.internet import defer, reactor +from twisted.web.resource import EncodingResourceWrapper, NoResource from twisted.web.server import GzipEncoderFactory -from synapse.http.server import RootRedirect -from synapse.rest.media.v0.content_repository import ContentRepoResource -from synapse.rest.media.v1.media_repository import MediaRepositoryResource -from synapse.rest.key.v1.server_key_resource import LocalKey -from synapse.rest.key.v2 import KeyApiV2Resource +from twisted.web.static import File + +import synapse +import synapse.config.logger +from synapse import events from synapse.api.urls import ( - FEDERATION_PREFIX, WEB_CLIENT_PREFIX, CONTENT_REPO_PREFIX, - SERVER_KEY_PREFIX, LEGACY_MEDIA_PREFIX, MEDIA_PREFIX, STATIC_PREFIX, + CONTENT_REPO_PREFIX, + FEDERATION_PREFIX, + LEGACY_MEDIA_PREFIX, + MEDIA_PREFIX, + SERVER_KEY_PREFIX, SERVER_KEY_V2_PREFIX, + STATIC_PREFIX, + WEB_CLIENT_PREFIX, ) +from synapse.app import _base +from synapse.app._base import listen_ssl, listen_tcp, quit_with_error +from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.crypto import context_factory -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext -from synapse.metrics import register_memory_metrics -from synapse.metrics.resource import MetricsResource, METRICS_PREFIX -from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory from synapse.federation.transport.server import TransportLayerServer - -from synapse.util.rlimit import change_resource_limit -from synapse.util.versionstring import get_version_string +from synapse.http.additional_resource import AdditionalResource +from synapse.http.server import RootRedirect +from synapse.http.site import SynapseSite +from synapse.metrics import RegistryProxy +from synapse.metrics.resource import METRICS_PREFIX, MetricsResource +from synapse.module_api import ModuleApi +from synapse.python_dependencies import CONDITIONAL_REQUIREMENTS, check_requirements +from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource +from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory +from synapse.rest import ClientRestResource +from synapse.rest.key.v1.server_key_resource import LocalKey +from synapse.rest.key.v2 import KeyApiV2Resource +from synapse.rest.media.v0.content_repository import ContentRepoResource +from synapse.server import HomeServer +from synapse.storage import are_all_users_on_domain +from synapse.storage.engines import IncorrectDatabaseSetup, create_engine +from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database +from synapse.util.caches import CACHE_SIZE_FACTOR from synapse.util.httpresourcetree import create_resource_tree +from synapse.util.logcontext import LoggingContext from synapse.util.manhole import manhole - -from synapse.http.site import SynapseSite - -from synapse import events - -from daemonize import Daemonize +from synapse.util.module_loader import load_module +from synapse.util.rlimit import change_resource_limit +from synapse.util.versionstring import get_version_string logger = logging.getLogger("synapse.app.homeserver") @@ -119,87 +120,132 @@ class SynapseHomeServer(HomeServer): resources = {} for res in listener_config["resources"]: for name in res["names"]: - if name == "client": - client_resource = ClientRestResource(self) - if res["compress"]: - client_resource = gz_wrap(client_resource) - - resources.update({ - "/_matrix/client/api/v1": client_resource, - "/_matrix/client/r0": client_resource, - "/_matrix/client/unstable": client_resource, - "/_matrix/client/v2_alpha": client_resource, - "/_matrix/client/versions": client_resource, - }) - - if name == "federation": - resources.update({ - FEDERATION_PREFIX: TransportLayerServer(self), - }) - - if name in ["static", "client"]: - resources.update({ - STATIC_PREFIX: File( - os.path.join(os.path.dirname(synapse.__file__), "static") - ), - }) - - if name in ["media", "federation", "client"]: - media_repo = MediaRepositoryResource(self) - resources.update({ - MEDIA_PREFIX: media_repo, - LEGACY_MEDIA_PREFIX: media_repo, - CONTENT_REPO_PREFIX: ContentRepoResource( - self, self.config.uploads_path - ), - }) - - if name in ["keys", "federation"]: - resources.update({ - SERVER_KEY_PREFIX: LocalKey(self), - SERVER_KEY_V2_PREFIX: KeyApiV2Resource(self), - }) - - if name == "webclient": - resources[WEB_CLIENT_PREFIX] = build_resource_for_web_client(self) - - if name == "metrics" and self.get_config().enable_metrics: - resources[METRICS_PREFIX] = MetricsResource(self) + resources.update(self._configure_named_resource( + name, res.get("compress", False), + )) + + additional_resources = listener_config.get("additional_resources", {}) + logger.debug("Configuring additional resources: %r", + additional_resources) + module_api = ModuleApi(self, self.get_auth_handler()) + for path, resmodule in additional_resources.items(): + handler_cls, config = load_module(resmodule) + handler = handler_cls(config, module_api) + resources[path] = AdditionalResource(self, handler.handle_request) if WEB_CLIENT_PREFIX in resources: root_resource = RootRedirect(WEB_CLIENT_PREFIX) else: - root_resource = Resource() + root_resource = NoResource() root_resource = create_resource_tree(resources, root_resource) if tls: - for address in bind_addresses: - reactor.listenSSL( - port, - SynapseSite( - "synapse.access.https.%s" % (site_tag,), - site_tag, - listener_config, - root_resource, - ), - self.tls_server_context_factory, - interface=address - ) + listen_ssl( + bind_addresses, + port, + SynapseSite( + "synapse.access.https.%s" % (site_tag,), + site_tag, + listener_config, + root_resource, + self.version_string, + ), + self.tls_server_context_factory, + ) + else: - for address in bind_addresses: - reactor.listenTCP( - port, - SynapseSite( - "synapse.access.http.%s" % (site_tag,), - site_tag, - listener_config, - root_resource, - ), - interface=address + listen_tcp( + bind_addresses, + port, + SynapseSite( + "synapse.access.http.%s" % (site_tag,), + site_tag, + listener_config, + root_resource, + self.version_string, ) + ) logger.info("Synapse now listening on port %d", port) + def _configure_named_resource(self, name, compress=False): + """Build a resource map for a named resource + + Args: + name (str): named resource: one of "client", "federation", etc + compress (bool): whether to enable gzip compression for this + resource + + Returns: + dict[str, Resource]: map from path to HTTP resource + """ + resources = {} + if name == "client": + client_resource = ClientRestResource(self) + if compress: + client_resource = gz_wrap(client_resource) + + resources.update({ + "/_matrix/client/api/v1": client_resource, + "/_matrix/client/r0": client_resource, + "/_matrix/client/unstable": client_resource, + "/_matrix/client/v2_alpha": client_resource, + "/_matrix/client/versions": client_resource, + }) + + if name == "consent": + from synapse.rest.consent.consent_resource import ConsentResource + consent_resource = ConsentResource(self) + if compress: + consent_resource = gz_wrap(consent_resource) + resources.update({ + "/_matrix/consent": consent_resource, + }) + + if name == "federation": + resources.update({ + FEDERATION_PREFIX: TransportLayerServer(self), + }) + + if name in ["static", "client"]: + resources.update({ + STATIC_PREFIX: File( + os.path.join(os.path.dirname(synapse.__file__), "static") + ), + }) + + if name in ["media", "federation", "client"]: + if self.get_config().enable_media_repo: + media_repo = self.get_media_repository_resource() + resources.update({ + MEDIA_PREFIX: media_repo, + LEGACY_MEDIA_PREFIX: media_repo, + CONTENT_REPO_PREFIX: ContentRepoResource( + self, self.config.uploads_path + ), + }) + elif name == "media": + raise ConfigError( + "'media' resource conflicts with enable_media_repo=False", + ) + + if name in ["keys", "federation"]: + resources.update({ + SERVER_KEY_PREFIX: LocalKey(self), + SERVER_KEY_V2_PREFIX: KeyApiV2Resource(self), + }) + + if name == "webclient": + resources[WEB_CLIENT_PREFIX] = build_resource_for_web_client(self) + + if name == "metrics" and self.get_config().enable_metrics: + resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) + + if name == "replication": + resources[REPLICATION_PREFIX] = ReplicationRestResource(self) + + return resources + def start_listening(self): config = self.get_config() @@ -207,18 +253,15 @@ class SynapseHomeServer(HomeServer): if listener["type"] == "http": self._listener_http(config, listener) elif listener["type"] == "manhole": - bind_addresses = listener["bind_addresses"] - - for address in bind_addresses: - reactor.listenTCP( - listener["port"], - manhole( - username="matrix", - password="rabbithole", - globals={"hs": self}, - ), - interface=address + listen_tcp( + listener["bind_addresses"], + listener["port"], + manhole( + username="matrix", + password="rabbithole", + globals={"hs": self}, ) + ) elif listener["type"] == "replication": bind_addresses = listener["bind_addresses"] for address in bind_addresses: @@ -229,6 +272,13 @@ class SynapseHomeServer(HomeServer): reactor.addSystemEventTrigger( "before", "shutdown", server_listener.stopListening, ) + elif listener["type"] == "metrics": + if not self.get_config().enable_metrics: + logger.warn(("Metrics listener configured, but " + "enable_metrics is not True!")) + else: + _base.listen_metrics(listener["bind_addresses"], + listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) @@ -248,29 +298,6 @@ class SynapseHomeServer(HomeServer): except IncorrectDatabaseSetup as e: quit_with_error(e.message) - def get_db_conn(self, run_new_connection=True): - # Any param beginning with cp_ is a parameter for adbapi, and should - # not be passed to the database engine. - db_params = { - k: v for k, v in self.db_config.get("args", {}).items() - if not k.startswith("cp_") - } - db_conn = self.database_engine.module.connect(**db_params) - - if run_new_connection: - self.database_engine.on_new_connection(db_conn) - return db_conn - - -def quit_with_error(error_string): - message_lines = error_string.split("\n") - line_length = max([len(l) for l in message_lines if len(l) < 80]) + 2 - sys.stderr.write("*" * line_length + '\n') - for line in message_lines: - sys.stderr.write(" %s\n" % (line.rstrip(),)) - sys.stderr.write("*" * line_length + '\n') - sys.exit(1) - def setup(config_options): """ @@ -300,11 +327,6 @@ def setup(config_options): # check any extra requirements we have now we have a config check_requirements(config) - version_string = "Synapse/" + get_version_string(synapse) - - logger.info("Server hostname: %s", config.server_name) - logger.info("Server version: %s", version_string) - events.USE_FROZEN_DICTS = config.use_frozen_dicts tls_server_context_factory = context_factory.ServerContextFactory(config) @@ -317,7 +339,7 @@ def setup(config_options): db_config=config.database_config, tls_server_context_factory=tls_server_context_factory, config=config, - version_string=version_string, + version_string="Synapse/" + get_version_string(synapse), database_engine=database_engine, ) @@ -349,9 +371,7 @@ def setup(config_options): hs.get_state_handler().start_caching() hs.get_datastore().start_profiling() hs.get_datastore().start_doing_background_updates() - hs.get_replication_layer().start_get_pdu_cache() - - register_memory_metrics(hs) + hs.get_federation_client().start_get_pdu_cache() reactor.callWhenRunning(start) @@ -403,6 +423,10 @@ def run(hs): stats = {} + # Contains the list of processes we will be monitoring + # currently either 0 or 1 + stats_process = [] + @defer.inlineCallbacks def phone_stats_home(): logger.info("Gathering stats for reporting") @@ -419,6 +443,10 @@ def run(hs): total_nonbridged_users = yield hs.get_datastore().count_nonbridged_users() stats["total_nonbridged_users"] = total_nonbridged_users + daily_user_type_results = yield hs.get_datastore().count_daily_user_type() + for name, count in iteritems(daily_user_type_results): + stats["daily_user_type_" + name] = count + room_count = yield hs.get_datastore().get_room_count() stats["total_room_count"] = room_count @@ -426,8 +454,21 @@ def run(hs): stats["daily_active_rooms"] = yield hs.get_datastore().count_daily_active_rooms() stats["daily_messages"] = yield hs.get_datastore().count_daily_messages() + r30_results = yield hs.get_datastore().count_r30_users() + for name, count in iteritems(r30_results): + stats["r30_users_" + name] = count + daily_sent_messages = yield hs.get_datastore().count_daily_sent_messages() stats["daily_sent_messages"] = daily_sent_messages + stats["cache_factor"] = CACHE_SIZE_FACTOR + stats["event_cache_size"] = hs.config.event_cache_size + + if len(stats_process) > 0: + stats["memory_rss"] = 0 + stats["cpu_average"] = 0 + for process in stats_process: + stats["memory_rss"] += process.memory_info().rss + stats["cpu_average"] += int(process.cpu_percent(interval=None)) logger.info("Reporting stats to matrix.org: %s" % (stats,)) try: @@ -438,45 +479,56 @@ def run(hs): except Exception as e: logger.warn("Error reporting stats: %s", e) + def performance_stats_init(): + try: + import psutil + process = psutil.Process() + # Ensure we can fetch both, and make the initial request for cpu_percent + # so the next request will use this as the initial point. + process.memory_info().rss + process.cpu_percent(interval=None) + logger.info("report_stats can use psutil") + stats_process.append(process) + except (ImportError, AttributeError): + logger.warn( + "report_stats enabled but psutil is not installed or incorrect version." + " Disabling reporting of memory/cpu stats." + " Ensuring psutil is available will help matrix.org track performance" + " changes across releases." + ) + + def generate_user_daily_visit_stats(): + hs.get_datastore().generate_user_daily_visits() + + # Rather than update on per session basis, batch up the requests. + # If you increase the loop period, the accuracy of user_daily_visits + # table will decrease + clock.looping_call(generate_user_daily_visit_stats, 5 * 60 * 1000) + if hs.config.report_stats: logger.info("Scheduling stats reporting for 3 hour intervals") clock.looping_call(phone_stats_home, 3 * 60 * 60 * 1000) + # We need to defer this init for the cases that we daemonize + # otherwise the process ID we get is that of the non-daemon process + clock.call_later(0, performance_stats_init) + # We wait 5 minutes to send the first set of stats as the server can # be quite busy the first few minutes clock.call_later(5 * 60, phone_stats_home) - def in_thread(): - # Uncomment to enable tracing of log context changes. - # sys.settrace(logcontext_tracer) - - # make sure that we run the reactor with the sentinel log context, - # otherwise other PreserveLoggingContext instances will get confused - # and complain when they see the logcontext arbitrarily swapping - # between the sentinel and `run` logcontexts. - with PreserveLoggingContext(): - change_resource_limit(hs.config.soft_file_limit) - if hs.config.gc_thresholds: - gc.set_threshold(*hs.config.gc_thresholds) - reactor.run() - - if hs.config.daemonize: - - if hs.config.print_pidfile: - print (hs.config.pid_file) - - daemon = Daemonize( - app="synapse-homeserver", - pid=hs.config.pid_file, - action=lambda: in_thread(), - auto_close_fds=False, - verbose=True, - logger=logger, - ) - - daemon.start() - else: - in_thread() + if hs.config.daemonize and hs.config.print_pidfile: + print (hs.config.pid_file) + + _base.start_reactor( + "synapse-homeserver", + hs.config.soft_file_limit, + hs.config.gc_thresholds, + hs.config.pid_file, + hs.config.daemonize, + hs.config.cpu_affinity, + logger, + ) def main(): diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py index f57ec784fe..749bbf37d0 100644 --- a/synapse/app/media_repository.py +++ b/synapse/app/media_repository.py @@ -13,14 +13,23 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging +import sys -import synapse +from twisted.internet import reactor +from twisted.web.resource import NoResource +import synapse +from synapse import events +from synapse.api.urls import CONTENT_REPO_PREFIX, LEGACY_MEDIA_PREFIX, MEDIA_PREFIX +from synapse.app import _base from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging +from synapse.crypto import context_factory from synapse.http.site import SynapseSite -from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.metrics import RegistryProxy +from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.client_ips import SlavedClientIpStore @@ -28,31 +37,13 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto from synapse.replication.slave.storage.transactions import TransactionStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.rest.media.v0.content_repository import ContentRepoResource -from synapse.rest.media.v1.media_repository import MediaRepositoryResource from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.storage.media_repository import MediaRepositoryStore from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext +from synapse.util.logcontext import LoggingContext from synapse.util.manhole import manhole -from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string -from synapse.api.urls import ( - CONTENT_REPO_PREFIX, LEGACY_MEDIA_PREFIX, MEDIA_PREFIX -) -from synapse.crypto import context_factory - -from synapse import events - - -from twisted.internet import reactor -from twisted.web.resource import Resource - -from daemonize import Daemonize - -import sys -import logging -import gc logger = logging.getLogger("synapse.app.media_repository") @@ -69,19 +60,6 @@ class MediaRepositorySlavedStore( class MediaRepositoryServer(HomeServer): - def get_db_conn(self, run_new_connection=True): - # Any param beginning with cp_ is a parameter for adbapi, and should - # not be passed to the database engine. - db_params = { - k: v for k, v in self.db_config.get("args", {}).items() - if not k.startswith("cp_") - } - db_conn = self.database_engine.module.connect(**db_params) - - if run_new_connection: - self.database_engine.on_new_connection(db_conn) - return db_conn - def setup(self): logger.info("Setting up.") self.datastore = MediaRepositorySlavedStore(self.get_db_conn(), self) @@ -95,9 +73,9 @@ class MediaRepositoryServer(HomeServer): for res in listener_config["resources"]: for name in res["names"]: if name == "metrics": - resources[METRICS_PREFIX] = MetricsResource(self) + resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) elif name == "media": - media_repo = MediaRepositoryResource(self) + media_repo = self.get_media_repository_resource() resources.update({ MEDIA_PREFIX: media_repo, LEGACY_MEDIA_PREFIX: media_repo, @@ -106,19 +84,19 @@ class MediaRepositoryServer(HomeServer): ), }) - root_resource = create_resource_tree(resources, Resource()) - - for address in bind_addresses: - reactor.listenTCP( - port, - SynapseSite( - "synapse.access.http.%s" % (site_tag,), - site_tag, - listener_config, - root_resource, - ), - interface=address + root_resource = create_resource_tree(resources, NoResource()) + + _base.listen_tcp( + bind_addresses, + port, + SynapseSite( + "synapse.access.http.%s" % (site_tag,), + site_tag, + listener_config, + root_resource, + self.version_string, ) + ) logger.info("Synapse media repository now listening on port %d", port) @@ -127,18 +105,22 @@ class MediaRepositoryServer(HomeServer): if listener["type"] == "http": self._listen_http(listener) elif listener["type"] == "manhole": - bind_addresses = listener["bind_addresses"] - - for address in bind_addresses: - reactor.listenTCP( - listener["port"], - manhole( - username="matrix", - password="rabbithole", - globals={"hs": self}, - ), - interface=address + _base.listen_tcp( + listener["bind_addresses"], + listener["port"], + manhole( + username="matrix", + password="rabbithole", + globals={"hs": self}, ) + ) + elif listener["type"] == "metrics": + if not self.get_config().enable_metrics: + logger.warn(("Metrics listener configured, but " + "enable_metrics is not True!")) + else: + _base.listen_metrics(listener["bind_addresses"], + listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) @@ -159,6 +141,13 @@ def start(config_options): assert config.worker_app == "synapse.app.media_repository" + if config.enable_media_repo: + _base.quit_with_error( + "enable_media_repo must be disabled in the main synapse process\n" + "before the media repo can be run in a separate worker.\n" + "Please add ``enable_media_repo: false`` to the main config\n" + ) + setup_logging(config, use_worker_options=True) events.USE_FROZEN_DICTS = config.use_frozen_dicts @@ -177,39 +166,15 @@ def start(config_options): ) ss.setup() - ss.get_handlers() ss.start_listening(config.worker_listeners) - def run(): - # make sure that we run the reactor with the sentinel log context, - # otherwise other PreserveLoggingContext instances will get confused - # and complain when they see the logcontext arbitrarily swapping - # between the sentinel and `run` logcontexts. - with PreserveLoggingContext(): - logger.info("Running") - change_resource_limit(config.soft_file_limit) - if config.gc_thresholds: - gc.set_threshold(*config.gc_thresholds) - reactor.run() - def start(): ss.get_state_handler().start_caching() ss.get_datastore().start_profiling() reactor.callWhenRunning(start) - if config.worker_daemonize: - daemon = Daemonize( - app="synapse-media-repository", - pid=config.worker_pid_file, - action=run, - auto_close_fds=False, - verbose=True, - logger=logger, - ) - daemon.start() - else: - run() + _base.start_worker_reactor("synapse-media-repository", config) if __name__ == '__main__': diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index f9114acfcb..9295a51d5b 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -13,41 +13,34 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging +import sys -import synapse +from twisted.internet import defer, reactor +from twisted.web.resource import NoResource -from synapse.server import HomeServer +import synapse +from synapse import events +from synapse.app import _base from synapse.config._base import ConfigError -from synapse.config.logger import setup_logging from synapse.config.homeserver import HomeServerConfig +from synapse.config.logger import setup_logging from synapse.http.site import SynapseSite -from synapse.metrics.resource import MetricsResource, METRICS_PREFIX -from synapse.storage.roommember import RoomMemberStore +from synapse.metrics import RegistryProxy +from synapse.metrics.resource import METRICS_PREFIX, MetricsResource +from synapse.replication.slave.storage.account_data import SlavedAccountDataStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.pushers import SlavedPusherStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore -from synapse.replication.slave.storage.account_data import SlavedAccountDataStore from synapse.replication.tcp.client import ReplicationClientHandler -from synapse.storage.engines import create_engine +from synapse.server import HomeServer from synapse.storage import DataStore +from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, preserve_fn, \ - PreserveLoggingContext +from synapse.util.logcontext import LoggingContext, run_in_background from synapse.util.manhole import manhole -from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string -from synapse import events - -from twisted.internet import reactor, defer -from twisted.web.resource import Resource - -from daemonize import Daemonize - -import sys -import logging -import gc - logger = logging.getLogger("synapse.app.pusher") @@ -83,25 +76,8 @@ class PusherSlaveStore( DataStore.get_profile_displayname.__func__ ) - who_forgot_in_room = ( - RoomMemberStore.__dict__["who_forgot_in_room"] - ) - class PusherServer(HomeServer): - def get_db_conn(self, run_new_connection=True): - # Any param beginning with cp_ is a parameter for adbapi, and should - # not be passed to the database engine. - db_params = { - k: v for k, v in self.db_config.get("args", {}).items() - if not k.startswith("cp_") - } - db_conn = self.database_engine.module.connect(**db_params) - - if run_new_connection: - self.database_engine.on_new_connection(db_conn) - return db_conn - def setup(self): logger.info("Setting up.") self.datastore = PusherSlaveStore(self.get_db_conn(), self) @@ -118,21 +94,21 @@ class PusherServer(HomeServer): for res in listener_config["resources"]: for name in res["names"]: if name == "metrics": - resources[METRICS_PREFIX] = MetricsResource(self) - - root_resource = create_resource_tree(resources, Resource()) - - for address in bind_addresses: - reactor.listenTCP( - port, - SynapseSite( - "synapse.access.http.%s" % (site_tag,), - site_tag, - listener_config, - root_resource, - ), - interface=address + resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) + + root_resource = create_resource_tree(resources, NoResource()) + + _base.listen_tcp( + bind_addresses, + port, + SynapseSite( + "synapse.access.http.%s" % (site_tag,), + site_tag, + listener_config, + root_resource, + self.version_string, ) + ) logger.info("Synapse pusher now listening on port %d", port) @@ -141,18 +117,22 @@ class PusherServer(HomeServer): if listener["type"] == "http": self._listen_http(listener) elif listener["type"] == "manhole": - bind_addresses = listener["bind_addresses"] - - for address in bind_addresses: - reactor.listenTCP( - listener["port"], - manhole( - username="matrix", - password="rabbithole", - globals={"hs": self}, - ), - interface=address + _base.listen_tcp( + listener["bind_addresses"], + listener["port"], + manhole( + username="matrix", + password="rabbithole", + globals={"hs": self}, ) + ) + elif listener["type"] == "metrics": + if not self.get_config().enable_metrics: + logger.warn(("Metrics listener configured, but " + "enable_metrics is not True!")) + else: + _base.listen_metrics(listener["bind_addresses"], + listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) @@ -170,24 +150,27 @@ class PusherReplicationHandler(ReplicationClientHandler): def on_rdata(self, stream_name, token, rows): super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows) - preserve_fn(self.poke_pushers)(stream_name, token, rows) + run_in_background(self.poke_pushers, stream_name, token, rows) @defer.inlineCallbacks def poke_pushers(self, stream_name, token, rows): - if stream_name == "pushers": - for row in rows: - if row.deleted: - yield self.stop_pusher(row.user_id, row.app_id, row.pushkey) - else: - yield self.start_pusher(row.user_id, row.app_id, row.pushkey) - elif stream_name == "events": - yield self.pusher_pool.on_new_notifications( - token, token, - ) - elif stream_name == "receipts": - yield self.pusher_pool.on_new_receipts( - token, token, set(row.room_id for row in rows) - ) + try: + if stream_name == "pushers": + for row in rows: + if row.deleted: + yield self.stop_pusher(row.user_id, row.app_id, row.pushkey) + else: + yield self.start_pusher(row.user_id, row.app_id, row.pushkey) + elif stream_name == "events": + yield self.pusher_pool.on_new_notifications( + token, token, + ) + elif stream_name == "receipts": + yield self.pusher_pool.on_new_receipts( + token, token, set(row.room_id for row in rows) + ) + except Exception: + logger.exception("Error poking pushers") def stop_pusher(self, user_id, app_id, pushkey): key = "%s:%s" % (app_id, pushkey) @@ -244,18 +227,6 @@ def start(config_options): ps.setup() ps.start_listening(config.worker_listeners) - def run(): - # make sure that we run the reactor with the sentinel log context, - # otherwise other PreserveLoggingContext instances will get confused - # and complain when they see the logcontext arbitrarily swapping - # between the sentinel and `run` logcontexts. - with PreserveLoggingContext(): - logger.info("Running") - change_resource_limit(config.soft_file_limit) - if config.gc_thresholds: - gc.set_threshold(*config.gc_thresholds) - reactor.run() - def start(): ps.get_pusherpool().start() ps.get_datastore().start_profiling() @@ -263,18 +234,7 @@ def start(config_options): reactor.callWhenRunning(start) - if config.worker_daemonize: - daemon = Daemonize( - app="synapse-pusher", - pid=config.worker_pid_file, - action=run, - auto_close_fds=False, - verbose=True, - logger=logger, - ) - daemon.start() - else: - run() + _base.start_worker_reactor("synapse-pusher", config) if __name__ == '__main__': diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 4bdd99a966..26b9ec85f2 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -13,78 +13,74 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import contextlib +import logging +import sys -import synapse +from six import iteritems + +from twisted.internet import defer, reactor +from twisted.web.resource import NoResource +import synapse from synapse.api.constants import EventTypes +from synapse.app import _base from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging from synapse.handlers.presence import PresenceHandler, get_interested_parties -from synapse.http.site import SynapseSite from synapse.http.server import JsonResource -from synapse.metrics.resource import MetricsResource, METRICS_PREFIX -from synapse.rest.client.v2_alpha import sync -from synapse.rest.client.v1 import events -from synapse.rest.client.v1.room import RoomInitialSyncRestServlet -from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet +from synapse.http.site import SynapseSite +from synapse.metrics import RegistryProxy +from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage._base import BaseSlavedStore -from synapse.replication.slave.storage.client_ips import SlavedClientIpStore -from synapse.replication.slave.storage.events import SlavedEventStore -from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.account_data import SlavedAccountDataStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore -from synapse.replication.slave.storage.registration import SlavedRegistrationStore -from synapse.replication.slave.storage.filtering import SlavedFilteringStore -from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore -from synapse.replication.slave.storage.presence import SlavedPresenceStore +from synapse.replication.slave.storage.client_ips import SlavedClientIpStore from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore from synapse.replication.slave.storage.devices import SlavedDeviceStore +from synapse.replication.slave.storage.events import SlavedEventStore +from synapse.replication.slave.storage.filtering import SlavedFilteringStore +from synapse.replication.slave.storage.groups import SlavedGroupServerStore +from synapse.replication.slave.storage.presence import SlavedPresenceStore +from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore +from synapse.replication.slave.storage.receipts import SlavedReceiptsStore +from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.room import RoomStore from synapse.replication.tcp.client import ReplicationClientHandler +from synapse.rest.client.v1 import events +from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet +from synapse.rest.client.v1.room import RoomInitialSyncRestServlet +from synapse.rest.client.v2_alpha import sync from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.storage.presence import UserPresenceState from synapse.storage.roommember import RoomMemberStore from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, run_in_background from synapse.util.manhole import manhole -from synapse.util.rlimit import change_resource_limit from synapse.util.stringutils import random_string from synapse.util.versionstring import get_version_string -from twisted.internet import reactor, defer -from twisted.web.resource import Resource - -from daemonize import Daemonize - -import sys -import logging -import contextlib -import gc - logger = logging.getLogger("synapse.app.synchrotron") class SynchrotronSlavedStore( - SlavedPushRuleStore, - SlavedEventStore, SlavedReceiptsStore, SlavedAccountDataStore, SlavedApplicationServiceStore, SlavedRegistrationStore, SlavedFilteringStore, SlavedPresenceStore, + SlavedGroupServerStore, SlavedDeviceInboxStore, SlavedDeviceStore, + SlavedPushRuleStore, + SlavedEventStore, SlavedClientIpStore, RoomStore, BaseSlavedStore, ): - who_forgot_in_room = ( - RoomMemberStore.__dict__["who_forgot_in_room"] - ) - did_forget = ( RoomMemberStore.__dict__["did_forget"] ) @@ -219,7 +215,7 @@ class SynchrotronPresence(object): def get_currently_syncing_users(self): return [ - user_id for user_id, count in self.user_to_num_current_syncs.iteritems() + user_id for user_id, count in iteritems(self.user_to_num_current_syncs) if count > 0 ] @@ -250,19 +246,6 @@ class SynchrotronApplicationService(object): class SynchrotronServer(HomeServer): - def get_db_conn(self, run_new_connection=True): - # Any param beginning with cp_ is a parameter for adbapi, and should - # not be passed to the database engine. - db_params = { - k: v for k, v in self.db_config.get("args", {}).items() - if not k.startswith("cp_") - } - db_conn = self.database_engine.module.connect(**db_params) - - if run_new_connection: - self.database_engine.on_new_connection(db_conn) - return db_conn - def setup(self): logger.info("Setting up.") self.datastore = SynchrotronSlavedStore(self.get_db_conn(), self) @@ -276,7 +259,7 @@ class SynchrotronServer(HomeServer): for res in listener_config["resources"]: for name in res["names"]: if name == "metrics": - resources[METRICS_PREFIX] = MetricsResource(self) + resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) elif name == "client": resource = JsonResource(self, canonical_json=False) sync.register_servlets(self, resource) @@ -290,19 +273,19 @@ class SynchrotronServer(HomeServer): "/_matrix/client/api/v1": resource, }) - root_resource = create_resource_tree(resources, Resource()) - - for address in bind_addresses: - reactor.listenTCP( - port, - SynapseSite( - "synapse.access.http.%s" % (site_tag,), - site_tag, - listener_config, - root_resource, - ), - interface=address + root_resource = create_resource_tree(resources, NoResource()) + + _base.listen_tcp( + bind_addresses, + port, + SynapseSite( + "synapse.access.http.%s" % (site_tag,), + site_tag, + listener_config, + root_resource, + self.version_string, ) + ) logger.info("Synapse synchrotron now listening on port %d", port) @@ -311,18 +294,22 @@ class SynchrotronServer(HomeServer): if listener["type"] == "http": self._listen_http(listener) elif listener["type"] == "manhole": - bind_addresses = listener["bind_addresses"] - - for address in bind_addresses: - reactor.listenTCP( - listener["port"], - manhole( - username="matrix", - password="rabbithole", - globals={"hs": self}, - ), - interface=address + _base.listen_tcp( + listener["bind_addresses"], + listener["port"], + manhole( + username="matrix", + password="rabbithole", + globals={"hs": self}, ) + ) + elif listener["type"] == "metrics": + if not self.get_config().enable_metrics: + logger.warn(("Metrics listener configured, but " + "enable_metrics is not True!")) + else: + _base.listen_metrics(listener["bind_addresses"], + listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) @@ -344,15 +331,13 @@ class SyncReplicationHandler(ReplicationClientHandler): self.store = hs.get_datastore() self.typing_handler = hs.get_typing_handler() + # NB this is a SynchrotronPresence, not a normal PresenceHandler self.presence_handler = hs.get_presence_handler() self.notifier = hs.get_notifier() - self.presence_handler.sync_callback = self.send_user_sync - def on_rdata(self, stream_name, token, rows): super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows) - - preserve_fn(self.process_and_notify)(stream_name, token, rows) + run_in_background(self.process_and_notify, stream_name, token, rows) def get_streams_to_replicate(self): args = super(SyncReplicationHandler, self).get_streams_to_replicate() @@ -364,51 +349,58 @@ class SyncReplicationHandler(ReplicationClientHandler): @defer.inlineCallbacks def process_and_notify(self, stream_name, token, rows): - if stream_name == "events": - # We shouldn't get multiple rows per token for events stream, so - # we don't need to optimise this for multiple rows. - for row in rows: - event = yield self.store.get_event(row.event_id) - extra_users = () - if event.type == EventTypes.Member: - extra_users = (event.state_key,) - max_token = self.store.get_room_max_stream_ordering() - self.notifier.on_new_room_event( - event, token, max_token, extra_users + try: + if stream_name == "events": + # We shouldn't get multiple rows per token for events stream, so + # we don't need to optimise this for multiple rows. + for row in rows: + event = yield self.store.get_event(row.event_id) + extra_users = () + if event.type == EventTypes.Member: + extra_users = (event.state_key,) + max_token = self.store.get_room_max_stream_ordering() + self.notifier.on_new_room_event( + event, token, max_token, extra_users + ) + elif stream_name == "push_rules": + self.notifier.on_new_event( + "push_rules_key", token, users=[row.user_id for row in rows], ) - elif stream_name == "push_rules": - self.notifier.on_new_event( - "push_rules_key", token, users=[row.user_id for row in rows], - ) - elif stream_name in ("account_data", "tag_account_data",): - self.notifier.on_new_event( - "account_data_key", token, users=[row.user_id for row in rows], - ) - elif stream_name == "receipts": - self.notifier.on_new_event( - "receipt_key", token, rooms=[row.room_id for row in rows], - ) - elif stream_name == "typing": - self.typing_handler.process_replication_rows(token, rows) - self.notifier.on_new_event( - "typing_key", token, rooms=[row.room_id for row in rows], - ) - elif stream_name == "to_device": - entities = [row.entity for row in rows if row.entity.startswith("@")] - if entities: + elif stream_name in ("account_data", "tag_account_data",): self.notifier.on_new_event( - "to_device_key", token, users=entities, + "account_data_key", token, users=[row.user_id for row in rows], ) - elif stream_name == "device_lists": - all_room_ids = set() - for row in rows: - room_ids = yield self.store.get_rooms_for_user(row.user_id) - all_room_ids.update(room_ids) - self.notifier.on_new_event( - "device_list_key", token, rooms=all_room_ids, - ) - elif stream_name == "presence": - yield self.presence_handler.process_replication_rows(token, rows) + elif stream_name == "receipts": + self.notifier.on_new_event( + "receipt_key", token, rooms=[row.room_id for row in rows], + ) + elif stream_name == "typing": + self.typing_handler.process_replication_rows(token, rows) + self.notifier.on_new_event( + "typing_key", token, rooms=[row.room_id for row in rows], + ) + elif stream_name == "to_device": + entities = [row.entity for row in rows if row.entity.startswith("@")] + if entities: + self.notifier.on_new_event( + "to_device_key", token, users=entities, + ) + elif stream_name == "device_lists": + all_room_ids = set() + for row in rows: + room_ids = yield self.store.get_rooms_for_user(row.user_id) + all_room_ids.update(room_ids) + self.notifier.on_new_event( + "device_list_key", token, rooms=all_room_ids, + ) + elif stream_name == "presence": + yield self.presence_handler.process_replication_rows(token, rows) + elif stream_name == "receipts": + self.notifier.on_new_event( + "groups_key", token, users=[row.user_id for row in rows], + ) + except Exception: + logger.exception("Error processing replication") def start(config_options): @@ -440,36 +432,13 @@ def start(config_options): ss.setup() ss.start_listening(config.worker_listeners) - def run(): - # make sure that we run the reactor with the sentinel log context, - # otherwise other PreserveLoggingContext instances will get confused - # and complain when they see the logcontext arbitrarily swapping - # between the sentinel and `run` logcontexts. - with PreserveLoggingContext(): - logger.info("Running") - change_resource_limit(config.soft_file_limit) - if config.gc_thresholds: - gc.set_threshold(*config.gc_thresholds) - reactor.run() - def start(): ss.get_datastore().start_profiling() ss.get_state_handler().start_caching() reactor.callWhenRunning(start) - if config.worker_daemonize: - daemon = Daemonize( - app="synapse-synchrotron", - pid=config.worker_pid_file, - action=run, - auto_close_fds=False, - verbose=True, - logger=logger, - ) - daemon.start() - else: - run() + _base.start_worker_reactor("synapse-synchrotron", config) if __name__ == '__main__': diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py index 3bd7ef7bba..d658f967ba 100755 --- a/synapse/app/synctl.py +++ b/synapse/app/synctl.py @@ -16,16 +16,19 @@ import argparse import collections +import errno import glob import os import os.path import signal import subprocess import sys -import yaml -import errno import time +from six import iteritems + +import yaml + SYNAPSE = [sys.executable, "-B", "-m", "synapse.app.homeserver"] GREEN = "\x1b[1;32m" @@ -38,7 +41,7 @@ def pid_running(pid): try: os.kill(pid, 0) return True - except OSError, err: + except OSError as err: if err.errno == errno.EPERM: return True return False @@ -98,7 +101,7 @@ def stop(pidfile, app): try: os.kill(pid, signal.SIGTERM) write("stopped %s" % (app,), colour=GREEN) - except OSError, err: + except OSError as err: if err.errno == errno.ESRCH: write("%s not running" % (app,), colour=YELLOW) elif err.errno == errno.EPERM: @@ -171,6 +174,10 @@ def main(): if cache_factor: os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor) + cache_factors = config.get("synctl_cache_factors", {}) + for cache_name, factor in iteritems(cache_factors): + os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor) + worker_configfiles = [] if options.worker: start_stop_synapse = False @@ -184,6 +191,9 @@ def main(): worker_configfiles.append(worker_configfile) if options.all_processes: + # To start the main synapse with -a you need to add a worker file + # with worker_app == "synapse.app.homeserver" + start_stop_synapse = False worker_configdir = options.all_processes if not os.path.isdir(worker_configdir): write( @@ -200,11 +210,29 @@ def main(): with open(worker_configfile) as stream: worker_config = yaml.load(stream) worker_app = worker_config["worker_app"] - worker_pidfile = worker_config["worker_pid_file"] - worker_daemonize = worker_config["worker_daemonize"] - assert worker_daemonize, "In config %r: expected '%s' to be True" % ( - worker_configfile, "worker_daemonize") - worker_cache_factor = worker_config.get("synctl_cache_factor") + if worker_app == "synapse.app.homeserver": + # We need to special case all of this to pick up options that may + # be set in the main config file or in this worker config file. + worker_pidfile = ( + worker_config.get("pid_file") + or pidfile + ) + worker_cache_factor = worker_config.get("synctl_cache_factor") or cache_factor + daemonize = worker_config.get("daemonize") or config.get("daemonize") + assert daemonize, "Main process must have daemonize set to true" + + # The master process doesn't support using worker_* config. + for key in worker_config: + if key == "worker_app": # But we allow worker_app + continue + assert not key.startswith("worker_"), \ + "Main process cannot use worker_* config" + else: + worker_pidfile = worker_config["worker_pid_file"] + worker_daemonize = worker_config["worker_daemonize"] + assert worker_daemonize, "In config %r: expected '%s' to be True" % ( + worker_configfile, "worker_daemonize") + worker_cache_factor = worker_config.get("synctl_cache_factor") workers.append(Worker( worker_app, worker_configfile, worker_pidfile, worker_cache_factor, )) @@ -231,6 +259,7 @@ def main(): for running_pid in running_pids: while pid_running(running_pid): time.sleep(0.2) + write("All processes exited; now restarting...") if action == "start" or action == "restart": if start_stop_synapse: diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py index 8c6300db9d..637a89530a 100644 --- a/synapse/app/user_dir.py +++ b/synapse/app/user_dir.py @@ -14,16 +14,23 @@ # See the License for the specific language governing permissions and # limitations under the License. -import synapse +import logging +import sys -from synapse.server import HomeServer +from twisted.internet import defer, reactor +from twisted.web.resource import NoResource + +import synapse +from synapse import events +from synapse.app import _base from synapse.config._base import ConfigError -from synapse.config.logger import setup_logging from synapse.config.homeserver import HomeServerConfig +from synapse.config.logger import setup_logging from synapse.crypto import context_factory -from synapse.http.site import SynapseSite from synapse.http.server import JsonResource -from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.http.site import SynapseSite +from synapse.metrics import RegistryProxy +from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.client_ips import SlavedClientIpStore @@ -31,25 +38,14 @@ from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.rest.client.v2_alpha import user_directory +from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.storage.user_directory import UserDirectoryStore +from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, run_in_background from synapse.util.manhole import manhole -from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string -from synapse.util.caches.stream_change_cache import StreamChangeCache - -from synapse import events - -from twisted.internet import reactor -from twisted.web.resource import Resource - -from daemonize import Daemonize - -import sys -import logging -import gc logger = logging.getLogger("synapse.app.user_dir") @@ -98,19 +94,6 @@ class UserDirectorySlaveStore( class UserDirectoryServer(HomeServer): - def get_db_conn(self, run_new_connection=True): - # Any param beginning with cp_ is a parameter for adbapi, and should - # not be passed to the database engine. - db_params = { - k: v for k, v in self.db_config.get("args", {}).items() - if not k.startswith("cp_") - } - db_conn = self.database_engine.module.connect(**db_params) - - if run_new_connection: - self.database_engine.on_new_connection(db_conn) - return db_conn - def setup(self): logger.info("Setting up.") self.datastore = UserDirectorySlaveStore(self.get_db_conn(), self) @@ -124,7 +107,7 @@ class UserDirectoryServer(HomeServer): for res in listener_config["resources"]: for name in res["names"]: if name == "metrics": - resources[METRICS_PREFIX] = MetricsResource(self) + resources[METRICS_PREFIX] = MetricsResource(RegistryProxy) elif name == "client": resource = JsonResource(self, canonical_json=False) user_directory.register_servlets(self, resource) @@ -135,19 +118,19 @@ class UserDirectoryServer(HomeServer): "/_matrix/client/api/v1": resource, }) - root_resource = create_resource_tree(resources, Resource()) - - for address in bind_addresses: - reactor.listenTCP( - port, - SynapseSite( - "synapse.access.http.%s" % (site_tag,), - site_tag, - listener_config, - root_resource, - ), - interface=address + root_resource = create_resource_tree(resources, NoResource()) + + _base.listen_tcp( + bind_addresses, + port, + SynapseSite( + "synapse.access.http.%s" % (site_tag,), + site_tag, + listener_config, + root_resource, + self.version_string, ) + ) logger.info("Synapse user_dir now listening on port %d", port) @@ -156,18 +139,22 @@ class UserDirectoryServer(HomeServer): if listener["type"] == "http": self._listen_http(listener) elif listener["type"] == "manhole": - bind_addresses = listener["bind_addresses"] - - for address in bind_addresses: - reactor.listenTCP( - listener["port"], - manhole( - username="matrix", - password="rabbithole", - globals={"hs": self}, - ), - interface=address + _base.listen_tcp( + listener["bind_addresses"], + listener["port"], + manhole( + username="matrix", + password="rabbithole", + globals={"hs": self}, ) + ) + elif listener["type"] == "metrics": + if not self.get_config().enable_metrics: + logger.warn(("Metrics listener configured, but " + "enable_metrics is not True!")) + else: + _base.listen_metrics(listener["bind_addresses"], + listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) @@ -187,7 +174,14 @@ class UserDirectoryReplicationHandler(ReplicationClientHandler): stream_name, token, rows ) if stream_name == "current_state_deltas": - preserve_fn(self.user_directory.notify_new_event)() + run_in_background(self._notify_directory) + + @defer.inlineCallbacks + def _notify_directory(self): + try: + yield self.user_directory.notify_new_event() + except Exception: + logger.exception("Error notifiying user directory of state update") def start(config_options): @@ -233,36 +227,13 @@ def start(config_options): ps.setup() ps.start_listening(config.worker_listeners) - def run(): - # make sure that we run the reactor with the sentinel log context, - # otherwise other PreserveLoggingContext instances will get confused - # and complain when they see the logcontext arbitrarily swapping - # between the sentinel and `run` logcontexts. - with PreserveLoggingContext(): - logger.info("Running") - change_resource_limit(config.soft_file_limit) - if config.gc_thresholds: - gc.set_threshold(*config.gc_thresholds) - reactor.run() - def start(): ps.get_datastore().start_profiling() ps.get_state_handler().start_caching() reactor.callWhenRunning(start) - if config.worker_daemonize: - daemon = Daemonize( - app="synapse-user-dir", - pid=config.worker_pid_file, - action=run, - auto_close_fds=False, - verbose=True, - logger=logger, - ) - daemon.start() - else: - run() + _base.start_worker_reactor("synapse-user-dir", config) if __name__ == '__main__': |