summary refs log tree commit diff
path: root/synapse/storage/prepare_database.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2019-10-23 12:03:03 +0100
committerGitHub <noreply@github.com>2019-10-23 12:03:03 +0100
commit7b6d99fa5aed2d8e0f71e75b12b26ade24701963 (patch)
treec96bca83c8dba442c63c52e8e18bae29a1eb8ce8 /synapse/storage/prepare_database.py
parentMerge pull request #5726 from matrix-org/uhoreg/e2e_cross-signing2-part2 (diff)
parentMerge branch 'develop' of github.com:matrix-org/synapse into erikj/refactor_s... (diff)
downloadsynapse-7b6d99fa5aed2d8e0f71e75b12b26ade24701963.tar.xz
Merge pull request #6231 from matrix-org/erikj/refactor_stores
Refactor storage layer to support multiple databases
Diffstat (limited to 'synapse/storage/prepare_database.py')
-rw-r--r--synapse/storage/prepare_database.py176
1 files changed, 133 insertions, 43 deletions
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index e96eed8a6d..2e7753820e 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -14,12 +14,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import fnmatch
 import imp
 import logging
 import os
 import re
 
+import attr
+
 from synapse.storage.engines.postgres import PostgresEngine
 
 logger = logging.getLogger(__name__)
@@ -54,6 +55,10 @@ def prepare_database(db_conn, database_engine, config):
             application config, or None if we are connecting to an existing
             database which we expect to be configured already
     """
+
+    # For now we only have the one datastore.
+    data_stores = ["main"]
+
     try:
         cur = db_conn.cursor()
         version_info = _get_or_create_schema_state(cur, database_engine)
@@ -68,10 +73,16 @@ def prepare_database(db_conn, database_engine, config):
                     raise UpgradeDatabaseException("Database needs to be upgraded")
             else:
                 _upgrade_existing_database(
-                    cur, user_version, delta_files, upgraded, database_engine, config
+                    cur,
+                    user_version,
+                    delta_files,
+                    upgraded,
+                    database_engine,
+                    config,
+                    data_stores=data_stores,
                 )
         else:
-            _setup_new_database(cur, database_engine)
+            _setup_new_database(cur, database_engine, data_stores=data_stores)
 
         # check if any of our configured dynamic modules want a database
         if config is not None:
@@ -84,9 +95,10 @@ def prepare_database(db_conn, database_engine, config):
         raise
 
 
-def _setup_new_database(cur, database_engine):
+def _setup_new_database(cur, database_engine, data_stores):
     """Sets up the database by finding a base set of "full schemas" and then
-    applying any necessary deltas.
+    applying any necessary deltas, including schemas from the given data
+    stores.
 
     The "full_schemas" directory has subdirectories named after versions. This
     function searches for the highest version less than or equal to
@@ -111,52 +123,78 @@ def _setup_new_database(cur, database_engine):
 
     In the example foo.sql and bar.sql would be run, and then any delta files
     for versions strictly greater than 11.
+
+    Note: we apply the full schemas and deltas from the top level `schema/`
+    folder as well those in the data stores specified.
+
+    Args:
+        cur (Cursor): a database cursor
+        database_engine (DatabaseEngine)
+        data_stores (list[str]): The names of the data stores to instantiate
+            on the given database.
     """
     current_dir = os.path.join(dir_path, "schema", "full_schemas")
     directory_entries = os.listdir(current_dir)
 
-    valid_dirs = []
-    pattern = re.compile(r"^\d+(\.sql)?$")
-
-    if isinstance(database_engine, PostgresEngine):
-        specific = "postgres"
-    else:
-        specific = "sqlite"
-
-    specific_pattern = re.compile(r"^\d+(\.sql." + specific + r")?$")
+    # First we find the highest full schema version we have
+    valid_versions = []
 
     for filename in directory_entries:
-        match = pattern.match(filename) or specific_pattern.match(filename)
-        abs_path = os.path.join(current_dir, filename)
-        if match and os.path.isdir(abs_path):
-            ver = int(match.group(0))
-            if ver <= SCHEMA_VERSION:
-                valid_dirs.append((ver, abs_path))
-        else:
-            logger.debug("Ignoring entry '%s' in 'full_schemas'", filename)
+        try:
+            ver = int(filename)
+        except ValueError:
+            continue
+
+        if ver <= SCHEMA_VERSION:
+            valid_versions.append(ver)
 
-    if not valid_dirs:
+    if not valid_versions:
         raise PrepareDatabaseException(
             "Could not find a suitable base set of full schemas"
         )
 
-    max_current_ver, sql_dir = max(valid_dirs, key=lambda x: x[0])
+    max_current_ver = max(valid_versions)
 
     logger.debug("Initialising schema v%d", max_current_ver)
 
-    directory_entries = os.listdir(sql_dir)
+    # Now lets find all the full schema files, both in the global schema and
+    # in data store schemas.
+    directories = [os.path.join(current_dir, str(max_current_ver))]
+    directories.extend(
+        os.path.join(
+            dir_path,
+            "data_stores",
+            data_store,
+            "schema",
+            "full_schemas",
+            str(max_current_ver),
+        )
+        for data_store in data_stores
+    )
+
+    directory_entries = []
+    for directory in directories:
+        directory_entries.extend(
+            _DirectoryListing(file_name, os.path.join(directory, file_name))
+            for file_name in os.listdir(directory)
+        )
+
+    if isinstance(database_engine, PostgresEngine):
+        specific = "postgres"
+    else:
+        specific = "sqlite"
 
-    for filename in sorted(
-        fnmatch.filter(directory_entries, "*.sql")
-        + fnmatch.filter(directory_entries, "*.sql." + specific)
-    ):
-        sql_loc = os.path.join(sql_dir, filename)
-        logger.debug("Applying schema %s", sql_loc)
-        executescript(cur, sql_loc)
+    directory_entries.sort()
+    for entry in directory_entries:
+        if entry.file_name.endswith(".sql") or entry.file_name.endswith(
+            ".sql." + specific
+        ):
+            logger.debug("Applying schema %s", entry.absolute_path)
+            executescript(cur, entry.absolute_path)
 
     cur.execute(
         database_engine.convert_param_style(
-            "INSERT INTO schema_version (version, upgraded)" " VALUES (?,?)"
+            "INSERT INTO schema_version (version, upgraded) VALUES (?,?)"
         ),
         (max_current_ver, False),
     )
@@ -168,6 +206,7 @@ def _setup_new_database(cur, database_engine):
         upgraded=False,
         database_engine=database_engine,
         config=None,
+        data_stores=data_stores,
         is_empty=True,
     )
 
@@ -179,6 +218,7 @@ def _upgrade_existing_database(
     upgraded,
     database_engine,
     config,
+    data_stores,
     is_empty=False,
 ):
     """Upgrades an existing database.
