summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/app/_base.py16
-rw-r--r--synapse/app/admin_cmd.py197
-rw-r--r--synapse/config/_base.py73
-rw-r--r--synapse/config/database.py3
-rw-r--r--synapse/config/logger.py3
-rw-r--r--synapse/config/registration.py3
-rw-r--r--synapse/config/server.py3
7 files changed, 288 insertions, 10 deletions
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index bd285122ea..807f320b46 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -48,7 +48,7 @@ def register_sighup(func):
     _sighup_callbacks.append(func)
 
 
-def start_worker_reactor(appname, config):
+def start_worker_reactor(appname, config, run_command=reactor.run):
     """ Run the reactor in the main process
 
     Daemonizes if necessary, and then configures some resources, before starting
@@ -57,6 +57,7 @@ def start_worker_reactor(appname, config):
     Args:
         appname (str): application name which will be sent to syslog
         config (synapse.config.Config): config object
+        run_command (Callable[]): callable that actually runs the reactor
     """
 
     logger = logging.getLogger(config.worker_app)
@@ -69,11 +70,19 @@ def start_worker_reactor(appname, config):
         daemonize=config.worker_daemonize,
         print_pidfile=config.print_pidfile,
         logger=logger,
+        run_command=run_command,
     )
 
 
 def start_reactor(
-    appname, soft_file_limit, gc_thresholds, pid_file, daemonize, print_pidfile, logger
+    appname,
+    soft_file_limit,
+    gc_thresholds,
+    pid_file,
+    daemonize,
+    print_pidfile,
+    logger,
+    run_command=reactor.run,
 ):
     """ Run the reactor in the main process
 
@@ -88,6 +97,7 @@ def start_reactor(
         daemonize (bool): true to run the reactor in a background process
         print_pidfile (bool): whether to print the pid file, if daemonize is True
         logger (logging.Logger): logger instance to pass to Daemonize
+        run_command (Callable[]): callable that actually runs the reactor
     """
 
     install_dns_limiter(reactor)
