summary refs log tree commit diff
path: root/synapse/app/homeserver.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/app/homeserver.py')
-rwxr-xr-xsynapse/app/homeserver.py432
1 files changed, 226 insertions, 206 deletions
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index e0b87468fe..a0e465d644 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -13,59 +13,53 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-import synapse
-
 import gc
 import logging
 import os
 import sys
-from synapse.config._base import ConfigError
-
-from synapse.python_dependencies import (
-    check_requirements, DEPENDENCY_LINKS
-)
-
-from synapse.rest import ClientRestResource
-from synapse.storage.engines import create_engine, IncorrectDatabaseSetup
-from synapse.storage import are_all_users_on_domain
-from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database
-
-from synapse.server import HomeServer
 
-from twisted.internet import reactor, task, defer
-from twisted.application import service
-from twisted.web.resource import Resource, EncodingResourceWrapper
-from twisted.web.static import File
-from twisted.web.server import GzipEncoderFactory
+import synapse
+import synapse.config.logger
+from synapse import events
+from synapse.api.urls import CONTENT_REPO_PREFIX, FEDERATION_PREFIX, \
+    LEGACY_MEDIA_PREFIX, MEDIA_PREFIX, SERVER_KEY_PREFIX, SERVER_KEY_V2_PREFIX, \
+    STATIC_PREFIX, WEB_CLIENT_PREFIX
+from synapse.app import _base
+from synapse.app._base import quit_with_error, listen_ssl, listen_tcp
+from synapse.config._base import ConfigError
+from synapse.config.homeserver import HomeServerConfig
+from synapse.crypto import context_factory
+from synapse.federation.transport.server import TransportLayerServer
+from synapse.module_api import ModuleApi
+from synapse.http.additional_resource import AdditionalResource
 from synapse.http.server import RootRedirect
-from synapse.rest.media.v0.content_repository import ContentRepoResource
-from synapse.rest.media.v1.media_repository import MediaRepositoryResource
+from synapse.http.site import SynapseSite
+from synapse.metrics import register_memory_metrics
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.python_dependencies import CONDITIONAL_REQUIREMENTS, \
+    check_requirements
+from synapse.replication.http import ReplicationRestResource, REPLICATION_PREFIX
+from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
+from synapse.rest import ClientRestResource
 from synapse.rest.key.v1.server_key_resource import LocalKey
 from synapse.rest.key.v2 import KeyApiV2Resource
-from synapse.api.urls import (
-    FEDERATION_PREFIX, WEB_CLIENT_PREFIX, CONTENT_REPO_PREFIX,
-    SERVER_KEY_PREFIX, LEGACY_MEDIA_PREFIX, MEDIA_PREFIX, STATIC_PREFIX,
-    SERVER_KEY_V2_PREFIX,
-)
-from synapse.config.homeserver import HomeServerConfig
-from synapse.crypto import context_factory
+from synapse.rest.media.v0.content_repository import ContentRepoResource
+from synapse.server import HomeServer
+from synapse.storage import are_all_users_on_domain
+from synapse.storage.engines import IncorrectDatabaseSetup, create_engine
+from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database
+from synapse.util.caches import CACHE_SIZE_FACTOR
+from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.logcontext import LoggingContext
-from synapse.metrics import register_memory_metrics, get_metrics_for
-from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
-from synapse.replication.resource import ReplicationResource, REPLICATION_PREFIX
-from synapse.federation.transport.server import TransportLayerServer
-
+from synapse.util.manhole import manhole
+from synapse.util.module_loader import load_module
 from synapse.util.rlimit import change_resource_limit
 from synapse.util.versionstring import get_version_string
-from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.manhole import manhole
-
-from synapse.http.site import SynapseSite
-
-from synapse import events
-
-from daemonize import Daemonize
+from twisted.application import service
+from twisted.internet import defer, reactor
+from twisted.web.resource import EncodingResourceWrapper, NoResource
+from twisted.web.server import GzipEncoderFactory
+from twisted.web.static import File
 
 logger = logging.getLogger("synapse.app.homeserver")
 