@@ -215,6 +255,10 @@ def _upgrade_existing_database(
     only if `upgraded` is True. Then `foo.sql` and `bar.py` would be run in
     some arbitrary order.
 
+    Note: we apply the delta files from the specified data stores as well as
+    those in the top-level schema. We apply all delta files across data stores
+    for a version before applying those in the next version.
+
     Args:
         cur (Cursor)
         current_version (int): The current version of the schema.
@@ -224,6 +268,14 @@ def _upgrade_existing_database(
             applied deltas or from full schema file. If `True` the function
             will never apply delta files for the given `current_version`, since
             the current_version wasn't generated by applying those delta files.
+        database_engine (DatabaseEngine)
+        config (synapse.config.homeserver.HomeServerConfig|None):
+            application config, or None if we are connecting to an existing
+            database which we expect to be configured already
+        data_stores (list[str]): The names of the data stores to instantiate
+            on the given database.
+        is_empty (bool): Is this a blank database? I.e. do we need to run the
+            upgrade portions of the delta scripts.
     """
 
     if current_version > SCHEMA_VERSION:
@@ -248,24 +300,49 @@ def _upgrade_existing_database(
     for v in range(start_ver, SCHEMA_VERSION + 1):
         logger.info("Upgrading schema to v%d", v)
 
-        delta_dir = os.path.join(dir_path, "schema", "delta", str(v))
+        # We need to search both the global and per data store schema
+        # directories for schema updates.
 
-        try:
-            directory_entries = os.listdir(delta_dir)
-        except OSError:
-            logger.exception("Could not open delta dir for version %d", v)
-            raise UpgradeDatabaseException(
-                "Could not open delta dir for version %d" % (v,)
+        # First we find the directories to search in
+        delta_dir = os.path.join(dir_path, "schema", "delta", str(v))
+        directories = [delta_dir]
+        for data_store in data_stores:
+            directories.append(
+                os.path.join(
+                    dir_path, "data_stores", data_store, "schema", "delta", str(v)
+                )
             )
 
+        # Now find which directories have anything of interest.
+        directory_entries = []
+        for directory in directories:
+            logger.debug("Looking for schema deltas in %s", directory)
+            try:
+                file_names = os.listdir(directory)
+                directory_entries.extend(
+                    _DirectoryListing(file_name, os.path.join(directory, file_name))
+                    for file_name in file_names
+                )
+            except FileNotFoundError:
+                # Data stores can have empty entries for a given version delta.
+                pass
+            except OSError:
+                raise UpgradeDatabaseException(
+                    "Could not open delta dir for version %d: %s" % (v, directory)
+                )
+
+        # We sort to ensure that we apply the delta files in a consistent
+        # order (to avoid bugs caused by inconsistent directory listing order)
         directory_entries.sort()
-        for file_name in directory_entries:
+        for entry in directory_entries:
+            file_name = entry.file_name
             relative_path = os.path.join(str(v), file_name)
-            logger.debug("Found file: %s", relative_path)
+            absolute_path = entry.absolute_path
+
+            logger.debug("Found file: %s (%s)", relative_path, absolute_path)
             if relative_path in applied_delta_files:
                 continue
 
-            absolute_path = os.path.join(dir_path, "schema", "delta", relative_path)
             root_name, ext = os.path.splitext(file_name)
             if ext == ".py":
                 # This is a python upgrade module. We need to import into some
@@ -448,3 +525,16 @@ def _get_or_create_schema_state(txn, database_engine):
         return current_version, applied_deltas, upgraded
 
     return None
+
+
+@attr.s()
+class _DirectoryListing(object):
+    """Helper class to store schema file name and the
+    absolute path to it.
+
+    These entries get sorted, so for consistency we want to ensure that
+    `file_name` attr is kept first.
+    """
+
+    file_name = attr.ib()
+    absolute_path = attr.ib()