diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index 59f3394b0a..56818f4df8 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -16,11 +16,8 @@
import logging
from typing import Optional
-from canonicaljson import json
-
-from twisted.internet import defer
-
from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.util import json_encoder
from . import engines
@@ -88,7 +85,7 @@ class BackgroundUpdater(object):
def __init__(self, hs, database):
self._clock = hs.get_clock()
- self.db = database
+ self.db_pool = database
# if a background update is currently running, its name.
self._current_background_update = None # type: Optional[str]
@@ -139,7 +136,7 @@ class BackgroundUpdater(object):
# otherwise, check if there are updates to be run. This is important,
# as we may be running on a worker which doesn't perform the bg updates
# itself, but still wants to wait for them to happen.
- updates = await self.db.simple_select_onecol(
+ updates = await self.db_pool.simple_select_onecol(
"background_updates",
keyvalues=None,
retcol="1",
@@ -160,7 +157,7 @@ class BackgroundUpdater(object):
if update_name == self._current_background_update:
return False
- update_exists = await self.db.simple_select_one_onecol(
+ update_exists = await self.db_pool.simple_select_one_onecol(
"background_updates",
keyvalues={"update_name": update_name},
retcol="1",
@@ -189,10 +186,10 @@ class BackgroundUpdater(object):
ORDER BY ordering, update_name
"""
)
- return self.db.cursor_to_dict(txn)
+ return self.db_pool.cursor_to_dict(txn)
if not self._current_background_update:
- all_pending_updates = await self.db.runInteraction(
+ all_pending_updates = await self.db_pool.runInteraction(
"background_updates", get_background_updates_txn,
)
if not all_pending_updates:
@@ -243,13 +240,16 @@ class BackgroundUpdater(object):
else:
batch_size = self.DEFAULT_BACKGROUND_BATCH_SIZE
- progress_json = await self.db.simple_select_one_onecol(
+ progress_json = await self.db_pool.simple_select_one_onecol(
"background_updates",
keyvalues={"update_name": update_name},
retcol="progress_json",
)
- progress = json.loads(progress_json)
+ # Avoid a circular import.
+ from synapse.storage._base import db_to_json
+
+ progress = db_to_json(progress_json)
time_start = self._clock.time_msec()
items_updated = await update_handler(progress, batch_size)
@@ -305,9 +305,8 @@ class BackgroundUpdater(object):
update_name (str): Name of update
"""
- @defer.inlineCallbacks
- def noop_update(progress, batch_size):
- yield self._end_background_update(update_name)
+ async def noop_update(progress, batch_size):
+ await self._end_background_update(update_name)
return 1
self.register_background_update_handler(update_name, noop_update)
@@ -399,19 +398,18 @@ class BackgroundUpdater(object):
logger.debug("[SQL] %s", sql)
c.execute(sql)
- if isinstance(self.db.engine, engines.PostgresEngine):
+ if isinstance(self.db_pool.engine, engines.PostgresEngine):
runner = create_index_psql
elif psql_only:
runner = None
else:
runner = create_index_sqlite
- @defer.inlineCallbacks
- def updater(progress, batch_size):
+ async def updater(progress, batch_size):
if runner is not None:
logger.info("Adding index %s to %s", index_name, table)
- yield self.db.runWithConnection(runner)
- yield self._end_background_update(update_name)
+ await self.db_pool.runWithConnection(runner)
+ await self._end_background_update(update_name)
return 1
self.register_background_update_handler(update_name, updater)
@@ -430,7 +428,7 @@ class BackgroundUpdater(object):
% update_name
)
self._current_background_update = None
- return self.db.simple_delete_one(
+ return self.db_pool.simple_delete_one(
"background_updates", keyvalues={"update_name": update_name}
)
@@ -442,7 +440,7 @@ class BackgroundUpdater(object):
progress: The progress of the update.
"""
- return self.db.runInteraction(
+ return self.db_pool.runInteraction(
"background_update_progress",
self._background_update_progress_txn,
update_name,
@@ -458,9 +456,9 @@ class BackgroundUpdater(object):
progress(dict): The progress of the update.
"""
- progress_json = json.dumps(progress)
+ progress_json = json_encoder.encode(progress)
- self.db.simple_update_one_txn(
+ self.db_pool.simple_update_one_txn(
txn,
"background_updates",
keyvalues={"update_name": update_name},
|