summary refs log tree commit diff
path: root/synapse/storage/background_updates.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/storage/background_updates.py50
1 files changed, 35 insertions, 15 deletions
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py

index f473294070..d170bbddaa 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py
@@ -40,20 +40,15 @@ from typing import ( import attr -from synapse._pydantic_compat import HAS_PYDANTIC_V2 +from synapse._pydantic_compat import BaseModel from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.engines import PostgresEngine from synapse.storage.types import Connection, Cursor -from synapse.types import JsonDict +from synapse.types import JsonDict, StrCollection from synapse.util import Clock, json_encoder from . import engines -if TYPE_CHECKING or HAS_PYDANTIC_V2: - from pydantic.v1 import BaseModel -else: - from pydantic import BaseModel - if TYPE_CHECKING: from synapse.server import HomeServer from synapse.storage.database import ( @@ -487,6 +482,31 @@ class BackgroundUpdater: return not update_exists + async def have_completed_background_updates( + self, update_names: StrCollection + ) -> bool: + """Return the name of background updates that have not yet been + completed""" + if self._all_done: + return True + + # We now check if we have completed all pending background updates. We + # do this as once this returns True then it will set `self._all_done` + # and we can skip checking the database in future. + if await self.has_completed_background_updates(): + return True + + rows = await self.db_pool.simple_select_many_batch( + table="background_updates", + column="update_name", + iterable=update_names, + retcols=("update_name",), + desc="get_uncompleted_background_updates", + ) + + # If we find any rows then we've not completed the update. + return not bool(rows) + async def do_next_background_update(self, sleep: bool = True) -> bool: """Does some amount of work on the next queued background update @@ -719,9 +739,9 @@ class BackgroundUpdater: c.execute(sql) async def updater(progress: JsonDict, batch_size: int) -> int: - assert isinstance( - self.db_pool.engine, engines.PostgresEngine - ), "validate constraint background update registered for non-Postres database" + assert isinstance(self.db_pool.engine, engines.PostgresEngine), ( + "validate constraint background update registered for non-Postres database" + ) logger.info("Validating constraint %s to %s", constraint_name, table) await self.db_pool.runWithConnection(runner) @@ -769,7 +789,7 @@ class BackgroundUpdater: # we may already have a half-built index. Let's just drop it # before trying to create it again. - sql = "DROP INDEX IF EXISTS %s" % (index_name,) + sql = "DROP INDEX CONCURRENTLY IF EXISTS %s" % (index_name,) logger.debug("[SQL] %s", sql) c.execute(sql) @@ -794,7 +814,7 @@ class BackgroundUpdater: if replaces_index is not None: # We drop the old index as the new index has now been created. - sql = f"DROP INDEX IF EXISTS {replaces_index}" + sql = f"DROP INDEX CONCURRENTLY IF EXISTS {replaces_index}" logger.debug("[SQL] %s", sql) c.execute(sql) finally: @@ -880,9 +900,9 @@ class BackgroundUpdater: on the table. Used to iterate over the table. """ - assert isinstance( - self.db_pool.engine, engines.PostgresEngine - ), "validate constraint background update registered for non-Postres database" + assert isinstance(self.db_pool.engine, engines.PostgresEngine), ( + "validate constraint background update registered for non-Postres database" + ) async def updater(progress: JsonDict, batch_size: int) -> int: return await self.validate_constraint_and_delete_in_background(