diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 1629d2a53c..e45adfcb55 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -131,17 +131,9 @@ def prepare_database(
"config==None in prepare_database, but database is not empty"
)
- # if it's a worker app, refuse to upgrade the database, to avoid multiple
- # workers doing it at once.
- if (
- config.worker.worker_app is not None
- and version_info.current_version != SCHEMA_VERSION
- ):
- raise UpgradeDatabaseException(
- OUTDATED_SCHEMA_ON_WORKER_ERROR
- % (SCHEMA_VERSION, version_info.current_version)
- )
-
+ # This should be run on all processes, master or worker. The master will
+ # apply the deltas, while workers will check if any outstanding deltas
+ # exist and raise an PrepareDatabaseException if they do.
_upgrade_existing_database(
cur,
version_info,
@@ -149,6 +141,7 @@ def prepare_database(
config,
databases=databases,
)
+
else:
logger.info("%r: Initialising new database", databases)
@@ -357,6 +350,18 @@ def _upgrade_existing_database(
is_worker = config and config.worker.worker_app is not None
+ # If the schema version needs to be updated, and we are on a worker, we immediately
+ # know to bail out as workers cannot update the database schema. Only one process
+ # must update the database at the time, therefore we delegate this task to the master.
+ if is_worker and current_schema_state.current_version < SCHEMA_VERSION:
+ # If the DB is on an older version than we expect then we refuse
+ # to start the worker (as the main process needs to run first to
+ # update the schema).
+ raise UpgradeDatabaseException(
+ OUTDATED_SCHEMA_ON_WORKER_ERROR
+ % (SCHEMA_VERSION, current_schema_state.current_version)
+ )
+
if (
current_schema_state.compat_version is not None
and current_schema_state.compat_version > SCHEMA_VERSION
|