diff options
Diffstat (limited to 'synapse/storage/background_updates.py')
-rw-r--r-- | synapse/storage/background_updates.py | 82 |
1 files changed, 70 insertions, 12 deletions
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 7157fb1dfb..5fe1ca2de7 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -12,15 +12,17 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import synapse.util.async -from ._base import SQLBaseStore -from . import engines +import logging + +from canonicaljson import json from twisted.internet import defer -import ujson as json -import logging +from synapse.metrics.background_process_metrics import run_as_background_process + +from . import engines +from ._base import SQLBaseStore logger = logging.getLogger(__name__) @@ -80,25 +82,30 @@ class BackgroundUpdateStore(SQLBaseStore): BACKGROUND_UPDATE_INTERVAL_MS = 1000 BACKGROUND_UPDATE_DURATION_MS = 100 - def __init__(self, hs): - super(BackgroundUpdateStore, self).__init__(hs) + def __init__(self, db_conn, hs): + super(BackgroundUpdateStore, self).__init__(db_conn, hs) self._background_update_performance = {} self._background_update_queue = [] self._background_update_handlers = {} + self._all_done = False - @defer.inlineCallbacks def start_doing_background_updates(self): - logger.info("Starting background schema updates") + run_as_background_process( + "background_updates", self._run_background_updates, + ) + @defer.inlineCallbacks + def _run_background_updates(self): + logger.info("Starting background schema updates") while True: - yield synapse.util.async.sleep( + yield self.hs.get_clock().sleep( self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.) try: result = yield self.do_next_background_update( self.BACKGROUND_UPDATE_DURATION_MS ) - except: + except Exception: logger.exception("Error doing update") else: if result is None: @@ -106,9 +113,41 @@ class BackgroundUpdateStore(SQLBaseStore): "No more background updates to do." " Unscheduling background update task." ) + self._all_done = True defer.returnValue(None) @defer.inlineCallbacks + def has_completed_background_updates(self): + """Check if all the background updates have completed + + Returns: + Deferred[bool]: True if all background updates have completed + """ + # if we've previously determined that there is nothing left to do, that + # is easy + if self._all_done: + defer.returnValue(True) + + # obviously, if we have things in our queue, we're not done. + if self._background_update_queue: + defer.returnValue(False) + + # otherwise, check if there are updates to be run. This is important, + # as we may be running on a worker which doesn't perform the bg updates + # itself, but still wants to wait for them to happen. + updates = yield self._simple_select_onecol( + "background_updates", + keyvalues=None, + retcol="1", + desc="check_background_updates", + ) + if not updates: + self._all_done = True + defer.returnValue(True) + + defer.returnValue(False) + + @defer.inlineCallbacks def do_next_background_update(self, desired_duration_ms): """Does some amount of work on the next queued background update @@ -209,6 +248,25 @@ class BackgroundUpdateStore(SQLBaseStore): """ self._background_update_handlers[update_name] = update_handler + def register_noop_background_update(self, update_name): + """Register a noop handler for a background update. + + This is useful when we previously did a background update, but no + longer wish to do the update. In this case the background update should + be removed from the schema delta files, but there may still be some + users who have the background update queued, so this method should + also be called to clear the update. + + Args: + update_name (str): Name of update + """ + @defer.inlineCallbacks + def noop_update(progress, batch_size): + yield self._end_background_update(update_name) + defer.returnValue(1) + + self.register_background_update_handler(update_name, noop_update) + def register_background_index_update(self, update_name, index_name, table, columns, where_clause=None, unique=False, @@ -269,7 +327,7 @@ class BackgroundUpdateStore(SQLBaseStore): # Sqlite doesn't support concurrent creation of indexes. # # We don't use partial indices on SQLite as it wasn't introduced - # until 3.8, and wheezy has 3.7 + # until 3.8, and wheezy and CentOS 7 have 3.7 # # We assume that sqlite doesn't give us invalid indices; however # we may still end up with the index existing but the |