diff options
Diffstat (limited to 'synapse/storage/prepare_database.py')
-rw-r--r-- | synapse/storage/prepare_database.py | 136 |
1 files changed, 94 insertions, 42 deletions
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 9cc3b51fe6..ee60e2a718 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -47,8 +47,24 @@ class UpgradeDatabaseException(PrepareDatabaseException): pass -def prepare_database(db_conn, database_engine, config, data_stores=["main", "state"]): - """Prepares a database for usage. Will either create all necessary tables +OUTDATED_SCHEMA_ON_WORKER_ERROR = ( + "Expected database schema version %i but got %i: run the main synapse process to " + "upgrade the database schema before starting worker processes." +) + +EMPTY_DATABASE_ON_WORKER_ERROR = ( + "Uninitialised database: run the main synapse process to prepare the database " + "schema before starting worker processes." +) + +UNAPPLIED_DELTA_ON_WORKER_ERROR = ( + "Database schema delta %s has not been applied: run the main synapse process to " + "upgrade the database schema before starting worker processes." +) + + +def prepare_database(db_conn, database_engine, config, databases=["main", "state"]): + """Prepares a physical database for usage. Will either create all necessary tables or upgrade from an older schema version. If `config` is None then prepare_database will assert that no upgrade is @@ -60,37 +76,57 @@ def prepare_database(db_conn, database_engine, config, data_stores=["main", "sta config (synapse.config.homeserver.HomeServerConfig|None): application config, or None if we are connecting to an existing database which we expect to be configured already - data_stores (list[str]): The name of the data stores that will be used - with this database. Defaults to all data stores. + databases (list[str]): The name of the databases that will be used + with this physical database. Defaults to all databases. """ try: cur = db_conn.cursor() + + logger.info("%r: Checking existing schema version", databases) version_info = _get_or_create_schema_state(cur, database_engine) if version_info: user_version, delta_files, upgraded = version_info + logger.info( + "%r: Existing schema is %i (+%i deltas)", + databases, + user_version, + len(delta_files), + ) + # config should only be None when we are preparing an in-memory SQLite db, + # which should be empty. if config is None: - if user_version != SCHEMA_VERSION: - # If we don't pass in a config file then we are expecting to - # have already upgraded the DB. - raise UpgradeDatabaseException( - "Expected database schema version %i but got %i" - % (SCHEMA_VERSION, user_version) - ) - else: - _upgrade_existing_database( - cur, - user_version, - delta_files, - upgraded, - database_engine, - config, - data_stores=data_stores, + raise ValueError( + "config==None in prepare_database, but databse is not empty" + ) + + # if it's a worker app, refuse to upgrade the database, to avoid multiple + # workers doing it at once. + if config.worker_app is not None and user_version != SCHEMA_VERSION: + raise UpgradeDatabaseException( + OUTDATED_SCHEMA_ON_WORKER_ERROR % (SCHEMA_VERSION, user_version) ) + + _upgrade_existing_database( + cur, + user_version, + delta_files, + upgraded, + database_engine, + config, + databases=databases, + ) else: - _setup_new_database(cur, database_engine, data_stores=data_stores) + logger.info("%r: Initialising new database", databases) + + # if it's a worker app, refuse to upgrade the database, to avoid multiple + # workers doing it at once. + if config and config.worker_app is not None: + raise UpgradeDatabaseException(EMPTY_DATABASE_ON_WORKER_ERROR) + + _setup_new_database(cur, database_engine, databases=databases) # check if any of our configured dynamic modules want a database if config is not None: @@ -103,9 +139,9 @@ def prepare_database(db_conn, database_engine, config, data_stores=["main", "sta raise -def _setup_new_database(cur, database_engine, data_stores): - """Sets up the database by finding a base set of "full schemas" and then - applying any necessary deltas, including schemas from the given data +def _setup_new_database(cur, database_engine, databases): + """Sets up the physical database by finding a base set of "full schemas" and + then applying any necessary deltas, including schemas from the given data stores. The "full_schemas" directory has subdirectories named after versions. This @@ -138,8 +174,8 @@ def _setup_new_database(cur, database_engine, data_stores): Args: cur (Cursor): a database cursor database_engine (DatabaseEngine) - data_stores (list[str]): The names of the data stores to instantiate - on the given database. + databases (list[str]): The names of the databases to instantiate + on the given physical database. """ # We're about to set up a brand new database so we check that its @@ -176,13 +212,13 @@ def _setup_new_database(cur, database_engine, data_stores): directories.extend( os.path.join( dir_path, - "data_stores", - data_store, + "databases", + database, "schema", "full_schemas", str(max_current_ver), ) - for data_store in data_stores + for database in databases ) directory_entries = [] @@ -219,7 +255,7 @@ def _setup_new_database(cur, database_engine, data_stores): upgraded=False, database_engine=database_engine, config=None, - data_stores=data_stores, + databases=databases, is_empty=True, ) @@ -231,10 +267,10 @@ def _upgrade_existing_database( upgraded, database_engine, config, - data_stores, + databases, is_empty=False, ): - """Upgrades an existing database. + """Upgrades an existing physical database. Delta files can either be SQL stored in *.sql files, or python modules in *.py. @@ -285,8 +321,8 @@ def _upgrade_existing_database( config (synapse.config.homeserver.HomeServerConfig|None): None if we are initialising a blank database, otherwise the application config - data_stores (list[str]): The names of the data stores to instantiate - on the given database. + databases (list[str]): The names of the databases to instantiate + on the given physical database. is_empty (bool): Is this a blank database? I.e. do we need to run the upgrade portions of the delta scripts. """ @@ -295,6 +331,8 @@ def _upgrade_existing_database( else: assert config + is_worker = config and config.worker_app is not None + if current_version > SCHEMA_VERSION: raise ValueError( "Cannot use this database as it is too " @@ -303,8 +341,8 @@ def _upgrade_existing_database( # some of the deltas assume that config.server_name is set correctly, so now # is a good time to run the sanity check. - if not is_empty and "main" in data_stores: - from synapse.storage.data_stores.main import check_database_before_upgrade + if not is_empty and "main" in databases: + from synapse.storage.databases.main import check_database_before_upgrade check_database_before_upgrade(cur, database_engine, config) @@ -322,7 +360,7 @@ def _upgrade_existing_database( specific_engine_extensions = (".sqlite", ".postgres") for v in range(start_ver, SCHEMA_VERSION + 1): - logger.info("Upgrading schema to v%d", v) + logger.info("Applying schema deltas for v%d", v) # We need to search both the global and per data store schema # directories for schema updates. @@ -330,11 +368,9 @@ def _upgrade_existing_database( # First we find the directories to search in delta_dir = os.path.join(dir_path, "schema", "delta", str(v)) directories = [delta_dir] - for data_store in data_stores: + for database in databases: directories.append( - os.path.join( - dir_path, "data_stores", data_store, "schema", "delta", str(v) - ) + os.path.join(dir_path, "databases", database, "schema", "delta", str(v)) ) # Used to check if we have any duplicate file names @@ -384,9 +420,15 @@ def _upgrade_existing_database( continue root_name, ext = os.path.splitext(file_name) + if ext == ".py": # This is a python upgrade module. We need to import into some # package and then execute its `run_upgrade` function. + if is_worker: + raise PrepareDatabaseException( + UNAPPLIED_DELTA_ON_WORKER_ERROR % relative_path + ) + module_name = "synapse.storage.v%d_%s" % (v, root_name) with open(absolute_path) as python_file: module = imp.load_source(module_name, absolute_path, python_file) @@ -401,10 +443,18 @@ def _upgrade_existing_database( continue elif ext == ".sql": # A plain old .sql file, just read and execute it + if is_worker: + raise PrepareDatabaseException( + UNAPPLIED_DELTA_ON_WORKER_ERROR % relative_path + ) logger.info("Applying schema %s", relative_path) executescript(cur, absolute_path) elif ext == specific_engine_extension and root_name.endswith(".sql"): # A .sql file specific to our engine; just read and execute it + if is_worker: + raise PrepareDatabaseException( + UNAPPLIED_DELTA_ON_WORKER_ERROR % relative_path + ) logger.info("Applying engine-specific schema %s", relative_path) executescript(cur, absolute_path) elif ext in specific_engine_extensions and root_name.endswith(".sql"): @@ -434,6 +484,8 @@ def _upgrade_existing_database( (v, True), ) + logger.info("Schema now up to date") + def _apply_module_schemas(txn, database_engine, config): """Apply the module schemas for the dynamic modules, if any @@ -571,7 +623,7 @@ def _get_or_create_schema_state(txn, database_engine): @attr.s() -class _DirectoryListing(object): +class _DirectoryListing: """Helper class to store schema file name and the absolute path to it. |