@@ -90,7 +84,7 @@ def build_resource_for_web_client(hs):
                 "\n"
                 "You can also disable hosting of the webclient via the\n"
                 "configuration option `web_client`\n"
-                % {"dep": DEPENDENCY_LINKS["matrix-angular-sdk"]}
+                % {"dep": CONDITIONAL_REQUIREMENTS["web_client"].keys()[0]}
             )
         syweb_path = os.path.dirname(syweb.__file__)
         webclient_path = os.path.join(syweb_path, "webclient")
@@ -117,90 +111,121 @@ class SynapseHomeServer(HomeServer):
         resources = {}
         for res in listener_config["resources"]:
             for name in res["names"]:
-                if name == "client":
-                    client_resource = ClientRestResource(self)
-                    if res["compress"]:
-                        client_resource = gz_wrap(client_resource)
-
-                    resources.update({
-                        "/_matrix/client/api/v1": client_resource,
-                        "/_matrix/client/r0": client_resource,
-                        "/_matrix/client/unstable": client_resource,
-                        "/_matrix/client/v2_alpha": client_resource,
-                        "/_matrix/client/versions": client_resource,
-                    })
-
-                if name == "federation":
-                    resources.update({
-                        FEDERATION_PREFIX: TransportLayerServer(self),
-                    })
-
-                if name in ["static", "client"]:
-                    resources.update({
-                        STATIC_PREFIX: File(
-                            os.path.join(os.path.dirname(synapse.__file__), "static")
-                        ),
-                    })
-
-                if name in ["media", "federation", "client"]:
-                    media_repo = MediaRepositoryResource(self)
-                    resources.update({
-                        MEDIA_PREFIX: media_repo,
-                        LEGACY_MEDIA_PREFIX: media_repo,
-                        CONTENT_REPO_PREFIX: ContentRepoResource(
-                            self, self.config.uploads_path
-                        ),
-                    })
-
-                if name in ["keys", "federation"]:
-                    resources.update({
-                        SERVER_KEY_PREFIX: LocalKey(self),
-                        SERVER_KEY_V2_PREFIX: KeyApiV2Resource(self),
-                    })
-
-                if name == "webclient":
-                    resources[WEB_CLIENT_PREFIX] = build_resource_for_web_client(self)
-
-                if name == "metrics" and self.get_config().enable_metrics:
-                    resources[METRICS_PREFIX] = MetricsResource(self)
-
-                if name == "replication":
-                    resources[REPLICATION_PREFIX] = ReplicationResource(self)
+                resources.update(self._configure_named_resource(
+                    name, res.get("compress", False),
+                ))
+
+        additional_resources = listener_config.get("additional_resources", {})
+        logger.debug("Configuring additional resources: %r",
+                     additional_resources)
+        module_api = ModuleApi(self, self.get_auth_handler())
+        for path, resmodule in additional_resources.items():
+            handler_cls, config = load_module(resmodule)
+            handler = handler_cls(config, module_api)
+            resources[path] = AdditionalResource(self, handler.handle_request)
 
         if WEB_CLIENT_PREFIX in resources:
             root_resource = RootRedirect(WEB_CLIENT_PREFIX)
         else:
-            root_resource = Resource()
+            root_resource = NoResource()
 
         root_resource = create_resource_tree(resources, root_resource)
 
         if tls:
-            for address in bind_addresses:
-                reactor.listenSSL(
-                    port,
-                    SynapseSite(
-                        "synapse.access.https.%s" % (site_tag,),
-                        site_tag,
-                        listener_config,
-                        root_resource,
-                    ),
-                    self.tls_server_context_factory,
-                    interface=address
-                )
+            listen_ssl(
+                bind_addresses,
+                port,
+                SynapseSite(
+                    "synapse.access.https.%s" % (site_tag,),
+                    site_tag,
+                    listener_config,
+                    root_resource,
+                ),
+                self.tls_server_context_factory,
+            )
+
         else:
