diff --git a/changelog.d/7190.misc b/changelog.d/7190.misc
new file mode 100644
index 0000000000..34348873f1
--- /dev/null
+++ b/changelog.d/7190.misc
@@ -0,0 +1 @@
+Only run one background database update at a time.
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index eb1a7e5002..59f3394b0a 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -90,8 +90,10 @@ class BackgroundUpdater(object):
self._clock = hs.get_clock()
self.db = database
+ # if a background update is currently running, its name.
+ self._current_background_update = None # type: Optional[str]
+
self._background_update_performance = {}
- self._background_update_queue = []
self._background_update_handlers = {}
self._all_done = False
@@ -111,7 +113,7 @@ class BackgroundUpdater(object):
except Exception:
logger.exception("Error doing update")
else:
- if result is None:
+ if result:
logger.info(
"No more background updates to do."
" Unscheduling background update task."
@@ -119,26 +121,25 @@ class BackgroundUpdater(object):
self._all_done = True
return None
- @defer.inlineCallbacks
- def has_completed_background_updates(self):
+ async def has_completed_background_updates(self) -> bool:
"""Check if all the background updates have completed
Returns:
- Deferred[bool]: True if all background updates have completed
+ True if all background updates have completed
"""
# if we've previously determined that there is nothing left to do, that
# is easy
if self._all_done:
return True
- # obviously, if we have things in our queue, we're not done.
- if self._background_update_queue:
+ # obviously, if we are currently processing an update, we're not done.
+ if self._current_background_update:
return False
# otherwise, check if there are updates to be run. This is important,
# as we may be running on a worker which doesn't perform the bg updates
# itself, but still wants to wait for them to happen.
- updates = yield self.db.simple_select_onecol(
+ updates = await self.db.simple_select_onecol(
"background_updates",
keyvalues=None,
retcol="1",
@@ -153,11 +154,10 @@ class BackgroundUpdater(object):
async def has_completed_background_update(self, update_name) -> bool:
"""Check if the given background update has finished running.
"""
-
if self._all_done:
return True
- if update_name in self._background_update_queue:
+ if update_name == self._current_background_update:
return False
update_exists = await self.db.simple_select_one_onecol(
@@ -170,9 +170,7 @@ class BackgroundUpdater(object):
return not update_exists
- async def do_next_background_update(
- self, desired_duration_ms: float
- ) -> Optional[int]:
+ async def do_next_background_update(self, desired_duration_ms: float) -> bool:
"""Does some amount of work on the next queued background update
Returns once some amount of work is done.
@@ -181,33 +179,51 @@ class BackgroundUpdater(object):
desired_duration_ms(float): How long we want to spend
updating.
Returns:
- None if there is no more work to do, otherwise an int
+ True if we have finished running all the background updates, otherwise False
"""
- if not self._background_update_queue:
- updates = await self.db.simple_select_list(
- "background_updates",
- keyvalues=None,
- retcols=("update_name", "depends_on"),
+
+ def get_background_updates_txn(txn):
+ txn.execute(
+ """
+ SELECT update_name, depends_on FROM background_updates
+ ORDER BY ordering, update_name
+ """
)
- in_flight = {update["update_name"] for update in updates}
- for update in updates:
- if update["depends_on"] not in in_flight:
- self._background_update_queue.append(update["update_name"])
+ return self.db.cursor_to_dict(txn)
- if not self._background_update_queue:
- # no work left to do
- return None
+ if not self._current_background_update:
+ all_pending_updates = await self.db.runInteraction(
+ "background_updates", get_background_updates_txn,
+ )
+ if not all_pending_updates:
+ # no work left to do
+ return True
+
+ # find the first update which isn't dependent on another one in the queue.
+ pending = {update["update_name"] for update in all_pending_updates}
+ for upd in all_pending_updates:
+ depends_on = upd["depends_on"]
+ if not depends_on or depends_on not in pending:
+ break
+ logger.info(
+ "Not starting on bg update %s until %s is done",
+ upd["update_name"],
+ depends_on,
+ )
+ else:
+ # if we get to the end of that for loop, there is a problem
+ raise Exception(
+ "Unable to find a background update which doesn't depend on "
+ "another: dependency cycle?"
+ )
- # pop from the front, and add back to the back
- update_name = self._background_update_queue.pop(0)
- self._background_update_queue.append(update_name)
+ self._current_background_update = upd["update_name"]
- res = await self._do_background_update(update_name, desired_duration_ms)
- return res
+ await self._do_background_update(desired_duration_ms)
+ return False
- async def _do_background_update(
- self, update_name: str, desired_duration_ms: float
- ) -> int:
+ async def _do_background_update(self, desired_duration_ms: float) -> int:
+ update_name = self._current_background_update
logger.info("Starting update batch on background update '%s'", update_name)
update_handler = self._background_update_handlers[update_name]
@@ -400,27 +416,6 @@ class BackgroundUpdater(object):
self.register_background_update_handler(update_name, updater)
- def start_background_update(self, update_name, progress):
- """Starts a background update running.
-
- Args:
- update_name: The update to set running.
- progress: The initial state of the progress of the update.
-
- Returns:
- A deferred that completes once the task has been added to the
- queue.
- """
- # Clear the background update queue so that we will pick up the new
- # task on the next iteration of do_background_update.
- self._background_update_queue = []
- progress_json = json.dumps(progress)
-
- return self.db.simple_insert(
- "background_updates",
- {"update_name": update_name, "progress_json": progress_json},
- )
-
def _end_background_update(self, update_name):
"""Removes a completed background update task from the queue.
@@ -429,9 +424,12 @@ class BackgroundUpdater(object):
Returns:
A deferred that completes once the task is removed.
"""
- self._background_update_queue = [
- name for name in self._background_update_queue if name != update_name
- ]
+ if update_name != self._current_background_update:
+ raise Exception(
+ "Cannot end background update %s which isn't currently running"
+ % update_name
+ )
+ self._current_background_update = None
return self.db.simple_delete_one(
"background_updates", keyvalues={"update_name": update_name}
)
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 6cb7d4b922..1712932f31 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -29,7 +29,7 @@ logger = logging.getLogger(__name__)
# Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 57
+SCHEMA_VERSION = 58
dir_path = os.path.abspath(os.path.dirname(__file__))
diff --git a/synapse/storage/schema/delta/58/00background_update_ordering.sql b/synapse/storage/schema/delta/58/00background_update_ordering.sql
new file mode 100644
index 0000000000..02dae587cc
--- /dev/null
+++ b/synapse/storage/schema/delta/58/00background_update_ordering.sql
@@ -0,0 +1,19 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* add an "ordering" column to background_updates, which can be used to sort them
+ to achieve some level of consistency. */
+
+ALTER TABLE background_updates ADD COLUMN ordering INT NOT NULL DEFAULT 0;
diff --git a/tests/storage/test_background_update.py b/tests/storage/test_background_update.py
index ae14fb407d..940b166129 100644
--- a/tests/storage/test_background_update.py
+++ b/tests/storage/test_background_update.py
@@ -11,7 +11,9 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
def prepare(self, reactor, clock, homeserver):
self.updates = self.hs.get_datastore().db.updates # type: BackgroundUpdater
# the base test class should have run the real bg updates for us
- self.assertTrue(self.updates.has_completed_background_updates())
+ self.assertTrue(
+ self.get_success(self.updates.has_completed_background_updates())
+ )
self.update_handler = Mock()
self.updates.register_background_update_handler(
@@ -25,12 +27,20 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
# the target runtime for each bg update
target_background_update_duration_ms = 50000
+ store = self.hs.get_datastore()
+ self.get_success(
+ store.db.simple_insert(
+ "background_updates",
+ values={"update_name": "test_update", "progress_json": '{"my_key": 1}'},
+ )
+ )
+
# first step: make a bit of progress
@defer.inlineCallbacks
def update(progress, count):
yield self.clock.sleep((count * duration_ms) / 1000)
progress = {"my_key": progress["my_key"] + 1}
- yield self.hs.get_datastore().db.runInteraction(
+ yield store.db.runInteraction(
"update_progress",
self.updates._background_update_progress_txn,
"test_update",
@@ -39,10 +49,6 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
return count
self.update_handler.side_effect = update
-
- self.get_success(
- self.updates.start_background_update("test_update", {"my_key": 1})
- )
self.update_handler.reset_mock()
res = self.get_success(
self.updates.do_next_background_update(
@@ -50,7 +56,7 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
),
by=0.1,
)
- self.assertIsNotNone(res)
+ self.assertFalse(res)
# on the first call, we should get run with the default background update size
self.update_handler.assert_called_once_with(
@@ -73,7 +79,7 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
result = self.get_success(
self.updates.do_next_background_update(target_background_update_duration_ms)
)
- self.assertIsNotNone(result)
+ self.assertFalse(result)
self.update_handler.assert_called_once()
# third step: we don't expect to be called any more
@@ -81,5 +87,5 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
result = self.get_success(
self.updates.do_next_background_update(target_background_update_duration_ms)
)
- self.assertIsNone(result)
+ self.assertTrue(result)
self.assertFalse(self.update_handler.called)
diff --git a/tests/unittest.py b/tests/unittest.py
index d0406ca2fd..27af5228fe 100644
--- a/tests/unittest.py
+++ b/tests/unittest.py
@@ -40,6 +40,7 @@ from synapse.http.server import JsonResource
from synapse.http.site import SynapseRequest, SynapseSite
from synapse.logging.context import (
SENTINEL_CONTEXT,
+ LoggingContext,
current_context,
set_current_context,
)
@@ -419,15 +420,17 @@ class HomeserverTestCase(TestCase):
config_obj.parse_config_dict(config, "", "")
kwargs["config"] = config_obj
+ async def run_bg_updates():
+ with LoggingContext("run_bg_updates", request="run_bg_updates-1"):
+ while not await stor.db.updates.has_completed_background_updates():
+ await stor.db.updates.do_next_background_update(1)
+
hs = setup_test_homeserver(self.addCleanup, *args, **kwargs)
stor = hs.get_datastore()
# Run the database background updates, when running against "master".
if hs.__class__.__name__ == "TestHomeServer":
- while not self.get_success(
- stor.db.updates.has_completed_background_updates()
- ):
- self.get_success(stor.db.updates.do_next_background_update(1))
+ self.get_success(run_bg_updates())
return hs
|