diff options
Diffstat (limited to 'synapse/storage/__init__.py')
-rw-r--r-- | synapse/storage/__init__.py | 348 |
1 files changed, 272 insertions, 76 deletions
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index d6ec446bd2..a3ff995695 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -45,35 +45,18 @@ from syutil.jsonutil import encode_canonical_json from synapse.crypto.event_signing import compute_event_reference_hash +import fnmatch +import imp import logging import os +import re logger = logging.getLogger(__name__) -SCHEMAS = [ - "transactions", - "users", - "profiles", - "presence", - "im", - "room_aliases", - "keys", - "redactions", - "state", - "event_edges", - "event_signatures", - "pusher", - "media_repository", - "application_services", - "filtering", - "rejections", -] - - -# Remember to update this number every time an incompatible change is made to -# database schema files, so the users will be informed on server restarts. +# Remember to update this number every time a change is made to database +# schema files, so the users will be informed on server restarts. SCHEMA_VERSION = 14 dir_path = os.path.abspath(os.path.dirname(__file__)) @@ -576,28 +559,15 @@ class DataStore(RoomMemberStore, RoomStore, ) -def schema_path(schema): - """ Get a filesystem path for the named database schema - - Args: - schema: Name of the database schema. - Returns: - A filesystem path pointing at a ".sql" file. - - """ - schemaPath = os.path.join(dir_path, "schema", schema + ".sql") - return schemaPath - - -def read_schema(schema): +def read_schema(path): """ Read the named database schema. Args: - schema: Name of the datbase schema. + path: Path of the database schema. Returns: A string containing the database schema. """ - with open(schema_path(schema)) as schema_file: + with open(path) as schema_file: return schema_file.read() @@ -610,49 +580,275 @@ class UpgradeDatabaseException(PrepareDatabaseException): def prepare_database(db_conn): - """ Set up all the dbs. Since all the *.sql have IF NOT EXISTS, so we - don't have to worry about overwriting existing content. + """Prepares a database for usage. Will either create all necessary tables + or upgrade from an older schema version. + """ + try: + cur = db_conn.cursor() + version_info = _get_or_create_schema_state(cur) + + if version_info: + user_version, delta_files, upgraded = version_info + _upgrade_existing_database(cur, user_version, delta_files, upgraded) + else: + _setup_new_database(cur) + + cur.execute("PRAGMA user_version = %d" % (SCHEMA_VERSION,)) + + cur.close() + db_conn.commit() + except: + db_conn.rollback() + raise + + +def _setup_new_database(cur): + """Sets up the database by finding a base set of "full schemas" and then + applying any necessary deltas. + + The "full_schemas" directory has subdirectories named after versions. This + function searches for the highest version less than or equal to + `SCHEMA_VERSION` and executes all .sql files in that directory. + + The function will then apply all deltas for all versions after the base + version. + + Example directory structure: + + schema/ + delta/ + ... + full_schemas/ + 3/ + test.sql + ... + 11/ + foo.sql + bar.sql + ... + + In the example foo.sql and bar.sql would be run, and then any delta files + for versions strictly greater than 11. """ - c = db_conn.cursor() - c.execute("PRAGMA user_version") - row = c.fetchone() + current_dir = os.path.join(dir_path, "schema", "full_schemas") + directory_entries = os.listdir(current_dir) + + valid_dirs = [] + pattern = re.compile(r"^\d+(\.sql)?$") + for filename in directory_entries: + match = pattern.match(filename) + abs_path = os.path.join(current_dir, filename) + if match and os.path.isdir(abs_path): + ver = int(match.group(0)) + if ver <= SCHEMA_VERSION: + valid_dirs.append((ver, abs_path)) + else: + logger.warn("Unexpected entry in 'full_schemas': %s", filename) - if row and row[0]: - user_version = row[0] + if not valid_dirs: + raise PrepareDatabaseException( + "Could not find a suitable base set of full schemas" + ) - if user_version > SCHEMA_VERSION: - raise ValueError( - "Cannot use this database as it is too " + - "new for the server to understand" - ) - elif user_version < SCHEMA_VERSION: - logger.info( - "Upgrading database from version %d", - user_version + max_current_ver, sql_dir = max(valid_dirs, key=lambda x: x[0]) + + logger.debug("Initialising schema v%d", max_current_ver) + + directory_entries = os.listdir(sql_dir) + + sql_script = "BEGIN TRANSACTION;\n" + for filename in fnmatch.filter(directory_entries, "*.sql"): + sql_loc = os.path.join(sql_dir, filename) + logger.debug("Applying schema %s", sql_loc) + sql_script += read_schema(sql_loc) + sql_script += "\n" + sql_script += "COMMIT TRANSACTION;" + cur.executescript(sql_script) + + cur.execute( + "INSERT OR REPLACE INTO schema_version (version, upgraded)" + " VALUES (?,?)", + (max_current_ver, False) + ) + + _upgrade_existing_database( + cur, + current_version=max_current_ver, + applied_delta_files=[], + upgraded=False + ) + + +def _upgrade_existing_database(cur, current_version, applied_delta_files, + upgraded): + """Upgrades an existing database. + + Delta files can either be SQL stored in *.sql files, or python modules + in *.py. + + There can be multiple delta files per version. Synapse will keep track of + which delta files have been applied, and will apply any that haven't been + even if there has been no version bump. This is useful for development + where orthogonal schema changes may happen on separate branches. + + Different delta files for the same version *must* be orthogonal and give + the same result when applied in any order. No guarantees are made on the + order of execution of these scripts. + + This is a no-op of current_version == SCHEMA_VERSION. + + Example directory structure: + + schema/ + delta/ + 11/ + foo.sql + ... + 12/ + foo.sql + bar.py + ... + full_schemas/ + ... + + In the example, if current_version is 11, then foo.sql will be run if and + only if `upgraded` is True. Then `foo.sql` and `bar.py` would be run in + some arbitrary order. + + Args: + cur (Cursor) + current_version (int): The current version of the schema. + applied_delta_files (list): A list of deltas that have already been + applied. + upgraded (bool): Whether the current version was generated by having + applied deltas or from full schema file. If `True` the function + will never apply delta files for the given `current_version`, since + the current_version wasn't generated by applying those delta files. + """ + + if current_version > SCHEMA_VERSION: + raise ValueError( + "Cannot use this database as it is too " + + "new for the server to understand" + ) + + start_ver = current_version + if not upgraded: + start_ver += 1 + + for v in range(start_ver, SCHEMA_VERSION + 1): + logger.debug("Upgrading schema to v%d", v) + + delta_dir = os.path.join(dir_path, "schema", "delta", str(v)) + + try: + directory_entries = os.listdir(delta_dir) + except OSError: + logger.exception("Could not open delta dir for version %d", v) + raise UpgradeDatabaseException( + "Could not open delta dir for version %d" % (v,) ) - # Run every version since after the current version. - for v in range(user_version + 1, SCHEMA_VERSION + 1): - if v in (10, 14,): - raise UpgradeDatabaseException( - "No delta for version 10" + directory_entries.sort() + for file_name in directory_entries: + relative_path = os.path.join(str(v), file_name) + if relative_path in applied_delta_files: + continue + + absolute_path = os.path.join( + dir_path, "schema", "delta", relative_path, + ) + root_name, ext = os.path.splitext(file_name) + if ext == ".py": + # This is a python upgrade module. We need to import into some + # package and then execute its `run_upgrade` function. + module_name = "synapse.storage.v%d_%s" % ( + v, root_name + ) + with open(absolute_path) as python_file: + module = imp.load_source( + module_name, absolute_path, python_file ) - sql_script = read_schema("delta/v%d" % (v,)) - c.executescript(sql_script) + logger.debug("Running script %s", relative_path) + module.run_upgrade(cur) + elif ext == ".sql": + # A plain old .sql file, just read and execute it + delta_schema = read_schema(absolute_path) + logger.debug("Applying schema %s", relative_path) + cur.executescript(delta_schema) + else: + # Not a valid delta file. + logger.warn( + "Found directory entry that did not end in .py or" + " .sql: %s", + relative_path, + ) + continue + + # Mark as done. + cur.execute( + "INSERT INTO applied_schema_deltas (version, file)" + " VALUES (?,?)", + (v, relative_path) + ) + + cur.execute( + "INSERT OR REPLACE INTO schema_version (version, upgraded)" + " VALUES (?,?)", + (v, True) + ) - db_conn.commit() - else: - logger.info("Database is at version %r", user_version) - - else: - sql_script = "BEGIN TRANSACTION;\n" - for sql_loc in SCHEMAS: - logger.debug("Applying schema %r", sql_loc) - sql_script += read_schema(sql_loc) - sql_script += "\n" - sql_script += "COMMIT TRANSACTION;" - c.executescript(sql_script) - db_conn.commit() - c.execute("PRAGMA user_version = %d" % SCHEMA_VERSION) - c.close() +def _get_or_create_schema_state(txn): + schema_path = os.path.join( + dir_path, "schema", "schema_version.sql", + ) + create_schema = read_schema(schema_path) + txn.executescript(create_schema) + + txn.execute("SELECT version, upgraded FROM schema_version") + row = txn.fetchone() + current_version = int(row[0]) if row else None + upgraded = bool(row[1]) if row else None + + if current_version: + txn.execute( + "SELECT file FROM applied_schema_deltas WHERE version >= ?", + (current_version,) + ) + return current_version, txn.fetchall(), upgraded + + return None + + +def prepare_sqlite3_database(db_conn): + """This function should be called before `prepare_database` on sqlite3 + databases. + + Since we changed the way we store the current schema version and handle + updates to schemas, we need a way to upgrade from the old method to the + new. This only affects sqlite databases since they were the only ones + supported at the time. + """ + with db_conn: + schema_path = os.path.join( + dir_path, "schema", "schema_version.sql", + ) + create_schema = read_schema(schema_path) + db_conn.executescript(create_schema) + + c = db_conn.execute("SELECT * FROM schema_version") + rows = c.fetchall() + c.close() + + if not rows: + c = db_conn.execute("PRAGMA user_version") + row = c.fetchone() + c.close() + + if row and row[0]: + db_conn.execute( + "INSERT OR REPLACE INTO schema_version (version, upgraded)" + " VALUES (?,?)", + (row[0], False) + ) |