diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index f43463df53..810721ebe9 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -16,18 +16,15 @@
import logging
from typing import Optional
-from canonicaljson import json
-
-from twisted.internet import defer
-
from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.util import json_encoder
from . import engines
logger = logging.getLogger(__name__)
-class BackgroundUpdatePerformance(object):
+class BackgroundUpdatePerformance:
"""Tracks the how long a background update is taking to update its items"""
def __init__(self, name):
@@ -74,7 +71,7 @@ class BackgroundUpdatePerformance(object):
return float(self.total_item_count) / float(self.total_duration_ms)
-class BackgroundUpdater(object):
+class BackgroundUpdater:
""" Background updates are updates to the database that run in the
background. Each update processes a batch of data at once. We attempt to
limit the impact of each update by monitoring how long each batch takes to
@@ -308,9 +305,8 @@ class BackgroundUpdater(object):
update_name (str): Name of update
"""
- @defer.inlineCallbacks
- def noop_update(progress, batch_size):
- yield self._end_background_update(update_name)
+ async def noop_update(progress, batch_size):
+ await self._end_background_update(update_name)
return 1
self.register_background_update_handler(update_name, noop_update)
@@ -409,23 +405,23 @@ class BackgroundUpdater(object):
else:
runner = create_index_sqlite
- @defer.inlineCallbacks
- def updater(progress, batch_size):
+ async def updater(progress, batch_size):
if runner is not None:
logger.info("Adding index %s to %s", index_name, table)
- yield self.db_pool.runWithConnection(runner)
- yield self._end_background_update(update_name)
+ await self.db_pool.runWithConnection(runner)
+ await self._end_background_update(update_name)
return 1
self.register_background_update_handler(update_name, updater)
- def _end_background_update(self, update_name):
+ async def _end_background_update(self, update_name: str) -> None:
"""Removes a completed background update task from the queue.
Args:
- update_name(str): The name of the completed task to remove
+ update_name:: The name of the completed task to remove
+
Returns:
- A deferred that completes once the task is removed.
+ None, completes once the task is removed.
"""
if update_name != self._current_background_update:
raise Exception(
@@ -433,11 +429,11 @@ class BackgroundUpdater(object):
% update_name
)
self._current_background_update = None
- return self.db_pool.simple_delete_one(
+ await self.db_pool.simple_delete_one(
"background_updates", keyvalues={"update_name": update_name}
)
- def _background_update_progress(self, update_name: str, progress: dict):
+ async def _background_update_progress(self, update_name: str, progress: dict):
"""Update the progress of a background update
Args:
@@ -445,7 +441,7 @@ class BackgroundUpdater(object):
progress: The progress of the update.
"""
- return self.db_pool.runInteraction(
+ return await self.db_pool.runInteraction(
"background_update_progress",
self._background_update_progress_txn,
update_name,
@@ -461,7 +457,7 @@ class BackgroundUpdater(object):
progress(dict): The progress of the update.
"""
- progress_json = json.dumps(progress)
+ progress_json = json_encoder.encode(progress)
self.db_pool.simple_update_one_txn(
txn,
|