summary refs log tree commit diff
diff options
context:
space:
mode:
authorRichard van der Hoff <1389908+richvdh@users.noreply.github.com>2020-01-07 14:12:42 +0000
committerGitHub <noreply@github.com>2020-01-07 14:12:42 +0000
commit9824a39d807d2d13424095743761930b853fb08f (patch)
treeb043c47b3593a7a3717cf4677e9a731ea256d5dd
parentport BackgroundUpdateTestCase to HomeserverTestCase (#6653) (diff)
downloadsynapse-9824a39d807d2d13424095743761930b853fb08f.tar.xz
Async/await for background updates (#6647)
so that bg update routines can be async
-rw-r--r--changelog.d/6647.misc1
-rw-r--r--synapse/storage/background_updates.py36
2 files changed, 21 insertions, 16 deletions
diff --git a/changelog.d/6647.misc b/changelog.d/6647.misc
new file mode 100644
index 0000000000..fbe7c0e7db
--- /dev/null
+++ b/changelog.d/6647.misc
@@ -0,0 +1 @@
+Port core background update routines to async/await.
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index 4f97fd5ab6..b4825acc7b 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 import logging
+from typing import Optional
 
 from canonicaljson import json
 
@@ -97,15 +98,14 @@ class BackgroundUpdater(object):
     def start_doing_background_updates(self):
         run_as_background_process("background_updates", self.run_background_updates)
 
-    @defer.inlineCallbacks
-    def run_background_updates(self, sleep=True):
+    async def run_background_updates(self, sleep=True):
         logger.info("Starting background schema updates")
         while True:
             if sleep:
-                yield self._clock.sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.0)
+                await self._clock.sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.0)
 
             try:
-                result = yield self.do_next_background_update(
+                result = await self.do_next_background_update(
                     self.BACKGROUND_UPDATE_DURATION_MS
                 )
             except Exception:
@@ -170,20 +170,21 @@ class BackgroundUpdater(object):
 
         return not update_exists
 
-    @defer.inlineCallbacks
-    def do_next_background_update(self, desired_duration_ms):
+    async def do_next_background_update(
+        self, desired_duration_ms: float
+    ) -> Optional[int]:
         """Does some amount of work on the next queued background update
 
+        Returns once some amount of work is done.
+
         Args:
             desired_duration_ms(float): How long we want to spend
                 updating.
         Returns:
-            A deferred that completes once some amount of work is done.
-            The deferred will have a value of None if there is currently
-            no more work to do.
+            None if there is no more work to do, otherwise an int
         """
         if not self._background_update_queue:
-            updates = yield self.db.simple_select_list(
+            updates = await self.db.simple_select_list(
                 "background_updates",
                 keyvalues=None,
                 retcols=("update_name", "depends_on"),
@@ -201,11 +202,12 @@ class BackgroundUpdater(object):
         update_name = self._background_update_queue.pop(0)
         self._background_update_queue.append(update_name)
 
-        res = yield self._do_background_update(update_name, desired_duration_ms)
+        res = await self._do_background_update(update_name, desired_duration_ms)
         return res
 
-    @defer.inlineCallbacks
-    def _do_background_update(self, update_name, desired_duration_ms):
+    async def _do_background_update(
+        self, update_name: str, desired_duration_ms: float
+    ) -> int:
         logger.info("Starting update batch on background update '%s'", update_name)
 
         update_handler = self._background_update_handlers[update_name]
@@ -225,7 +227,7 @@ class BackgroundUpdater(object):
         else:
             batch_size = self.DEFAULT_BACKGROUND_BATCH_SIZE
 
-        progress_json = yield self.db.simple_select_one_onecol(
+        progress_json = await self.db.simple_select_one_onecol(
             "background_updates",
             keyvalues={"update_name": update_name},
             retcol="progress_json",
@@ -234,7 +236,7 @@ class BackgroundUpdater(object):
         progress = json.loads(progress_json)
 
         time_start = self._clock.time_msec()
-        items_updated = yield update_handler(progress, batch_size)
+        items_updated = await update_handler(progress, batch_size)
         time_stop = self._clock.time_msec()
 
         duration_ms = time_stop - time_start
@@ -263,7 +265,9 @@ class BackgroundUpdater(object):
         * A dict of the current progress
         * An integer count of the number of items to update in this batch.
 
-        The handler should return a deferred integer count of items updated.
+        The handler should return a deferred or coroutine which returns an integer count
+        of items updated.
+
         The handler is responsible for updating the progress of the update.
 
         Args: