diff options
Diffstat (limited to 'synapse/storage/prepare_database.py')
-rw-r--r-- | synapse/storage/prepare_database.py | 121 |
1 files changed, 82 insertions, 39 deletions
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 3799d46734..683e5e3b90 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -1,5 +1,4 @@ -# Copyright 2014 - 2016 OpenMarket Ltd -# Copyright 2018 New Vector Ltd +# Copyright 2014 - 2021 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -26,7 +25,7 @@ from synapse.config.homeserver import HomeServerConfig from synapse.storage.database import LoggingDatabaseConnection from synapse.storage.engines import BaseDatabaseEngine from synapse.storage.engines.postgres import PostgresEngine -from synapse.storage.schema import SCHEMA_VERSION +from synapse.storage.schema import SCHEMA_COMPAT_VERSION, SCHEMA_VERSION from synapse.storage.types import Cursor logger = logging.getLogger(__name__) @@ -59,6 +58,28 @@ UNAPPLIED_DELTA_ON_WORKER_ERROR = ( ) +@attr.s +class _SchemaState: + current_version: int = attr.ib() + """The current schema version of the database""" + + compat_version: Optional[int] = attr.ib() + """The SCHEMA_VERSION of the oldest version of Synapse for this database + + If this is None, we have an old version of the database without the necessary + table. + """ + + applied_deltas: Collection[str] = attr.ib(factory=tuple) + """Any delta files for `current_version` which have already been applied""" + + upgraded: bool = attr.ib(default=False) + """Whether the current state was reached by applying deltas. + + If False, we have run the full schema for `current_version`, and have applied no + deltas since. If True, we have run some deltas since the original creation.""" + + def prepare_database( db_conn: LoggingDatabaseConnection, database_engine: BaseDatabaseEngine, @@ -96,12 +117,11 @@ def prepare_database( version_info = _get_or_create_schema_state(cur, database_engine) if version_info: - user_version, delta_files, upgraded = version_info logger.info( "%r: Existing schema is %i (+%i deltas)", databases, - user_version, - len(delta_files), + version_info.current_version, + len(version_info.applied_deltas), ) # config should only be None when we are preparing an in-memory SQLite db, @@ -113,16 +133,18 @@ def prepare_database( # if it's a worker app, refuse to upgrade the database, to avoid multiple # workers doing it at once. - if config.worker_app is not None and user_version != SCHEMA_VERSION: + if ( + config.worker_app is not None + and version_info.current_version != SCHEMA_VERSION + ): raise UpgradeDatabaseException( - OUTDATED_SCHEMA_ON_WORKER_ERROR % (SCHEMA_VERSION, user_version) + OUTDATED_SCHEMA_ON_WORKER_ERROR + % (SCHEMA_VERSION, version_info.current_version) ) _upgrade_existing_database( cur, - user_version, - delta_files, - upgraded, + version_info, database_engine, config, databases=databases, @@ -261,9 +283,7 @@ def _setup_new_database( _upgrade_existing_database( cur, - current_version=max_current_ver, - applied_delta_files=[], - upgraded=False, + _SchemaState(current_version=max_current_ver, compat_version=None), database_engine=database_engine, config=None, databases=databases, @@ -273,9 +293,7 @@ def _setup_new_database( def _upgrade_existing_database( cur: Cursor, - current_version: int, - applied_delta_files: List[str], - upgraded: bool, + current_schema_state: _SchemaState, database_engine: BaseDatabaseEngine, config: Optional[HomeServerConfig], databases: Collection[str], @@ -321,12 +339,8 @@ def _upgrade_existing_database( Args: cur - current_version: The current version of the schema. - applied_delta_files: A list of deltas that have already been applied. - upgraded: Whether the current version was generated by having - applied deltas or from full schema file. If `True` the function - will never apply delta files for the given `current_version`, since - the current_version wasn't generated by applying those delta files. + current_schema_state: The current version of the schema, as + returned by _get_or_create_schema_state database_engine config: None if we are initialising a blank database, otherwise the application @@ -337,13 +351,16 @@ def _upgrade_existing_database( upgrade portions of the delta scripts. """ if is_empty: - assert not applied_delta_files + assert not current_schema_state.applied_deltas else: assert config is_worker = config and config.worker_app is not None - if current_version > SCHEMA_VERSION: + if ( + current_schema_state.compat_version is not None + and current_schema_state.compat_version > SCHEMA_VERSION + ): raise ValueError( "Cannot use this database as it is too " + "new for the server to understand" @@ -357,14 +374,26 @@ def _upgrade_existing_database( assert config is not None check_database_before_upgrade(cur, database_engine, config) - start_ver = current_version + # update schema_compat_version before we run any upgrades, so that if synapse + # gets downgraded again, it won't try to run against the upgraded database. + if ( + current_schema_state.compat_version is None + or current_schema_state.compat_version < SCHEMA_COMPAT_VERSION + ): + cur.execute("DELETE FROM schema_compat_version") + cur.execute( + "INSERT INTO schema_compat_version(compat_version) VALUES (?)", + (SCHEMA_COMPAT_VERSION,), + ) + + start_ver = current_schema_state.current_version # if we got to this schema version by running a full_schema rather than a series # of deltas, we should not run the deltas for this version. - if not upgraded: + if not current_schema_state.upgraded: start_ver += 1 - logger.debug("applied_delta_files: %s", applied_delta_files) + logger.debug("applied_delta_files: %s", current_schema_state.applied_deltas) if isinstance(database_engine, PostgresEngine): specific_engine_extension = ".postgres" @@ -440,7 +469,7 @@ def _upgrade_existing_database( absolute_path = entry.absolute_path logger.debug("Found file: %s (%s)", relative_path, absolute_path) - if relative_path in applied_delta_files: + if relative_path in current_schema_state.applied_deltas: continue root_name, ext = os.path.splitext(file_name) @@ -621,7 +650,7 @@ def execute_statements_from_stream(cur: Cursor, f: TextIO) -> None: def _get_or_create_schema_state( txn: Cursor, database_engine: BaseDatabaseEngine -) -> Optional[Tuple[int, List[str], bool]]: +) -> Optional[_SchemaState]: # Bluntly try creating the schema_version tables. sql_path = os.path.join(schema_path, "common", "schema_version.sql") executescript(txn, sql_path) @@ -629,17 +658,31 @@ def _get_or_create_schema_state( txn.execute("SELECT version, upgraded FROM schema_version") row = txn.fetchone() + if row is None: + # new database + return None + + current_version = int(row[0]) + upgraded = bool(row[1]) + + compat_version: Optional[int] = None + txn.execute("SELECT compat_version FROM schema_compat_version") + row = txn.fetchone() if row is not None: - current_version = int(row[0]) - txn.execute( - "SELECT file FROM applied_schema_deltas WHERE version >= ?", - (current_version,), - ) - applied_deltas = [d for d, in txn] - upgraded = bool(row[1]) - return current_version, applied_deltas, upgraded + compat_version = int(row[0]) + + txn.execute( + "SELECT file FROM applied_schema_deltas WHERE version >= ?", + (current_version,), + ) + applied_deltas = tuple(d for d, in txn) - return None + return _SchemaState( + current_version=current_version, + compat_version=compat_version, + applied_deltas=applied_deltas, + upgraded=upgraded, + ) @attr.s(slots=True) |