summary refs log tree commit diff
path: root/synapse/storage/background_updates.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/background_updates.py')
-rw-r--r--synapse/storage/background_updates.py44
1 files changed, 21 insertions, 23 deletions
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index 59f3394b0a..56818f4df8 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -16,11 +16,8 @@
 import logging
 from typing import Optional
 
-from canonicaljson import json
-
-from twisted.internet import defer
-
 from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.util import json_encoder
 
 from . import engines
 
@@ -88,7 +85,7 @@ class BackgroundUpdater(object):
 
     def __init__(self, hs, database):
         self._clock = hs.get_clock()
-        self.db = database
+        self.db_pool = database
 
         # if a background update is currently running, its name.
         self._current_background_update = None  # type: Optional[str]
@@ -139,7 +136,7 @@ class BackgroundUpdater(object):
         # otherwise, check if there are updates to be run. This is important,
         # as we may be running on a worker which doesn't perform the bg updates
         # itself, but still wants to wait for them to happen.
-        updates = await self.db.simple_select_onecol(
+        updates = await self.db_pool.simple_select_onecol(
             "background_updates",
             keyvalues=None,
             retcol="1",
@@ -160,7 +157,7 @@ class BackgroundUpdater(object):
         if update_name == self._current_background_update:
             return False
 
-        update_exists = await self.db.simple_select_one_onecol(
+        update_exists = await self.db_pool.simple_select_one_onecol(
             "background_updates",
             keyvalues={"update_name": update_name},
             retcol="1",
@@ -189,10 +186,10 @@ class BackgroundUpdater(object):
                 ORDER BY ordering, update_name
                 """
             )
-            return self.db.cursor_to_dict(txn)
+            return self.db_pool.cursor_to_dict(txn)
 
         if not self._current_background_update:
-            all_pending_updates = await self.db.runInteraction(
+            all_pending_updates = await self.db_pool.runInteraction(
                 "background_updates", get_background_updates_txn,
             )
             if not all_pending_updates:
@@ -243,13 +240,16 @@ class BackgroundUpdater(object):
         else:
             batch_size = self.DEFAULT_BACKGROUND_BATCH_SIZE
 
-        progress_json = await self.db.simple_select_one_onecol(
+        progress_json = await self.db_pool.simple_select_one_onecol(
             "background_updates",
             keyvalues={"update_name": update_name},
             retcol="progress_json",
         )
 
-        progress = json.loads(progress_json)
+        # Avoid a circular import.
+        from synapse.storage._base import db_to_json
+
+        progress = db_to_json(progress_json)
 
         time_start = self._clock.time_msec()
         items_updated = await update_handler(progress, batch_size)
@@ -305,9 +305,8 @@ class BackgroundUpdater(object):
             update_name (str): Name of update
         """
 
-        @defer.inlineCallbacks
-        def noop_update(progress, batch_size):
-            yield self._end_background_update(update_name)
+        async def noop_update(progress, batch_size):
+            await self._end_background_update(update_name)
             return 1
 
         self.register_background_update_handler(update_name, noop_update)
@@ -399,19 +398,18 @@ class BackgroundUpdater(object):
             logger.debug("[SQL] %s", sql)
             c.execute(sql)
 
-        if isinstance(self.db.engine, engines.PostgresEngine):
+        if isinstance(self.db_pool.engine, engines.PostgresEngine):
             runner = create_index_psql
         elif psql_only:
             runner = None
         else:
             runner = create_index_sqlite
 
-        @defer.inlineCallbacks
-        def updater(progress, batch_size):
+        async def updater(progress, batch_size):
             if runner is not None:
                 logger.info("Adding index %s to %s", index_name, table)
-                yield self.db.runWithConnection(runner)
-            yield self._end_background_update(update_name)
+                await self.db_pool.runWithConnection(runner)
+            await self._end_background_update(update_name)
             return 1
 
         self.register_background_update_handler(update_name, updater)
@@ -430,7 +428,7 @@ class BackgroundUpdater(object):
                 % update_name
             )
         self._current_background_update = None
-        return self.db.simple_delete_one(
+        return self.db_pool.simple_delete_one(
             "background_updates", keyvalues={"update_name": update_name}
         )
 
@@ -442,7 +440,7 @@ class BackgroundUpdater(object):
             progress: The progress of the update.
         """
 
-        return self.db.runInteraction(
+        return self.db_pool.runInteraction(
             "background_update_progress",
             self._background_update_progress_txn,
             update_name,
@@ -458,9 +456,9 @@ class BackgroundUpdater(object):
             progress(dict): The progress of the update.
         """
 
-        progress_json = json.dumps(progress)
+        progress_json = json_encoder.encode(progress)
 
-        self.db.simple_update_one_txn(
+        self.db_pool.simple_update_one_txn(
             txn,
             "background_updates",
             keyvalues={"update_name": update_name},