summary refs log tree commit diff
diff options
context:
space:
mode:
authorRichard van der Hoff <1389908+richvdh@users.noreply.github.com>2020-04-03 13:17:30 +0100
committerGitHub <noreply@github.com>2020-04-03 13:17:30 +0100
commitfd4c975b5bffb4eacf7fa07a4d6391fa1d3376c4 (patch)
tree758163e4bcdb4a6aa506c9e2ac303e8794a067a5
parentRemove some `run_in_background` calls in replication code (#7203) (diff)
parentUpdate docstring per review comments (diff)
downloadsynapse-fd4c975b5bffb4eacf7fa07a4d6391fa1d3376c4.tar.xz
Merge pull request #7190 from matrix-org/rav/one_bg_update_at_a_time
Only run one background update at a time
-rw-r--r--changelog.d/7190.misc1
-rw-r--r--synapse/storage/background_updates.py114
-rw-r--r--synapse/storage/prepare_database.py2
-rw-r--r--synapse/storage/schema/delta/58/00background_update_ordering.sql19
-rw-r--r--tests/storage/test_background_update.py24
-rw-r--r--tests/unittest.py11
6 files changed, 99 insertions, 72 deletions
diff --git a/changelog.d/7190.misc b/changelog.d/7190.misc
new file mode 100644
index 0000000000..34348873f1
--- /dev/null
+++ b/changelog.d/7190.misc
@@ -0,0 +1 @@
+Only run one background database update at a time.
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index eb1a7e5002..59f3394b0a 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -90,8 +90,10 @@ class BackgroundUpdater(object):
         self._clock = hs.get_clock()
         self.db = database
 
+        # if a background update is currently running, its name.
+        self._current_background_update = None  # type: Optional[str]
+
         self._background_update_performance = {}
-        self._background_update_queue = []
         self._background_update_handlers = {}
         self._all_done = False
 
@@ -111,7 +113,7 @@ class BackgroundUpdater(object):
             except Exception:
                 logger.exception("Error doing update")
             else:
-                if result is None:
+                if result:
                     logger.info(
                         "No more background updates to do."
                         " Unscheduling background update task."
@@ -119,26 +121,25 @@ class BackgroundUpdater(object):
                     self._all_done = True
                     return None
 
-    @defer.inlineCallbacks
-    def has_completed_background_updates(self):
+    async def has_completed_background_updates(self) -> bool:
         """Check if all the background updates have completed
 
         Returns:
-            Deferred[bool]: True if all background updates have completed
+            True if all background updates have completed
         """
         # if we've previously determined that there is nothing left to do, that
         # is easy
         if self._all_done:
             return True
 
-        # obviously, if we have things in our queue, we're not done.
-        if self._background_update_queue:
+        # obviously, if we are currently processing an update, we're not done.
+        if self._current_background_update:
             return False
 
         # otherwise, check if there are updates to be run. This is important,
         # as we may be running on a worker which doesn't perform the bg updates
         # itself, but still wants to wait for them to happen.
-        updates = yield self.db.simple_select_onecol(
+        updates = await self.db.simple_select_onecol(
             "background_updates",
             keyvalues=None,
             retcol="1",
@@ -153,11 +154,10 @@ class BackgroundUpdater(object):
     async def has_completed_background_update(self, update_name) -> bool:
         """Check if the given background update has finished running.
         """
-
         if self._all_done:
             return True
 
-        if update_name in self._background_update_queue:
+        if update_name == self._current_background_update:
             return False
 
         update_exists = await self.db.simple_select_one_onecol(
@@ -170,9 +170,7 @@ class BackgroundUpdater(object):
 
         return not update_exists
 
-    async def do_next_background_update(
-        self, desired_duration_ms: float
-    ) -> Optional[int]:
+    async def do_next_background_update(self, desired_duration_ms: float) -> bool:
         """Does some amount of work on the next queued background update
 
         Returns once some amount of work is done.
@@ -181,33 +179,51 @@ class BackgroundUpdater(object):
             desired_duration_ms(float): How long we want to spend
                 updating.
         Returns:
-            None if there is no more work to do, otherwise an int
+            True if we have finished running all the background updates, otherwise False
         """
-        if not self._background_update_queue:
-            updates = await self.db.simple_select_list(
-                "background_updates",
-                keyvalues=None,
-                retcols=("update_name", "depends_on"),
+
+        def get_background_updates_txn(txn):
+            txn.execute(
+                """
+                SELECT update_name, depends_on FROM background_updates
+                ORDER BY ordering, update_name
+                """
             )
-            in_flight = {update["update_name"] for update in updates}
-            for update in updates:
-                if update["depends_on"] not in in_flight:
-                    self._background_update_queue.append(update["update_name"])
+            return self.db.cursor_to_dict(txn)
 
-        if not self._background_update_queue:
-            # no work left to do
-            return None
+        if not self._current_background_update:
+            all_pending_updates = await self.db.runInteraction(
+                "background_updates", get_background_updates_txn,
+            )
+            if not all_pending_updates:
+                # no work left to do
+                return True
+
+            # find the first update which isn't dependent on another one in the queue.
+            pending = {update["update_name"] for update in all_pending_updates}
+            for upd in all_pending_updates:
+                depends_on = upd["depends_on"]
+                if not depends_on or depends_on not in pending:
+                    break
+                logger.info(
+                    "Not starting on bg update %s until %s is done",
+                    upd["update_name"],
+                    depends_on,
+                )
+            else:
+                # if we get to the end of that for loop, there is a problem
+                raise Exception(
+                    "Unable to find a background update which doesn't depend on "
+                    "another: dependency cycle?"
+                )
 
-        # pop from the front, and add back to the back
-        update_name = self._background_update_queue.pop(0)
-        self._background_update_queue.append(update_name)
+            self._current_background_update = upd["update_name"]
 
-        res = await self._do_background_update(update_name, desired_duration_ms)
-        return res
+        await self._do_background_update(desired_duration_ms)
+        return False
 
-    async def _do_background_update(
-        self, update_name: str, desired_duration_ms: float
-    ) -> int:
+    async def _do_background_update(self, desired_duration_ms: float) -> int:
+        update_name = self._current_background_update
         logger.info("Starting update batch on background update '%s'", update_name)
 
         update_handler = self._background_update_handlers[update_name]
@@ -400,27 +416,6 @@ class BackgroundUpdater(object):
 
         self.register_background_update_handler(update_name, updater)
 
-    def start_background_update(self, update_name, progress):
-        """Starts a background update running.
-
-        Args:
-            update_name: The update to set running.
-            progress: The initial state of the progress of the update.
-
-        Returns:
-            A deferred that completes once the task has been added to the
-            queue.
-        """
-        # Clear the background update queue so that we will pick up the new
-        # task on the next iteration of do_background_update.
-        self._background_update_queue = []
-        progress_json = json.dumps(progress)
-
-        return self.db.simple_insert(
-            "background_updates",
-            {"update_name": update_name, "progress_json": progress_json},
-        )
-
     def _end_background_update(self, update_name):
         """Removes a completed background update task from the queue.
 
@@ -429,9 +424,12 @@ class BackgroundUpdater(object):
         Returns:
             A deferred that completes once the task is removed.
         """
-        self._background_update_queue = [
-            name for name in self._background_update_queue if name != update_name
-        ]
+        if update_name != self._current_background_update:
+            raise Exception(
+                "Cannot end background update %s which isn't currently running"
+                % update_name
+            )
+        self._current_background_update = None
         return self.db.simple_delete_one(
             "background_updates", keyvalues={"update_name": update_name}
         )
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 6cb7d4b922..1712932f31 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -29,7 +29,7 @@ logger = logging.getLogger(__name__)
 
 # Remember to update this number every time a change is made to database
 # schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 57
+SCHEMA_VERSION = 58
 
 dir_path = os.path.abspath(os.path.dirname(__file__))
 
diff --git a/synapse/storage/schema/delta/58/00background_update_ordering.sql b/synapse/storage/schema/delta/58/00background_update_ordering.sql
new file mode 100644
index 0000000000..02dae587cc
--- /dev/null
+++ b/synapse/storage/schema/delta/58/00background_update_ordering.sql
@@ -0,0 +1,19 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* add an "ordering" column to background_updates, which can be used to sort them
+   to achieve some level of consistency. */
+
+ALTER TABLE background_updates ADD COLUMN ordering INT NOT NULL DEFAULT 0;
diff --git a/tests/storage/test_background_update.py b/tests/storage/test_background_update.py
index ae14fb407d..940b166129 100644
--- a/tests/storage/test_background_update.py
+++ b/tests/storage/test_background_update.py
@@ -11,7 +11,9 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
     def prepare(self, reactor, clock, homeserver):
         self.updates = self.hs.get_datastore().db.updates  # type: BackgroundUpdater
         # the base test class should have run the real bg updates for us
-        self.assertTrue(self.updates.has_completed_background_updates())
+        self.assertTrue(
+            self.get_success(self.updates.has_completed_background_updates())
+        )
 
         self.update_handler = Mock()
         self.updates.register_background_update_handler(
@@ -25,12 +27,20 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
         # the target runtime for each bg update
         target_background_update_duration_ms = 50000
 
+        store = self.hs.get_datastore()
+        self.get_success(
+            store.db.simple_insert(
+                "background_updates",
+                values={"update_name": "test_update", "progress_json": '{"my_key": 1}'},
+            )
+        )
+
         # first step: make a bit of progress
         @defer.inlineCallbacks
         def update(progress, count):
             yield self.clock.sleep((count * duration_ms) / 1000)
             progress = {"my_key": progress["my_key"] + 1}
-            yield self.hs.get_datastore().db.runInteraction(
+            yield store.db.runInteraction(
                 "update_progress",
                 self.updates._background_update_progress_txn,
                 "test_update",
@@ -39,10 +49,6 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
             return count
 
         self.update_handler.side_effect = update
-
-        self.get_success(
-            self.updates.start_background_update("test_update", {"my_key": 1})
-        )
         self.update_handler.reset_mock()
         res = self.get_success(
             self.updates.do_next_background_update(
@@ -50,7 +56,7 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
             ),
             by=0.1,
         )
-        self.assertIsNotNone(res)
+        self.assertFalse(res)
 
         # on the first call, we should get run with the default background update size
         self.update_handler.assert_called_once_with(
@@ -73,7 +79,7 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
         result = self.get_success(
             self.updates.do_next_background_update(target_background_update_duration_ms)
         )
-        self.assertIsNotNone(result)
+        self.assertFalse(result)
         self.update_handler.assert_called_once()
 
         # third step: we don't expect to be called any more
@@ -81,5 +87,5 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
         result = self.get_success(
             self.updates.do_next_background_update(target_background_update_duration_ms)
         )
-        self.assertIsNone(result)
+        self.assertTrue(result)
         self.assertFalse(self.update_handler.called)
diff --git a/tests/unittest.py b/tests/unittest.py
index d0406ca2fd..27af5228fe 100644
--- a/tests/unittest.py
+++ b/tests/unittest.py
@@ -40,6 +40,7 @@ from synapse.http.server import JsonResource
 from synapse.http.site import SynapseRequest, SynapseSite
 from synapse.logging.context import (
     SENTINEL_CONTEXT,
+    LoggingContext,
     current_context,
     set_current_context,
 )
@@ -419,15 +420,17 @@ class HomeserverTestCase(TestCase):
         config_obj.parse_config_dict(config, "", "")
         kwargs["config"] = config_obj
 
+        async def run_bg_updates():
+            with LoggingContext("run_bg_updates", request="run_bg_updates-1"):
+                while not await stor.db.updates.has_completed_background_updates():
+                    await stor.db.updates.do_next_background_update(1)
+
         hs = setup_test_homeserver(self.addCleanup, *args, **kwargs)
         stor = hs.get_datastore()
 
         # Run the database background updates, when running against "master".
         if hs.__class__.__name__ == "TestHomeServer":
-            while not self.get_success(
-                stor.db.updates.has_completed_background_updates()
-            ):
-                self.get_success(stor.db.updates.do_next_background_update(1))
+            self.get_success(run_bg_updates())
 
         return hs