diff options
author | Erik Johnston <erik@matrix.org> | 2019-10-21 16:08:40 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2019-10-21 16:08:40 +0100 |
commit | ffd24545bbde327179a76bb1d6ed02b70bd2b93b (patch) | |
tree | 98e00cecacf1bbda53285cd5a79f095021acccb5 /synapse/storage/prepare_database.py | |
parent | Move storage classes into a main "data store". (diff) | |
download | synapse-ffd24545bbde327179a76bb1d6ed02b70bd2b93b.tar.xz |
Fix schema management to work with multiple data stores.
Diffstat (limited to 'synapse/storage/prepare_database.py')
-rw-r--r-- | synapse/storage/prepare_database.py | 149 |
1 files changed, 108 insertions, 41 deletions
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index e96eed8a6d..0bb970a296 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -14,12 +14,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -import fnmatch import imp import logging import os import re +import attr + from synapse.storage.engines.postgres import PostgresEngine logger = logging.getLogger(__name__) @@ -54,6 +55,10 @@ def prepare_database(db_conn, database_engine, config): application config, or None if we are connecting to an existing database which we expect to be configured already """ + + # For now we only have the one datastore. + data_stores = ["main"] + try: cur = db_conn.cursor() version_info = _get_or_create_schema_state(cur, database_engine) @@ -68,10 +73,16 @@ def prepare_database(db_conn, database_engine, config): raise UpgradeDatabaseException("Database needs to be upgraded") else: _upgrade_existing_database( - cur, user_version, delta_files, upgraded, database_engine, config + cur, + user_version, + delta_files, + upgraded, + database_engine, + config, + data_stores=data_stores, ) else: - _setup_new_database(cur, database_engine) + _setup_new_database(cur, database_engine, data_stores=data_stores) # check if any of our configured dynamic modules want a database if config is not None: @@ -84,7 +95,7 @@ def prepare_database(db_conn, database_engine, config): raise -def _setup_new_database(cur, database_engine): +def _setup_new_database(cur, database_engine, data_stores): """Sets up the database by finding a base set of "full schemas" and then applying any necessary deltas. @@ -115,48 +126,65 @@ def _setup_new_database(cur, database_engine): current_dir = os.path.join(dir_path, "schema", "full_schemas") directory_entries = os.listdir(current_dir) - valid_dirs = [] - pattern = re.compile(r"^\d+(\.sql)?$") - - if isinstance(database_engine, PostgresEngine): - specific = "postgres" - else: - specific = "sqlite" - - specific_pattern = re.compile(r"^\d+(\.sql." + specific + r")?$") + # First we find the highest full schema version we have + valid_versions = [] for filename in directory_entries: - match = pattern.match(filename) or specific_pattern.match(filename) - abs_path = os.path.join(current_dir, filename) - if match and os.path.isdir(abs_path): - ver = int(match.group(0)) - if ver <= SCHEMA_VERSION: - valid_dirs.append((ver, abs_path)) - else: - logger.debug("Ignoring entry '%s' in 'full_schemas'", filename) + try: + ver = int(filename) + except ValueError: + continue - if not valid_dirs: + if ver <= SCHEMA_VERSION: + valid_versions.append(ver) + + if not valid_versions: raise PrepareDatabaseException( "Could not find a suitable base set of full schemas" ) - max_current_ver, sql_dir = max(valid_dirs, key=lambda x: x[0]) + max_current_ver = max(valid_versions) logger.debug("Initialising schema v%d", max_current_ver) - directory_entries = os.listdir(sql_dir) + # Now lets find all the full schema files, both in the global schema and + # in data store schemas. + directories = [os.path.join(current_dir, str(max_current_ver))] + directories.extend( + os.path.join( + dir_path, + "data_stores", + data_store, + "schema", + "full_schemas", + str(max_current_ver), + ) + for data_store in data_stores + ) + + directory_entries = [] + for directory in directories: + directory_entries.extend( + _DirectoryListing(file_name, os.path.join(directory, file_name)) + for file_name in os.listdir(directory) + ) + + if isinstance(database_engine, PostgresEngine): + specific = "postgres" + else: + specific = "sqlite" - for filename in sorted( - fnmatch.filter(directory_entries, "*.sql") - + fnmatch.filter(directory_entries, "*.sql." + specific) - ): - sql_loc = os.path.join(sql_dir, filename) - logger.debug("Applying schema %s", sql_loc) - executescript(cur, sql_loc) + directory_entries.sort() + for entry in directory_entries: + if entry.file_name.endswith(".sql") or entry.file_name.endswith( + ".sql." + specific + ): + logger.debug("Applying schema %s", entry.absolute_path) + executescript(cur, entry.absolute_path) cur.execute( database_engine.convert_param_style( - "INSERT INTO schema_version (version, upgraded)" " VALUES (?,?)" + "INSERT INTO schema_version (version, upgraded) VALUES (?,?)" ), (max_current_ver, False), ) @@ -168,6 +196,7 @@ def _setup_new_database(cur, database_engine): upgraded=False, database_engine=database_engine, config=None, + data_stores=data_stores, is_empty=True, ) @@ -179,6 +208,7 @@ def _upgrade_existing_database( upgraded, database_engine, config, + data_stores, is_empty=False, ): """Upgrades an existing database. @@ -248,24 +278,51 @@ def _upgrade_existing_database( for v in range(start_ver, SCHEMA_VERSION + 1): logger.info("Upgrading schema to v%d", v) - delta_dir = os.path.join(dir_path, "schema", "delta", str(v)) + # We need to search both the global and per data store schema + # directories for schema updates. - try: - directory_entries = os.listdir(delta_dir) - except OSError: - logger.exception("Could not open delta dir for version %d", v) - raise UpgradeDatabaseException( - "Could not open delta dir for version %d" % (v,) + # First we find the directories to search in + delta_dir = os.path.join(dir_path, "schema", "delta", str(v)) + directories = [delta_dir] + for data_store in data_stores: + directories.append( + os.path.join( + dir_path, "data_stores", data_store, "schema", "delta", str(v) + ) ) + # Now find which directories have anything of interest. + directory_entries = [] + for directory in directories: + logger.debug("Looking for schema deltas in %s", directory) + try: + file_names = os.listdir(directory) + directory_entries.extend( + _DirectoryListing(file_name, os.path.join(directory, file_name)) + for file_name in file_names + ) + except FileNotFoundError: + # Data stores can have empty entries for a given version delta. + pass + except OSError: + logger.exception("Could not open delta dir for version %d", v) + raise UpgradeDatabaseException( + "Could not open delta dir for version %d" % (v,) + ) + + if not directory_entries: + continue + directory_entries.sort() - for file_name in directory_entries: + for entry in directory_entries: + file_name = entry.file_name relative_path = os.path.join(str(v), file_name) + absolute_path = entry.absolute_path + logger.debug("Found file: %s", relative_path) if relative_path in applied_delta_files: continue - absolute_path = os.path.join(dir_path, "schema", "delta", relative_path) root_name, ext = os.path.splitext(file_name) if ext == ".py": # This is a python upgrade module. We need to import into some @@ -448,3 +505,13 @@ def _get_or_create_schema_state(txn, database_engine): return current_version, applied_deltas, upgraded return None + + +@attr.s() +class _DirectoryListing(object): + """Helper class to store schema file name and the + absolute path to it. + """ + + file_name = attr.ib() + absolute_path = attr.ib() |