@@ -97,7 +107,7 @@ def start_reactor(
         change_resource_limit(soft_file_limit)
         if gc_thresholds:
             gc.set_threshold(*gc_thresholds)
-        reactor.run()
+        run_command()
 
     # make sure that we run the reactor with the sentinel log context,
     # otherwise other PreserveLoggingContext instances will get confused
diff --git a/synapse/app/admin_cmd.py b/synapse/app/admin_cmd.py
new file mode 100644
index 0000000000..611a196e54
--- /dev/null
+++ b/synapse/app/admin_cmd.py
@@ -0,0 +1,197 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Copyright 2019 Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import argparse
+import logging
+import sys
+
+from twisted.internet import defer, task
+
+import synapse
+from synapse.app import _base
+from synapse.config._base import ConfigError
+from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
+from synapse.handlers.admin import FileExfiltrationWriter
+from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
+from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
+from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
+from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
+from synapse.replication.slave.storage.devices import SlavedDeviceStore
+from synapse.replication.slave.storage.events import SlavedEventStore
+from synapse.replication.slave.storage.filtering import SlavedFilteringStore
+from synapse.replication.slave.storage.groups import SlavedGroupServerStore
+from synapse.replication.slave.storage.presence import SlavedPresenceStore
+from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
+from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
+from synapse.replication.slave.storage.registration import SlavedRegistrationStore
+from synapse.replication.slave.storage.room import RoomStore
+from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.server import HomeServer
+from synapse.storage.engines import create_engine
+from synapse.util.logcontext import LoggingContext
+from synapse.util.versionstring import get_version_string
+
+logger = logging.getLogger("synapse.app.admin_cmd")
+
+
+class AdminCmdSlavedStore(
+    SlavedReceiptsStore,
+    SlavedAccountDataStore,
+    SlavedApplicationServiceStore,
+    SlavedRegistrationStore,
+    SlavedFilteringStore,
+    SlavedPresenceStore,
+    SlavedGroupServerStore,
+    SlavedDeviceInboxStore,
+    SlavedDeviceStore,
+    SlavedPushRuleStore,
+    SlavedEventStore,
+    SlavedClientIpStore,
+    RoomStore,
+    BaseSlavedStore,
+):
+    pass
+
+
+class AdminCmdServer(HomeServer):
+    DATASTORE_CLASS = AdminCmdSlavedStore
+
+    def _listen_http(self, listener_config):
+        pass
+
+    def start_listening(self, listeners):
+        pass
+
+    def build_tcp_replication(self):
+        return AdminCmdReplicationHandler(self)
+
+
+class AdminCmdReplicationHandler(ReplicationClientHandler):
+    @defer.inlineCallbacks
+    def on_rdata(self, stream_name, token, rows):
+        pass
+
+    def get_streams_to_replicate(self):
+        return {}
+
+
+@defer.inlineCallbacks
+def export_data_command(hs, args):
+    """Export data for a user.
+
+    Args:
+        hs (HomeServer)
+        args (argparse.Namespace)
+    """
+
+    user_id = args.user_id
+    directory = args.output_directory
+
+    res = yield hs.get_handlers().admin_handler.exfiltrate_user_data(
+        user_id, FileExfiltrationWriter(user_id, directory=directory)
+    )
+    print(res)
+
+
+def start(config_options):
+    parser = argparse.ArgumentParser(description="Synapse Admin Command")
+    HomeServerConfig.add_arguments_to_parser(parser)
+
+    subparser = parser.add_subparsers(
+        title="Admin Commands",
+        description="Choose an admin command to perform.",
+        required=True,
+        dest="command",
+        metavar="<admin_command>",
+        help="The admin command to perform.",
+    )
+    export_data_parser = subparser.add_parser(
+        "export-data", help="Export all data for a user"
+    )
+    export_data_parser.add_argument("user_id", help="User to extra data from")
+    export_data_parser.add_argument(
+        "--output-directory",
+        action="store",
+        metavar="DIRECTORY",
+        required=False,
+        help="The directory to store the exported data in. Must be emtpy. Defaults"
+        " to creating a temp directory.",
+    )
+    export_data_parser.set_defaults(func=export_data_command)
+
+    try:
+        config, args = HomeServerConfig.load_config_with_parser(parser, config_options)
+    except ConfigError as e:
+        sys.stderr.write("\n" + str(e) + "\n")
+        sys.exit(1)
+
+    if config.worker_app is not None:
+        assert config.worker_app == "synapse.app.admin_cmd"
+
+    # Update the config with some basic overrides so that don't have to specify
+    # a full worker config.
+    config.worker_app = "synapse.app.admin_cmd"
+
+    if (
+        not config.worker_daemonize
+        and not config.worker_log_file
+        and not config.worker_log_config
+    ):
+        # Since we're meant to be run as a "command" let's not redirect stdio
+        # unless we've actually set log config.
+        config.no_redirect_stdio = True
+
+    # Explicitly disable background processes
+    config.update_user_directory = False
+    config.start_pushers = False
+    config.send_federation = False
+
+    setup_logging(config, use_worker_options=True)
+
+    synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
+
+    database_engine = create_engine(config.database_config)
+
+    ss = AdminCmdServer(
+        config.server_name,
+        db_config=config.database_config,
+        config=config,
+        version_string="Synapse/" + get_version_string(synapse),
+        database_engine=database_engine,
+    )
+
+    ss.setup()
+
+    # We use task.react as the basic run command as it correctly handles tearing
+    # down the reactor when the deferreds resolve and setting the return value.
+    # We also make sure that `_base.start` gets run before we actually run the
+    # command.
+
+    @defer.inlineCallbacks
+    def run(_reactor):
+        with LoggingContext("command"):
+            yield _base.start(ss, [])
+            yield args.func(ss, args)
+
+    _base.start_worker_reactor(
+        "synapse-admin-cmd", config, run_command=lambda: task.react(run)
+    )
+
+
+if __name__ == "__main__":
+    with LoggingContext("main"):
+        start(sys.argv[1:])
diff --git a/synapse/config/_base.py b/synapse/config/_base.py
index 965478d8d5..8c3acff03e 100644
--- a/synapse/config/_base.py
+++ b/synapse/config/_base.py
@@ -137,12 +137,42 @@ class Config(object):
             return file_stream.read()
 
     def invoke_all(self, name, *args, **kargs):
+        """Invoke all instance methods with the given name and arguments in the
+        class's MRO.
+
+        Args:
+            name (str): Name of function to invoke
+            *args
+            **kwargs
+
+        Returns:
+            list: The list of the return values from each method called
+        """
         results = []
         for cls in type(self).mro():
             if name in cls.__dict__:
                 results.append(getattr(cls, name)(self, *args, **kargs))
         return results
 
+    @classmethod
+    def invoke_all_static(cls, name, *args, **kargs):
+        """Invoke all static methods with the given name and arguments in the
+        class's MRO.
+
+        Args:
+            name (str): Name of function to invoke
+            *args
+            **kwargs
+
+        Returns:
+            list: The list of the return values from each method called
+        """
+        results = []
+        for c in cls.mro():
+            if name in c.__dict__:
+                results.append(getattr(c, name)(*args, **kargs))
+        return results
+
     def generate_config(
         self,
         config_dir_path,
@@ -202,6 +232,23 @@ class Config(object):
         Returns: Config object.
         """
         config_parser = argparse.ArgumentParser(description=description)
+        cls.add_arguments_to_parser(config_parser)
+        obj, _ = cls.load_config_with_parser(config_parser, argv)
+
+        return obj
+
+    @classmethod
+    def add_arguments_to_parser(cls, config_parser):
+        """Adds all the config flags to an ArgumentParser.
+
+        Doesn't support config-file-generation: used by the worker apps.
+
+        Used for workers where we want to add extra flags/subcommands.
+
+        Args:
+            config_parser (ArgumentParser): App description
+        """
+
         config_parser.add_argument(
             "-c",
             "--config-path",
@@ -219,9 +266,29 @@ class Config(object):
             " Defaults to the directory containing the last config file",
         )
 
-        obj = cls()
+        # We can only invoke `add_arguments` on an actual object, but
+        # `add_arguments` should be side effect free so this is probably fine.
+        cls.invoke_all_static("add_arguments", config_parser)
 
-        obj.invoke_all("add_arguments", config_parser)
+    @classmethod
+    def load_config_with_parser(cls, config_parser, argv):
+        """Parse the commandline and config files with the given parser
+
+        Doesn't support config-file-generation: used by the worker apps.
+
+        Used for workers where we want to add extra flags/subcommands.
+
+        Args:
+            config_parser (ArgumentParser)
+            argv (list[str])
+
+        Returns:
+            tuple[HomeServerConfig, argparse.Namespace]: Returns the parsed
+            config object and the parsed argparse.Namespace object from
+            `config_parser.parse_args(..)`
+        """
+
+        obj = cls()
 
         config_args = config_parser.parse_args(argv)
 
@@ -244,7 +311,7 @@ class Config(object):
 
         obj.invoke_all("read_arguments", config_args)
 
-        return obj
+        return obj, config_args
 
     @classmethod
     def load_or_generate_config(cls, description, argv):
diff --git a/synapse/config/database.py b/synapse/config/database.py
index bcb2089dd7..746a6cd1f4 100644
--- a/synapse/config/database.py
+++ b/synapse/config/database.py
@@ -69,7 +69,8 @@ class DatabaseConfig(Config):
             if database_path is not None:
                 self.database_config["args"]["database"] = database_path
 
-    def add_arguments(self, parser):
+    @staticmethod
+    def add_arguments(parser):
         db_group = parser.add_argument_group("database")
         db_group.add_argument(
             "-d",
diff --git a/synapse/config/logger.py b/synapse/config/logger.py
index 0f5554211c..40502a5798 100644
--- a/synapse/config/logger.py
+++ b/synapse/config/logger.py
@@ -103,7 +103,8 @@ class LoggingConfig(Config):
         if args.log_file is not None:
             self.log_file = args.log_file
 
-    def add_arguments(cls, parser):
+    @staticmethod
+    def add_arguments(parser):
         logging_group = parser.add_argument_group("logging")
         logging_group.add_argument(
             "-v",
diff --git a/synapse/config/registration.py b/synapse/config/registration.py
index 34cb11468c..c3de7a4e32 100644
--- a/synapse/config/registration.py
+++ b/synapse/config/registration.py
@@ -237,7 +237,8 @@ class RegistrationConfig(Config):
             % locals()
         )
 
-    def add_arguments(self, parser):
+    @staticmethod
+    def add_arguments(parser):
         reg_group = parser.add_argument_group("registration")
         reg_group.add_argument(
             "--enable-registration",
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 2a74dea2ea..080d0630bd 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -639,7 +639,8 @@ class ServerConfig(Config):
         if args.print_pidfile is not None:
             self.print_pidfile = args.print_pidfile
 
-    def add_arguments(self, parser):
+    @staticmethod
+    def add_arguments(parser):
         server_group = parser.add_argument_group("server")
         server_group.add_argument(
             "-D",