diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 31501fd573..598ded33b5 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -25,6 +25,7 @@ from typing import (
Optional,
TextIO,
Tuple,
+ cast,
)
import attr
@@ -32,7 +33,11 @@ import attr
from synapse.config.homeserver import HomeServerConfig
from synapse.storage.database import LoggingDatabaseConnection, LoggingTransaction
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
-from synapse.storage.schema import SCHEMA_COMPAT_VERSION, SCHEMA_VERSION
+from synapse.storage.schema import (
+ BACKGROUND_UPDATES_COMPAT_VERSION,
+ SCHEMA_COMPAT_VERSION,
+ SCHEMA_VERSION,
+)
from synapse.storage.types import Cursor
logger = logging.getLogger(__name__)
@@ -80,6 +85,9 @@ class _SchemaState:
applied_deltas: Collection[str] = attr.ib(factory=tuple)
"""Any delta files for `current_version` which have already been applied"""
+ background_updates: Collection[Tuple[str, int]] = attr.ib(factory=tuple)
+ """Any (pending) updates in the `background_updates` table."""
+
upgraded: bool = attr.ib(default=False)
"""Whether the current state was reached by applying deltas.
@@ -359,6 +367,7 @@ def _upgrade_existing_database(
"""
if is_empty:
assert not current_schema_state.applied_deltas
+ assert not current_schema_state.background_updates
else:
assert config
@@ -413,6 +422,24 @@ def _upgrade_existing_database(
start_ver += 1
logger.debug("applied_delta_files: %s", current_schema_state.applied_deltas)
+ logger.debug(
+ "pending background_updates: %s",
+ (name for name, ordering in current_schema_state.background_updates),
+ )
+
+ # Bail if there are any pending background updates from before the background schema compat version.
+ for update_name, ordering in sorted(
+ current_schema_state.background_updates, key=lambda b: b[1]
+ ):
+ # ordering is an int based on when the background update was added:
+ #
+ # (schema version when added * 100) + (schema delta when added).
+ update_schema_version = ordering // 100
+ if update_schema_version < BACKGROUND_UPDATES_COMPAT_VERSION:
+ raise UpgradeDatabaseException(
+ "Database has old pending background updates for version %d: %s"
+ % (update_schema_version, update_name)
+ )
if isinstance(database_engine, PostgresEngine):
specific_engine_extension = ".postgres"
@@ -705,10 +732,14 @@ def _get_or_create_schema_state(
)
applied_deltas = tuple(d for d, in txn)
+ txn.execute("SELECT update_name, ordering FROM background_updates")
+ background_Updates = cast(Tuple[Tuple[str, int], ...], tuple(txn))
+
return _SchemaState(
current_version=current_version,
compat_version=compat_version,
applied_deltas=applied_deltas,
+ background_updates=background_Updates,
upgraded=upgraded,
)
|