diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index f473294070..d170bbddaa 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -40,20 +40,15 @@ from typing import (
import attr
-from synapse._pydantic_compat import HAS_PYDANTIC_V2
+from synapse._pydantic_compat import BaseModel
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.engines import PostgresEngine
from synapse.storage.types import Connection, Cursor
-from synapse.types import JsonDict
+from synapse.types import JsonDict, StrCollection
from synapse.util import Clock, json_encoder
from . import engines
-if TYPE_CHECKING or HAS_PYDANTIC_V2:
- from pydantic.v1 import BaseModel
-else:
- from pydantic import BaseModel
-
if TYPE_CHECKING:
from synapse.server import HomeServer
from synapse.storage.database import (
@@ -487,6 +482,31 @@ class BackgroundUpdater:
return not update_exists
+ async def have_completed_background_updates(
+ self, update_names: StrCollection
+ ) -> bool:
+ """Return the name of background updates that have not yet been
+ completed"""
+ if self._all_done:
+ return True
+
+ # We now check if we have completed all pending background updates. We
+ # do this as once this returns True then it will set `self._all_done`
+ # and we can skip checking the database in future.
+ if await self.has_completed_background_updates():
+ return True
+
+ rows = await self.db_pool.simple_select_many_batch(
+ table="background_updates",
+ column="update_name",
+ iterable=update_names,
+ retcols=("update_name",),
+ desc="get_uncompleted_background_updates",
+ )
+
+ # If we find any rows then we've not completed the update.
+ return not bool(rows)
+
async def do_next_background_update(self, sleep: bool = True) -> bool:
"""Does some amount of work on the next queued background update
@@ -719,9 +739,9 @@ class BackgroundUpdater:
c.execute(sql)
async def updater(progress: JsonDict, batch_size: int) -> int:
- assert isinstance(
- self.db_pool.engine, engines.PostgresEngine
- ), "validate constraint background update registered for non-Postres database"
+ assert isinstance(self.db_pool.engine, engines.PostgresEngine), (
+ "validate constraint background update registered for non-Postres database"
+ )
logger.info("Validating constraint %s to %s", constraint_name, table)
await self.db_pool.runWithConnection(runner)
@@ -769,7 +789,7 @@ class BackgroundUpdater:
# we may already have a half-built index. Let's just drop it
# before trying to create it again.
- sql = "DROP INDEX IF EXISTS %s" % (index_name,)
+ sql = "DROP INDEX CONCURRENTLY IF EXISTS %s" % (index_name,)
logger.debug("[SQL] %s", sql)
c.execute(sql)
@@ -794,7 +814,7 @@ class BackgroundUpdater:
if replaces_index is not None:
# We drop the old index as the new index has now been created.
- sql = f"DROP INDEX IF EXISTS {replaces_index}"
+ sql = f"DROP INDEX CONCURRENTLY IF EXISTS {replaces_index}"
logger.debug("[SQL] %s", sql)
c.execute(sql)
finally:
@@ -880,9 +900,9 @@ class BackgroundUpdater:
on the table. Used to iterate over the table.
"""
- assert isinstance(
- self.db_pool.engine, engines.PostgresEngine
- ), "validate constraint background update registered for non-Postres database"
+ assert isinstance(self.db_pool.engine, engines.PostgresEngine), (
+ "validate constraint background update registered for non-Postres database"
+ )
async def updater(progress: JsonDict, batch_size: int) -> int:
return await self.validate_constraint_and_delete_in_background(
|