diff options
Diffstat (limited to 'synapse/storage/background_updates.py')
-rw-r--r-- | synapse/storage/background_updates.py | 187 |
1 files changed, 113 insertions, 74 deletions
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index e5f0668f09..59f3394b0a 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -14,6 +14,7 @@ # limitations under the License. import logging +from typing import Optional from canonicaljson import json @@ -22,7 +23,6 @@ from twisted.internet import defer from synapse.metrics.background_process_metrics import run_as_background_process from . import engines -from ._base import SQLBaseStore logger = logging.getLogger(__name__) @@ -74,7 +74,7 @@ class BackgroundUpdatePerformance(object): return float(self.total_item_count) / float(self.total_duration_ms) -class BackgroundUpdateStore(SQLBaseStore): +class BackgroundUpdater(object): """ Background updates are updates to the database that run in the background. Each update processes a batch of data at once. We attempt to limit the impact of each update by monitoring how long each batch takes to @@ -86,30 +86,34 @@ class BackgroundUpdateStore(SQLBaseStore): BACKGROUND_UPDATE_INTERVAL_MS = 1000 BACKGROUND_UPDATE_DURATION_MS = 100 - def __init__(self, db_conn, hs): - super(BackgroundUpdateStore, self).__init__(db_conn, hs) + def __init__(self, hs, database): + self._clock = hs.get_clock() + self.db = database + + # if a background update is currently running, its name. + self._current_background_update = None # type: Optional[str] + self._background_update_performance = {} - self._background_update_queue = [] self._background_update_handlers = {} self._all_done = False def start_doing_background_updates(self): - run_as_background_process("background_updates", self._run_background_updates) + run_as_background_process("background_updates", self.run_background_updates) - @defer.inlineCallbacks - def _run_background_updates(self): + async def run_background_updates(self, sleep=True): logger.info("Starting background schema updates") while True: - yield self.hs.get_clock().sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.0) + if sleep: + await self._clock.sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.0) try: - result = yield self.do_next_background_update( + result = await self.do_next_background_update( self.BACKGROUND_UPDATE_DURATION_MS ) except Exception: logger.exception("Error doing update") else: - if result is None: + if result: logger.info( "No more background updates to do." " Unscheduling background update task." @@ -117,30 +121,29 @@ class BackgroundUpdateStore(SQLBaseStore): self._all_done = True return None - @defer.inlineCallbacks - def has_completed_background_updates(self): + async def has_completed_background_updates(self) -> bool: """Check if all the background updates have completed Returns: - Deferred[bool]: True if all background updates have completed + True if all background updates have completed """ # if we've previously determined that there is nothing left to do, that # is easy if self._all_done: return True - # obviously, if we have things in our queue, we're not done. - if self._background_update_queue: + # obviously, if we are currently processing an update, we're not done. + if self._current_background_update: return False # otherwise, check if there are updates to be run. This is important, # as we may be running on a worker which doesn't perform the bg updates # itself, but still wants to wait for them to happen. - updates = yield self._simple_select_onecol( + updates = await self.db.simple_select_onecol( "background_updates", keyvalues=None, retcol="1", - desc="check_background_updates", + desc="has_completed_background_updates", ) if not updates: self._all_done = True @@ -148,42 +151,79 @@ class BackgroundUpdateStore(SQLBaseStore): return False - @defer.inlineCallbacks - def do_next_background_update(self, desired_duration_ms): + async def has_completed_background_update(self, update_name) -> bool: + """Check if the given background update has finished running. + """ + if self._all_done: + return True + + if update_name == self._current_background_update: + return False + + update_exists = await self.db.simple_select_one_onecol( + "background_updates", + keyvalues={"update_name": update_name}, + retcol="1", + desc="has_completed_background_update", + allow_none=True, + ) + + return not update_exists + + async def do_next_background_update(self, desired_duration_ms: float) -> bool: """Does some amount of work on the next queued background update + Returns once some amount of work is done. + Args: desired_duration_ms(float): How long we want to spend updating. Returns: - A deferred that completes once some amount of work is done. - The deferred will have a value of None if there is currently - no more work to do. + True if we have finished running all the background updates, otherwise False """ - if not self._background_update_queue: - updates = yield self._simple_select_list( - "background_updates", - keyvalues=None, - retcols=("update_name", "depends_on"), + + def get_background_updates_txn(txn): + txn.execute( + """ + SELECT update_name, depends_on FROM background_updates + ORDER BY ordering, update_name + """ ) - in_flight = set(update["update_name"] for update in updates) - for update in updates: - if update["depends_on"] not in in_flight: - self._background_update_queue.append(update["update_name"]) + return self.db.cursor_to_dict(txn) - if not self._background_update_queue: - # no work left to do - return None + if not self._current_background_update: + all_pending_updates = await self.db.runInteraction( + "background_updates", get_background_updates_txn, + ) + if not all_pending_updates: + # no work left to do + return True + + # find the first update which isn't dependent on another one in the queue. + pending = {update["update_name"] for update in all_pending_updates} + for upd in all_pending_updates: + depends_on = upd["depends_on"] + if not depends_on or depends_on not in pending: + break + logger.info( + "Not starting on bg update %s until %s is done", + upd["update_name"], + depends_on, + ) + else: + # if we get to the end of that for loop, there is a problem + raise Exception( + "Unable to find a background update which doesn't depend on " + "another: dependency cycle?" + ) - # pop from the front, and add back to the back - update_name = self._background_update_queue.pop(0) - self._background_update_queue.append(update_name) + self._current_background_update = upd["update_name"] - res = yield self._do_background_update(update_name, desired_duration_ms) - return res + await self._do_background_update(desired_duration_ms) + return False - @defer.inlineCallbacks - def _do_background_update(self, update_name, desired_duration_ms): + async def _do_background_update(self, desired_duration_ms: float) -> int: + update_name = self._current_background_update logger.info("Starting update batch on background update '%s'", update_name) update_handler = self._background_update_handlers[update_name] @@ -203,7 +243,7 @@ class BackgroundUpdateStore(SQLBaseStore): else: batch_size = self.DEFAULT_BACKGROUND_BATCH_SIZE - progress_json = yield self._simple_select_one_onecol( + progress_json = await self.db.simple_select_one_onecol( "background_updates", keyvalues={"update_name": update_name}, retcol="progress_json", @@ -212,13 +252,13 @@ class BackgroundUpdateStore(SQLBaseStore): progress = json.loads(progress_json) time_start = self._clock.time_msec() - items_updated = yield update_handler(progress, batch_size) + items_updated = await update_handler(progress, batch_size) time_stop = self._clock.time_msec() duration_ms = time_stop - time_start logger.info( - "Updating %r. Updated %r items in %rms." + "Running background update %r. Processed %r items in %rms." " (total_rate=%r/ms, current_rate=%r/ms, total_updated=%r, batch_size=%r)", update_name, items_updated, @@ -241,7 +281,9 @@ class BackgroundUpdateStore(SQLBaseStore): * A dict of the current progress * An integer count of the number of items to update in this batch. - The handler should return a deferred integer count of items updated. + The handler should return a deferred or coroutine which returns an integer count + of items updated. + The handler is responsible for updating the progress of the update. Args: @@ -357,7 +399,7 @@ class BackgroundUpdateStore(SQLBaseStore): logger.debug("[SQL] %s", sql) c.execute(sql) - if isinstance(self.database_engine, engines.PostgresEngine): + if isinstance(self.db.engine, engines.PostgresEngine): runner = create_index_psql elif psql_only: runner = None @@ -368,33 +410,12 @@ class BackgroundUpdateStore(SQLBaseStore): def updater(progress, batch_size): if runner is not None: logger.info("Adding index %s to %s", index_name, table) - yield self.runWithConnection(runner) + yield self.db.runWithConnection(runner) yield self._end_background_update(update_name) return 1 self.register_background_update_handler(update_name, updater) - def start_background_update(self, update_name, progress): - """Starts a background update running. - - Args: - update_name: The update to set running. - progress: The initial state of the progress of the update. - - Returns: - A deferred that completes once the task has been added to the - queue. - """ - # Clear the background update queue so that we will pick up the new - # task on the next iteration of do_background_update. - self._background_update_queue = [] - progress_json = json.dumps(progress) - - return self._simple_insert( - "background_updates", - {"update_name": update_name, "progress_json": progress_json}, - ) - def _end_background_update(self, update_name): """Removes a completed background update task from the queue. @@ -403,13 +424,31 @@ class BackgroundUpdateStore(SQLBaseStore): Returns: A deferred that completes once the task is removed. """ - self._background_update_queue = [ - name for name in self._background_update_queue if name != update_name - ] - return self._simple_delete_one( + if update_name != self._current_background_update: + raise Exception( + "Cannot end background update %s which isn't currently running" + % update_name + ) + self._current_background_update = None + return self.db.simple_delete_one( "background_updates", keyvalues={"update_name": update_name} ) + def _background_update_progress(self, update_name: str, progress: dict): + """Update the progress of a background update + + Args: + update_name: The name of the background update task + progress: The progress of the update. + """ + + return self.db.runInteraction( + "background_update_progress", + self._background_update_progress_txn, + update_name, + progress, + ) + def _background_update_progress_txn(self, txn, update_name, progress): """Update the progress of a background update @@ -421,7 +460,7 @@ class BackgroundUpdateStore(SQLBaseStore): progress_json = json.dumps(progress) - self._simple_update_one_txn( + self.db.simple_update_one_txn( txn, "background_updates", keyvalues={"update_name": update_name}, |