diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index e5f0668f09..59f3394b0a 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -14,6 +14,7 @@
# limitations under the License.
import logging
+from typing import Optional
from canonicaljson import json
@@ -22,7 +23,6 @@ from twisted.internet import defer
from synapse.metrics.background_process_metrics import run_as_background_process
from . import engines
-from ._base import SQLBaseStore
logger = logging.getLogger(__name__)
@@ -74,7 +74,7 @@ class BackgroundUpdatePerformance(object):
return float(self.total_item_count) / float(self.total_duration_ms)
-class BackgroundUpdateStore(SQLBaseStore):
+class BackgroundUpdater(object):
""" Background updates are updates to the database that run in the
background. Each update processes a batch of data at once. We attempt to
limit the impact of each update by monitoring how long each batch takes to
@@ -86,30 +86,34 @@ class BackgroundUpdateStore(SQLBaseStore):
BACKGROUND_UPDATE_INTERVAL_MS = 1000
BACKGROUND_UPDATE_DURATION_MS = 100
- def __init__(self, db_conn, hs):
- super(BackgroundUpdateStore, self).__init__(db_conn, hs)
+ def __init__(self, hs, database):
+ self._clock = hs.get_clock()
+ self.db = database
+
+ # if a background update is currently running, its name.
+ self._current_background_update = None # type: Optional[str]
+
self._background_update_performance = {}
- self._background_update_queue = []
self._background_update_handlers = {}
self._all_done = False
def start_doing_background_updates(self):
- run_as_background_process("background_updates", self._run_background_updates)
+ run_as_background_process("background_updates", self.run_background_updates)
- @defer.inlineCallbacks
- def _run_background_updates(self):
+ async def run_background_updates(self, sleep=True):
logger.info("Starting background schema updates")
while True:
- yield self.hs.get_clock().sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.0)
+ if sleep:
+ await self._clock.sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.0)
try:
- result = yield self.do_next_background_update(
+ result = await self.do_next_background_update(
self.BACKGROUND_UPDATE_DURATION_MS
)
except Exception:
logger.exception("Error doing update")
else:
- if result is None:
+ if result:
logger.info(
"No more background updates to do."
" Unscheduling background update task."
@@ -117,30 +121,29 @@ class BackgroundUpdateStore(SQLBaseStore):
self._all_done = True
return None
- @defer.inlineCallbacks
- def has_completed_background_updates(self):
+ async def has_completed_background_updates(self) -> bool:
"""Check if all the background updates have completed
Returns:
- Deferred[bool]: True if all background updates have completed
+ True if all background updates have completed
"""
# if we've previously determined that there is nothing left to do, that
# is easy
if self._all_done:
return True
- # obviously, if we have things in our queue, we're not done.
- if self._background_update_queue:
+ # obviously, if we are currently processing an update, we're not done.
+ if self._current_background_update:
return False
# otherwise, check if there are updates to be run. This is important,
# as we may be running on a worker which doesn't perform the bg updates
# itself, but still wants to wait for them to happen.
- updates = yield self._simple_select_onecol(
+ updates = await self.db.simple_select_onecol(
"background_updates",
keyvalues=None,
retcol="1",
- desc="check_background_updates",
+ desc="has_completed_background_updates",
)
if not updates:
self._all_done = True
@@ -148,42 +151,79 @@ class BackgroundUpdateStore(SQLBaseStore):
return False
- @defer.inlineCallbacks
- def do_next_background_update(self, desired_duration_ms):
+ async def has_completed_background_update(self, update_name) -> bool:
+ """Check if the given background update has finished running.
+ """
+ if self._all_done:
+ return True
+
+ if update_name == self._current_background_update:
+ return False
+
+ update_exists = await self.db.simple_select_one_onecol(
+ "background_updates",
+ keyvalues={"update_name": update_name},
+ retcol="1",
+ desc="has_completed_background_update",
+ allow_none=True,
+ )
+
+ return not update_exists
+
+ async def do_next_background_update(self, desired_duration_ms: float) -> bool:
"""Does some amount of work on the next queued background update
+ Returns once some amount of work is done.
+
Args:
desired_duration_ms(float): How long we want to spend
updating.
Returns:
- A deferred that completes once some amount of work is done.
- The deferred will have a value of None if there is currently
- no more work to do.
+ True if we have finished running all the background updates, otherwise False
"""
- if not self._background_update_queue:
- updates = yield self._simple_select_list(
- "background_updates",
- keyvalues=None,
- retcols=("update_name", "depends_on"),
+
+ def get_background_updates_txn(txn):
+ txn.execute(
+ """
+ SELECT update_name, depends_on FROM background_updates
+ ORDER BY ordering, update_name
+ """
)
- in_flight = set(update["update_name"] for update in updates)
- for update in updates:
- if update["depends_on"] not in in_flight:
- self._background_update_queue.append(update["update_name"])
+ return self.db.cursor_to_dict(txn)
- if not self._background_update_queue:
- # no work left to do
- return None
+ if not self._current_background_update:
+ all_pending_updates = await self.db.runInteraction(
+ "background_updates", get_background_updates_txn,
+ )
+ if not all_pending_updates:
+ # no work left to do
+ return True
+
+ # find the first update which isn't dependent on another one in the queue.
+ pending = {update["update_name"] for update in all_pending_updates}
+ for upd in all_pending_updates:
+ depends_on = upd["depends_on"]
+ if not depends_on or depends_on not in pending:
+ break
+ logger.info(
+ "Not starting on bg update %s until %s is done",
+ upd["update_name"],
+ depends_on,
+ )
+ else:
+ # if we get to the end of that for loop, there is a problem
+ raise Exception(
+ "Unable to find a background update which doesn't depend on "
+ "another: dependency cycle?"
+ )
- # pop from the front, and add back to the back
- update_name = self._background_update_queue.pop(0)
- self._background_update_queue.append(update_name)
+ self._current_background_update = upd["update_name"]
- res = yield self._do_background_update(update_name, desired_duration_ms)
- return res
+ await self._do_background_update(desired_duration_ms)
+ return False
- @defer.inlineCallbacks
- def _do_background_update(self, update_name, desired_duration_ms):
+ async def _do_background_update(self, desired_duration_ms: float) -> int:
+ update_name = self._current_background_update
logger.info("Starting update batch on background update '%s'", update_name)
update_handler = self._background_update_handlers[update_name]
@@ -203,7 +243,7 @@ class BackgroundUpdateStore(SQLBaseStore):
else:
batch_size = self.DEFAULT_BACKGROUND_BATCH_SIZE
- progress_json = yield self._simple_select_one_onecol(
+ progress_json = await self.db.simple_select_one_onecol(
"background_updates",
keyvalues={"update_name": update_name},
retcol="progress_json",
@@ -212,13 +252,13 @@ class BackgroundUpdateStore(SQLBaseStore):
progress = json.loads(progress_json)
time_start = self._clock.time_msec()
- items_updated = yield update_handler(progress, batch_size)
+ items_updated = await update_handler(progress, batch_size)
time_stop = self._clock.time_msec()
duration_ms = time_stop - time_start
logger.info(
- "Updating %r. Updated %r items in %rms."
+ "Running background update %r. Processed %r items in %rms."
" (total_rate=%r/ms, current_rate=%r/ms, total_updated=%r, batch_size=%r)",
update_name,
items_updated,
@@ -241,7 +281,9 @@ class BackgroundUpdateStore(SQLBaseStore):
* A dict of the current progress
* An integer count of the number of items to update in this batch.
- The handler should return a deferred integer count of items updated.
+ The handler should return a deferred or coroutine which returns an integer count
+ of items updated.
+
The handler is responsible for updating the progress of the update.
Args:
@@ -357,7 +399,7 @@ class BackgroundUpdateStore(SQLBaseStore):
logger.debug("[SQL] %s", sql)
c.execute(sql)
- if isinstance(self.database_engine, engines.PostgresEngine):
+ if isinstance(self.db.engine, engines.PostgresEngine):
runner = create_index_psql
elif psql_only:
runner = None
@@ -368,33 +410,12 @@ class BackgroundUpdateStore(SQLBaseStore):
def updater(progress, batch_size):
if runner is not None:
logger.info("Adding index %s to %s", index_name, table)
- yield self.runWithConnection(runner)
+ yield self.db.runWithConnection(runner)
yield self._end_background_update(update_name)
return 1
self.register_background_update_handler(update_name, updater)
- def start_background_update(self, update_name, progress):
- """Starts a background update running.
-
- Args:
- update_name: The update to set running.
- progress: The initial state of the progress of the update.
-
- Returns:
- A deferred that completes once the task has been added to the
- queue.
- """
- # Clear the background update queue so that we will pick up the new
- # task on the next iteration of do_background_update.
- self._background_update_queue = []
- progress_json = json.dumps(progress)
-
- return self._simple_insert(
- "background_updates",
- {"update_name": update_name, "progress_json": progress_json},
- )
-
def _end_background_update(self, update_name):
"""Removes a completed background update task from the queue.
@@ -403,13 +424,31 @@ class BackgroundUpdateStore(SQLBaseStore):
Returns:
A deferred that completes once the task is removed.
"""
- self._background_update_queue = [
- name for name in self._background_update_queue if name != update_name
- ]
- return self._simple_delete_one(
+ if update_name != self._current_background_update:
+ raise Exception(
+ "Cannot end background update %s which isn't currently running"
+ % update_name
+ )
+ self._current_background_update = None
+ return self.db.simple_delete_one(
"background_updates", keyvalues={"update_name": update_name}
)
+ def _background_update_progress(self, update_name: str, progress: dict):
+ """Update the progress of a background update
+
+ Args:
+ update_name: The name of the background update task
+ progress: The progress of the update.
+ """
+
+ return self.db.runInteraction(
+ "background_update_progress",
+ self._background_update_progress_txn,
+ update_name,
+ progress,
+ )
+
def _background_update_progress_txn(self, txn, update_name, progress):
"""Update the progress of a background update
@@ -421,7 +460,7 @@ class BackgroundUpdateStore(SQLBaseStore):
progress_json = json.dumps(progress)
- self._simple_update_one_txn(
+ self.db.simple_update_one_txn(
txn,
"background_updates",
keyvalues={"update_name": update_name},
|