diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index 06955a0537..a9a13a2658 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -22,7 +22,6 @@ from twisted.internet import defer
from synapse.metrics.background_process_metrics import run_as_background_process
from . import engines
-from ._base import SQLBaseStore
logger = logging.getLogger(__name__)
@@ -74,7 +73,7 @@ class BackgroundUpdatePerformance(object):
return float(self.total_item_count) / float(self.total_duration_ms)
-class BackgroundUpdateStore(SQLBaseStore):
+class BackgroundUpdater(object):
""" Background updates are updates to the database that run in the
background. Each update processes a batch of data at once. We attempt to
limit the impact of each update by monitoring how long each batch takes to
@@ -86,8 +85,10 @@ class BackgroundUpdateStore(SQLBaseStore):
BACKGROUND_UPDATE_INTERVAL_MS = 1000
BACKGROUND_UPDATE_DURATION_MS = 100
- def __init__(self, db_conn, hs):
- super(BackgroundUpdateStore, self).__init__(db_conn, hs)
+ def __init__(self, hs, database):
+ self._clock = hs.get_clock()
+ self.db = database
+
self._background_update_performance = {}
self._background_update_queue = []
self._background_update_handlers = {}
@@ -101,9 +102,7 @@ class BackgroundUpdateStore(SQLBaseStore):
logger.info("Starting background schema updates")
while True:
if sleep:
- yield self.hs.get_clock().sleep(
- self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.0
- )
+ yield self._clock.sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.0)
try:
result = yield self.do_next_background_update(
@@ -139,7 +138,7 @@ class BackgroundUpdateStore(SQLBaseStore):
# otherwise, check if there are updates to be run. This is important,
# as we may be running on a worker which doesn't perform the bg updates
# itself, but still wants to wait for them to happen.
- updates = yield self.simple_select_onecol(
+ updates = yield self.db.simple_select_onecol(
"background_updates",
keyvalues=None,
retcol="1",
@@ -161,7 +160,7 @@ class BackgroundUpdateStore(SQLBaseStore):
if update_name in self._background_update_queue:
return False
- update_exists = await self.simple_select_one_onecol(
+ update_exists = await self.db.simple_select_one_onecol(
"background_updates",
keyvalues={"update_name": update_name},
retcol="1",
@@ -184,7 +183,7 @@ class BackgroundUpdateStore(SQLBaseStore):
no more work to do.
"""
if not self._background_update_queue:
- updates = yield self.simple_select_list(
+ updates = yield self.db.simple_select_list(
"background_updates",
keyvalues=None,
retcols=("update_name", "depends_on"),
@@ -226,7 +225,7 @@ class BackgroundUpdateStore(SQLBaseStore):
else:
batch_size = self.DEFAULT_BACKGROUND_BATCH_SIZE
- progress_json = yield self.simple_select_one_onecol(
+ progress_json = yield self.db.simple_select_one_onecol(
"background_updates",
keyvalues={"update_name": update_name},
retcol="progress_json",
@@ -380,7 +379,7 @@ class BackgroundUpdateStore(SQLBaseStore):
logger.debug("[SQL] %s", sql)
c.execute(sql)
- if isinstance(self.database_engine, engines.PostgresEngine):
+ if isinstance(self.db.database_engine, engines.PostgresEngine):
runner = create_index_psql
elif psql_only:
runner = None
@@ -391,7 +390,7 @@ class BackgroundUpdateStore(SQLBaseStore):
def updater(progress, batch_size):
if runner is not None:
logger.info("Adding index %s to %s", index_name, table)
- yield self.runWithConnection(runner)
+ yield self.db.runWithConnection(runner)
yield self._end_background_update(update_name)
return 1
@@ -413,7 +412,7 @@ class BackgroundUpdateStore(SQLBaseStore):
self._background_update_queue = []
progress_json = json.dumps(progress)
- return self.simple_insert(
+ return self.db.simple_insert(
"background_updates",
{"update_name": update_name, "progress_json": progress_json},
)
@@ -429,7 +428,7 @@ class BackgroundUpdateStore(SQLBaseStore):
self._background_update_queue = [
name for name in self._background_update_queue if name != update_name
]
- return self.simple_delete_one(
+ return self.db.simple_delete_one(
"background_updates", keyvalues={"update_name": update_name}
)
@@ -444,7 +443,7 @@ class BackgroundUpdateStore(SQLBaseStore):
progress_json = json.dumps(progress)
- self.simple_update_one_txn(
+ self.db.simple_update_one_txn(
txn,
"background_updates",
keyvalues={"update_name": update_name},
|