summary refs log tree commit diff
path: root/synapse/storage/prepare_database.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/prepare_database.py')
-rw-r--r--synapse/storage/prepare_database.py136
1 files changed, 94 insertions, 42 deletions
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 9cc3b51fe6..ee60e2a718 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -47,8 +47,24 @@ class UpgradeDatabaseException(PrepareDatabaseException):
     pass
 
 
-def prepare_database(db_conn, database_engine, config, data_stores=["main", "state"]):
-    """Prepares a database for usage. Will either create all necessary tables
+OUTDATED_SCHEMA_ON_WORKER_ERROR = (
+    "Expected database schema version %i but got %i: run the main synapse process to "
+    "upgrade the database schema before starting worker processes."
+)
+
+EMPTY_DATABASE_ON_WORKER_ERROR = (
+    "Uninitialised database: run the main synapse process to prepare the database "
+    "schema before starting worker processes."
+)
+
+UNAPPLIED_DELTA_ON_WORKER_ERROR = (
+    "Database schema delta %s has not been applied: run the main synapse process to "
+    "upgrade the database schema before starting worker processes."
+)
+
+
+def prepare_database(db_conn, database_engine, config, databases=["main", "state"]):
+    """Prepares a physical database for usage. Will either create all necessary tables
     or upgrade from an older schema version.
 
     If `config` is None then prepare_database will assert that no upgrade is
