diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index 75951d0173..2771f7c3c1 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -88,10 +88,12 @@ class BackgroundUpdateStore(SQLBaseStore):
@defer.inlineCallbacks
def start_doing_background_updates(self):
- while True:
- if self._background_update_timer is not None:
- return
+ assert(self._background_update_timer is not None,
+ "background updates already running")
+
+ logger.info("Starting background schema updates")
+ while True:
sleep = defer.Deferred()
self._background_update_timer = self._clock.call_later(
self.BACKGROUND_UPDATE_INTERVAL_MS / 1000., sleep.callback, None
@@ -102,7 +104,7 @@ class BackgroundUpdateStore(SQLBaseStore):
self._background_update_timer = None
try:
- result = yield self.do_background_update(
+ result = yield self.do_next_background_update(
self.BACKGROUND_UPDATE_DURATION_MS
)
except:
@@ -113,11 +115,12 @@ class BackgroundUpdateStore(SQLBaseStore):
"No more background updates to do."
" Unscheduling background update task."
)
- return
+ defer.returnValue()
@defer.inlineCallbacks
- def do_background_update(self, desired_duration_ms):
- """Does some amount of work on a background update
+ def do_next_background_update(self, desired_duration_ms):
+ """Does some amount of work on the next queued background update
+
Args:
desired_duration_ms(float): How long we want to spend
updating.
@@ -136,11 +139,21 @@ class BackgroundUpdateStore(SQLBaseStore):
self._background_update_queue.append(update['update_name'])
if not self._background_update_queue:
+ # no work left to do
defer.returnValue(None)
+ # pop from the front, and add back to the back
update_name = self._background_update_queue.pop(0)
self._background_update_queue.append(update_name)
+ res = yield self._do_background_update(update_name, desired_duration_ms)
+ defer.returnValue(res)
+
+ @defer.inlineCallbacks
+ def _do_background_update(self, update_name, desired_duration_ms):
+ logger.info("Starting update batch on background update '%s'",
+ update_name)
+
update_handler = self._background_update_handlers[update_name]
performance = self._background_update_performance.get(update_name)
diff --git a/tests/storage/test_background_update.py b/tests/storage/test_background_update.py
index 6e4d9b1373..4944cb0d2e 100644
--- a/tests/storage/test_background_update.py
+++ b/tests/storage/test_background_update.py
@@ -10,7 +10,7 @@ class BackgroundUpdateTestCase(unittest.TestCase):
@defer.inlineCallbacks
def setUp(self):
- hs = yield setup_test_homeserver()
+ hs = yield setup_test_homeserver() # type: synapse.server.HomeServer
self.store = hs.get_datastore()
self.clock = hs.get_clock()
@@ -20,11 +20,20 @@ class BackgroundUpdateTestCase(unittest.TestCase):
"test_update", self.update_handler
)
+ # run the real background updates, to get them out the way
+ # (perhaps we should run them as part of the test HS setup, since we
+ # run all of the other schema setup stuff there?)
+ while True:
+ res = yield self.store.do_next_background_update(1000)
+ if res is None:
+ break
+
@defer.inlineCallbacks
def test_do_background_update(self):
desired_count = 1000
duration_ms = 42
+ # first step: make a bit of progress
@defer.inlineCallbacks
def update(progress, count):
self.clock.advance_time_msec(count * duration_ms)
@@ -42,7 +51,7 @@ class BackgroundUpdateTestCase(unittest.TestCase):
yield self.store.start_background_update("test_update", {"my_key": 1})
self.update_handler.reset_mock()
- result = yield self.store.do_background_update(
+ result = yield self.store.do_next_background_update(
duration_ms * desired_count
)
self.assertIsNotNone(result)
@@ -50,24 +59,25 @@ class BackgroundUpdateTestCase(unittest.TestCase):
{"my_key": 1}, self.store.DEFAULT_BACKGROUND_BATCH_SIZE
)
+ # second step: complete the update
@defer.inlineCallbacks
def update(progress, count):
yield self.store._end_background_update("test_update")
defer.returnValue(count)
self.update_handler.side_effect = update
-
self.update_handler.reset_mock()
- result = yield self.store.do_background_update(
- duration_ms * desired_count
+ result = yield self.store.do_next_background_update(
+ duration_ms * desired_count
)
self.assertIsNotNone(result)
self.update_handler.assert_called_once_with(
{"my_key": 2}, desired_count
)
+ # third step: we don't expect to be called any more
self.update_handler.reset_mock()
- result = yield self.store.do_background_update(
+ result = yield self.store.do_next_background_update(
duration_ms * desired_count
)
self.assertIsNone(result)
|