summary refs log tree commit diff
path: root/synapse/storage/background_updates.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/background_updates.py')
-rw-r--r--synapse/storage/background_updates.py74
1 files changed, 48 insertions, 26 deletions
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index 4a59132bf3..0e430356cd 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -90,8 +90,10 @@ class BackgroundUpdater(object):
         self._clock = hs.get_clock()
         self.db = database
 
+        # if a background update is currently running, its name.
+        self._current_background_update = None  # type: Optional[str]
+
         self._background_update_performance = {}
-        self._background_update_queue = []
         self._background_update_handlers = {}
         self._all_done = False
 
@@ -131,7 +133,7 @@ class BackgroundUpdater(object):
             return True
 
         # obviously, if we have things in our queue, we're not done.
-        if self._background_update_queue:
+        if self._current_background_update:
             return False
 
         # otherwise, check if there are updates to be run. This is important,
@@ -152,11 +154,10 @@ class BackgroundUpdater(object):
     async def has_completed_background_update(self, update_name) -> bool:
         """Check if the given background update has finished running.
         """
-
         if self._all_done:
             return True
 
-        if update_name in self._background_update_queue:
+        if update_name == self._current_background_update:
             return False
 
         update_exists = await self.db.simple_select_one_onecol(
@@ -180,31 +181,49 @@ class BackgroundUpdater(object):
         Returns:
             True if there is no more work to do, otherwise False
         """
-        if not self._background_update_queue:
-            updates = await self.db.simple_select_list(
-                "background_updates",
-                keyvalues=None,
-                retcols=("update_name", "depends_on"),
+
+        def get_background_updates_txn(txn):
+            txn.execute(
+                """
+                SELECT update_name, depends_on FROM background_updates
+                ORDER BY ordering, update_name
+                """
             )
-            in_flight = {update["update_name"] for update in updates}
-            for update in updates:
-                if update["depends_on"] not in in_flight:
-                    self._background_update_queue.append(update["update_name"])
+            return self.db.cursor_to_dict(txn)
 
-        if not self._background_update_queue:
-            # no work left to do
-            return True
+        if not self._current_background_update:
+            all_pending_updates = await self.db.runInteraction(
+                "background_updates", get_background_updates_txn,
+            )
+            if not all_pending_updates:
+                # no work left to do
+                return True
+
+            # find the first update which isn't dependent on another one in the queue.
+            pending = {update["update_name"] for update in all_pending_updates}
+            for upd in all_pending_updates:
+                depends_on = upd["depends_on"]
+                if not depends_on or depends_on not in pending:
+                    break
+                logger.info(
+                    "Not starting on bg update %s until %s is done",
+                    upd["update_name"],
+                    depends_on,
+                )
+            else:
+                # if we get to the end of that for loop, there is a problem
+                raise Exception(
+                    "Unable to find a background update which doesn't depend on "
+                    "another: dependency cycle?"
+                )
 
-        # pop from the front, and add back to the back
-        update_name = self._background_update_queue.pop(0)
-        self._background_update_queue.append(update_name)
+            self._current_background_update = upd["update_name"]
 
-        res = await self._do_background_update(update_name, desired_duration_ms)
+        await self._do_background_update(desired_duration_ms)
         return False
 
-    async def _do_background_update(
-        self, update_name: str, desired_duration_ms: float
-    ) -> int:
+    async def _do_background_update(self, desired_duration_ms: float) -> int:
+        update_name = self._current_background_update
         logger.info("Starting update batch on background update '%s'", update_name)
 
         update_handler = self._background_update_handlers[update_name]
@@ -405,9 +424,12 @@ class BackgroundUpdater(object):
         Returns:
             A deferred that completes once the task is removed.
         """
-        self._background_update_queue = [
-            name for name in self._background_update_queue if name != update_name
-        ]
+        if update_name != self._current_background_update:
+            raise Exception(
+                "Cannot end background update %s which isn't currently running"
+                % update_name
+            )
+        self._current_background_update = None
         return self.db.simple_delete_one(
             "background_updates", keyvalues={"update_name": update_name}
         )