diff options
author | Erik Johnston <erik@matrix.org> | 2019-10-30 13:37:04 +0000 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2019-10-30 13:37:04 +0000 |
commit | ec6de1cc7d915abf6907b1d6a93336f8cd435cdd (patch) | |
tree | 5e0beaa14d2e9abc82116a7a07740abf2508177b /scripts/synapse_port_db | |
parent | Review comments (diff) | |
parent | Merge pull request #6291 from matrix-org/erikj/fix_cache_descriptor (diff) | |
download | synapse-ec6de1cc7d915abf6907b1d6a93336f8cd435cdd.tar.xz |
Merge branch 'develop' of github.com:matrix-org/synapse into erikj/split_out_persistence_store
Diffstat (limited to 'scripts/synapse_port_db')
-rwxr-xr-x | scripts/synapse_port_db | 193 |
1 files changed, 141 insertions, 52 deletions
diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index 3f942abdb6..54faed1e83 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -2,6 +2,7 @@ # -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd # Copyright 2018 New Vector Ltd +# Copyright 2019 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -29,9 +30,33 @@ import yaml from twisted.enterprise import adbapi from twisted.internet import defer, reactor -from synapse.storage._base import LoggingTransaction, SQLBaseStore +from synapse.config.homeserver import HomeServerConfig +from synapse.logging.context import PreserveLoggingContext +from synapse.storage._base import LoggingTransaction +from synapse.storage.data_stores.main.client_ips import ClientIpBackgroundUpdateStore +from synapse.storage.data_stores.main.deviceinbox import ( + DeviceInboxBackgroundUpdateStore, +) +from synapse.storage.data_stores.main.devices import DeviceBackgroundUpdateStore +from synapse.storage.data_stores.main.events_bg_updates import ( + EventsBackgroundUpdatesStore, +) +from synapse.storage.data_stores.main.media_repository import ( + MediaRepositoryBackgroundUpdateStore, +) +from synapse.storage.data_stores.main.registration import ( + RegistrationBackgroundUpdateStore, +) +from synapse.storage.data_stores.main.roommember import RoomMemberBackgroundUpdateStore +from synapse.storage.data_stores.main.search import SearchBackgroundUpdateStore +from synapse.storage.data_stores.main.state import StateBackgroundUpdateStore +from synapse.storage.data_stores.main.stats import StatsStore +from synapse.storage.data_stores.main.user_directory import ( + UserDirectoryBackgroundUpdateStore, +) from synapse.storage.engines import create_engine from synapse.storage.prepare_database import prepare_database +from synapse.util import Clock logger = logging.getLogger("synapse_port_db") @@ -43,6 +68,7 @@ BOOLEAN_COLUMNS = { "presence_list": ["accepted"], "presence_stream": ["currently_active"], "public_room_list_stream": ["visibility"], + "devices": ["hidden"], "device_lists_outbound_pokes": ["sent"], "users_who_share_rooms": ["share_private"], "groups": ["is_public"], @@ -98,33 +124,24 @@ APPEND_ONLY_TABLES = [ end_error_exec_info = None -class Store(object): - """This object is used to pull out some of the convenience API from the - Storage layer. - - *All* database interactions should go through this object. - """ - - def __init__(self, db_pool, engine): - self.db_pool = db_pool - self.database_engine = engine - - _simple_insert_txn = SQLBaseStore.__dict__["_simple_insert_txn"] - _simple_insert = SQLBaseStore.__dict__["_simple_insert"] - - _simple_select_onecol_txn = SQLBaseStore.__dict__["_simple_select_onecol_txn"] - _simple_select_onecol = SQLBaseStore.__dict__["_simple_select_onecol"] - _simple_select_one = SQLBaseStore.__dict__["_simple_select_one"] - _simple_select_one_txn = SQLBaseStore.__dict__["_simple_select_one_txn"] - _simple_select_one_onecol = SQLBaseStore.__dict__["_simple_select_one_onecol"] - _simple_select_one_onecol_txn = SQLBaseStore.__dict__[ - "_simple_select_one_onecol_txn" - ] - - _simple_update_one = SQLBaseStore.__dict__["_simple_update_one"] - _simple_update_one_txn = SQLBaseStore.__dict__["_simple_update_one_txn"] - _simple_update_txn = SQLBaseStore.__dict__["_simple_update_txn"] +class Store( + ClientIpBackgroundUpdateStore, + DeviceInboxBackgroundUpdateStore, + DeviceBackgroundUpdateStore, + EventsBackgroundUpdatesStore, + MediaRepositoryBackgroundUpdateStore, + RegistrationBackgroundUpdateStore, + RoomMemberBackgroundUpdateStore, + SearchBackgroundUpdateStore, + StateBackgroundUpdateStore, + UserDirectoryBackgroundUpdateStore, + StatsStore, +): + def __init__(self, db_conn, hs): + super().__init__(db_conn, hs) + self.db_pool = hs.get_db_pool() + @defer.inlineCallbacks def runInteraction(self, desc, func, *args, **kwargs): def r(conn): try: @@ -150,7 +167,8 @@ class Store(object): logger.debug("[TXN FAIL] {%s} %s", desc, e) raise - return self.db_pool.runWithConnection(r) + with PreserveLoggingContext(): + return (yield self.db_pool.runWithConnection(r)) def execute(self, f, *args, **kwargs): return self.runInteraction(f.__name__, f, *args, **kwargs) @@ -176,6 +194,25 @@ class Store(object): raise +class MockHomeserver: + def __init__(self, config, database_engine, db_conn, db_pool): + self.database_engine = database_engine + self.db_conn = db_conn + self.db_pool = db_pool + self.clock = Clock(reactor) + self.config = config + self.hostname = config.server_name + + def get_db_conn(self): + return self.db_conn + + def get_db_pool(self): + return self.db_pool + + def get_clock(self): + return self.clock + + class Porter(object): def __init__(self, **kwargs): self.__dict__.update(kwargs) @@ -447,31 +484,75 @@ class Porter(object): db_conn.commit() + return db_conn + @defer.inlineCallbacks - def run(self): - try: - sqlite_db_pool = adbapi.ConnectionPool( - self.sqlite_config["name"], **self.sqlite_config["args"] - ) + def build_db_store(self, config): + """Builds and returns a database store using the provided configuration. - postgres_db_pool = adbapi.ConnectionPool( - self.postgres_config["name"], **self.postgres_config["args"] - ) + Args: + config: The database configuration, i.e. a dict following the structure of + the "database" section of Synapse's configuration file. + + Returns: + The built Store object. + """ + engine = create_engine(config) + + self.progress.set_state("Preparing %s" % config["name"]) + conn = self.setup_db(config, engine) + + db_pool = adbapi.ConnectionPool( + config["name"], **config["args"] + ) + + hs = MockHomeserver(self.hs_config, engine, conn, db_pool) + + store = Store(conn, hs) + + yield store.runInteraction( + "%s_engine.check_database" % config["name"], + engine.check_database, + ) - sqlite_engine = create_engine(sqlite_config) - postgres_engine = create_engine(postgres_config) + return store - self.sqlite_store = Store(sqlite_db_pool, sqlite_engine) - self.postgres_store = Store(postgres_db_pool, postgres_engine) + @defer.inlineCallbacks + def run_background_updates_on_postgres(self): + # Manually apply all background updates on the PostgreSQL database. + postgres_ready = yield self.postgres_store.has_completed_background_updates() + + if not postgres_ready: + # Only say that we're running background updates when there are background + # updates to run. + self.progress.set_state("Running background updates on PostgreSQL") + + while not postgres_ready: + yield self.postgres_store.do_next_background_update(100) + postgres_ready = yield ( + self.postgres_store.has_completed_background_updates() + ) - yield self.postgres_store.execute(postgres_engine.check_database) + @defer.inlineCallbacks + def run(self): + try: + self.sqlite_store = yield self.build_db_store(self.sqlite_config) + + # Check if all background updates are done, abort if not. + updates_complete = yield self.sqlite_store.has_completed_background_updates() + if not updates_complete: + sys.stderr.write( + "Pending background updates exist in the SQLite3 database." + " Please start Synapse again and wait until every update has finished" + " before running this script.\n" + ) + defer.returnValue(None) - # Step 1. Set up databases. - self.progress.set_state("Preparing SQLite3") - self.setup_db(sqlite_config, sqlite_engine) + self.postgres_store = yield self.build_db_store( + self.hs_config.database_config + ) - self.progress.set_state("Preparing PostgreSQL") - self.setup_db(postgres_config, postgres_engine) + yield self.run_background_updates_on_postgres() self.progress.set_state("Creating port tables") @@ -563,6 +644,8 @@ class Porter(object): def conv(j, col): if j in bool_cols: return bool(col) + if isinstance(col, bytes): + return bytearray(col) elif isinstance(col, string_types) and "\0" in col: logger.warn( "DROPPING ROW: NUL value in table %s col %s: %r", @@ -926,18 +1009,24 @@ if __name__ == "__main__": }, } - postgres_config = yaml.safe_load(args.postgres_config) + hs_config = yaml.safe_load(args.postgres_config) - if "database" in postgres_config: - postgres_config = postgres_config["database"] + if "database" not in hs_config: + sys.stderr.write("The configuration file must have a 'database' section.\n") + sys.exit(4) + + postgres_config = hs_config["database"] if "name" not in postgres_config: - sys.stderr.write("Malformed database config: no 'name'") + sys.stderr.write("Malformed database config: no 'name'\n") sys.exit(2) if postgres_config["name"] != "psycopg2": - sys.stderr.write("Database must use 'psycopg2' connector.") + sys.stderr.write("Database must use the 'psycopg2' connector.\n") sys.exit(3) + config = HomeServerConfig() + config.parse_config_dict(hs_config, "", "") + def start(stdscr=None): if stdscr: progress = CursesProgress(stdscr) @@ -946,9 +1035,9 @@ if __name__ == "__main__": porter = Porter( sqlite_config=sqlite_config, - postgres_config=postgres_config, progress=progress, batch_size=args.batch_size, + hs_config=config, ) reactor.callWhenRunning(porter.run) |