summary refs log blame commit diff
path: root/synapse/app/admin_cmd.py
blob: bd73c47ae20a882e62f1feb11b4ac3f105624097 (plain) (tree)




































































































































































































                                                                                       
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import sys

from twisted.internet import defer, task

import synapse
from synapse.app import _base
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
from synapse.handlers.admin import FileExfiltrationWriter
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.filtering import SlavedFilteringStore
from synapse.replication.slave.storage.groups import SlavedGroupServerStore
from synapse.replication.slave.storage.presence import SlavedPresenceStore
from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.util.logcontext import LoggingContext
from synapse.util.versionstring import get_version_string

logger = logging.getLogger("synapse.app.admin_cmd")


class AdminCmdSlavedStore(
    SlavedReceiptsStore,
    SlavedAccountDataStore,
    SlavedApplicationServiceStore,
    SlavedRegistrationStore,
    SlavedFilteringStore,
    SlavedPresenceStore,
    SlavedGroupServerStore,
    SlavedDeviceInboxStore,
    SlavedDeviceStore,
    SlavedPushRuleStore,
    SlavedEventStore,
    SlavedClientIpStore,
    RoomStore,
    BaseSlavedStore,
):
    pass


class AdminCmdServer(HomeServer):
    DATASTORE_CLASS = AdminCmdSlavedStore

    def _listen_http(self, listener_config):
        pass

    def start_listening(self, listeners):
        pass

    def build_tcp_replication(self):
        return AdminCmdReplicationHandler(self)


class AdminCmdReplicationHandler(ReplicationClientHandler):
    @defer.inlineCallbacks
    def on_rdata(self, stream_name, token, rows):
        pass

    def get_streams_to_replicate(self):
        return {}


@defer.inlineCallbacks
def export_data_command(hs, user_id, directory):
    """Export data for a user.

    Args:
        user_id (str)
        directory (str|None): Directory to write output to. Will create a temp
            directory if not specified.
    """

    res = yield hs.get_handlers().admin_handler.exfiltrate_user_data(
        user_id, FileExfiltrationWriter(user_id, directory=directory)
    )
    print(res)


def start(config_options):
    parser = HomeServerConfig.create_argument_parser("Synapse Admin Command")

    subparser = parser.add_subparsers(
        title="Admin Commands",
        description="Choose and admin command to perform.",
        required=True,
        dest="command",
        metavar="<admin_command>",
        help="The admin command to perform.",
    )
    export_data_parser = subparser.add_parser(
        "export-data", help="Export all data for a user"
    )
    export_data_parser.add_argument("user_id", help="User to extra data from")
    export_data_parser.add_argument(
        "--output-directory",
        action="store",
        metavar="DIRECTORY",
        required=False,
        help="The directory to store the exported data in. Must be emtpy. Defaults"
        " to creating a temp directory.",
    )

    try:
        config, args = HomeServerConfig.load_config_with_parser(parser, config_options)
    except ConfigError as e:
        sys.stderr.write("\n" + str(e) + "\n")
        sys.exit(1)

    if config.worker_app is not None:
        assert config.worker_app == "synapse.app.admin_cmd"

    # Update the config with some basic overrides so that don't have to specify
    # a full worker config.
    config.worker_app = "synapse.app.admin_cmd"

    if (
        not config.worker_daemonize
        and not config.worker_log_file
        and not config.worker_log_config
    ):
        # Since we're meant to be run as a "command" let's not redirect stdio
        # unless we've actually set log config.
        config.no_redirect_stdio = True

    # Explicitly disable background processes
    config.update_user_directory = False
    config.start_pushers = False
    config.send_federation = False

    setup_logging(config, use_worker_options=True)

    synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts

    database_engine = create_engine(config.database_config)

    ss = AdminCmdServer(
        config.server_name,
        db_config=config.database_config,
        config=config,
        version_string="Synapse/" + get_version_string(synapse),
        database_engine=database_engine,
    )

    ss.setup()

    if args.command == "export-data":
        command = lambda: export_data_command(ss, args.user_id, args.output_directory)
    else:
        # This shouldn't happen.
        raise ConfigError("Unknown admin command %s" % (args.command,))

    # We use task.react as the basic run command as it correctly handles tearing
    # down the reactor when the deferreds resolve and setting the return value.
    # We also make sure that `_base.start` gets run before we actually run the
    # command.

    @defer.inlineCallbacks
    def run(_reactor):
        with LoggingContext("command"):
            yield _base.start(ss, [])
            yield command()

    _base.start_worker_reactor(
        "synapse-admin-cmd", config, run_command=lambda: task.react(run)
    )


if __name__ == "__main__":
    with LoggingContext("main"):
        start(sys.argv[1:])