diff --git a/synapse/app/__init__.py b/synapse/app/__init__.py
index 1bc4279807..9c2b627590 100644
--- a/synapse/app/__init__.py
+++ b/synapse/app/__init__.py
@@ -16,13 +16,11 @@
import sys
sys.dont_write_bytecode = True
-from synapse.python_dependencies import (
- check_requirements, MissingRequirementError
-) # NOQA
+from synapse import python_dependencies # noqa: E402
try:
- check_requirements()
-except MissingRequirementError as e:
+ python_dependencies.check_requirements()
+except python_dependencies.MissingRequirementError as e:
message = "\n".join([
"Missing Requirement: %s" % (e.message,),
"To install run:",
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
new file mode 100644
index 0000000000..dd9ee406a1
--- /dev/null
+++ b/synapse/app/appservice.py
@@ -0,0 +1,214 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import synapse
+
+from synapse.server import HomeServer
+from synapse.config._base import ConfigError
+from synapse.config.logger import setup_logging
+from synapse.config.homeserver import HomeServerConfig
+from synapse.http.site import SynapseSite
+from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.replication.slave.storage.directory import DirectoryStore
+from synapse.replication.slave.storage.events import SlavedEventStore
+from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
+from synapse.replication.slave.storage.registration import SlavedRegistrationStore
+from synapse.storage.engines import create_engine
+from synapse.util.async import sleep
+from synapse.util.httpresourcetree import create_resource_tree
+from synapse.util.logcontext import LoggingContext
+from synapse.util.manhole import manhole
+from synapse.util.rlimit import change_resource_limit
+from synapse.util.versionstring import get_version_string
+
+from synapse import events
+
+from twisted.internet import reactor, defer
+from twisted.web.resource import Resource
+
+from daemonize import Daemonize
+
+import sys
+import logging
+import gc
+
+logger = logging.getLogger("synapse.app.appservice")
+
+
+class AppserviceSlaveStore(
+ DirectoryStore, SlavedEventStore, SlavedApplicationServiceStore,
+ SlavedRegistrationStore,
+):
+ pass
+
+
+class AppserviceServer(HomeServer):
+ def get_db_conn(self, run_new_connection=True):
+ # Any param beginning with cp_ is a parameter for adbapi, and should
+ # not be passed to the database engine.
+ db_params = {
+ k: v for k, v in self.db_config.get("args", {}).items()
+ if not k.startswith("cp_")
+ }
+ db_conn = self.database_engine.module.connect(**db_params)
+
+ if run_new_connection:
+ self.database_engine.on_new_connection(db_conn)
+ return db_conn
+
+ def setup(self):
+ logger.info("Setting up.")
+ self.datastore = AppserviceSlaveStore(self.get_db_conn(), self)
+ logger.info("Finished setting up.")
+
+ def _listen_http(self, listener_config):
+ port = listener_config["port"]
+ bind_address = listener_config.get("bind_address", "")
+ site_tag = listener_config.get("tag", port)
+ resources = {}
+ for res in listener_config["resources"]:
+ for name in res["names"]:
+ if name == "metrics":
+ resources[METRICS_PREFIX] = MetricsResource(self)
+
+ root_resource = create_resource_tree(resources, Resource())
+ reactor.listenTCP(
+ port,
+ SynapseSite(
+ "synapse.access.http.%s" % (site_tag,),
+ site_tag,
+ listener_config,
+ root_resource,
+ ),
+ interface=bind_address
+ )
+ logger.info("Synapse appservice now listening on port %d", port)
+
+ def start_listening(self, listeners):
+ for listener in listeners:
+ if listener["type"] == "http":
+ self._listen_http(listener)
+ elif listener["type"] == "manhole":
+ reactor.listenTCP(
+ listener["port"],
+ manhole(
+ username="matrix",
+ password="rabbithole",
+ globals={"hs": self},
+ ),
+ interface=listener.get("bind_address", '127.0.0.1')
+ )
+ else:
+ logger.warn("Unrecognized listener type: %s", listener["type"])
+
+ @defer.inlineCallbacks
+ def replicate(self):
+ http_client = self.get_simple_http_client()
+ store = self.get_datastore()
+ replication_url = self.config.worker_replication_url
+ appservice_handler = self.get_application_service_handler()
+
+ @defer.inlineCallbacks
+ def replicate(results):
+ stream = results.get("events")
+ if stream:
+ max_stream_id = stream["position"]
+ yield appservice_handler.notify_interested_services(max_stream_id)
+
+ while True:
+ try:
+ args = store.stream_positions()
+ args["timeout"] = 30000
+ result = yield http_client.get_json(replication_url, args=args)
+ yield store.process_replication(result)
+ replicate(result)
+ except:
+ logger.exception("Error replicating from %r", replication_url)
+ yield sleep(30)
+
+
+def start(config_options):
+ try:
+ config = HomeServerConfig.load_config(
+ "Synapse appservice", config_options
+ )
+ except ConfigError as e:
+ sys.stderr.write("\n" + e.message + "\n")
+ sys.exit(1)
+
+ assert config.worker_app == "synapse.app.appservice"
+
+ setup_logging(config.worker_log_config, config.worker_log_file)
+
+ events.USE_FROZEN_DICTS = config.use_frozen_dicts
+
+ database_engine = create_engine(config.database_config)
+
+ if config.notify_appservices:
+ sys.stderr.write(
+ "\nThe appservices must be disabled in the main synapse process"
+ "\nbefore they can be run in a separate worker."
+ "\nPlease add ``notify_appservices: false`` to the main config"
+ "\n"
+ )
+ sys.exit(1)
+
+ # Force the pushers to start since they will be disabled in the main config
+ config.notify_appservices = True
+
+ ps = AppserviceServer(
+ config.server_name,
+ db_config=config.database_config,
+ config=config,
+ version_string="Synapse/" + get_version_string(synapse),
+ database_engine=database_engine,
+ )
+
+ ps.setup()
+ ps.start_listening(config.worker_listeners)
+
+ def run():
+ with LoggingContext("run"):
+ logger.info("Running")
+ change_resource_limit(config.soft_file_limit)
+ if config.gc_thresholds:
+ gc.set_threshold(*config.gc_thresholds)
+ reactor.run()
+
+ def start():
+ ps.replicate()
+ ps.get_datastore().start_profiling()
+ ps.get_state_handler().start_caching()
+
+ reactor.callWhenRunning(start)
+
+ if config.worker_daemonize:
+ daemon = Daemonize(
+ app="synapse-appservice",
+ pid=config.worker_pid_file,
+ action=run,
+ auto_close_fds=False,
+ verbose=True,
+ logger=logger,
+ )
+ daemon.start()
+ else:
+ run()
+
+
+if __name__ == '__main__':
+ with LoggingContext("main"):
+ start(sys.argv[1:])
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
new file mode 100644
index 0000000000..0086a2977e
--- /dev/null
+++ b/synapse/app/client_reader.py
@@ -0,0 +1,220 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import synapse
+
+from synapse.config._base import ConfigError
+from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
+from synapse.http.site import SynapseSite
+from synapse.http.server import JsonResource
+from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
+from synapse.replication.slave.storage.events import SlavedEventStore
+from synapse.replication.slave.storage.keys import SlavedKeyStore
+from synapse.replication.slave.storage.room import RoomStore
+from synapse.replication.slave.storage.directory import DirectoryStore
+from synapse.replication.slave.storage.registration import SlavedRegistrationStore
+from synapse.rest.client.v1.room import PublicRoomListRestServlet
+from synapse.server import HomeServer
+from synapse.storage.client_ips import ClientIpStore
+from synapse.storage.engines import create_engine
+from synapse.util.async import sleep
+from synapse.util.httpresourcetree import create_resource_tree
+from synapse.util.logcontext import LoggingContext
+from synapse.util.manhole import manhole
+from synapse.util.rlimit import change_resource_limit
+from synapse.util.versionstring import get_version_string
+from synapse.crypto import context_factory
+
+from synapse import events
+
+
+from twisted.internet import reactor, defer
+from twisted.web.resource import Resource
+
+from daemonize import Daemonize
+
+import sys
+import logging
+import gc
+
+logger = logging.getLogger("synapse.app.client_reader")
+
+
+class ClientReaderSlavedStore(
+ SlavedEventStore,
+ SlavedKeyStore,
+ RoomStore,
+ DirectoryStore,
+ SlavedApplicationServiceStore,
+ SlavedRegistrationStore,
+ BaseSlavedStore,
+ ClientIpStore, # After BaseSlavedStore because the constructor is different
+):
+ pass
+
+
+class ClientReaderServer(HomeServer):
+ def get_db_conn(self, run_new_connection=True):
+ # Any param beginning with cp_ is a parameter for adbapi, and should
+ # not be passed to the database engine.
+ db_params = {
+ k: v for k, v in self.db_config.get("args", {}).items()
+ if not k.startswith("cp_")
+ }
+ db_conn = self.database_engine.module.connect(**db_params)
+
+ if run_new_connection:
+ self.database_engine.on_new_connection(db_conn)
+ return db_conn
+
+ def setup(self):
+ logger.info("Setting up.")
+ self.datastore = ClientReaderSlavedStore(self.get_db_conn(), self)
+ logger.info("Finished setting up.")
+
+ def _listen_http(self, listener_config):
+ port = listener_config["port"]
+ bind_address = listener_config.get("bind_address", "")
+ site_tag = listener_config.get("tag", port)
+ resources = {}
+ for res in listener_config["resources"]:
+ for name in res["names"]:
+ if name == "metrics":
+ resources[METRICS_PREFIX] = MetricsResource(self)
+ elif name == "client":
+ resource = JsonResource(self, canonical_json=False)
+ PublicRoomListRestServlet(self).register(resource)
+ resources.update({
+ "/_matrix/client/r0": resource,
+ "/_matrix/client/unstable": resource,
+ "/_matrix/client/v2_alpha": resource,
+ "/_matrix/client/api/v1": resource,
+ })
+
+ root_resource = create_resource_tree(resources, Resource())
+ reactor.listenTCP(
+ port,
+ SynapseSite(
+ "synapse.access.http.%s" % (site_tag,),
+ site_tag,
+ listener_config,
+ root_resource,
+ ),
+ interface=bind_address
+ )
+ logger.info("Synapse client reader now listening on port %d", port)
+
+ def start_listening(self, listeners):
+ for listener in listeners:
+ if listener["type"] == "http":
+ self._listen_http(listener)
+ elif listener["type"] == "manhole":
+ reactor.listenTCP(
+ listener["port"],
+ manhole(
+ username="matrix",
+ password="rabbithole",
+ globals={"hs": self},
+ ),
+ interface=listener.get("bind_address", '127.0.0.1')
+ )
+ else:
+ logger.warn("Unrecognized listener type: %s", listener["type"])
+
+ @defer.inlineCallbacks
+ def replicate(self):
+ http_client = self.get_simple_http_client()
+ store = self.get_datastore()
+ replication_url = self.config.worker_replication_url
+
+ while True:
+ try:
+ args = store.stream_positions()
+ args["timeout"] = 30000
+ result = yield http_client.get_json(replication_url, args=args)
+ yield store.process_replication(result)
+ except:
+ logger.exception("Error replicating from %r", replication_url)
+ yield sleep(5)
+
+
+def start(config_options):
+ try:
+ config = HomeServerConfig.load_config(
+ "Synapse client reader", config_options
+ )
+ except ConfigError as e:
+ sys.stderr.write("\n" + e.message + "\n")
+ sys.exit(1)
+
+ assert config.worker_app == "synapse.app.client_reader"
+
+ setup_logging(config.worker_log_config, config.worker_log_file)
+
+ events.USE_FROZEN_DICTS = config.use_frozen_dicts
+
+ database_engine = create_engine(config.database_config)
+
+ tls_server_context_factory = context_factory.ServerContextFactory(config)
+
+ ss = ClientReaderServer(
+ config.server_name,
+ db_config=config.database_config,
+ tls_server_context_factory=tls_server_context_factory,
+ config=config,
+ version_string="Synapse/" + get_version_string(synapse),
+ database_engine=database_engine,
+ )
+
+ ss.setup()
+ ss.get_handlers()
+ ss.start_listening(config.worker_listeners)
+
+ def run():
+ with LoggingContext("run"):
+ logger.info("Running")
+ change_resource_limit(config.soft_file_limit)
+ if config.gc_thresholds:
+ gc.set_threshold(*config.gc_thresholds)
+ reactor.run()
+
+ def start():
+ ss.get_state_handler().start_caching()
+ ss.get_datastore().start_profiling()
+ ss.replicate()
+
+ reactor.callWhenRunning(start)
+
+ if config.worker_daemonize:
+ daemon = Daemonize(
+ app="synapse-client-reader",
+ pid=config.worker_pid_file,
+ action=run,
+ auto_close_fds=False,
+ verbose=True,
+ logger=logger,
+ )
+ daemon.start()
+ else:
+ run()
+
+
+if __name__ == '__main__':
+ with LoggingContext("main"):
+ start(sys.argv[1:])
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
new file mode 100644
index 0000000000..b5f59a9931
--- /dev/null
+++ b/synapse/app/federation_reader.py
@@ -0,0 +1,211 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import synapse
+
+from synapse.config._base import ConfigError
+from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
+from synapse.http.site import SynapseSite
+from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage.events import SlavedEventStore
+from synapse.replication.slave.storage.keys import SlavedKeyStore
+from synapse.replication.slave.storage.room import RoomStore
+from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.replication.slave.storage.directory import DirectoryStore
+from synapse.server import HomeServer
+from synapse.storage.engines import create_engine
+from synapse.util.async import sleep
+from synapse.util.httpresourcetree import create_resource_tree
+from synapse.util.logcontext import LoggingContext
+from synapse.util.manhole import manhole
+from synapse.util.rlimit import change_resource_limit
+from synapse.util.versionstring import get_version_string
+from synapse.api.urls import FEDERATION_PREFIX
+from synapse.federation.transport.server import TransportLayerServer
+from synapse.crypto import context_factory
+
+from synapse import events
+
+
+from twisted.internet import reactor, defer
+from twisted.web.resource import Resource
+
+from daemonize import Daemonize
+
+import sys
+import logging
+import gc
+
+logger = logging.getLogger("synapse.app.federation_reader")
+
+
+class FederationReaderSlavedStore(
+ SlavedEventStore,
+ SlavedKeyStore,
+ RoomStore,
+ DirectoryStore,
+ TransactionStore,
+ BaseSlavedStore,
+):
+ pass
+
+
+class FederationReaderServer(HomeServer):
+ def get_db_conn(self, run_new_connection=True):
+ # Any param beginning with cp_ is a parameter for adbapi, and should
+ # not be passed to the database engine.
+ db_params = {
+ k: v for k, v in self.db_config.get("args", {}).items()
+ if not k.startswith("cp_")
+ }
+ db_conn = self.database_engine.module.connect(**db_params)
+
+ if run_new_connection:
+ self.database_engine.on_new_connection(db_conn)
+ return db_conn
+
+ def setup(self):
+ logger.info("Setting up.")
+ self.datastore = FederationReaderSlavedStore(self.get_db_conn(), self)
+ logger.info("Finished setting up.")
+
+ def _listen_http(self, listener_config):
+ port = listener_config["port"]
+ bind_address = listener_config.get("bind_address", "")
+ site_tag = listener_config.get("tag", port)
+ resources = {}
+ for res in listener_config["resources"]:
+ for name in res["names"]:
+ if name == "metrics":
+ resources[METRICS_PREFIX] = MetricsResource(self)
+ elif name == "federation":
+ resources.update({
+ FEDERATION_PREFIX: TransportLayerServer(self),
+ })
+
+ root_resource = create_resource_tree(resources, Resource())
+ reactor.listenTCP(
+ port,
+ SynapseSite(
+ "synapse.access.http.%s" % (site_tag,),
+ site_tag,
+ listener_config,
+ root_resource,
+ ),
+ interface=bind_address
+ )
+ logger.info("Synapse federation reader now listening on port %d", port)
+
+ def start_listening(self, listeners):
+ for listener in listeners:
+ if listener["type"] == "http":
+ self._listen_http(listener)
+ elif listener["type"] == "manhole":
+ reactor.listenTCP(
+ listener["port"],
+ manhole(
+ username="matrix",
+ password="rabbithole",
+ globals={"hs": self},
+ ),
+ interface=listener.get("bind_address", '127.0.0.1')
+ )
+ else:
+ logger.warn("Unrecognized listener type: %s", listener["type"])
+
+ @defer.inlineCallbacks
+ def replicate(self):
+ http_client = self.get_simple_http_client()
+ store = self.get_datastore()
+ replication_url = self.config.worker_replication_url
+
+ while True:
+ try:
+ args = store.stream_positions()
+ args["timeout"] = 30000
+ result = yield http_client.get_json(replication_url, args=args)
+ yield store.process_replication(result)
+ except:
+ logger.exception("Error replicating from %r", replication_url)
+ yield sleep(5)
+
+
+def start(config_options):
+ try:
+ config = HomeServerConfig.load_config(
+ "Synapse federation reader", config_options
+ )
+ except ConfigError as e:
+ sys.stderr.write("\n" + e.message + "\n")
+ sys.exit(1)
+
+ assert config.worker_app == "synapse.app.federation_reader"
+
+ setup_logging(config.worker_log_config, config.worker_log_file)
+
+ events.USE_FROZEN_DICTS = config.use_frozen_dicts
+
+ database_engine = create_engine(config.database_config)
+
+ tls_server_context_factory = context_factory.ServerContextFactory(config)
+
+ ss = FederationReaderServer(
+ config.server_name,
+ db_config=config.database_config,
+ tls_server_context_factory=tls_server_context_factory,
+ config=config,
+ version_string="Synapse/" + get_version_string(synapse),
+ database_engine=database_engine,
+ )
+
+ ss.setup()
+ ss.get_handlers()
+ ss.start_listening(config.worker_listeners)
+
+ def run():
+ with LoggingContext("run"):
+ logger.info("Running")
+ change_resource_limit(config.soft_file_limit)
+ if config.gc_thresholds:
+ gc.set_threshold(*config.gc_thresholds)
+ reactor.run()
+
+ def start():
+ ss.get_state_handler().start_caching()
+ ss.get_datastore().start_profiling()
+ ss.replicate()
+
+ reactor.callWhenRunning(start)
+
+ if config.worker_daemonize:
+ daemon = Daemonize(
+ app="synapse-federation-reader",
+ pid=config.worker_pid_file,
+ action=run,
+ auto_close_fds=False,
+ verbose=True,
+ logger=logger,
+ )
+ daemon.start()
+ else:
+ run()
+
+
+if __name__ == '__main__':
+ with LoggingContext("main"):
+ start(sys.argv[1:])
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
new file mode 100644
index 0000000000..80ea4c8062
--- /dev/null
+++ b/synapse/app/federation_sender.py
@@ -0,0 +1,331 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import synapse
+
+from synapse.server import HomeServer
+from synapse.config._base import ConfigError
+from synapse.config.logger import setup_logging
+from synapse.config.homeserver import HomeServerConfig
+from synapse.crypto import context_factory
+from synapse.http.site import SynapseSite
+from synapse.federation import send_queue
+from synapse.federation.units import Edu
+from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
+from synapse.replication.slave.storage.events import SlavedEventStore
+from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
+from synapse.replication.slave.storage.registration import SlavedRegistrationStore
+from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.storage.engines import create_engine
+from synapse.storage.presence import UserPresenceState
+from synapse.util.async import sleep
+from synapse.util.httpresourcetree import create_resource_tree
+from synapse.util.logcontext import LoggingContext
+from synapse.util.manhole import manhole
+from synapse.util.rlimit import change_resource_limit
+from synapse.util.versionstring import get_version_string
+
+from synapse import events
+
+from twisted.internet import reactor, defer
+from twisted.web.resource import Resource
+
+from daemonize import Daemonize
+
+import sys
+import logging
+import gc
+import ujson as json
+
+logger = logging.getLogger("synapse.app.appservice")
+
+
+class FederationSenderSlaveStore(
+ SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore,
+ SlavedRegistrationStore,
+):
+ pass
+
+
+class FederationSenderServer(HomeServer):
+ def get_db_conn(self, run_new_connection=True):
+ # Any param beginning with cp_ is a parameter for adbapi, and should
+ # not be passed to the database engine.
+ db_params = {
+ k: v for k, v in self.db_config.get("args", {}).items()
+ if not k.startswith("cp_")
+ }
+ db_conn = self.database_engine.module.connect(**db_params)
+
+ if run_new_connection:
+ self.database_engine.on_new_connection(db_conn)
+ return db_conn
+
+ def setup(self):
+ logger.info("Setting up.")
+ self.datastore = FederationSenderSlaveStore(self.get_db_conn(), self)
+ logger.info("Finished setting up.")
+
+ def _listen_http(self, listener_config):
+ port = listener_config["port"]
+ bind_address = listener_config.get("bind_address", "")
+ site_tag = listener_config.get("tag", port)
+ resources = {}
+ for res in listener_config["resources"]:
+ for name in res["names"]:
+ if name == "metrics":
+ resources[METRICS_PREFIX] = MetricsResource(self)
+
+ root_resource = create_resource_tree(resources, Resource())
+ reactor.listenTCP(
+ port,
+ SynapseSite(
+ "synapse.access.http.%s" % (site_tag,),
+ site_tag,
+ listener_config,
+ root_resource,
+ ),
+ interface=bind_address
+ )
+ logger.info("Synapse federation_sender now listening on port %d", port)
+
+ def start_listening(self, listeners):
+ for listener in listeners:
+ if listener["type"] == "http":
+ self._listen_http(listener)
+ elif listener["type"] == "manhole":
+ reactor.listenTCP(
+ listener["port"],
+ manhole(
+ username="matrix",
+ password="rabbithole",
+ globals={"hs": self},
+ ),
+ interface=listener.get("bind_address", '127.0.0.1')
+ )
+ else:
+ logger.warn("Unrecognized listener type: %s", listener["type"])
+
+ @defer.inlineCallbacks
+ def replicate(self):
+ http_client = self.get_simple_http_client()
+ store = self.get_datastore()
+ replication_url = self.config.worker_replication_url
+ send_handler = FederationSenderHandler(self)
+
+ send_handler.on_start()
+
+ while True:
+ try:
+ args = store.stream_positions()
+ args.update((yield send_handler.stream_positions()))
+ args["timeout"] = 30000
+ result = yield http_client.get_json(replication_url, args=args)
+ yield store.process_replication(result)
+ yield send_handler.process_replication(result)
+ except:
+ logger.exception("Error replicating from %r", replication_url)
+ yield sleep(30)
+
+
+def start(config_options):
+ try:
+ config = HomeServerConfig.load_config(
+ "Synapse federation sender", config_options
+ )
+ except ConfigError as e:
+ sys.stderr.write("\n" + e.message + "\n")
+ sys.exit(1)
+
+ assert config.worker_app == "synapse.app.federation_sender"
+
+ setup_logging(config.worker_log_config, config.worker_log_file)
+
+ events.USE_FROZEN_DICTS = config.use_frozen_dicts
+
+ database_engine = create_engine(config.database_config)
+
+ if config.send_federation:
+ sys.stderr.write(
+ "\nThe send_federation must be disabled in the main synapse process"
+ "\nbefore they can be run in a separate worker."
+ "\nPlease add ``send_federation: false`` to the main config"
+ "\n"
+ )
+ sys.exit(1)
+
+ # Force the pushers to start since they will be disabled in the main config
+ config.send_federation = True
+
+ tls_server_context_factory = context_factory.ServerContextFactory(config)
+
+ ps = FederationSenderServer(
+ config.server_name,
+ db_config=config.database_config,
+ tls_server_context_factory=tls_server_context_factory,
+ config=config,
+ version_string="Synapse/" + get_version_string(synapse),
+ database_engine=database_engine,
+ )
+
+ ps.setup()
+ ps.start_listening(config.worker_listeners)
+
+ def run():
+ with LoggingContext("run"):
+ logger.info("Running")
+ change_resource_limit(config.soft_file_limit)
+ if config.gc_thresholds:
+ gc.set_threshold(*config.gc_thresholds)
+ reactor.run()
+
+ def start():
+ ps.replicate()
+ ps.get_datastore().start_profiling()
+ ps.get_state_handler().start_caching()
+
+ reactor.callWhenRunning(start)
+
+ if config.worker_daemonize:
+ daemon = Daemonize(
+ app="synapse-federation-sender",
+ pid=config.worker_pid_file,
+ action=run,
+ auto_close_fds=False,
+ verbose=True,
+ logger=logger,
+ )
+ daemon.start()
+ else:
+ run()
+
+
+class FederationSenderHandler(object):
+ """Processes the replication stream and forwards the appropriate entries
+ to the federation sender.
+ """
+ def __init__(self, hs):
+ self.store = hs.get_datastore()
+ self.federation_sender = hs.get_federation_sender()
+
+ self._room_serials = {}
+ self._room_typing = {}
+
+ def on_start(self):
+ # There may be some events that are persisted but haven't been sent,
+ # so send them now.
+ self.federation_sender.notify_new_events(
+ self.store.get_room_max_stream_ordering()
+ )
+
+ @defer.inlineCallbacks
+ def stream_positions(self):
+ stream_id = yield self.store.get_federation_out_pos("federation")
+ defer.returnValue({
+ "federation": stream_id,
+
+ # Ack stuff we've "processed", this should only be called from
+ # one process.
+ "federation_ack": stream_id,
+ })
+
+ @defer.inlineCallbacks
+ def process_replication(self, result):
+ # The federation stream contains things that we want to send out, e.g.
+ # presence, typing, etc.
+ fed_stream = result.get("federation")
+ if fed_stream:
+ latest_id = int(fed_stream["position"])
+
+ # The federation stream containis a bunch of different types of
+ # rows that need to be handled differently. We parse the rows, put
+ # them into the appropriate collection and then send them off.
+ presence_to_send = {}
+ keyed_edus = {}
+ edus = {}
+ failures = {}
+ device_destinations = set()
+
+ # Parse the rows in the stream
+ for row in fed_stream["rows"]:
+ position, typ, content_js = row
+ content = json.loads(content_js)
+
+ if typ == send_queue.PRESENCE_TYPE:
+ destination = content["destination"]
+ state = UserPresenceState.from_dict(content["state"])
+
+ presence_to_send.setdefault(destination, []).append(state)
+ elif typ == send_queue.KEYED_EDU_TYPE:
+ key = content["key"]
+ edu = Edu(**content["edu"])
+
+ keyed_edus.setdefault(
+ edu.destination, {}
+ )[(edu.destination, tuple(key))] = edu
+ elif typ == send_queue.EDU_TYPE:
+ edu = Edu(**content)
+
+ edus.setdefault(edu.destination, []).append(edu)
+ elif typ == send_queue.FAILURE_TYPE:
+ destination = content["destination"]
+ failure = content["failure"]
+
+ failures.setdefault(destination, []).append(failure)
+ elif typ == send_queue.DEVICE_MESSAGE_TYPE:
+ device_destinations.add(content["destination"])
+ else:
+ raise Exception("Unrecognised federation type: %r", typ)
+
+ # We've finished collecting, send everything off
+ for destination, states in presence_to_send.items():
+ self.federation_sender.send_presence(destination, states)
+
+ for destination, edu_map in keyed_edus.items():
+ for key, edu in edu_map.items():
+ self.federation_sender.send_edu(
+ edu.destination, edu.edu_type, edu.content, key=key,
+ )
+
+ for destination, edu_list in edus.items():
+ for edu in edu_list:
+ self.federation_sender.send_edu(
+ edu.destination, edu.edu_type, edu.content, key=None,
+ )
+
+ for destination, failure_list in failures.items():
+ for failure in failure_list:
+ self.federation_sender.send_failure(destination, failure)
+
+ for destination in device_destinations:
+ self.federation_sender.send_device_messages(destination)
+
+ # Record where we are in the stream.
+ yield self.store.update_federation_out_pos(
+ "federation", latest_id
+ )
+
+ # We also need to poke the federation sender when new events happen
+ event_stream = result.get("events")
+ if event_stream:
+ latest_pos = event_stream["position"]
+ self.federation_sender.notify_new_events(latest_pos)
+
+
+if __name__ == '__main__':
+ with LoggingContext("main"):
+ start(sys.argv[1:])
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index fcdc8e6e10..54f35900f8 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -16,14 +16,10 @@
import synapse
-import contextlib
+import gc
import logging
import os
-import re
-import resource
-import subprocess
import sys
-import time
from synapse.config._base import ConfigError
from synapse.python_dependencies import (
@@ -33,22 +29,15 @@ from synapse.python_dependencies import (
from synapse.rest import ClientRestResource
from synapse.storage.engines import create_engine, IncorrectDatabaseSetup
from synapse.storage import are_all_users_on_domain
-from synapse.storage.prepare_database import UpgradeDatabaseException
+from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database
from synapse.server import HomeServer
-
-from twisted.conch.manhole import ColoredManhole
-from twisted.conch.insults import insults
-from twisted.conch import manhole_ssh
-from twisted.cred import checkers, portal
-
-
from twisted.internet import reactor, task, defer
from twisted.application import service
from twisted.web.resource import Resource, EncodingResourceWrapper
from twisted.web.static import File
-from twisted.web.server import Site, GzipEncoderFactory, Request
+from twisted.web.server import GzipEncoderFactory
from synapse.http.server import RootRedirect
from synapse.rest.media.v0.content_repository import ContentRepoResource
from synapse.rest.media.v1.media_repository import MediaRepositoryResource
@@ -62,10 +51,18 @@ from synapse.api.urls import (
from synapse.config.homeserver import HomeServerConfig
from synapse.crypto import context_factory
from synapse.util.logcontext import LoggingContext
+from synapse.metrics import register_memory_metrics, get_metrics_for
from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
from synapse.replication.resource import ReplicationResource, REPLICATION_PREFIX
from synapse.federation.transport.server import TransportLayerServer
+from synapse.util.rlimit import change_resource_limit
+from synapse.util.versionstring import get_version_string
+from synapse.util.httpresourcetree import create_resource_tree
+from synapse.util.manhole import manhole
+
+from synapse.http.site import SynapseSite
+
from synapse import events
from daemonize import Daemonize
@@ -73,9 +70,6 @@ from daemonize import Daemonize
logger = logging.getLogger("synapse.app.homeserver")
-ACCESS_TOKEN_RE = re.compile(r'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$')
-
-
def gz_wrap(r):
return EncodingResourceWrapper(r, [GzipEncoderFactory()])
@@ -154,7 +148,7 @@ class SynapseHomeServer(HomeServer):
MEDIA_PREFIX: media_repo,
LEGACY_MEDIA_PREFIX: media_repo,
CONTENT_REPO_PREFIX: ContentRepoResource(
- self, self.config.uploads_path, self.auth, self.content_addr
+ self, self.config.uploads_path
),
})
@@ -173,7 +167,12 @@ class SynapseHomeServer(HomeServer):
if name == "replication":
resources[REPLICATION_PREFIX] = ReplicationResource(self)
- root_resource = create_resource_tree(resources)
+ if WEB_CLIENT_PREFIX in resources:
+ root_resource = RootRedirect(WEB_CLIENT_PREFIX)
+ else:
+ root_resource = Resource()
+
+ root_resource = create_resource_tree(resources, root_resource)
if tls:
reactor.listenSSL(
port,
@@ -206,24 +205,13 @@ class SynapseHomeServer(HomeServer):
if listener["type"] == "http":
self._listener_http(config, listener)
elif listener["type"] == "manhole":
- checker = checkers.InMemoryUsernamePasswordDatabaseDontUse(
- matrix="rabbithole"
- )
-
- rlm = manhole_ssh.TerminalRealm()
- rlm.chainedProtocolFactory = lambda: insults.ServerProtocol(
- ColoredManhole,
- {
- "__name__": "__console__",
- "hs": self,
- }
- )
-
- f = manhole_ssh.ConchFactory(portal.Portal(rlm, [checker]))
-
reactor.listenTCP(
listener["port"],
- f,
+ manhole(
+ username="matrix",
+ password="rabbithole",
+ globals={"hs": self},
+ ),
interface=listener.get("bind_address", '127.0.0.1')
)
else:
@@ -245,7 +233,7 @@ class SynapseHomeServer(HomeServer):
except IncorrectDatabaseSetup as e:
quit_with_error(e.message)
- def get_db_conn(self):
+ def get_db_conn(self, run_new_connection=True):
# Any param beginning with cp_ is a parameter for adbapi, and should
# not be passed to the database engine.
db_params = {
@@ -254,7 +242,8 @@ class SynapseHomeServer(HomeServer):
}
db_conn = self.database_engine.module.connect(**db_params)
- self.database_engine.on_new_connection(db_conn)
+ if run_new_connection:
+ self.database_engine.on_new_connection(db_conn)
return db_conn
@@ -268,86 +257,6 @@ def quit_with_error(error_string):
sys.exit(1)
-def get_version_string():
- try:
- null = open(os.devnull, 'w')
- cwd = os.path.dirname(os.path.abspath(__file__))
- try:
- git_branch = subprocess.check_output(
- ['git', 'rev-parse', '--abbrev-ref', 'HEAD'],
- stderr=null,
- cwd=cwd,
- ).strip()
- git_branch = "b=" + git_branch
- except subprocess.CalledProcessError:
- git_branch = ""
-
- try:
- git_tag = subprocess.check_output(
- ['git', 'describe', '--exact-match'],
- stderr=null,
- cwd=cwd,
- ).strip()
- git_tag = "t=" + git_tag
- except subprocess.CalledProcessError:
- git_tag = ""
-
- try:
- git_commit = subprocess.check_output(
- ['git', 'rev-parse', '--short', 'HEAD'],
- stderr=null,
- cwd=cwd,
- ).strip()
- except subprocess.CalledProcessError:
- git_commit = ""
-
- try:
- dirty_string = "-this_is_a_dirty_checkout"
- is_dirty = subprocess.check_output(
- ['git', 'describe', '--dirty=' + dirty_string],
- stderr=null,
- cwd=cwd,
- ).strip().endswith(dirty_string)
-
- git_dirty = "dirty" if is_dirty else ""
- except subprocess.CalledProcessError:
- git_dirty = ""
-
- if git_branch or git_tag or git_commit or git_dirty:
- git_version = ",".join(
- s for s in
- (git_branch, git_tag, git_commit, git_dirty,)
- if s
- )
-
- return (
- "Synapse/%s (%s)" % (
- synapse.__version__, git_version,
- )
- ).encode("ascii")
- except Exception as e:
- logger.info("Failed to check for git repository: %s", e)
-
- return ("Synapse/%s" % (synapse.__version__,)).encode("ascii")
-
-
-def change_resource_limit(soft_file_no):
- try:
- soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
-
- if not soft_file_no:
- soft_file_no = hard
-
- resource.setrlimit(resource.RLIMIT_NOFILE, (soft_file_no, hard))
- logger.info("Set file limit to: %d", soft_file_no)
-
- resource.setrlimit(
- resource.RLIMIT_CORE, (resource.RLIM_INFINITY, resource.RLIM_INFINITY)
- )
- except (ValueError, resource.error) as e:
- logger.warn("Failed to set file or core limit: %s", e)
-
-
def setup(config_options):
"""
Args:
@@ -358,10 +267,9 @@ def setup(config_options):
HomeServer
"""
try:
- config = HomeServerConfig.load_config(
+ config = HomeServerConfig.load_or_generate_config(
"Synapse Homeserver",
config_options,
- generate_section="Homeserver"
)
except ConfigError as e:
sys.stderr.write("\n" + e.message + "\n")
@@ -377,7 +285,7 @@ def setup(config_options):
# check any extra requirements we have now we have a config
check_requirements(config)
- version_string = get_version_string()
+ version_string = "Synapse/" + get_version_string(synapse)
logger.info("Server hostname: %s", config.server_name)
logger.info("Server version: %s", version_string)
@@ -386,7 +294,7 @@ def setup(config_options):
tls_server_context_factory = context_factory.ServerContextFactory(config)
- database_engine = create_engine(config)
+ database_engine = create_engine(config.database_config)
config.database_config["args"]["cp_openfun"] = database_engine.on_new_connection
hs = SynapseHomeServer(
@@ -394,7 +302,6 @@ def setup(config_options):
db_config=config.database_config,
tls_server_context_factory=tls_server_context_factory,
config=config,
- content_addr=config.content_addr,
version_string=version_string,
database_engine=database_engine,
)
@@ -402,8 +309,10 @@ def setup(config_options):
logger.info("Preparing database: %s...", config.database_config['name'])
try:
- db_conn = hs.get_db_conn()
- database_engine.prepare_database(db_conn)
+ db_conn = hs.get_db_conn(run_new_connection=False)
+ prepare_database(db_conn, database_engine, config=config)
+ database_engine.on_new_connection(db_conn)
+
hs.run_startup_checks(db_conn, database_engine)
db_conn.commit()
@@ -427,6 +336,8 @@ def setup(config_options):
hs.get_datastore().start_doing_background_updates()
hs.get_replication_layer().start_get_pdu_cache()
+ register_memory_metrics(hs)
+
reactor.callWhenRunning(start)
return hs
@@ -442,215 +353,13 @@ class SynapseService(service.Service):
def startService(self):
hs = setup(self.config)
change_resource_limit(hs.config.soft_file_limit)
+ if hs.config.gc_thresholds:
+ gc.set_threshold(*hs.config.gc_thresholds)
def stopService(self):
return self._port.stopListening()
-class SynapseRequest(Request):
- def __init__(self, site, *args, **kw):
- Request.__init__(self, *args, **kw)
- self.site = site
- self.authenticated_entity = None
- self.start_time = 0
-
- def __repr__(self):
- # We overwrite this so that we don't log ``access_token``
- return '<%s at 0x%x method=%s uri=%s clientproto=%s site=%s>' % (
- self.__class__.__name__,
- id(self),
- self.method,
- self.get_redacted_uri(),
- self.clientproto,
- self.site.site_tag,
- )
-
- def get_redacted_uri(self):
- return ACCESS_TOKEN_RE.sub(
- r'\1<redacted>\3',
- self.uri
- )
-
- def get_user_agent(self):
- return self.requestHeaders.getRawHeaders("User-Agent", [None])[-1]
-
- def started_processing(self):
- self.site.access_logger.info(
- "%s - %s - Received request: %s %s",
- self.getClientIP(),
- self.site.site_tag,
- self.method,
- self.get_redacted_uri()
- )
- self.start_time = int(time.time() * 1000)
-
- def finished_processing(self):
-
- try:
- context = LoggingContext.current_context()
- ru_utime, ru_stime = context.get_resource_usage()
- db_txn_count = context.db_txn_count
- db_txn_duration = context.db_txn_duration
- except:
- ru_utime, ru_stime = (0, 0)
- db_txn_count, db_txn_duration = (0, 0)
-
- self.site.access_logger.info(
- "%s - %s - {%s}"
- " Processed request: %dms (%dms, %dms) (%dms/%d)"
- " %sB %s \"%s %s %s\" \"%s\"",
- self.getClientIP(),
- self.site.site_tag,
- self.authenticated_entity,
- int(time.time() * 1000) - self.start_time,
- int(ru_utime * 1000),
- int(ru_stime * 1000),
- int(db_txn_duration * 1000),
- int(db_txn_count),
- self.sentLength,
- self.code,
- self.method,
- self.get_redacted_uri(),
- self.clientproto,
- self.get_user_agent(),
- )
-
- @contextlib.contextmanager
- def processing(self):
- self.started_processing()
- yield
- self.finished_processing()
-
-
-class XForwardedForRequest(SynapseRequest):
- def __init__(self, *args, **kw):
- SynapseRequest.__init__(self, *args, **kw)
-
- """
- Add a layer on top of another request that only uses the value of an
- X-Forwarded-For header as the result of C{getClientIP}.
- """
- def getClientIP(self):
- """
- @return: The client address (the first address) in the value of the
- I{X-Forwarded-For header}. If the header is not present, return
- C{b"-"}.
- """
- return self.requestHeaders.getRawHeaders(
- b"x-forwarded-for", [b"-"])[0].split(b",")[0].strip()
-
-
-class SynapseRequestFactory(object):
- def __init__(self, site, x_forwarded_for):
- self.site = site
- self.x_forwarded_for = x_forwarded_for
-
- def __call__(self, *args, **kwargs):
- if self.x_forwarded_for:
- return XForwardedForRequest(self.site, *args, **kwargs)
- else:
- return SynapseRequest(self.site, *args, **kwargs)
-
-
-class SynapseSite(Site):
- """
- Subclass of a twisted http Site that does access logging with python's
- standard logging
- """
- def __init__(self, logger_name, site_tag, config, resource, *args, **kwargs):
- Site.__init__(self, resource, *args, **kwargs)
-
- self.site_tag = site_tag
-
- proxied = config.get("x_forwarded", False)
- self.requestFactory = SynapseRequestFactory(self, proxied)
- self.access_logger = logging.getLogger(logger_name)
-
- def log(self, request):
- pass
-
-
-def create_resource_tree(desired_tree, redirect_root_to_web_client=True):
- """Create the resource tree for this Home Server.
-
- This in unduly complicated because Twisted does not support putting
- child resources more than 1 level deep at a time.
-
- Args:
- web_client (bool): True to enable the web client.
- redirect_root_to_web_client (bool): True to redirect '/' to the
- location of the web client. This does nothing if web_client is not
- True.
- """
- if redirect_root_to_web_client and WEB_CLIENT_PREFIX in desired_tree:
- root_resource = RootRedirect(WEB_CLIENT_PREFIX)
- else:
- root_resource = Resource()
-
- # ideally we'd just use getChild and putChild but getChild doesn't work
- # unless you give it a Request object IN ADDITION to the name :/ So
- # instead, we'll store a copy of this mapping so we can actually add
- # extra resources to existing nodes. See self._resource_id for the key.
- resource_mappings = {}
- for full_path, res in desired_tree.items():
- logger.info("Attaching %s to path %s", res, full_path)
- last_resource = root_resource
- for path_seg in full_path.split('/')[1:-1]:
- if path_seg not in last_resource.listNames():
- # resource doesn't exist, so make a "dummy resource"
- child_resource = Resource()
- last_resource.putChild(path_seg, child_resource)
- res_id = _resource_id(last_resource, path_seg)
- resource_mappings[res_id] = child_resource
- last_resource = child_resource
- else:
- # we have an existing Resource, use that instead.
- res_id = _resource_id(last_resource, path_seg)
- last_resource = resource_mappings[res_id]
-
- # ===========================
- # now attach the actual desired resource
- last_path_seg = full_path.split('/')[-1]
-
- # if there is already a resource here, thieve its children and
- # replace it
- res_id = _resource_id(last_resource, last_path_seg)
- if res_id in resource_mappings:
- # there is a dummy resource at this path already, which needs
- # to be replaced with the desired resource.
- existing_dummy_resource = resource_mappings[res_id]
- for child_name in existing_dummy_resource.listNames():
- child_res_id = _resource_id(
- existing_dummy_resource, child_name
- )
- child_resource = resource_mappings[child_res_id]
- # steal the children
- res.putChild(child_name, child_resource)
-
- # finally, insert the desired resource in the right place
- last_resource.putChild(last_path_seg, res)
- res_id = _resource_id(last_resource, last_path_seg)
- resource_mappings[res_id] = res
-
- return root_resource
-
-
-def _resource_id(resource, path_seg):
- """Construct an arbitrary resource ID so you can retrieve the mapping
- later.
-
- If you want to represent resource A putChild resource B with path C,
- the mapping should looks like _resource_id(A,C) = B.
-
- Args:
- resource (Resource): The *parent* Resourceb
- path_seg (str): The name of the child Resource to be attached.
- Returns:
- str: A unique string which can be a key to the child Resource.
- """
- return "%s-%s" % (resource, path_seg)
-
-
def run(hs):
PROFILE_SYNAPSE = False
if PROFILE_SYNAPSE:
@@ -676,6 +385,8 @@ def run(hs):
start_time = hs.get_clock().time()
+ stats = {}
+
@defer.inlineCallbacks
def phone_stats_home():
logger.info("Gathering stats for reporting")
@@ -684,7 +395,10 @@ def run(hs):
if uptime < 0:
uptime = 0
- stats = {}
+ # If the stats directory is empty then this is the first time we've
+ # reported stats.
+ first_time = not stats
+
stats["homeserver"] = hs.config.server_name
stats["timestamp"] = now
stats["uptime_seconds"] = uptime
@@ -697,6 +411,25 @@ def run(hs):
daily_messages = yield hs.get_datastore().count_daily_messages()
if daily_messages is not None:
stats["daily_messages"] = daily_messages
+ else:
+ stats.pop("daily_messages", None)
+
+ if first_time:
+ # Add callbacks to report the synapse stats as metrics whenever
+ # prometheus requests them, typically every 30s.
+ # As some of the stats are expensive to calculate we only update
+ # them when synapse phones home to matrix.org every 24 hours.
+ metrics = get_metrics_for("synapse.usage")
+ metrics.add_callback("timestamp", lambda: stats["timestamp"])
+ metrics.add_callback("uptime_seconds", lambda: stats["uptime_seconds"])
+ metrics.add_callback("total_users", lambda: stats["total_users"])
+ metrics.add_callback("total_room_count", lambda: stats["total_room_count"])
+ metrics.add_callback(
+ "daily_active_users", lambda: stats["daily_active_users"]
+ )
+ metrics.add_callback(
+ "daily_messages", lambda: stats.get("daily_messages", 0)
+ )
logger.info("Reporting stats to matrix.org: %s" % (stats,))
try:
@@ -717,6 +450,8 @@ def run(hs):
# sys.settrace(logcontext_tracer)
with LoggingContext("run"):
change_resource_limit(hs.config.soft_file_limit)
+ if hs.config.gc_thresholds:
+ gc.set_threshold(*hs.config.gc_thresholds)
reactor.run()
if hs.config.daemonize:
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
new file mode 100644
index 0000000000..44c19a1bef
--- /dev/null
+++ b/synapse/app/media_repository.py
@@ -0,0 +1,217 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import synapse
+
+from synapse.config._base import ConfigError
+from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
+from synapse.http.site import SynapseSite
+from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
+from synapse.replication.slave.storage.registration import SlavedRegistrationStore
+from synapse.rest.media.v0.content_repository import ContentRepoResource
+from synapse.rest.media.v1.media_repository import MediaRepositoryResource
+from synapse.server import HomeServer
+from synapse.storage.client_ips import ClientIpStore
+from synapse.storage.engines import create_engine
+from synapse.storage.media_repository import MediaRepositoryStore
+from synapse.util.async import sleep
+from synapse.util.httpresourcetree import create_resource_tree
+from synapse.util.logcontext import LoggingContext
+from synapse.util.manhole import manhole
+from synapse.util.rlimit import change_resource_limit
+from synapse.util.versionstring import get_version_string
+from synapse.api.urls import (
+ CONTENT_REPO_PREFIX, LEGACY_MEDIA_PREFIX, MEDIA_PREFIX
+)
+from synapse.crypto import context_factory
+
+from synapse import events
+
+
+from twisted.internet import reactor, defer
+from twisted.web.resource import Resource
+
+from daemonize import Daemonize
+
+import sys
+import logging
+import gc
+
+logger = logging.getLogger("synapse.app.media_repository")
+
+
+class MediaRepositorySlavedStore(
+ SlavedApplicationServiceStore,
+ SlavedRegistrationStore,
+ BaseSlavedStore,
+ MediaRepositoryStore,
+ ClientIpStore,
+):
+ pass
+
+
+class MediaRepositoryServer(HomeServer):
+ def get_db_conn(self, run_new_connection=True):
+ # Any param beginning with cp_ is a parameter for adbapi, and should
+ # not be passed to the database engine.
+ db_params = {
+ k: v for k, v in self.db_config.get("args", {}).items()
+ if not k.startswith("cp_")
+ }
+ db_conn = self.database_engine.module.connect(**db_params)
+
+ if run_new_connection:
+ self.database_engine.on_new_connection(db_conn)
+ return db_conn
+
+ def setup(self):
+ logger.info("Setting up.")
+ self.datastore = MediaRepositorySlavedStore(self.get_db_conn(), self)
+ logger.info("Finished setting up.")
+
+ def _listen_http(self, listener_config):
+ port = listener_config["port"]
+ bind_address = listener_config.get("bind_address", "")
+ site_tag = listener_config.get("tag", port)
+ resources = {}
+ for res in listener_config["resources"]:
+ for name in res["names"]:
+ if name == "metrics":
+ resources[METRICS_PREFIX] = MetricsResource(self)
+ elif name == "media":
+ media_repo = MediaRepositoryResource(self)
+ resources.update({
+ MEDIA_PREFIX: media_repo,
+ LEGACY_MEDIA_PREFIX: media_repo,
+ CONTENT_REPO_PREFIX: ContentRepoResource(
+ self, self.config.uploads_path
+ ),
+ })
+
+ root_resource = create_resource_tree(resources, Resource())
+ reactor.listenTCP(
+ port,
+ SynapseSite(
+ "synapse.access.http.%s" % (site_tag,),
+ site_tag,
+ listener_config,
+ root_resource,
+ ),
+ interface=bind_address
+ )
+ logger.info("Synapse media repository now listening on port %d", port)
+
+ def start_listening(self, listeners):
+ for listener in listeners:
+ if listener["type"] == "http":
+ self._listen_http(listener)
+ elif listener["type"] == "manhole":
+ reactor.listenTCP(
+ listener["port"],
+ manhole(
+ username="matrix",
+ password="rabbithole",
+ globals={"hs": self},
+ ),
+ interface=listener.get("bind_address", '127.0.0.1')
+ )
+ else:
+ logger.warn("Unrecognized listener type: %s", listener["type"])
+
+ @defer.inlineCallbacks
+ def replicate(self):
+ http_client = self.get_simple_http_client()
+ store = self.get_datastore()
+ replication_url = self.config.worker_replication_url
+
+ while True:
+ try:
+ args = store.stream_positions()
+ args["timeout"] = 30000
+ result = yield http_client.get_json(replication_url, args=args)
+ yield store.process_replication(result)
+ except:
+ logger.exception("Error replicating from %r", replication_url)
+ yield sleep(5)
+
+
+def start(config_options):
+ try:
+ config = HomeServerConfig.load_config(
+ "Synapse media repository", config_options
+ )
+ except ConfigError as e:
+ sys.stderr.write("\n" + e.message + "\n")
+ sys.exit(1)
+
+ assert config.worker_app == "synapse.app.media_repository"
+
+ setup_logging(config.worker_log_config, config.worker_log_file)
+
+ events.USE_FROZEN_DICTS = config.use_frozen_dicts
+
+ database_engine = create_engine(config.database_config)
+
+ tls_server_context_factory = context_factory.ServerContextFactory(config)
+
+ ss = MediaRepositoryServer(
+ config.server_name,
+ db_config=config.database_config,
+ tls_server_context_factory=tls_server_context_factory,
+ config=config,
+ version_string="Synapse/" + get_version_string(synapse),
+ database_engine=database_engine,
+ )
+
+ ss.setup()
+ ss.get_handlers()
+ ss.start_listening(config.worker_listeners)
+
+ def run():
+ with LoggingContext("run"):
+ logger.info("Running")
+ change_resource_limit(config.soft_file_limit)
+ if config.gc_thresholds:
+ gc.set_threshold(*config.gc_thresholds)
+ reactor.run()
+
+ def start():
+ ss.get_state_handler().start_caching()
+ ss.get_datastore().start_profiling()
+ ss.replicate()
+
+ reactor.callWhenRunning(start)
+
+ if config.worker_daemonize:
+ daemon = Daemonize(
+ app="synapse-media-repository",
+ pid=config.worker_pid_file,
+ action=run,
+ auto_close_fds=False,
+ verbose=True,
+ logger=logger,
+ )
+ daemon.start()
+ else:
+ run()
+
+
+if __name__ == '__main__':
+ with LoggingContext("main"):
+ start(sys.argv[1:])
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
new file mode 100644
index 0000000000..a0e765c54f
--- /dev/null
+++ b/synapse/app/pusher.py
@@ -0,0 +1,303 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import synapse
+
+from synapse.server import HomeServer
+from synapse.config._base import ConfigError
+from synapse.config.logger import setup_logging
+from synapse.config.homeserver import HomeServerConfig
+from synapse.http.site import SynapseSite
+from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.storage.roommember import RoomMemberStore
+from synapse.replication.slave.storage.events import SlavedEventStore
+from synapse.replication.slave.storage.pushers import SlavedPusherStore
+from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
+from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
+from synapse.storage.engines import create_engine
+from synapse.storage import DataStore
+from synapse.util.async import sleep
+from synapse.util.httpresourcetree import create_resource_tree
+from synapse.util.logcontext import LoggingContext, preserve_fn
+from synapse.util.manhole import manhole
+from synapse.util.rlimit import change_resource_limit
+from synapse.util.versionstring import get_version_string
+
+from synapse import events
+
+from twisted.internet import reactor, defer
+from twisted.web.resource import Resource
+
+from daemonize import Daemonize
+
+import sys
+import logging
+import gc
+
+logger = logging.getLogger("synapse.app.pusher")
+
+
+class PusherSlaveStore(
+ SlavedEventStore, SlavedPusherStore, SlavedReceiptsStore,
+ SlavedAccountDataStore
+):
+ update_pusher_last_stream_ordering_and_success = (
+ DataStore.update_pusher_last_stream_ordering_and_success.__func__
+ )
+
+ update_pusher_failing_since = (
+ DataStore.update_pusher_failing_since.__func__
+ )
+
+ update_pusher_last_stream_ordering = (
+ DataStore.update_pusher_last_stream_ordering.__func__
+ )
+
+ get_throttle_params_by_room = (
+ DataStore.get_throttle_params_by_room.__func__
+ )
+
+ set_throttle_params = (
+ DataStore.set_throttle_params.__func__
+ )
+
+ get_time_of_last_push_action_before = (
+ DataStore.get_time_of_last_push_action_before.__func__
+ )
+
+ get_profile_displayname = (
+ DataStore.get_profile_displayname.__func__
+ )
+
+ who_forgot_in_room = (
+ RoomMemberStore.__dict__["who_forgot_in_room"]
+ )
+
+
+class PusherServer(HomeServer):
+
+ def get_db_conn(self, run_new_connection=True):
+ # Any param beginning with cp_ is a parameter for adbapi, and should
+ # not be passed to the database engine.
+ db_params = {
+ k: v for k, v in self.db_config.get("args", {}).items()
+ if not k.startswith("cp_")
+ }
+ db_conn = self.database_engine.module.connect(**db_params)
+
+ if run_new_connection:
+ self.database_engine.on_new_connection(db_conn)
+ return db_conn
+
+ def setup(self):
+ logger.info("Setting up.")
+ self.datastore = PusherSlaveStore(self.get_db_conn(), self)
+ logger.info("Finished setting up.")
+
+ def remove_pusher(self, app_id, push_key, user_id):
+ http_client = self.get_simple_http_client()
+ replication_url = self.config.worker_replication_url
+ url = replication_url + "/remove_pushers"
+ return http_client.post_json_get_json(url, {
+ "remove": [{
+ "app_id": app_id,
+ "push_key": push_key,
+ "user_id": user_id,
+ }]
+ })
+
+ def _listen_http(self, listener_config):
+ port = listener_config["port"]
+ bind_address = listener_config.get("bind_address", "")
+ site_tag = listener_config.get("tag", port)
+ resources = {}
+ for res in listener_config["resources"]:
+ for name in res["names"]:
+ if name == "metrics":
+ resources[METRICS_PREFIX] = MetricsResource(self)
+
+ root_resource = create_resource_tree(resources, Resource())
+ reactor.listenTCP(
+ port,
+ SynapseSite(
+ "synapse.access.http.%s" % (site_tag,),
+ site_tag,
+ listener_config,
+ root_resource,
+ ),
+ interface=bind_address
+ )
+ logger.info("Synapse pusher now listening on port %d", port)
+
+ def start_listening(self, listeners):
+ for listener in listeners:
+ if listener["type"] == "http":
+ self._listen_http(listener)
+ elif listener["type"] == "manhole":
+ reactor.listenTCP(
+ listener["port"],
+ manhole(
+ username="matrix",
+ password="rabbithole",
+ globals={"hs": self},
+ ),
+ interface=listener.get("bind_address", '127.0.0.1')
+ )
+ else:
+ logger.warn("Unrecognized listener type: %s", listener["type"])
+
+ @defer.inlineCallbacks
+ def replicate(self):
+ http_client = self.get_simple_http_client()
+ store = self.get_datastore()
+ replication_url = self.config.worker_replication_url
+ pusher_pool = self.get_pusherpool()
+
+ def stop_pusher(user_id, app_id, pushkey):
+ key = "%s:%s" % (app_id, pushkey)
+ pushers_for_user = pusher_pool.pushers.get(user_id, {})
+ pusher = pushers_for_user.pop(key, None)
+ if pusher is None:
+ return
+ logger.info("Stopping pusher %r / %r", user_id, key)
+ pusher.on_stop()
+
+ def start_pusher(user_id, app_id, pushkey):
+ key = "%s:%s" % (app_id, pushkey)
+ logger.info("Starting pusher %r / %r", user_id, key)
+ return pusher_pool._refresh_pusher(app_id, pushkey, user_id)
+
+ @defer.inlineCallbacks
+ def poke_pushers(results):
+ pushers_rows = set(
+ map(tuple, results.get("pushers", {}).get("rows", []))
+ )
+ deleted_pushers_rows = set(
+ map(tuple, results.get("deleted_pushers", {}).get("rows", []))
+ )
+ for row in sorted(pushers_rows | deleted_pushers_rows):
+ if row in deleted_pushers_rows:
+ user_id, app_id, pushkey = row[1:4]
+ stop_pusher(user_id, app_id, pushkey)
+ elif row in pushers_rows:
+ user_id = row[1]
+ app_id = row[5]
+ pushkey = row[8]
+ yield start_pusher(user_id, app_id, pushkey)
+
+ stream = results.get("events")
+ if stream and stream["rows"]:
+ min_stream_id = stream["rows"][0][0]
+ max_stream_id = stream["position"]
+ preserve_fn(pusher_pool.on_new_notifications)(
+ min_stream_id, max_stream_id
+ )
+
+ stream = results.get("receipts")
+ if stream and stream["rows"]:
+ rows = stream["rows"]
+ affected_room_ids = set(row[1] for row in rows)
+ min_stream_id = rows[0][0]
+ max_stream_id = stream["position"]
+ preserve_fn(pusher_pool.on_new_receipts)(
+ min_stream_id, max_stream_id, affected_room_ids
+ )
+
+ while True:
+ try:
+ args = store.stream_positions()
+ args["timeout"] = 30000
+ result = yield http_client.get_json(replication_url, args=args)
+ yield store.process_replication(result)
+ poke_pushers(result)
+ except:
+ logger.exception("Error replicating from %r", replication_url)
+ yield sleep(30)
+
+
+def start(config_options):
+ try:
+ config = HomeServerConfig.load_config(
+ "Synapse pusher", config_options
+ )
+ except ConfigError as e:
+ sys.stderr.write("\n" + e.message + "\n")
+ sys.exit(1)
+
+ assert config.worker_app == "synapse.app.pusher"
+
+ setup_logging(config.worker_log_config, config.worker_log_file)
+
+ events.USE_FROZEN_DICTS = config.use_frozen_dicts
+
+ if config.start_pushers:
+ sys.stderr.write(
+ "\nThe pushers must be disabled in the main synapse process"
+ "\nbefore they can be run in a separate worker."
+ "\nPlease add ``start_pushers: false`` to the main config"
+ "\n"
+ )
+ sys.exit(1)
+
+ # Force the pushers to start since they will be disabled in the main config
+ config.start_pushers = True
+
+ database_engine = create_engine(config.database_config)
+
+ ps = PusherServer(
+ config.server_name,
+ db_config=config.database_config,
+ config=config,
+ version_string="Synapse/" + get_version_string(synapse),
+ database_engine=database_engine,
+ )
+
+ ps.setup()
+ ps.start_listening(config.worker_listeners)
+
+ def run():
+ with LoggingContext("run"):
+ logger.info("Running")
+ change_resource_limit(config.soft_file_limit)
+ if config.gc_thresholds:
+ gc.set_threshold(*config.gc_thresholds)
+ reactor.run()
+
+ def start():
+ ps.replicate()
+ ps.get_pusherpool().start()
+ ps.get_datastore().start_profiling()
+ ps.get_state_handler().start_caching()
+
+ reactor.callWhenRunning(start)
+
+ if config.worker_daemonize:
+ daemon = Daemonize(
+ app="synapse-pusher",
+ pid=config.worker_pid_file,
+ action=run,
+ auto_close_fds=False,
+ verbose=True,
+ logger=logger,
+ )
+ daemon.start()
+ else:
+ run()
+
+
+if __name__ == '__main__':
+ with LoggingContext("main"):
+ ps = start(sys.argv[1:])
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
new file mode 100644
index 0000000000..bf1b995dc2
--- /dev/null
+++ b/synapse/app/synchrotron.py
@@ -0,0 +1,496 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import synapse
+
+from synapse.api.constants import EventTypes, PresenceState
+from synapse.config._base import ConfigError
+from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
+from synapse.events import FrozenEvent
+from synapse.handlers.presence import PresenceHandler
+from synapse.http.site import SynapseSite
+from synapse.http.server import JsonResource
+from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
+from synapse.rest.client.v2_alpha import sync
+from synapse.rest.client.v1 import events
+from synapse.rest.client.v1.room import RoomInitialSyncRestServlet
+from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
+from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage.events import SlavedEventStore
+from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
+from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
+from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
+from synapse.replication.slave.storage.registration import SlavedRegistrationStore
+from synapse.replication.slave.storage.filtering import SlavedFilteringStore
+from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
+from synapse.replication.slave.storage.presence import SlavedPresenceStore
+from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
+from synapse.replication.slave.storage.room import RoomStore
+from synapse.server import HomeServer
+from synapse.storage.client_ips import ClientIpStore
+from synapse.storage.engines import create_engine
+from synapse.storage.presence import PresenceStore, UserPresenceState
+from synapse.storage.roommember import RoomMemberStore
+from synapse.util.async import sleep
+from synapse.util.httpresourcetree import create_resource_tree
+from synapse.util.logcontext import LoggingContext, preserve_fn
+from synapse.util.manhole import manhole
+from synapse.util.rlimit import change_resource_limit
+from synapse.util.stringutils import random_string
+from synapse.util.versionstring import get_version_string
+
+from twisted.internet import reactor, defer
+from twisted.web.resource import Resource
+
+from daemonize import Daemonize
+
+import sys
+import logging
+import contextlib
+import gc
+import ujson as json
+
+logger = logging.getLogger("synapse.app.synchrotron")
+
+
+class SynchrotronSlavedStore(
+ SlavedPushRuleStore,
+ SlavedEventStore,
+ SlavedReceiptsStore,
+ SlavedAccountDataStore,
+ SlavedApplicationServiceStore,
+ SlavedRegistrationStore,
+ SlavedFilteringStore,
+ SlavedPresenceStore,
+ SlavedDeviceInboxStore,
+ RoomStore,
+ BaseSlavedStore,
+ ClientIpStore, # After BaseSlavedStore because the constructor is different
+):
+ who_forgot_in_room = (
+ RoomMemberStore.__dict__["who_forgot_in_room"]
+ )
+
+ # XXX: This is a bit broken because we don't persist the accepted list in a
+ # way that can be replicated. This means that we don't have a way to
+ # invalidate the cache correctly.
+ get_presence_list_accepted = PresenceStore.__dict__[
+ "get_presence_list_accepted"
+ ]
+ get_presence_list_observers_accepted = PresenceStore.__dict__[
+ "get_presence_list_observers_accepted"
+ ]
+
+
+UPDATE_SYNCING_USERS_MS = 10 * 1000
+
+
+class SynchrotronPresence(object):
+ def __init__(self, hs):
+ self.is_mine_id = hs.is_mine_id
+ self.http_client = hs.get_simple_http_client()
+ self.store = hs.get_datastore()
+ self.user_to_num_current_syncs = {}
+ self.syncing_users_url = hs.config.worker_replication_url + "/syncing_users"
+ self.clock = hs.get_clock()
+ self.notifier = hs.get_notifier()
+
+ active_presence = self.store.take_presence_startup_info()
+ self.user_to_current_state = {
+ state.user_id: state
+ for state in active_presence
+ }
+
+ self.process_id = random_string(16)
+ logger.info("Presence process_id is %r", self.process_id)
+
+ self._sending_sync = False
+ self._need_to_send_sync = False
+ self.clock.looping_call(
+ self._send_syncing_users_regularly,
+ UPDATE_SYNCING_USERS_MS,
+ )
+
+ reactor.addSystemEventTrigger("before", "shutdown", self._on_shutdown)
+
+ def set_state(self, user, state, ignore_status_msg=False):
+ # TODO Hows this supposed to work?
+ pass
+
+ get_states = PresenceHandler.get_states.__func__
+ get_state = PresenceHandler.get_state.__func__
+ _get_interested_parties = PresenceHandler._get_interested_parties.__func__
+ current_state_for_users = PresenceHandler.current_state_for_users.__func__
+
+ @defer.inlineCallbacks
+ def user_syncing(self, user_id, affect_presence):
+ if affect_presence:
+ curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
+ self.user_to_num_current_syncs[user_id] = curr_sync + 1
+ prev_states = yield self.current_state_for_users([user_id])
+ if prev_states[user_id].state == PresenceState.OFFLINE:
+ # TODO: Don't block the sync request on this HTTP hit.
+ yield self._send_syncing_users_now()
+
+ def _end():
+ # We check that the user_id is in user_to_num_current_syncs because
+ # user_to_num_current_syncs may have been cleared if we are
+ # shutting down.
+ if affect_presence and user_id in self.user_to_num_current_syncs:
+ self.user_to_num_current_syncs[user_id] -= 1
+
+ @contextlib.contextmanager
+ def _user_syncing():
+ try:
+ yield
+ finally:
+ _end()
+
+ defer.returnValue(_user_syncing())
+
+ @defer.inlineCallbacks
+ def _on_shutdown(self):
+ # When the synchrotron is shutdown tell the master to clear the in
+ # progress syncs for this process
+ self.user_to_num_current_syncs.clear()
+ yield self._send_syncing_users_now()
+
+ def _send_syncing_users_regularly(self):
+ # Only send an update if we aren't in the middle of sending one.
+ if not self._sending_sync:
+ preserve_fn(self._send_syncing_users_now)()
+
+ @defer.inlineCallbacks
+ def _send_syncing_users_now(self):
+ if self._sending_sync:
+ # We don't want to race with sending another update.
+ # Instead we wait for that update to finish and send another
+ # update afterwards.
+ self._need_to_send_sync = True
+ return
+
+ # Flag that we are sending an update.
+ self._sending_sync = True
+
+ yield self.http_client.post_json_get_json(self.syncing_users_url, {
+ "process_id": self.process_id,
+ "syncing_users": [
+ user_id for user_id, count in self.user_to_num_current_syncs.items()
+ if count > 0
+ ],
+ })
+
+ # Unset the flag as we are no longer sending an update.
+ self._sending_sync = False
+ if self._need_to_send_sync:
+ # If something happened while we were sending the update then
+ # we might need to send another update.
+ # TODO: Check if the update that was sent matches the current state
+ # as we only need to send an update if they are different.
+ self._need_to_send_sync = False
+ yield self._send_syncing_users_now()
+
+ @defer.inlineCallbacks
+ def notify_from_replication(self, states, stream_id):
+ parties = yield self._get_interested_parties(
+ states, calculate_remote_hosts=False
+ )
+ room_ids_to_states, users_to_states, _ = parties
+
+ self.notifier.on_new_event(
+ "presence_key", stream_id, rooms=room_ids_to_states.keys(),
+ users=users_to_states.keys()
+ )
+
+ @defer.inlineCallbacks
+ def process_replication(self, result):
+ stream = result.get("presence", {"rows": []})
+ states = []
+ for row in stream["rows"]:
+ (
+ position, user_id, state, last_active_ts,
+ last_federation_update_ts, last_user_sync_ts, status_msg,
+ currently_active
+ ) = row
+ state = UserPresenceState(
+ user_id, state, last_active_ts,
+ last_federation_update_ts, last_user_sync_ts, status_msg,
+ currently_active
+ )
+ self.user_to_current_state[user_id] = state
+ states.append(state)
+
+ if states and "position" in stream:
+ stream_id = int(stream["position"])
+ yield self.notify_from_replication(states, stream_id)
+
+
+class SynchrotronTyping(object):
+ def __init__(self, hs):
+ self._latest_room_serial = 0
+ self._room_serials = {}
+ self._room_typing = {}
+
+ def stream_positions(self):
+ # We must update this typing token from the response of the previous
+ # sync. In particular, the stream id may "reset" back to zero/a low
+ # value which we *must* use for the next replication request.
+ return {"typing": self._latest_room_serial}
+
+ def process_replication(self, result):
+ stream = result.get("typing")
+ if stream:
+ self._latest_room_serial = int(stream["position"])
+
+ for row in stream["rows"]:
+ position, room_id, typing_json = row
+ typing = json.loads(typing_json)
+ self._room_serials[room_id] = position
+ self._room_typing[room_id] = typing
+
+
+class SynchrotronApplicationService(object):
+ def notify_interested_services(self, event):
+ pass
+
+
+class SynchrotronServer(HomeServer):
+ def get_db_conn(self, run_new_connection=True):
+ # Any param beginning with cp_ is a parameter for adbapi, and should
+ # not be passed to the database engine.
+ db_params = {
+ k: v for k, v in self.db_config.get("args", {}).items()
+ if not k.startswith("cp_")
+ }
+ db_conn = self.database_engine.module.connect(**db_params)
+
+ if run_new_connection:
+ self.database_engine.on_new_connection(db_conn)
+ return db_conn
+
+ def setup(self):
+ logger.info("Setting up.")
+ self.datastore = SynchrotronSlavedStore(self.get_db_conn(), self)
+ logger.info("Finished setting up.")
+
+ def _listen_http(self, listener_config):
+ port = listener_config["port"]
+ bind_address = listener_config.get("bind_address", "")
+ site_tag = listener_config.get("tag", port)
+ resources = {}
+ for res in listener_config["resources"]:
+ for name in res["names"]:
+ if name == "metrics":
+ resources[METRICS_PREFIX] = MetricsResource(self)
+ elif name == "client":
+ resource = JsonResource(self, canonical_json=False)
+ sync.register_servlets(self, resource)
+ events.register_servlets(self, resource)
+ InitialSyncRestServlet(self).register(resource)
+ RoomInitialSyncRestServlet(self).register(resource)
+ resources.update({
+ "/_matrix/client/r0": resource,
+ "/_matrix/client/unstable": resource,
+ "/_matrix/client/v2_alpha": resource,
+ "/_matrix/client/api/v1": resource,
+ })
+
+ root_resource = create_resource_tree(resources, Resource())
+ reactor.listenTCP(
+ port,
+ SynapseSite(
+ "synapse.access.http.%s" % (site_tag,),
+ site_tag,
+ listener_config,
+ root_resource,
+ ),
+ interface=bind_address
+ )
+ logger.info("Synapse synchrotron now listening on port %d", port)
+
+ def start_listening(self, listeners):
+ for listener in listeners:
+ if listener["type"] == "http":
+ self._listen_http(listener)
+ elif listener["type"] == "manhole":
+ reactor.listenTCP(
+ listener["port"],
+ manhole(
+ username="matrix",
+ password="rabbithole",
+ globals={"hs": self},
+ ),
+ interface=listener.get("bind_address", '127.0.0.1')
+ )
+ else:
+ logger.warn("Unrecognized listener type: %s", listener["type"])
+
+ @defer.inlineCallbacks
+ def replicate(self):
+ http_client = self.get_simple_http_client()
+ store = self.get_datastore()
+ replication_url = self.config.worker_replication_url
+ notifier = self.get_notifier()
+ presence_handler = self.get_presence_handler()
+ typing_handler = self.get_typing_handler()
+
+ def notify_from_stream(
+ result, stream_name, stream_key, room=None, user=None
+ ):
+ stream = result.get(stream_name)
+ if stream:
+ position_index = stream["field_names"].index("position")
+ if room:
+ room_index = stream["field_names"].index(room)
+ if user:
+ user_index = stream["field_names"].index(user)
+
+ users = ()
+ rooms = ()
+ for row in stream["rows"]:
+ position = row[position_index]
+
+ if user:
+ users = (row[user_index],)
+
+ if room:
+ rooms = (row[room_index],)
+
+ notifier.on_new_event(
+ stream_key, position, users=users, rooms=rooms
+ )
+
+ def notify(result):
+ stream = result.get("events")
+ if stream:
+ max_position = stream["position"]
+ for row in stream["rows"]:
+ position = row[0]
+ internal = json.loads(row[1])
+ event_json = json.loads(row[2])
+ event = FrozenEvent(event_json, internal_metadata_dict=internal)
+ extra_users = ()
+ if event.type == EventTypes.Member:
+ extra_users = (event.state_key,)
+ notifier.on_new_room_event(
+ event, position, max_position, extra_users
+ )
+
+ notify_from_stream(
+ result, "push_rules", "push_rules_key", user="user_id"
+ )
+ notify_from_stream(
+ result, "user_account_data", "account_data_key", user="user_id"
+ )
+ notify_from_stream(
+ result, "room_account_data", "account_data_key", user="user_id"
+ )
+ notify_from_stream(
+ result, "tag_account_data", "account_data_key", user="user_id"
+ )
+ notify_from_stream(
+ result, "receipts", "receipt_key", room="room_id"
+ )
+ notify_from_stream(
+ result, "typing", "typing_key", room="room_id"
+ )
+ notify_from_stream(
+ result, "to_device", "to_device_key", user="user_id"
+ )
+
+ while True:
+ try:
+ args = store.stream_positions()
+ args.update(typing_handler.stream_positions())
+ args["timeout"] = 30000
+ result = yield http_client.get_json(replication_url, args=args)
+ yield store.process_replication(result)
+ typing_handler.process_replication(result)
+ yield presence_handler.process_replication(result)
+ notify(result)
+ except:
+ logger.exception("Error replicating from %r", replication_url)
+ yield sleep(5)
+
+ def build_presence_handler(self):
+ return SynchrotronPresence(self)
+
+ def build_typing_handler(self):
+ return SynchrotronTyping(self)
+
+
+def start(config_options):
+ try:
+ config = HomeServerConfig.load_config(
+ "Synapse synchrotron", config_options
+ )
+ except ConfigError as e:
+ sys.stderr.write("\n" + e.message + "\n")
+ sys.exit(1)
+
+ assert config.worker_app == "synapse.app.synchrotron"
+
+ setup_logging(config.worker_log_config, config.worker_log_file)
+
+ synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
+
+ database_engine = create_engine(config.database_config)
+
+ ss = SynchrotronServer(
+ config.server_name,
+ db_config=config.database_config,
+ config=config,
+ version_string="Synapse/" + get_version_string(synapse),
+ database_engine=database_engine,
+ application_service_handler=SynchrotronApplicationService(),
+ )
+
+ ss.setup()
+ ss.start_listening(config.worker_listeners)
+
+ def run():
+ with LoggingContext("run"):
+ logger.info("Running")
+ change_resource_limit(config.soft_file_limit)
+ if config.gc_thresholds:
+ gc.set_threshold(*config.gc_thresholds)
+ reactor.run()
+
+ def start():
+ ss.get_datastore().start_profiling()
+ ss.replicate()
+ ss.get_state_handler().start_caching()
+
+ reactor.callWhenRunning(start)
+
+ if config.worker_daemonize:
+ daemon = Daemonize(
+ app="synapse-synchrotron",
+ pid=config.worker_pid_file,
+ action=run,
+ auto_close_fds=False,
+ verbose=True,
+ logger=logger,
+ )
+ daemon.start()
+ else:
+ run()
+
+
+if __name__ == '__main__':
+ with LoggingContext("main"):
+ start(sys.argv[1:])
diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py
index ab3a31d7b7..c045588866 100755
--- a/synapse/app/synctl.py
+++ b/synapse/app/synctl.py
@@ -14,70 +14,198 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import sys
+import argparse
+import collections
+import glob
import os
import os.path
-import subprocess
import signal
+import subprocess
+import sys
import yaml
-SYNAPSE = ["python", "-B", "-m", "synapse.app.homeserver"]
+SYNAPSE = [sys.executable, "-B", "-m", "synapse.app.homeserver"]
GREEN = "\x1b[1;32m"
RED = "\x1b[1;31m"
NORMAL = "\x1b[m"
+def write(message, colour=NORMAL, stream=sys.stdout):
+ if colour == NORMAL:
+ stream.write(message + "\n")
+ else:
+ stream.write(colour + message + NORMAL + "\n")
+
+
def start(configfile):
- print ("Starting ...")
+ write("Starting ...")
args = SYNAPSE
args.extend(["--daemonize", "-c", configfile])
try:
subprocess.check_call(args)
- print (GREEN + "started" + NORMAL)
+ write("started synapse.app.homeserver(%r)" % (configfile,), colour=GREEN)
+ except subprocess.CalledProcessError as e:
+ write(
+ "error starting (exit code: %d); see above for logs" % e.returncode,
+ colour=RED,
+ )
+
+
+def start_worker(app, configfile, worker_configfile):
+ args = [
+ "python", "-B",
+ "-m", app,
+ "-c", configfile,
+ "-c", worker_configfile
+ ]
+
+ try:
+ subprocess.check_call(args)
+ write("started %s(%r)" % (app, worker_configfile), colour=GREEN)
except subprocess.CalledProcessError as e:
- print (
- RED +
- "error starting (exit code: %d); see above for logs" % e.returncode +
- NORMAL
+ write(
+ "error starting %s(%r) (exit code: %d); see above for logs" % (
+ app, worker_configfile, e.returncode,
+ ),
+ colour=RED,
)
-def stop(pidfile):
+def stop(pidfile, app):
if os.path.exists(pidfile):
pid = int(open(pidfile).read())
os.kill(pid, signal.SIGTERM)
- print (GREEN + "stopped" + NORMAL)
+ write("stopped %s" % (app,), colour=GREEN)
+
+
+Worker = collections.namedtuple("Worker", [
+ "app", "configfile", "pidfile", "cache_factor"
+])
def main():
- configfile = sys.argv[2] if len(sys.argv) == 3 else "homeserver.yaml"
+
+ parser = argparse.ArgumentParser()
+
+ parser.add_argument(
+ "action",
+ choices=["start", "stop", "restart"],
+ help="whether to start, stop or restart the synapse",
+ )
+ parser.add_argument(
+ "configfile",
+ nargs="?",
+ default="homeserver.yaml",
+ help="the homeserver config file, defaults to homserver.yaml",
+ )
+ parser.add_argument(
+ "-w", "--worker",
+ metavar="WORKERCONFIG",
+ help="start or stop a single worker",
+ )
+ parser.add_argument(
+ "-a", "--all-processes",
+ metavar="WORKERCONFIGDIR",
+ help="start or stop all the workers in the given directory"
+ " and the main synapse process",
+ )
+
+ options = parser.parse_args()
+
+ if options.worker and options.all_processes:
+ write(
+ 'Cannot use "--worker" with "--all-processes"',
+ stream=sys.stderr
+ )
+ sys.exit(1)
+
+ configfile = options.configfile
if not os.path.exists(configfile):
- sys.stderr.write(
+ write(
"No config file found\n"
"To generate a config file, run '%s -c %s --generate-config"
" --server-name=<server name>'\n" % (
- " ".join(SYNAPSE), configfile
- )
+ " ".join(SYNAPSE), options.configfile
+ ),
+ stream=sys.stderr,
)
sys.exit(1)
- config = yaml.load(open(configfile))
+ with open(configfile) as stream:
+ config = yaml.load(stream)
+
pidfile = config["pid_file"]
+ cache_factor = config.get("synctl_cache_factor")
+ start_stop_synapse = True
- action = sys.argv[1] if sys.argv[1:] else "usage"
- if action == "start":
- start(configfile)
- elif action == "stop":
- stop(pidfile)
- elif action == "restart":
- stop(pidfile)
- start(configfile)
- else:
- sys.stderr.write("Usage: %s [start|stop|restart] [configfile]\n" % (sys.argv[0],))
- sys.exit(1)
+ if cache_factor:
+ os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
+
+ worker_configfiles = []
+ if options.worker:
+ start_stop_synapse = False
+ worker_configfile = options.worker
+ if not os.path.exists(worker_configfile):
+ write(
+ "No worker config found at %r" % (worker_configfile,),
+ stream=sys.stderr,
+ )
+ sys.exit(1)
+ worker_configfiles.append(worker_configfile)
+
+ if options.all_processes:
+ worker_configdir = options.all_processes
+ if not os.path.isdir(worker_configdir):
+ write(
+ "No worker config directory found at %r" % (worker_configdir,),
+ stream=sys.stderr,
+ )
+ sys.exit(1)
+ worker_configfiles.extend(sorted(glob.glob(
+ os.path.join(worker_configdir, "*.yaml")
+ )))
+
+ workers = []
+ for worker_configfile in worker_configfiles:
+ with open(worker_configfile) as stream:
+ worker_config = yaml.load(stream)
+ worker_app = worker_config["worker_app"]
+ worker_pidfile = worker_config["worker_pid_file"]
+ worker_daemonize = worker_config["worker_daemonize"]
+ assert worker_daemonize # TODO print something more user friendly
+ worker_cache_factor = worker_config.get("synctl_cache_factor")
+ workers.append(Worker(
+ worker_app, worker_configfile, worker_pidfile, worker_cache_factor,
+ ))
+
+ action = options.action
+
+ if action == "stop" or action == "restart":
+ for worker in workers:
+ stop(worker.pidfile, worker.app)
+
+ if start_stop_synapse:
+ stop(pidfile, "synapse.app.homeserver")
+
+ # TODO: Wait for synapse to actually shutdown before starting it again
+
+ if action == "start" or action == "restart":
+ if start_stop_synapse:
+ start(configfile)
+
+ for worker in workers:
+ if worker.cache_factor:
+ os.environ["SYNAPSE_CACHE_FACTOR"] = str(worker.cache_factor)
+
+ start_worker(worker.app, configfile, worker.configfile)
+
+ if cache_factor:
+ os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
+ else:
+ os.environ.pop("SYNAPSE_CACHE_FACTOR", None)
if __name__ == "__main__":
|