diff --git a/changelog.d/8266.misc b/changelog.d/8266.misc
new file mode 100644
index 0000000000..e7c899bea8
--- /dev/null
+++ b/changelog.d/8266.misc
@@ -0,0 +1 @@
+Do not attempt to upgrade upgrade database schema on worker processes.
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 964d8d9eb8..93dca96476 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -47,6 +47,22 @@ class UpgradeDatabaseException(PrepareDatabaseException):
pass
+OUTDATED_SCHEMA_ON_WORKER_ERROR = (
+ "Expected database schema version %i but got %i: run the main synapse process to "
+ "upgrade the database schema before starting worker processes."
+)
+
+EMPTY_DATABASE_ON_WORKER_ERROR = (
+ "Uninitialised database: run the main synapse process to prepare the database "
+ "schema before starting worker processes."
+)
+
+UNAPPLIED_DELTA_ON_WORKER_ERROR = (
+ "Database schema delta %s has not been applied: run the main synapse process to "
+ "upgrade the database schema before starting worker processes."
+)
+
+
def prepare_database(db_conn, database_engine, config, databases=["main", "state"]):
"""Prepares a physical database for usage. Will either create all necessary tables
or upgrade from an older schema version.
@@ -71,25 +87,35 @@ def prepare_database(db_conn, database_engine, config, databases=["main", "state
if version_info:
user_version, delta_files, upgraded = version_info
+ # config should only be None when we are preparing an in-memory SQLite db,
+ # which should be empty.
if config is None:
- if user_version != SCHEMA_VERSION:
- # If we don't pass in a config file then we are expecting to
- # have already upgraded the DB.
- raise UpgradeDatabaseException(
- "Expected database schema version %i but got %i"
- % (SCHEMA_VERSION, user_version)
- )
- else:
- _upgrade_existing_database(
- cur,
- user_version,
- delta_files,
- upgraded,
- database_engine,
- config,
- databases=databases,
+ raise ValueError(
+ "config==None in prepare_database, but databse is not empty"
)
+
+ # if it's a worker app, refuse to upgrade the database, to avoid multiple
+ # workers doing it at once.
+ if config.worker_app is not None and user_version != SCHEMA_VERSION:
+ raise UpgradeDatabaseException(
+ OUTDATED_SCHEMA_ON_WORKER_ERROR % (SCHEMA_VERSION, user_version)
+ )
+
+ _upgrade_existing_database(
+ cur,
+ user_version,
+ delta_files,
+ upgraded,
+ database_engine,
+ config,
+ databases=databases,
+ )
else:
+ # if it's a worker app, refuse to upgrade the database, to avoid multiple
+ # workers doing it at once.
+ if config and config.worker_app is not None:
+ raise UpgradeDatabaseException(EMPTY_DATABASE_ON_WORKER_ERROR)
+
_setup_new_database(cur, database_engine, databases=databases)
# check if any of our configured dynamic modules want a database
@@ -295,6 +321,8 @@ def _upgrade_existing_database(
else:
assert config
+ is_worker = config and config.worker_app is not None
+
if current_version > SCHEMA_VERSION:
raise ValueError(
"Cannot use this database as it is too "
@@ -322,7 +350,7 @@ def _upgrade_existing_database(
specific_engine_extensions = (".sqlite", ".postgres")
for v in range(start_ver, SCHEMA_VERSION + 1):
- logger.info("Upgrading schema to v%d", v)
+ logger.info("Applying schema deltas for v%d", v)
# We need to search both the global and per data store schema
# directories for schema updates.
@@ -382,9 +410,15 @@ def _upgrade_existing_database(
continue
root_name, ext = os.path.splitext(file_name)
+
if ext == ".py":
# This is a python upgrade module. We need to import into some
# package and then execute its `run_upgrade` function.
+ if is_worker:
+ raise PrepareDatabaseException(
+ UNAPPLIED_DELTA_ON_WORKER_ERROR % relative_path
+ )
+
module_name = "synapse.storage.v%d_%s" % (v, root_name)
with open(absolute_path) as python_file:
module = imp.load_source(module_name, absolute_path, python_file)
@@ -399,10 +433,18 @@ def _upgrade_existing_database(
continue
elif ext == ".sql":
# A plain old .sql file, just read and execute it
+ if is_worker:
+ raise PrepareDatabaseException(
+ UNAPPLIED_DELTA_ON_WORKER_ERROR % relative_path
+ )
logger.info("Applying schema %s", relative_path)
executescript(cur, absolute_path)
elif ext == specific_engine_extension and root_name.endswith(".sql"):
# A .sql file specific to our engine; just read and execute it
+ if is_worker:
+ raise PrepareDatabaseException(
+ UNAPPLIED_DELTA_ON_WORKER_ERROR % relative_path
+ )
logger.info("Applying engine-specific schema %s", relative_path)
executescript(cur, absolute_path)
elif ext in specific_engine_extensions and root_name.endswith(".sql"):
@@ -432,6 +474,8 @@ def _upgrade_existing_database(
(v, True),
)
+ logger.info("Schema now up to date")
+
def _apply_module_schemas(txn, database_engine, config):
"""Apply the module schemas for the dynamic modules, if any
|