diff options
Diffstat (limited to 'synapse/storage/background_updates.py')
-rw-r--r-- | synapse/storage/background_updates.py | 44 |
1 files changed, 21 insertions, 23 deletions
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 59f3394b0a..56818f4df8 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -16,11 +16,8 @@ import logging from typing import Optional -from canonicaljson import json - -from twisted.internet import defer - from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.util import json_encoder from . import engines @@ -88,7 +85,7 @@ class BackgroundUpdater(object): def __init__(self, hs, database): self._clock = hs.get_clock() - self.db = database + self.db_pool = database # if a background update is currently running, its name. self._current_background_update = None # type: Optional[str] @@ -139,7 +136,7 @@ class BackgroundUpdater(object): # otherwise, check if there are updates to be run. This is important, # as we may be running on a worker which doesn't perform the bg updates # itself, but still wants to wait for them to happen. - updates = await self.db.simple_select_onecol( + updates = await self.db_pool.simple_select_onecol( "background_updates", keyvalues=None, retcol="1", @@ -160,7 +157,7 @@ class BackgroundUpdater(object): if update_name == self._current_background_update: return False - update_exists = await self.db.simple_select_one_onecol( + update_exists = await self.db_pool.simple_select_one_onecol( "background_updates", keyvalues={"update_name": update_name}, retcol="1", @@ -189,10 +186,10 @@ class BackgroundUpdater(object): ORDER BY ordering, update_name """ ) - return self.db.cursor_to_dict(txn) + return self.db_pool.cursor_to_dict(txn) if not self._current_background_update: - all_pending_updates = await self.db.runInteraction( + all_pending_updates = await self.db_pool.runInteraction( "background_updates", get_background_updates_txn, ) if not all_pending_updates: @@ -243,13 +240,16 @@ class BackgroundUpdater(object): else: batch_size = self.DEFAULT_BACKGROUND_BATCH_SIZE - progress_json = await self.db.simple_select_one_onecol( + progress_json = await self.db_pool.simple_select_one_onecol( "background_updates", keyvalues={"update_name": update_name}, retcol="progress_json", ) - progress = json.loads(progress_json) + # Avoid a circular import. + from synapse.storage._base import db_to_json + + progress = db_to_json(progress_json) time_start = self._clock.time_msec() items_updated = await update_handler(progress, batch_size) @@ -305,9 +305,8 @@ class BackgroundUpdater(object): update_name (str): Name of update """ - @defer.inlineCallbacks - def noop_update(progress, batch_size): - yield self._end_background_update(update_name) + async def noop_update(progress, batch_size): + await self._end_background_update(update_name) return 1 self.register_background_update_handler(update_name, noop_update) @@ -399,19 +398,18 @@ class BackgroundUpdater(object): logger.debug("[SQL] %s", sql) c.execute(sql) - if isinstance(self.db.engine, engines.PostgresEngine): + if isinstance(self.db_pool.engine, engines.PostgresEngine): runner = create_index_psql elif psql_only: runner = None else: runner = create_index_sqlite - @defer.inlineCallbacks - def updater(progress, batch_size): + async def updater(progress, batch_size): if runner is not None: logger.info("Adding index %s to %s", index_name, table) - yield self.db.runWithConnection(runner) - yield self._end_background_update(update_name) + await self.db_pool.runWithConnection(runner) + await self._end_background_update(update_name) return 1 self.register_background_update_handler(update_name, updater) @@ -430,7 +428,7 @@ class BackgroundUpdater(object): % update_name ) self._current_background_update = None - return self.db.simple_delete_one( + return self.db_pool.simple_delete_one( "background_updates", keyvalues={"update_name": update_name} ) @@ -442,7 +440,7 @@ class BackgroundUpdater(object): progress: The progress of the update. """ - return self.db.runInteraction( + return self.db_pool.runInteraction( "background_update_progress", self._background_update_progress_txn, update_name, @@ -458,9 +456,9 @@ class BackgroundUpdater(object): progress(dict): The progress of the update. """ - progress_json = json.dumps(progress) + progress_json = json_encoder.encode(progress) - self.db.simple_update_one_txn( + self.db_pool.simple_update_one_txn( txn, "background_updates", keyvalues={"update_name": update_name}, |