diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index 1ebb7ae539..540dbd9236 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -48,7 +48,7 @@ def register_sighup(func):
_sighup_callbacks.append(func)
-def start_worker_reactor(appname, config):
+def start_worker_reactor(appname, config, run_command=reactor.run):
""" Run the reactor in the main process
Daemonizes if necessary, and then configures some resources, before starting
@@ -57,6 +57,7 @@ def start_worker_reactor(appname, config):
Args:
appname (str): application name which will be sent to syslog
config (synapse.config.Config): config object
+ run_command (Callable[]): callable that actually runs the reactor
"""
logger = logging.getLogger(config.worker_app)
@@ -69,11 +70,19 @@ def start_worker_reactor(appname, config):
daemonize=config.worker_daemonize,
print_pidfile=config.print_pidfile,
logger=logger,
+ run_command=run_command,
)
def start_reactor(
- appname, soft_file_limit, gc_thresholds, pid_file, daemonize, print_pidfile, logger
+ appname,
+ soft_file_limit,
+ gc_thresholds,
+ pid_file,
+ daemonize,
+ print_pidfile,
+ logger,
+ run_command=reactor.run,
):
""" Run the reactor in the main process
@@ -88,6 +97,7 @@ def start_reactor(
daemonize (bool): true to run the reactor in a background process
print_pidfile (bool): whether to print the pid file, if daemonize is True
logger (logging.Logger): logger instance to pass to Daemonize
+ run_command (Callable[]): callable that actually runs the reactor
"""
install_dns_limiter(reactor)
@@ -97,7 +107,7 @@ def start_reactor(
change_resource_limit(soft_file_limit)
if gc_thresholds:
gc.set_threshold(*gc_thresholds)
- reactor.run()
+ run_command()
# make sure that we run the reactor with the sentinel log context,
# otherwise other PreserveLoggingContext instances will get confused
@@ -139,8 +149,7 @@ def listen_metrics(bind_addresses, port):
"""
Start Prometheus metrics server.
"""
- from synapse.metrics import RegistryProxy
- from prometheus_client import start_http_server
+ from synapse.metrics import RegistryProxy, start_http_server
for host in bind_addresses:
logger.info("Starting metrics listener on %s:%d", host, port)
@@ -243,6 +252,9 @@ def start(hs, listeners=None):
# Load the certificate from disk.
refresh_certificate(hs)
+ # Start the tracer
+ synapse.logging.opentracing.init_tracer(hs.config)
+
# It is now safe to start your Synapse.
hs.start_listening(listeners)
hs.get_datastore().start_profiling()
diff --git a/synapse/app/admin_cmd.py b/synapse/app/admin_cmd.py
new file mode 100644
index 0000000000..1fd52a5526
--- /dev/null
+++ b/synapse/app/admin_cmd.py
@@ -0,0 +1,264 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Copyright 2019 Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import argparse
+import logging
+import os
+import sys
+import tempfile
+
+from canonicaljson import json
+
+from twisted.internet import defer, task
+
+import synapse
+from synapse.app import _base
+from synapse.config._base import ConfigError
+from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
+from synapse.handlers.admin import ExfiltrationWriter
+from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
+from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
+from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
+from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
+from synapse.replication.slave.storage.devices import SlavedDeviceStore
+from synapse.replication.slave.storage.events import SlavedEventStore
+from synapse.replication.slave.storage.filtering import SlavedFilteringStore
+from synapse.replication.slave.storage.groups import SlavedGroupServerStore
+from synapse.replication.slave.storage.presence import SlavedPresenceStore
+from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
+from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
+from synapse.replication.slave.storage.registration import SlavedRegistrationStore
+from synapse.replication.slave.storage.room import RoomStore
+from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.server import HomeServer
+from synapse.storage.engines import create_engine
+from synapse.util.logcontext import LoggingContext
+from synapse.util.versionstring import get_version_string
+
+logger = logging.getLogger("synapse.app.admin_cmd")
+
+
+class AdminCmdSlavedStore(
+ SlavedReceiptsStore,
+ SlavedAccountDataStore,
+ SlavedApplicationServiceStore,
+ SlavedRegistrationStore,
+ SlavedFilteringStore,
+ SlavedPresenceStore,
+ SlavedGroupServerStore,
+ SlavedDeviceInboxStore,
+ SlavedDeviceStore,
+ SlavedPushRuleStore,
+ SlavedEventStore,
+ SlavedClientIpStore,
+ RoomStore,
+ BaseSlavedStore,
+):
+ pass
+
+
+class AdminCmdServer(HomeServer):
+ DATASTORE_CLASS = AdminCmdSlavedStore
+
+ def _listen_http(self, listener_config):
+ pass
+
+ def start_listening(self, listeners):
+ pass
+
+ def build_tcp_replication(self):
+ return AdminCmdReplicationHandler(self)
+
+
+class AdminCmdReplicationHandler(ReplicationClientHandler):
+ @defer.inlineCallbacks
+ def on_rdata(self, stream_name, token, rows):
+ pass
+
+ def get_streams_to_replicate(self):
+ return {}
+
+
+@defer.inlineCallbacks
+def export_data_command(hs, args):
+ """Export data for a user.
+
+ Args:
+ hs (HomeServer)
+ args (argparse.Namespace)
+ """
+
+ user_id = args.user_id
+ directory = args.output_directory
+
+ res = yield hs.get_handlers().admin_handler.export_user_data(
+ user_id, FileExfiltrationWriter(user_id, directory=directory)
+ )
+ print(res)
+
+
+class FileExfiltrationWriter(ExfiltrationWriter):
+ """An ExfiltrationWriter that writes the users data to a directory.
+ Returns the directory location on completion.
+
+ Note: This writes to disk on the main reactor thread.
+
+ Args:
+ user_id (str): The user whose data is being exfiltrated.
+ directory (str|None): The directory to write the data to, if None then
+ will write to a temporary directory.
+ """
+
+ def __init__(self, user_id, directory=None):
+ self.user_id = user_id
+
+ if directory:
+ self.base_directory = directory
+ else:
+ self.base_directory = tempfile.mkdtemp(
+ prefix="synapse-exfiltrate__%s__" % (user_id,)
+ )
+
+ os.makedirs(self.base_directory, exist_ok=True)
+ if list(os.listdir(self.base_directory)):
+ raise Exception("Directory must be empty")
+
+ def write_events(self, room_id, events):
+ room_directory = os.path.join(self.base_directory, "rooms", room_id)
+ os.makedirs(room_directory, exist_ok=True)
+ events_file = os.path.join(room_directory, "events")
+
+ with open(events_file, "a") as f:
+ for event in events:
+ print(json.dumps(event.get_pdu_json()), file=f)
+
+ def write_state(self, room_id, event_id, state):
+ room_directory = os.path.join(self.base_directory, "rooms", room_id)
+ state_directory = os.path.join(room_directory, "state")
+ os.makedirs(state_directory, exist_ok=True)
+
+ event_file = os.path.join(state_directory, event_id)
+
+ with open(event_file, "a") as f:
+ for event in state.values():
+ print(json.dumps(event.get_pdu_json()), file=f)
+
+ def write_invite(self, room_id, event, state):
+ self.write_events(room_id, [event])
+
+ # We write the invite state somewhere else as they aren't full events
+ # and are only a subset of the state at the event.
+ room_directory = os.path.join(self.base_directory, "rooms", room_id)
+ os.makedirs(room_directory, exist_ok=True)
+
+ invite_state = os.path.join(room_directory, "invite_state")
+
+ with open(invite_state, "a") as f:
+ for event in state.values():
+ print(json.dumps(event), file=f)
+
+ def finished(self):
+ return self.base_directory
+
+
+def start(config_options):
+ parser = argparse.ArgumentParser(description="Synapse Admin Command")
+ HomeServerConfig.add_arguments_to_parser(parser)
+
+ subparser = parser.add_subparsers(
+ title="Admin Commands",
+ required=True,
+ dest="command",
+ metavar="<admin_command>",
+ help="The admin command to perform.",
+ )
+ export_data_parser = subparser.add_parser(
+ "export-data", help="Export all data for a user"
+ )
+ export_data_parser.add_argument("user_id", help="User to extra data from")
+ export_data_parser.add_argument(
+ "--output-directory",
+ action="store",
+ metavar="DIRECTORY",
+ required=False,
+ help="The directory to store the exported data in. Must be empty. Defaults"
+ " to creating a temp directory.",
+ )
+ export_data_parser.set_defaults(func=export_data_command)
+
+ try:
+ config, args = HomeServerConfig.load_config_with_parser(parser, config_options)
+ except ConfigError as e:
+ sys.stderr.write("\n" + str(e) + "\n")
+ sys.exit(1)
+
+ if config.worker_app is not None:
+ assert config.worker_app == "synapse.app.admin_cmd"
+
+ # Update the config with some basic overrides so that don't have to specify
+ # a full worker config.
+ config.worker_app = "synapse.app.admin_cmd"
+
+ if (
+ not config.worker_daemonize
+ and not config.worker_log_file
+ and not config.worker_log_config
+ ):
+ # Since we're meant to be run as a "command" let's not redirect stdio
+ # unless we've actually set log config.
+ config.no_redirect_stdio = True
+
+ # Explicitly disable background processes
+ config.update_user_directory = False
+ config.start_pushers = False
+ config.send_federation = False
+
+ setup_logging(config, use_worker_options=True)
+
+ synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
+
+ database_engine = create_engine(config.database_config)
+
+ ss = AdminCmdServer(
+ config.server_name,
+ db_config=config.database_config,
+ config=config,
+ version_string="Synapse/" + get_version_string(synapse),
+ database_engine=database_engine,
+ )
+
+ ss.setup()
+
+ # We use task.react as the basic run command as it correctly handles tearing
+ # down the reactor when the deferreds resolve and setting the return value.
+ # We also make sure that `_base.start` gets run before we actually run the
+ # command.
+
+ @defer.inlineCallbacks
+ def run(_reactor):
+ with LoggingContext("command"):
+ yield _base.start(ss, [])
+ yield args.func(ss, args)
+
+ _base.start_worker_reactor(
+ "synapse-admin-cmd", config, run_command=lambda: task.react(run)
+ )
+
+
+if __name__ == "__main__":
+ with LoggingContext("main"):
+ start(sys.argv[1:])
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
index be44249ed6..e01f3e5f3b 100644
--- a/synapse/app/appservice.py
+++ b/synapse/app/appservice.py
@@ -27,8 +27,7 @@ from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
from synapse.http.site import SynapseSite
from synapse.logging.context import LoggingContext, run_in_background
-from synapse.metrics import RegistryProxy
-from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.directory import DirectoryStore
from synapse.replication.slave.storage.events import SlavedEventStore
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index ff11beca82..29bddc4823 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -28,8 +28,7 @@ from synapse.config.logger import setup_logging
from synapse.http.server import JsonResource
from synapse.http.site import SynapseSite
from synapse.logging.context import LoggingContext
-from synapse.metrics import RegistryProxy
-from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py
index cacad25eac..042cfd04af 100644
--- a/synapse/app/event_creator.py
+++ b/synapse/app/event_creator.py
@@ -28,8 +28,7 @@ from synapse.config.logger import setup_logging
from synapse.http.server import JsonResource
from synapse.http.site import SynapseSite
from synapse.logging.context import LoggingContext
-from synapse.metrics import RegistryProxy
-from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index 11e80dbae0..76a97f8f32 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -29,8 +29,7 @@ from synapse.config.logger import setup_logging
from synapse.federation.transport.server import TransportLayerServer
from synapse.http.site import SynapseSite
from synapse.logging.context import LoggingContext
-from synapse.metrics import RegistryProxy
-from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 97da7bdcbf..fec49d5092 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -28,9 +28,8 @@ from synapse.config.logger import setup_logging
from synapse.federation import send_queue
from synapse.http.site import SynapseSite
from synapse.logging.context import LoggingContext, run_in_background
-from synapse.metrics import RegistryProxy
+from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.slave.storage.events import SlavedEventStore
diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py
index 417a10bbd2..1f1f1df78e 100644
--- a/synapse/app/frontend_proxy.py
+++ b/synapse/app/frontend_proxy.py
@@ -30,8 +30,7 @@ from synapse.http.server import JsonResource
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.http.site import SynapseSite
from synapse.logging.context import LoggingContext
-from synapse.metrics import RegistryProxy
-from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 639b1429c0..0c075cb3f1 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -55,9 +55,8 @@ from synapse.http.additional_resource import AdditionalResource
from synapse.http.server import RootRedirect
from synapse.http.site import SynapseSite
from synapse.logging.context import LoggingContext
-from synapse.metrics import RegistryProxy
+from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.module_api import ModuleApi
from synapse.python_dependencies import check_requirements
from synapse.replication.http import REPLICATION_PREFIX, ReplicationRestResource
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index f23b9b6eda..d70780e9d5 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -28,8 +28,7 @@ from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
from synapse.http.site import SynapseSite
from synapse.logging.context import LoggingContext
-from synapse.metrics import RegistryProxy
-from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 4f929edf86..070de7d0b0 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -27,8 +27,7 @@ from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
from synapse.http.site import SynapseSite
from synapse.logging.context import LoggingContext, run_in_background
-from synapse.metrics import RegistryProxy
-from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.replication.slave.storage._base import __func__
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.events import SlavedEventStore
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index de4797fddc..315c030694 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -32,8 +32,7 @@ from synapse.handlers.presence import PresenceHandler, get_interested_parties
from synapse.http.server import JsonResource
from synapse.http.site import SynapseSite
from synapse.logging.context import LoggingContext, run_in_background
-from synapse.metrics import RegistryProxy
-from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.replication.slave.storage._base import BaseSlavedStore, __func__
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py
index 1177ddd72e..03ef21bd01 100644
--- a/synapse/app/user_dir.py
+++ b/synapse/app/user_dir.py
@@ -29,8 +29,7 @@ from synapse.config.logger import setup_logging
from synapse.http.server import JsonResource
from synapse.http.site import SynapseSite
from synapse.logging.context import LoggingContext, run_in_background
-from synapse.metrics import RegistryProxy
-from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
|