-            for address in bind_addresses:
-                reactor.listenTCP(
-                    port,
-                    SynapseSite(
-                        "synapse.access.http.%s" % (site_tag,),
-                        site_tag,
-                        listener_config,
-                        root_resource,
-                    ),
-                    interface=address
+            listen_tcp(
+                bind_addresses,
+                port,
+                SynapseSite(
+                    "synapse.access.http.%s" % (site_tag,),
+                    site_tag,
+                    listener_config,
+                    root_resource,
                 )
+            )
         logger.info("Synapse now listening on port %d", port)
 
+    def _configure_named_resource(self, name, compress=False):
+        """Build a resource map for a named resource
+
+        Args:
+            name (str): named resource: one of "client", "federation", etc
+            compress (bool): whether to enable gzip compression for this
+                resource
+
+        Returns:
+            dict[str, Resource]: map from path to HTTP resource
+        """
+        resources = {}
+        if name == "client":
+            client_resource = ClientRestResource(self)
+            if compress:
+                client_resource = gz_wrap(client_resource)
+
+            resources.update({
+                "/_matrix/client/api/v1": client_resource,
+                "/_matrix/client/r0": client_resource,
+                "/_matrix/client/unstable": client_resource,
+                "/_matrix/client/v2_alpha": client_resource,
+                "/_matrix/client/versions": client_resource,
+            })
+
+        if name == "federation":
+            resources.update({
+                FEDERATION_PREFIX: TransportLayerServer(self),
+            })
+
+        if name in ["static", "client"]:
+            resources.update({
+                STATIC_PREFIX: File(
+                    os.path.join(os.path.dirname(synapse.__file__), "static")
+                ),
+            })
+
+        if name in ["media", "federation", "client"]:
+            if self.get_config().enable_media_repo:
+                media_repo = self.get_media_repository_resource()
+                resources.update({
+                    MEDIA_PREFIX: media_repo,
+                    LEGACY_MEDIA_PREFIX: media_repo,
+                    CONTENT_REPO_PREFIX: ContentRepoResource(
+                        self, self.config.uploads_path
+                    ),
+                })
+            elif name == "media":
+                raise ConfigError(
+                    "'media' resource conflicts with enable_media_repo=False",
+                )
+
+        if name in ["keys", "federation"]:
+            resources.update({
+                SERVER_KEY_PREFIX: LocalKey(self),
+                SERVER_KEY_V2_PREFIX: KeyApiV2Resource(self),
+            })
+
+        if name == "webclient":
+            resources[WEB_CLIENT_PREFIX] = build_resource_for_web_client(self)
+
+        if name == "metrics" and self.get_config().enable_metrics:
+            resources[METRICS_PREFIX] = MetricsResource(self)
+
+        if name == "replication":
+            resources[REPLICATION_PREFIX] = ReplicationRestResource(self)
+
+        return resources
+
     def start_listening(self):
         config = self.get_config()
 
@@ -208,17 +233,24 @@ class SynapseHomeServer(HomeServer):
             if listener["type"] == "http":
                 self._listener_http(config, listener)
             elif listener["type"] == "manhole":
+                listen_tcp(
+                    listener["bind_addresses"],
+                    listener["port"],
+                    manhole(
+                        username="matrix",
+                        password="rabbithole",
+                        globals={"hs": self},
+                    )
+                )
+            elif listener["type"] == "replication":
                 bind_addresses = listener["bind_addresses"]
-
                 for address in bind_addresses:
-                    reactor.listenTCP(
-                        listener["port"],
-                        manhole(
-                            username="matrix",
-                            password="rabbithole",
-                            globals={"hs": self},
-                        ),
-                        interface=address
+                    factory = ReplicationStreamProtocolFactory(self)
+                    server_listener = reactor.listenTCP(
+                        listener["port"], factory, interface=address
+                    )
+                    reactor.addSystemEventTrigger(
+                        "before", "shutdown", server_listener.stopListening,
                     )
             else:
                 logger.warn("Unrecognized listener type: %s", listener["type"])
@@ -239,29 +271,6 @@ class SynapseHomeServer(HomeServer):
         except IncorrectDatabaseSetup as e:
             quit_with_error(e.message)
 
-    def get_db_conn(self, run_new_connection=True):
-        # Any param beginning with cp_ is a parameter for adbapi, and should
-        # not be passed to the database engine.
-        db_params = {
-            k: v for k, v in self.db_config.get("args", {}).items()
-            if not k.startswith("cp_")
-        }
-        db_conn = self.database_engine.module.connect(**db_params)
-
-        if run_new_connection:
-            self.database_engine.on_new_connection(db_conn)
-        return db_conn
-
-
-def quit_with_error(error_string):
-    message_lines = error_string.split("\n")
-    line_length = max([len(l) for l in message_lines if len(l) < 80]) + 2
-    sys.stderr.write("*" * line_length + '\n')
-    for line in message_lines:
-        sys.stderr.write(" %s\n" % (line.rstrip(),))
-    sys.stderr.write("*" * line_length + '\n')
-    sys.exit(1)
-
 
 def setup(config_options):
     """
@@ -286,7 +295,7 @@ def setup(config_options):
         # generating config files and shouldn't try to continue.
         sys.exit(0)
 
-    config.setup_logging()
+    synapse.config.logger.setup_logging(config, use_worker_options=False)
 
     # check any extra requirements we have now we have a config
     check_requirements(config)
@@ -340,7 +349,7 @@ def setup(config_options):
         hs.get_state_handler().start_caching()
         hs.get_datastore().start_profiling()
         hs.get_datastore().start_doing_background_updates()
-        hs.get_replication_layer().start_get_pdu_cache()
+        hs.get_federation_client().start_get_pdu_cache()
 
         register_memory_metrics(hs)
 
@@ -389,10 +398,15 @@ def run(hs):
         ThreadPool._worker = profile(ThreadPool._worker)
         reactor.run = profile(reactor.run)
 
-    start_time = hs.get_clock().time()
+    clock = hs.get_clock()
+    start_time = clock.time()
 
     stats = {}
 
+    # Contains the list of processes we will be monitoring
+    # currently either 0 or 1
+    stats_process = []
+
     @defer.inlineCallbacks
     def phone_stats_home():
         logger.info("Gathering stats for reporting")
@@ -401,41 +415,36 @@ def run(hs):
         if uptime < 0:
             uptime = 0
 
-        # If the stats directory is empty then this is the first time we've
-        # reported stats.
-        first_time = not stats
-
         stats["homeserver"] = hs.config.server_name
         stats["timestamp"] = now
         stats["uptime_seconds"] = uptime
         stats["total_users"] = yield hs.get_datastore().count_all_users()
 
+        total_nonbridged_users = yield hs.get_datastore().count_nonbridged_users()
+        stats["total_nonbridged_users"] = total_nonbridged_users
+
         room_count = yield hs.get_datastore().get_room_count()
         stats["total_room_count"] = room_count
 
         stats["daily_active_users"] = yield hs.get_datastore().count_daily_users()
-        daily_messages = yield hs.get_datastore().count_daily_messages()
-        if daily_messages is not None:
-            stats["daily_messages"] = daily_messages
-        else:
-            stats.pop("daily_messages", None)
-
-        if first_time:
-            # Add callbacks to report the synapse stats as metrics whenever
-            # prometheus requests them, typically every 30s.
-            # As some of the stats are expensive to calculate we only update
-            # them when synapse phones home to matrix.org every 24 hours.
-            metrics = get_metrics_for("synapse.usage")
-            metrics.add_callback("timestamp", lambda: stats["timestamp"])
-            metrics.add_callback("uptime_seconds", lambda: stats["uptime_seconds"])
-            metrics.add_callback("total_users", lambda: stats["total_users"])
-            metrics.add_callback("total_room_count", lambda: stats["total_room_count"])
-            metrics.add_callback(
-                "daily_active_users", lambda: stats["daily_active_users"]
-            )
-            metrics.add_callback(
-                "daily_messages", lambda: stats.get("daily_messages", 0)
-            )
+        stats["daily_active_rooms"] = yield hs.get_datastore().count_daily_active_rooms()
+        stats["daily_messages"] = yield hs.get_datastore().count_daily_messages()
+
+        r30_results = yield hs.get_datastore().count_r30_users()
+        for name, count in r30_results.iteritems():
+            stats["r30_users_" + name] = count
+
+        daily_sent_messages = yield hs.get_datastore().count_daily_sent_messages()
+        stats["daily_sent_messages"] = daily_sent_messages
+        stats["cache_factor"] = CACHE_SIZE_FACTOR
+        stats["event_cache_size"] = hs.config.event_cache_size
+
+        if len(stats_process) > 0:
+            stats["memory_rss"] = 0
+            stats["cpu_average"] = 0
+            for process in stats_process:
+                stats["memory_rss"] += process.memory_info().rss
+                stats["cpu_average"] += int(process.cpu_percent(interval=None))
 
         logger.info("Reporting stats to matrix.org: %s" % (stats,))
         try:
@@ -446,37 +455,48 @@ def run(hs):
         except Exception as e:
             logger.warn("Error reporting stats: %s", e)
 
-    if hs.config.report_stats:
-        phone_home_task = task.LoopingCall(phone_stats_home)
-        logger.info("Scheduling stats reporting for 24 hour intervals")
-        phone_home_task.start(60 * 60 * 24, now=False)
-
-    def in_thread():
-        # Uncomment to enable tracing of log context changes.
-        # sys.settrace(logcontext_tracer)
-        with LoggingContext("run"):
-            change_resource_limit(hs.config.soft_file_limit)
-            if hs.config.gc_thresholds:
-                gc.set_threshold(*hs.config.gc_thresholds)
-            reactor.run()
-
-    if hs.config.daemonize:
-
-        if hs.config.print_pidfile:
-            print (hs.config.pid_file)
-
-        daemon = Daemonize(
-            app="synapse-homeserver",
-            pid=hs.config.pid_file,
-            action=lambda: in_thread(),
-            auto_close_fds=False,
-            verbose=True,
-            logger=logger,
-        )
+    def performance_stats_init():
+        try:
+            import psutil
+            process = psutil.Process()
+            # Ensure we can fetch both, and make the initial request for cpu_percent
+            # so the next request will use this as the initial point.
+            process.memory_info().rss
+            process.cpu_percent(interval=None)
+            logger.info("report_stats can use psutil")
+            stats_process.append(process)
+        except (ImportError, AttributeError):
+            logger.warn(
+                "report_stats enabled but psutil is not installed or incorrect version."
+                " Disabling reporting of memory/cpu stats."
+                " Ensuring psutil is available will help matrix.org track performance"
+                " changes across releases."
+            )
 
-        daemon.start()
-    else:
-        in_thread()
+    if hs.config.report_stats:
+        logger.info("Scheduling stats reporting for 3 hour intervals")
+        clock.looping_call(phone_stats_home, 3 * 60 * 60 * 1000)
+
+        # We need to defer this init for the cases that we daemonize
+        # otherwise the process ID we get is that of the non-daemon process
+        clock.call_later(0, performance_stats_init)
+
+        # We wait 5 minutes to send the first set of stats as the server can
+        # be quite busy the first few minutes
+        clock.call_later(5 * 60, phone_stats_home)
+
+    if hs.config.daemonize and hs.config.print_pidfile:
+        print (hs.config.pid_file)
+
+    _base.start_reactor(
+        "synapse-homeserver",
+        hs.config.soft_file_limit,
+        hs.config.gc_thresholds,
+        hs.config.pid_file,
+        hs.config.daemonize,
+        hs.config.cpu_affinity,
+        logger,
+    )
 
 
 def main():