diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 9cc3b51fe6..ee60e2a718 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -47,8 +47,24 @@ class UpgradeDatabaseException(PrepareDatabaseException):
pass
-def prepare_database(db_conn, database_engine, config, data_stores=["main", "state"]):
- """Prepares a database for usage. Will either create all necessary tables
+OUTDATED_SCHEMA_ON_WORKER_ERROR = (
+ "Expected database schema version %i but got %i: run the main synapse process to "
+ "upgrade the database schema before starting worker processes."
+)
+
+EMPTY_DATABASE_ON_WORKER_ERROR = (
+ "Uninitialised database: run the main synapse process to prepare the database "
+ "schema before starting worker processes."
+)
+
+UNAPPLIED_DELTA_ON_WORKER_ERROR = (
+ "Database schema delta %s has not been applied: run the main synapse process to "
+ "upgrade the database schema before starting worker processes."
+)
+
+
+def prepare_database(db_conn, database_engine, config, databases=["main", "state"]):
+ """Prepares a physical database for usage. Will either create all necessary tables
or upgrade from an older schema version.
If `config` is None then prepare_database will assert that no upgrade is
@@ -60,37 +76,57 @@ def prepare_database(db_conn, database_engine, config, data_stores=["main", "sta
config (synapse.config.homeserver.HomeServerConfig|None):
application config, or None if we are connecting to an existing
database which we expect to be configured already
- data_stores (list[str]): The name of the data stores that will be used
- with this database. Defaults to all data stores.
+ databases (list[str]): The name of the databases that will be used
+ with this physical database. Defaults to all databases.
"""
try:
cur = db_conn.cursor()
+
+ logger.info("%r: Checking existing schema version", databases)
version_info = _get_or_create_schema_state(cur, database_engine)
if version_info:
user_version, delta_files, upgraded = version_info
+ logger.info(
+ "%r: Existing schema is %i (+%i deltas)",
+ databases,
+ user_version,
+ len(delta_files),
+ )
+ # config should only be None when we are preparing an in-memory SQLite db,
+ # which should be empty.
if config is None:
- if user_version != SCHEMA_VERSION:
- # If we don't pass in a config file then we are expecting to
- # have already upgraded the DB.
- raise UpgradeDatabaseException(
- "Expected database schema version %i but got %i"
- % (SCHEMA_VERSION, user_version)
- )
- else:
- _upgrade_existing_database(
- cur,
- user_version,
- delta_files,
- upgraded,
- database_engine,
- config,
- data_stores=data_stores,
+ raise ValueError(
+ "config==None in prepare_database, but databse is not empty"
+ )
+
+ # if it's a worker app, refuse to upgrade the database, to avoid multiple
+ # workers doing it at once.
+ if config.worker_app is not None and user_version != SCHEMA_VERSION:
+ raise UpgradeDatabaseException(
+ OUTDATED_SCHEMA_ON_WORKER_ERROR % (SCHEMA_VERSION, user_version)
)
+
+ _upgrade_existing_database(
+ cur,
+ user_version,
+ delta_files,
+ upgraded,
+ database_engine,
+ config,
+ databases=databases,
+ )
else:
- _setup_new_database(cur, database_engine, data_stores=data_stores)
+ logger.info("%r: Initialising new database", databases)
+
+ # if it's a worker app, refuse to upgrade the database, to avoid multiple
+ # workers doing it at once.
+ if config and config.worker_app is not None:
+ raise UpgradeDatabaseException(EMPTY_DATABASE_ON_WORKER_ERROR)
+
+ _setup_new_database(cur, database_engine, databases=databases)
# check if any of our configured dynamic modules want a database
if config is not None:
@@ -103,9 +139,9 @@ def prepare_database(db_conn, database_engine, config, data_stores=["main", "sta
raise
-def _setup_new_database(cur, database_engine, data_stores):
- """Sets up the database by finding a base set of "full schemas" and then
- applying any necessary deltas, including schemas from the given data
+def _setup_new_database(cur, database_engine, databases):
+ """Sets up the physical database by finding a base set of "full schemas" and
+ then applying any necessary deltas, including schemas from the given data
stores.
The "full_schemas" directory has subdirectories named after versions. This
@@ -138,8 +174,8 @@ def _setup_new_database(cur, database_engine, data_stores):
Args:
cur (Cursor): a database cursor
database_engine (DatabaseEngine)
- data_stores (list[str]): The names of the data stores to instantiate
- on the given database.
+ databases (list[str]): The names of the databases to instantiate
+ on the given physical database.
"""
# We're about to set up a brand new database so we check that its
@@ -176,13 +212,13 @@ def _setup_new_database(cur, database_engine, data_stores):
directories.extend(
os.path.join(
dir_path,
- "data_stores",
- data_store,
+ "databases",
+ database,
"schema",
"full_schemas",
str(max_current_ver),
)
- for data_store in data_stores
+ for database in databases
)
directory_entries = []
@@ -219,7 +255,7 @@ def _setup_new_database(cur, database_engine, data_stores):
upgraded=False,
database_engine=database_engine,
config=None,
- data_stores=data_stores,
+ databases=databases,
is_empty=True,
)
@@ -231,10 +267,10 @@ def _upgrade_existing_database(
upgraded,
database_engine,
config,
- data_stores,
+ databases,
is_empty=False,
):
- """Upgrades an existing database.
+ """Upgrades an existing physical database.
Delta files can either be SQL stored in *.sql files, or python modules
in *.py.
@@ -285,8 +321,8 @@ def _upgrade_existing_database(
config (synapse.config.homeserver.HomeServerConfig|None):
None if we are initialising a blank database, otherwise the application
config
- data_stores (list[str]): The names of the data stores to instantiate
- on the given database.
+ databases (list[str]): The names of the databases to instantiate
+ on the given physical database.
is_empty (bool): Is this a blank database? I.e. do we need to run the
upgrade portions of the delta scripts.
"""
@@ -295,6 +331,8 @@ def _upgrade_existing_database(
else:
assert config
+ is_worker = config and config.worker_app is not None
+
if current_version > SCHEMA_VERSION:
raise ValueError(
"Cannot use this database as it is too "
@@ -303,8 +341,8 @@ def _upgrade_existing_database(
# some of the deltas assume that config.server_name is set correctly, so now
# is a good time to run the sanity check.
- if not is_empty and "main" in data_stores:
- from synapse.storage.data_stores.main import check_database_before_upgrade
+ if not is_empty and "main" in databases:
+ from synapse.storage.databases.main import check_database_before_upgrade
check_database_before_upgrade(cur, database_engine, config)
@@ -322,7 +360,7 @@ def _upgrade_existing_database(
specific_engine_extensions = (".sqlite", ".postgres")
for v in range(start_ver, SCHEMA_VERSION + 1):
- logger.info("Upgrading schema to v%d", v)
+ logger.info("Applying schema deltas for v%d", v)
# We need to search both the global and per data store schema
# directories for schema updates.
@@ -330,11 +368,9 @@ def _upgrade_existing_database(
# First we find the directories to search in
delta_dir = os.path.join(dir_path, "schema", "delta", str(v))
directories = [delta_dir]
- for data_store in data_stores:
+ for database in databases:
directories.append(
- os.path.join(
- dir_path, "data_stores", data_store, "schema", "delta", str(v)
- )
+ os.path.join(dir_path, "databases", database, "schema", "delta", str(v))
)
# Used to check if we have any duplicate file names
@@ -384,9 +420,15 @@ def _upgrade_existing_database(
continue
root_name, ext = os.path.splitext(file_name)
+
if ext == ".py":
# This is a python upgrade module. We need to import into some
# package and then execute its `run_upgrade` function.
+ if is_worker:
+ raise PrepareDatabaseException(
+ UNAPPLIED_DELTA_ON_WORKER_ERROR % relative_path
+ )
+
module_name = "synapse.storage.v%d_%s" % (v, root_name)
with open(absolute_path) as python_file:
module = imp.load_source(module_name, absolute_path, python_file)
@@ -401,10 +443,18 @@ def _upgrade_existing_database(
continue
elif ext == ".sql":
# A plain old .sql file, just read and execute it
+ if is_worker:
+ raise PrepareDatabaseException(
+ UNAPPLIED_DELTA_ON_WORKER_ERROR % relative_path
+ )
logger.info("Applying schema %s", relative_path)
executescript(cur, absolute_path)
elif ext == specific_engine_extension and root_name.endswith(".sql"):
# A .sql file specific to our engine; just read and execute it
+ if is_worker:
+ raise PrepareDatabaseException(
+ UNAPPLIED_DELTA_ON_WORKER_ERROR % relative_path
+ )
logger.info("Applying engine-specific schema %s", relative_path)
executescript(cur, absolute_path)
elif ext in specific_engine_extensions and root_name.endswith(".sql"):
@@ -434,6 +484,8 @@ def _upgrade_existing_database(
(v, True),
)
+ logger.info("Schema now up to date")
+
def _apply_module_schemas(txn, database_engine, config):
"""Apply the module schemas for the dynamic modules, if any
@@ -571,7 +623,7 @@ def _get_or_create_schema_state(txn, database_engine):
@attr.s()
-class _DirectoryListing(object):
+class _DirectoryListing:
"""Helper class to store schema file name and the
absolute path to it.
|