diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index 7157fb1dfb..5fe1ca2de7 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -12,15 +12,17 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-import synapse.util.async
-from ._base import SQLBaseStore
-from . import engines
+import logging
+
+from canonicaljson import json
from twisted.internet import defer
-import ujson as json
-import logging
+from synapse.metrics.background_process_metrics import run_as_background_process
+
+from . import engines
+from ._base import SQLBaseStore
logger = logging.getLogger(__name__)
@@ -80,25 +82,30 @@ class BackgroundUpdateStore(SQLBaseStore):
BACKGROUND_UPDATE_INTERVAL_MS = 1000
BACKGROUND_UPDATE_DURATION_MS = 100
- def __init__(self, hs):
- super(BackgroundUpdateStore, self).__init__(hs)
+ def __init__(self, db_conn, hs):
+ super(BackgroundUpdateStore, self).__init__(db_conn, hs)
self._background_update_performance = {}
self._background_update_queue = []
self._background_update_handlers = {}
+ self._all_done = False
- @defer.inlineCallbacks
def start_doing_background_updates(self):
- logger.info("Starting background schema updates")
+ run_as_background_process(
+ "background_updates", self._run_background_updates,
+ )
+ @defer.inlineCallbacks
+ def _run_background_updates(self):
+ logger.info("Starting background schema updates")
while True:
- yield synapse.util.async.sleep(
+ yield self.hs.get_clock().sleep(
self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.)
try:
result = yield self.do_next_background_update(
self.BACKGROUND_UPDATE_DURATION_MS
)
- except:
+ except Exception:
logger.exception("Error doing update")
else:
if result is None:
@@ -106,9 +113,41 @@ class BackgroundUpdateStore(SQLBaseStore):
"No more background updates to do."
" Unscheduling background update task."
)
+ self._all_done = True
defer.returnValue(None)
@defer.inlineCallbacks
+ def has_completed_background_updates(self):
+ """Check if all the background updates have completed
+
+ Returns:
+ Deferred[bool]: True if all background updates have completed
+ """
+ # if we've previously determined that there is nothing left to do, that
+ # is easy
+ if self._all_done:
+ defer.returnValue(True)
+
+ # obviously, if we have things in our queue, we're not done.
+ if self._background_update_queue:
+ defer.returnValue(False)
+
+ # otherwise, check if there are updates to be run. This is important,
+ # as we may be running on a worker which doesn't perform the bg updates
+ # itself, but still wants to wait for them to happen.
+ updates = yield self._simple_select_onecol(
+ "background_updates",
+ keyvalues=None,
+ retcol="1",
+ desc="check_background_updates",
+ )
+ if not updates:
+ self._all_done = True
+ defer.returnValue(True)
+
+ defer.returnValue(False)
+
+ @defer.inlineCallbacks
def do_next_background_update(self, desired_duration_ms):
"""Does some amount of work on the next queued background update
@@ -209,6 +248,25 @@ class BackgroundUpdateStore(SQLBaseStore):
"""
self._background_update_handlers[update_name] = update_handler
+ def register_noop_background_update(self, update_name):
+ """Register a noop handler for a background update.
+
+ This is useful when we previously did a background update, but no
+ longer wish to do the update. In this case the background update should
+ be removed from the schema delta files, but there may still be some
+ users who have the background update queued, so this method should
+ also be called to clear the update.
+
+ Args:
+ update_name (str): Name of update
+ """
+ @defer.inlineCallbacks
+ def noop_update(progress, batch_size):
+ yield self._end_background_update(update_name)
+ defer.returnValue(1)
+
+ self.register_background_update_handler(update_name, noop_update)
+
def register_background_index_update(self, update_name, index_name,
table, columns, where_clause=None,
unique=False,
@@ -269,7 +327,7 @@ class BackgroundUpdateStore(SQLBaseStore):
# Sqlite doesn't support concurrent creation of indexes.
#
# We don't use partial indices on SQLite as it wasn't introduced
- # until 3.8, and wheezy has 3.7
+ # until 3.8, and wheezy and CentOS 7 have 3.7
#
# We assume that sqlite doesn't give us invalid indices; however
# we may still end up with the index existing but the
|