diff options
Diffstat (limited to 'synapse/storage/background_updates.py')
-rw-r--r-- | synapse/storage/background_updates.py | 114 |
1 files changed, 56 insertions, 58 deletions
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index eb1a7e5002..59f3394b0a 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -90,8 +90,10 @@ class BackgroundUpdater(object): self._clock = hs.get_clock() self.db = database + # if a background update is currently running, its name. + self._current_background_update = None # type: Optional[str] + self._background_update_performance = {} - self._background_update_queue = [] self._background_update_handlers = {} self._all_done = False @@ -111,7 +113,7 @@ class BackgroundUpdater(object): except Exception: logger.exception("Error doing update") else: - if result is None: + if result: logger.info( "No more background updates to do." " Unscheduling background update task." @@ -119,26 +121,25 @@ class BackgroundUpdater(object): self._all_done = True return None - @defer.inlineCallbacks - def has_completed_background_updates(self): + async def has_completed_background_updates(self) -> bool: """Check if all the background updates have completed Returns: - Deferred[bool]: True if all background updates have completed + True if all background updates have completed """ # if we've previously determined that there is nothing left to do, that # is easy if self._all_done: return True - # obviously, if we have things in our queue, we're not done. - if self._background_update_queue: + # obviously, if we are currently processing an update, we're not done. + if self._current_background_update: return False # otherwise, check if there are updates to be run. This is important, # as we may be running on a worker which doesn't perform the bg updates # itself, but still wants to wait for them to happen. - updates = yield self.db.simple_select_onecol( + updates = await self.db.simple_select_onecol( "background_updates", keyvalues=None, retcol="1", @@ -153,11 +154,10 @@ class BackgroundUpdater(object): async def has_completed_background_update(self, update_name) -> bool: """Check if the given background update has finished running. """ - if self._all_done: return True - if update_name in self._background_update_queue: + if update_name == self._current_background_update: return False update_exists = await self.db.simple_select_one_onecol( @@ -170,9 +170,7 @@ class BackgroundUpdater(object): return not update_exists - async def do_next_background_update( - self, desired_duration_ms: float - ) -> Optional[int]: + async def do_next_background_update(self, desired_duration_ms: float) -> bool: """Does some amount of work on the next queued background update Returns once some amount of work is done. @@ -181,33 +179,51 @@ class BackgroundUpdater(object): desired_duration_ms(float): How long we want to spend updating. Returns: - None if there is no more work to do, otherwise an int + True if we have finished running all the background updates, otherwise False """ - if not self._background_update_queue: - updates = await self.db.simple_select_list( - "background_updates", - keyvalues=None, - retcols=("update_name", "depends_on"), + + def get_background_updates_txn(txn): + txn.execute( + """ + SELECT update_name, depends_on FROM background_updates + ORDER BY ordering, update_name + """ ) - in_flight = {update["update_name"] for update in updates} - for update in updates: - if update["depends_on"] not in in_flight: - self._background_update_queue.append(update["update_name"]) + return self.db.cursor_to_dict(txn) - if not self._background_update_queue: - # no work left to do - return None + if not self._current_background_update: + all_pending_updates = await self.db.runInteraction( + "background_updates", get_background_updates_txn, + ) + if not all_pending_updates: + # no work left to do + return True + + # find the first update which isn't dependent on another one in the queue. + pending = {update["update_name"] for update in all_pending_updates} + for upd in all_pending_updates: + depends_on = upd["depends_on"] + if not depends_on or depends_on not in pending: + break + logger.info( + "Not starting on bg update %s until %s is done", + upd["update_name"], + depends_on, + ) + else: + # if we get to the end of that for loop, there is a problem + raise Exception( + "Unable to find a background update which doesn't depend on " + "another: dependency cycle?" + ) - # pop from the front, and add back to the back - update_name = self._background_update_queue.pop(0) - self._background_update_queue.append(update_name) + self._current_background_update = upd["update_name"] - res = await self._do_background_update(update_name, desired_duration_ms) - return res + await self._do_background_update(desired_duration_ms) + return False - async def _do_background_update( - self, update_name: str, desired_duration_ms: float - ) -> int: + async def _do_background_update(self, desired_duration_ms: float) -> int: + update_name = self._current_background_update logger.info("Starting update batch on background update '%s'", update_name) update_handler = self._background_update_handlers[update_name] @@ -400,27 +416,6 @@ class BackgroundUpdater(object): self.register_background_update_handler(update_name, updater) - def start_background_update(self, update_name, progress): - """Starts a background update running. - - Args: - update_name: The update to set running. - progress: The initial state of the progress of the update. - - Returns: - A deferred that completes once the task has been added to the - queue. - """ - # Clear the background update queue so that we will pick up the new - # task on the next iteration of do_background_update. - self._background_update_queue = [] - progress_json = json.dumps(progress) - - return self.db.simple_insert( - "background_updates", - {"update_name": update_name, "progress_json": progress_json}, - ) - def _end_background_update(self, update_name): """Removes a completed background update task from the queue. @@ -429,9 +424,12 @@ class BackgroundUpdater(object): Returns: A deferred that completes once the task is removed. """ - self._background_update_queue = [ - name for name in self._background_update_queue if name != update_name - ] + if update_name != self._current_background_update: + raise Exception( + "Cannot end background update %s which isn't currently running" + % update_name + ) + self._current_background_update = None return self.db.simple_delete_one( "background_updates", keyvalues={"update_name": update_name} ) |