@@ -60,37 +76,57 @@ def prepare_database(db_conn, database_engine, config, data_stores=["main", "sta
         config (synapse.config.homeserver.HomeServerConfig|None):
             application config, or None if we are connecting to an existing
             database which we expect to be configured already
-        data_stores (list[str]): The name of the data stores that will be used
-            with this database. Defaults to all data stores.
+        databases (list[str]): The name of the databases that will be used
+            with this physical database. Defaults to all databases.
     """
 
     try:
         cur = db_conn.cursor()
+
+        logger.info("%r: Checking existing schema version", databases)
         version_info = _get_or_create_schema_state(cur, database_engine)
 
         if version_info:
             user_version, delta_files, upgraded = version_info
+            logger.info(
+                "%r: Existing schema is %i (+%i deltas)",
+                databases,
+                user_version,
+                len(delta_files),
+            )
 
+            # config should only be None when we are preparing an in-memory SQLite db,
+            # which should be empty.
             if config is None:
-                if user_version != SCHEMA_VERSION:
-                    # If we don't pass in a config file then we are expecting to
-                    # have already upgraded the DB.
-                    raise UpgradeDatabaseException(
-                        "Expected database schema version %i but got %i"
-                        % (SCHEMA_VERSION, user_version)
-                    )
-            else:
-                _upgrade_existing_database(
-                    cur,
-                    user_version,
-                    delta_files,
-                    upgraded,
-                    database_engine,
-                    config,
-                    data_stores=data_stores,
+                raise ValueError(
+                    "config==None in prepare_database, but databse is not empty"
+                )
+
+            # if it's a worker app, refuse to upgrade the database, to avoid multiple
+            # workers doing it at once.
+            if config.worker_app is not None and user_version != SCHEMA_VERSION:
+                raise UpgradeDatabaseException(
+                    OUTDATED_SCHEMA_ON_WORKER_ERROR % (SCHEMA_VERSION, user_version)
                 )
+
+            _upgrade_existing_database(
+                cur,
+                user_version,
+                delta_files,
+                upgraded,
+                database_engine,
+                config,
+                databases=databases,
+            )
         else:
-            _setup_new_database(cur, database_engine, data_stores=data_stores)
+            logger.info("%r: Initialising new database", databases)
+
+            # if it's a worker app, refuse to upgrade the database, to avoid multiple
+            # workers doing it at once.
+            if config and config.worker_app is not None:
+                raise UpgradeDatabaseException(EMPTY_DATABASE_ON_WORKER_ERROR)
+
+            _setup_new_database(cur, database_engine, databases=databases)
 
         # check if any of our configured dynamic modules want a database
         if config is not None:
@@ -103,9 +139,9 @@ def prepare_database(db_conn, database_engine, config, data_stores=["main", "sta
         raise
 
 
-def _setup_new_database(cur, database_engine, data_stores):
-    """Sets up the database by finding a base set of "full schemas" and then
-    applying any necessary deltas, including schemas from the given data
+def _setup_new_database(cur, database_engine, databases):
+    """Sets up the physical database by finding a base set of "full schemas" and
+    then applying any necessary deltas, including schemas from the given data
     stores.
 
     The "full_schemas" directory has subdirectories named after versions. This
@@ -138,8 +174,8 @@ def _setup_new_database(cur, database_engine, data_stores):
     Args:
         cur (Cursor): a database cursor
         database_engine (DatabaseEngine)
-        data_stores (list[str]): The names of the data stores to instantiate
-            on the given database.
+        databases (list[str]): The names of the databases to instantiate
+            on the given physical database.
     """
 
     # We're about to set up a brand new database so we check that its
@@ -176,13 +212,13 @@ def _setup_new_database(cur, database_engine, data_stores):
     directories.extend(
         os.path.join(
             dir_path,
-            "data_stores",
-            data_store,
+            "databases",
+            database,
             "schema",
             "full_schemas",
             str(max_current_ver),
         )
-        for data_store in data_stores
+        for database in databases
     )
 
     directory_entries = []
@@ -219,7 +255,7 @@ def _setup_new_database(cur, database_engine, data_stores):
         upgraded=False,
         database_engine=database_engine,
         config=None,
-        data_stores=data_stores,
+        databases=databases,
         is_empty=True,
     )
 
@@ -231,10 +267,10 @@ def _upgrade_existing_database(
     upgraded,
     database_engine,
     config,
-    data_stores,
+    databases,
     is_empty=False,
 ):
-    """Upgrades an existing database.
+    """Upgrades an existing physical database.
 
     Delta files can either be SQL stored in *.sql files, or python modules
     in *.py.
@@ -285,8 +321,8 @@ def _upgrade_existing_database(
         config (synapse.config.homeserver.HomeServerConfig|None):
             None if we are initialising a blank database, otherwise the application
             config
-        data_stores (list[str]): The names of the data stores to instantiate
-            on the given database.
+        databases (list[str]): The names of the databases to instantiate
+            on the given physical database.
         is_empty (bool): Is this a blank database? I.e. do we need to run the
             upgrade portions of the delta scripts.
     """
@@ -295,6 +331,8 @@ def _upgrade_existing_database(
     else:
         assert config
 
+    is_worker = config and config.worker_app is not None
+
     if current_version > SCHEMA_VERSION:
         raise ValueError(
             "Cannot use this database as it is too "
@@ -303,8 +341,8 @@ def _upgrade_existing_database(
 
     # some of the deltas assume that config.server_name is set correctly, so now
     # is a good time to run the sanity check.
-    if not is_empty and "main" in data_stores:
-        from synapse.storage.data_stores.main import check_database_before_upgrade
+    if not is_empty and "main" in databases:
+        from synapse.storage.databases.main import check_database_before_upgrade
 
         check_database_before_upgrade(cur, database_engine, config)
 
@@ -322,7 +360,7 @@ def _upgrade_existing_database(
     specific_engine_extensions = (".sqlite", ".postgres")
 
     for v in range(start_ver, SCHEMA_VERSION + 1):
-        logger.info("Upgrading schema to v%d", v)
+        logger.info("Applying schema deltas for v%d", v)
 
         # We need to search both the global and per data store schema
         # directories for schema updates.
@@ -330,11 +368,9 @@ def _upgrade_existing_database(
         # First we find the directories to search in
         delta_dir = os.path.join(dir_path, "schema", "delta", str(v))
         directories = [delta_dir]
-        for data_store in data_stores:
+        for database in databases:
             directories.append(
-                os.path.join(
-                    dir_path, "data_stores", data_store, "schema", "delta", str(v)
-                )
+                os.path.join(dir_path, "databases", database, "schema", "delta", str(v))
             )
 
         # Used to check if we have any duplicate file names
@@ -384,9 +420,15 @@ def _upgrade_existing_database(
                 continue
 
             root_name, ext = os.path.splitext(file_name)
+
             if ext == ".py":
                 # This is a python upgrade module. We need to import into some
                 # package and then execute its `run_upgrade` function.
+                if is_worker:
+                    raise PrepareDatabaseException(
+                        UNAPPLIED_DELTA_ON_WORKER_ERROR % relative_path
+                    )
+
                 module_name = "synapse.storage.v%d_%s" % (v, root_name)
                 with open(absolute_path) as python_file:
                     module = imp.load_source(module_name, absolute_path, python_file)
@@ -401,10 +443,18 @@ def _upgrade_existing_database(
                 continue
             elif ext == ".sql":
                 # A plain old .sql file, just read and execute it
+                if is_worker:
+                    raise PrepareDatabaseException(
+                        UNAPPLIED_DELTA_ON_WORKER_ERROR % relative_path
+                    )
                 logger.info("Applying schema %s", relative_path)
                 executescript(cur, absolute_path)
             elif ext == specific_engine_extension and root_name.endswith(".sql"):
                 # A .sql file specific to our engine; just read and execute it
+                if is_worker:
+                    raise PrepareDatabaseException(
+                        UNAPPLIED_DELTA_ON_WORKER_ERROR % relative_path
+                    )
                 logger.info("Applying engine-specific schema %s", relative_path)
                 executescript(cur, absolute_path)
             elif ext in specific_engine_extensions and root_name.endswith(".sql"):
@@ -434,6 +484,8 @@ def _upgrade_existing_database(
                 (v, True),
             )
 
+    logger.info("Schema now up to date")
+
 
 def _apply_module_schemas(txn, database_engine, config):
     """Apply the module schemas for the dynamic modules, if any
@@ -571,7 +623,7 @@ def _get_or_create_schema_state(txn, database_engine):
 
 
 @attr.s()
-class _DirectoryListing(object):
+class _DirectoryListing:
     """Helper class to store schema file name and the
     absolute path to it.