summary refs log tree commit diff
path: root/synapse/storage/prepare_database.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/prepare_database.py')
-rw-r--r--synapse/storage/prepare_database.py78
1 files changed, 61 insertions, 17 deletions
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 964d8d9eb8..93dca96476 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -47,6 +47,22 @@ class UpgradeDatabaseException(PrepareDatabaseException):
     pass
 
 
+OUTDATED_SCHEMA_ON_WORKER_ERROR = (
+    "Expected database schema version %i but got %i: run the main synapse process to "
+    "upgrade the database schema before starting worker processes."
+)
+
+EMPTY_DATABASE_ON_WORKER_ERROR = (
+    "Uninitialised database: run the main synapse process to prepare the database "
+    "schema before starting worker processes."
+)
+
+UNAPPLIED_DELTA_ON_WORKER_ERROR = (
+    "Database schema delta %s has not been applied: run the main synapse process to "
+    "upgrade the database schema before starting worker processes."
+)
+
+
 def prepare_database(db_conn, database_engine, config, databases=["main", "state"]):
     """Prepares a physical database for usage. Will either create all necessary tables
     or upgrade from an older schema version.
@@ -71,25 +87,35 @@ def prepare_database(db_conn, database_engine, config, databases=["main", "state
         if version_info:
             user_version, delta_files, upgraded = version_info
 
+            # config should only be None when we are preparing an in-memory SQLite db,
+            # which should be empty.
             if config is None:
-                if user_version != SCHEMA_VERSION:
-                    # If we don't pass in a config file then we are expecting to
-                    # have already upgraded the DB.
-                    raise UpgradeDatabaseException(
-                        "Expected database schema version %i but got %i"
-                        % (SCHEMA_VERSION, user_version)
-                    )
-            else:
-                _upgrade_existing_database(
-                    cur,
-                    user_version,
-                    delta_files,
-                    upgraded,
-                    database_engine,
-                    config,
-                    databases=databases,
+                raise ValueError(
+                    "config==None in prepare_database, but databse is not empty"
                 )
+
+            # if it's a worker app, refuse to upgrade the database, to avoid multiple
+            # workers doing it at once.
+            if config.worker_app is not None and user_version != SCHEMA_VERSION:
+                raise UpgradeDatabaseException(
+                    OUTDATED_SCHEMA_ON_WORKER_ERROR % (SCHEMA_VERSION, user_version)
+                )
+
+            _upgrade_existing_database(
+                cur,
+                user_version,
+                delta_files,
+                upgraded,
+                database_engine,
+                config,
+                databases=databases,
+            )
         else:
+            # if it's a worker app, refuse to upgrade the database, to avoid multiple
+            # workers doing it at once.
+            if config and config.worker_app is not None:
+                raise UpgradeDatabaseException(EMPTY_DATABASE_ON_WORKER_ERROR)
+
             _setup_new_database(cur, database_engine, databases=databases)
 
         # check if any of our configured dynamic modules want a database
@@ -295,6 +321,8 @@ def _upgrade_existing_database(
     else:
         assert config
 
+    is_worker = config and config.worker_app is not None
+
     if current_version > SCHEMA_VERSION:
         raise ValueError(
             "Cannot use this database as it is too "
@@ -322,7 +350,7 @@ def _upgrade_existing_database(
     specific_engine_extensions = (".sqlite", ".postgres")
 
     for v in range(start_ver, SCHEMA_VERSION + 1):
-        logger.info("Upgrading schema to v%d", v)
+        logger.info("Applying schema deltas for v%d", v)
 
         # We need to search both the global and per data store schema
         # directories for schema updates.
@@ -382,9 +410,15 @@ def _upgrade_existing_database(
                 continue
 
             root_name, ext = os.path.splitext(file_name)
+
             if ext == ".py":
                 # This is a python upgrade module. We need to import into some
                 # package and then execute its `run_upgrade` function.
+                if is_worker:
+                    raise PrepareDatabaseException(
+                        UNAPPLIED_DELTA_ON_WORKER_ERROR % relative_path
+                    )
+
                 module_name = "synapse.storage.v%d_%s" % (v, root_name)
                 with open(absolute_path) as python_file:
                     module = imp.load_source(module_name, absolute_path, python_file)
@@ -399,10 +433,18 @@ def _upgrade_existing_database(
                 continue
             elif ext == ".sql":
                 # A plain old .sql file, just read and execute it
+                if is_worker:
+                    raise PrepareDatabaseException(
+                        UNAPPLIED_DELTA_ON_WORKER_ERROR % relative_path
+                    )
                 logger.info("Applying schema %s", relative_path)
                 executescript(cur, absolute_path)
             elif ext == specific_engine_extension and root_name.endswith(".sql"):
                 # A .sql file specific to our engine; just read and execute it
+                if is_worker:
+                    raise PrepareDatabaseException(
+                        UNAPPLIED_DELTA_ON_WORKER_ERROR % relative_path
+                    )
                 logger.info("Applying engine-specific schema %s", relative_path)
                 executescript(cur, absolute_path)
             elif ext in specific_engine_extensions and root_name.endswith(".sql"):
@@ -432,6 +474,8 @@ def _upgrade_existing_database(
                 (v, True),
             )
 
+    logger.info("Schema now up to date")
+
 
 def _apply_module_schemas(txn, database_engine, config):
     """Apply the module schemas for the dynamic modules, if any