diff options
author | Erik Johnston <erik@matrix.org> | 2019-10-23 12:03:03 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-10-23 12:03:03 +0100 |
commit | 7b6d99fa5aed2d8e0f71e75b12b26ade24701963 (patch) | |
tree | c96bca83c8dba442c63c52e8e18bae29a1eb8ce8 /synapse/storage/prepare_database.py | |
parent | Merge pull request #5726 from matrix-org/uhoreg/e2e_cross-signing2-part2 (diff) | |
parent | Merge branch 'develop' of github.com:matrix-org/synapse into erikj/refactor_s... (diff) | |
download | synapse-7b6d99fa5aed2d8e0f71e75b12b26ade24701963.tar.xz |
Merge pull request #6231 from matrix-org/erikj/refactor_stores
Refactor storage layer to support multiple databases
Diffstat (limited to 'synapse/storage/prepare_database.py')
-rw-r--r-- | synapse/storage/prepare_database.py | 176 |
1 files changed, 133 insertions, 43 deletions
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index e96eed8a6d..2e7753820e 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -14,12 +14,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -import fnmatch import imp import logging import os import re +import attr + from synapse.storage.engines.postgres import PostgresEngine logger = logging.getLogger(__name__) @@ -54,6 +55,10 @@ def prepare_database(db_conn, database_engine, config): application config, or None if we are connecting to an existing database which we expect to be configured already """ + + # For now we only have the one datastore. + data_stores = ["main"] + try: cur = db_conn.cursor() version_info = _get_or_create_schema_state(cur, database_engine) @@ -68,10 +73,16 @@ def prepare_database(db_conn, database_engine, config): raise UpgradeDatabaseException("Database needs to be upgraded") else: _upgrade_existing_database( - cur, user_version, delta_files, upgraded, database_engine, config + cur, + user_version, + delta_files, + upgraded, + database_engine, + config, + data_stores=data_stores, ) else: - _setup_new_database(cur, database_engine) + _setup_new_database(cur, database_engine, data_stores=data_stores) # check if any of our configured dynamic modules want a database if config is not None: @@ -84,9 +95,10 @@ def prepare_database(db_conn, database_engine, config): raise -def _setup_new_database(cur, database_engine): +def _setup_new_database(cur, database_engine, data_stores): """Sets up the database by finding a base set of "full schemas" and then - applying any necessary deltas. + applying any necessary deltas, including schemas from the given data + stores. The "full_schemas" directory has subdirectories named after versions. This function searches for the highest version less than or equal to @@ -111,52 +123,78 @@ def _setup_new_database(cur, database_engine): In the example foo.sql and bar.sql would be run, and then any delta files for versions strictly greater than 11. + + Note: we apply the full schemas and deltas from the top level `schema/` + folder as well those in the data stores specified. + + Args: + cur (Cursor): a database cursor + database_engine (DatabaseEngine) + data_stores (list[str]): The names of the data stores to instantiate + on the given database. """ current_dir = os.path.join(dir_path, "schema", "full_schemas") directory_entries = os.listdir(current_dir) - valid_dirs = [] - pattern = re.compile(r"^\d+(\.sql)?$") - - if isinstance(database_engine, PostgresEngine): - specific = "postgres" - else: - specific = "sqlite" - - specific_pattern = re.compile(r"^\d+(\.sql." + specific + r")?$") + # First we find the highest full schema version we have + valid_versions = [] for filename in directory_entries: - match = pattern.match(filename) or specific_pattern.match(filename) - abs_path = os.path.join(current_dir, filename) - if match and os.path.isdir(abs_path): - ver = int(match.group(0)) - if ver <= SCHEMA_VERSION: - valid_dirs.append((ver, abs_path)) - else: - logger.debug("Ignoring entry '%s' in 'full_schemas'", filename) + try: + ver = int(filename) + except ValueError: + continue + + if ver <= SCHEMA_VERSION: + valid_versions.append(ver) - if not valid_dirs: + if not valid_versions: raise PrepareDatabaseException( "Could not find a suitable base set of full schemas" ) - max_current_ver, sql_dir = max(valid_dirs, key=lambda x: x[0]) + max_current_ver = max(valid_versions) logger.debug("Initialising schema v%d", max_current_ver) - directory_entries = os.listdir(sql_dir) + # Now lets find all the full schema files, both in the global schema and + # in data store schemas. + directories = [os.path.join(current_dir, str(max_current_ver))] + directories.extend( + os.path.join( + dir_path, + "data_stores", + data_store, + "schema", + "full_schemas", + str(max_current_ver), + ) + for data_store in data_stores + ) + + directory_entries = [] + for directory in directories: + directory_entries.extend( + _DirectoryListing(file_name, os.path.join(directory, file_name)) + for file_name in os.listdir(directory) + ) + + if isinstance(database_engine, PostgresEngine): + specific = "postgres" + else: + specific = "sqlite" - for filename in sorted( - fnmatch.filter(directory_entries, "*.sql") - + fnmatch.filter(directory_entries, "*.sql." + specific) - ): - sql_loc = os.path.join(sql_dir, filename) - logger.debug("Applying schema %s", sql_loc) - executescript(cur, sql_loc) + directory_entries.sort() + for entry in directory_entries: + if entry.file_name.endswith(".sql") or entry.file_name.endswith( + ".sql." + specific + ): + logger.debug("Applying schema %s", entry.absolute_path) + executescript(cur, entry.absolute_path) cur.execute( database_engine.convert_param_style( - "INSERT INTO schema_version (version, upgraded)" " VALUES (?,?)" + "INSERT INTO schema_version (version, upgraded) VALUES (?,?)" ), (max_current_ver, False), ) @@ -168,6 +206,7 @@ def _setup_new_database(cur, database_engine): upgraded=False, database_engine=database_engine, config=None, + data_stores=data_stores, is_empty=True, ) @@ -179,6 +218,7 @@ def _upgrade_existing_database( upgraded, database_engine, config, + data_stores, is_empty=False, ): """Upgrades an existing database. @@ -215,6 +255,10 @@ def _upgrade_existing_database( only if `upgraded` is True. Then `foo.sql` and `bar.py` would be run in some arbitrary order. + Note: we apply the delta files from the specified data stores as well as + those in the top-level schema. We apply all delta files across data stores + for a version before applying those in the next version. + Args: cur (Cursor) current_version (int): The current version of the schema. @@ -224,6 +268,14 @@ def _upgrade_existing_database( applied deltas or from full schema file. If `True` the function will never apply delta files for the given `current_version`, since the current_version wasn't generated by applying those delta files. + database_engine (DatabaseEngine) + config (synapse.config.homeserver.HomeServerConfig|None): + application config, or None if we are connecting to an existing + database which we expect to be configured already + data_stores (list[str]): The names of the data stores to instantiate + on the given database. + is_empty (bool): Is this a blank database? I.e. do we need to run the + upgrade portions of the delta scripts. """ if current_version > SCHEMA_VERSION: @@ -248,24 +300,49 @@ def _upgrade_existing_database( for v in range(start_ver, SCHEMA_VERSION + 1): logger.info("Upgrading schema to v%d", v) - delta_dir = os.path.join(dir_path, "schema", "delta", str(v)) + # We need to search both the global and per data store schema + # directories for schema updates. - try: - directory_entries = os.listdir(delta_dir) - except OSError: - logger.exception("Could not open delta dir for version %d", v) - raise UpgradeDatabaseException( - "Could not open delta dir for version %d" % (v,) + # First we find the directories to search in + delta_dir = os.path.join(dir_path, "schema", "delta", str(v)) + directories = [delta_dir] + for data_store in data_stores: + directories.append( + os.path.join( + dir_path, "data_stores", data_store, "schema", "delta", str(v) + ) ) + # Now find which directories have anything of interest. + directory_entries = [] + for directory in directories: + logger.debug("Looking for schema deltas in %s", directory) + try: + file_names = os.listdir(directory) + directory_entries.extend( + _DirectoryListing(file_name, os.path.join(directory, file_name)) + for file_name in file_names + ) + except FileNotFoundError: + # Data stores can have empty entries for a given version delta. + pass + except OSError: + raise UpgradeDatabaseException( + "Could not open delta dir for version %d: %s" % (v, directory) + ) + + # We sort to ensure that we apply the delta files in a consistent + # order (to avoid bugs caused by inconsistent directory listing order) directory_entries.sort() - for file_name in directory_entries: + for entry in directory_entries: + file_name = entry.file_name relative_path = os.path.join(str(v), file_name) - logger.debug("Found file: %s", relative_path) + absolute_path = entry.absolute_path + + logger.debug("Found file: %s (%s)", relative_path, absolute_path) if relative_path in applied_delta_files: continue - absolute_path = os.path.join(dir_path, "schema", "delta", relative_path) root_name, ext = os.path.splitext(file_name) if ext == ".py": # This is a python upgrade module. We need to import into some @@ -448,3 +525,16 @@ def _get_or_create_schema_state(txn, database_engine): return current_version, applied_deltas, upgraded return None + + +@attr.s() +class _DirectoryListing(object): + """Helper class to store schema file name and the + absolute path to it. + + These entries get sorted, so for consistency we want to ensure that + `file_name` attr is kept first. + """ + + file_name = attr.ib() + absolute_path = attr.ib() |