summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2019-12-04 15:09:36 +0000
committerErik Johnston <erik@matrix.org>2019-12-05 11:11:26 +0000
commit4a33a6dd19590b8e6626a5af5a69507dc11236f8 (patch)
tree4d4f1f826a93afbd47e50562949a0e2f51ddb827
parentComments (diff)
downloadsynapse-4a33a6dd19590b8e6626a5af5a69507dc11236f8.tar.xz
Move background update handling out of store
-rw-r--r--synapse/app/homeserver.py2
-rw-r--r--synapse/rest/media/v1/preview_url_resource.py2
-rw-r--r--synapse/storage/background_updates.py15
-rw-r--r--synapse/storage/data_stores/main/client_ips.py36
-rw-r--r--synapse/storage/data_stores/main/deviceinbox.py9
-rw-r--r--synapse/storage/data_stores/main/devices.py15
-rw-r--r--synapse/storage/data_stores/main/event_federation.py6
-rw-r--r--synapse/storage/data_stores/main/event_push_actions.py4
-rw-r--r--synapse/storage/data_stores/main/events.py6
-rw-r--r--synapse/storage/data_stores/main/events_bg_updates.py49
-rw-r--r--synapse/storage/data_stores/main/media_repository.py6
-rw-r--r--synapse/storage/data_stores/main/registration.py21
-rw-r--r--synapse/storage/data_stores/main/room.py9
-rw-r--r--synapse/storage/data_stores/main/roommember.py27
-rw-r--r--synapse/storage/data_stores/main/search.py31
-rw-r--r--synapse/storage/data_stores/main/state.py19
-rw-r--r--synapse/storage/data_stores/main/stats.py20
-rw-r--r--synapse/storage/data_stores/main/user_directory.py33
-rw-r--r--synapse/storage/database.py3
-rw-r--r--synmark/__init__.py6
-rw-r--r--tests/handlers/test_stats.py50
-rw-r--r--tests/handlers/test_user_directory.py18
-rw-r--r--tests/storage/test_background_update.py26
-rw-r--r--tests/storage/test_cleanup_extrems.py14
-rw-r--r--tests/storage/test_client_ips.py26
-rw-r--r--tests/storage/test_roommember.py18
-rw-r--r--tests/unittest.py10
27 files changed, 281 insertions, 200 deletions
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 267aebaae9..9f81a857ab 100644
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -436,7 +436,7 @@ def setup(config_options):
             _base.start(hs, config.listeners)
 
             hs.get_pusherpool().start()
-            hs.get_datastore().start_doing_background_updates()
+            hs.get_datastore().db.updates.start_doing_background_updates()
         except Exception:
             # Print the exception and bail out.
             print("Error during startup:", file=sys.stderr)
diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py
index fb0d02aa83..6b978be876 100644
--- a/synapse/rest/media/v1/preview_url_resource.py
+++ b/synapse/rest/media/v1/preview_url_resource.py
@@ -402,7 +402,7 @@ class PreviewUrlResource(DirectServeResource):
 
         logger.info("Running url preview cache expiry")
 
-        if not (yield self.store.has_completed_background_updates()):
+        if not (yield self.store.db.updates.has_completed_background_updates()):
             logger.info("Still running DB updates; skipping expiry")
             return
 
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index dfca94b0e0..a9a13a2658 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -22,7 +22,6 @@ from twisted.internet import defer
 from synapse.metrics.background_process_metrics import run_as_background_process
 
 from . import engines
-from ._base import SQLBaseStore
 
 logger = logging.getLogger(__name__)
 
@@ -74,7 +73,7 @@ class BackgroundUpdatePerformance(object):
             return float(self.total_item_count) / float(self.total_duration_ms)
 
 
-class BackgroundUpdateStore(SQLBaseStore):
+class BackgroundUpdater(object):
     """ Background updates are updates to the database that run in the
     background. Each update processes a batch of data at once. We attempt to
     limit the impact of each update by monitoring how long each batch takes to
@@ -86,8 +85,10 @@ class BackgroundUpdateStore(SQLBaseStore):
     BACKGROUND_UPDATE_INTERVAL_MS = 1000
     BACKGROUND_UPDATE_DURATION_MS = 100
 
-    def __init__(self, db_conn, hs):
-        super(BackgroundUpdateStore, self).__init__(db_conn, hs)
+    def __init__(self, hs, database):
+        self._clock = hs.get_clock()
+        self.db = database
+
         self._background_update_performance = {}
         self._background_update_queue = []
         self._background_update_handlers = {}
@@ -101,9 +102,7 @@ class BackgroundUpdateStore(SQLBaseStore):
         logger.info("Starting background schema updates")
         while True:
             if sleep:
-                yield self.hs.get_clock().sleep(
-                    self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.0
-                )
+                yield self._clock.sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.0)
 
             try:
                 result = yield self.do_next_background_update(
@@ -380,7 +379,7 @@ class BackgroundUpdateStore(SQLBaseStore):
             logger.debug("[SQL] %s", sql)
             c.execute(sql)
 
-        if isinstance(self.database_engine, engines.PostgresEngine):
+        if isinstance(self.db.database_engine, engines.PostgresEngine):
             runner = create_index_psql
         elif psql_only:
             runner = None
diff --git a/synapse/storage/data_stores/main/client_ips.py b/synapse/storage/data_stores/main/client_ips.py
index 6f2a720b97..7b470a58f1 100644
--- a/synapse/storage/data_stores/main/client_ips.py
+++ b/synapse/storage/data_stores/main/client_ips.py
@@ -20,7 +20,7 @@ from six import iteritems
 from twisted.internet import defer
 
 from synapse.metrics.background_process_metrics import wrap_as_background_process
-from synapse.storage import background_updates
+from synapse.storage._base import SQLBaseStore
 from synapse.util.caches import CACHE_SIZE_FACTOR
 from synapse.util.caches.descriptors import Cache
 
@@ -32,41 +32,41 @@ logger = logging.getLogger(__name__)
 LAST_SEEN_GRANULARITY = 120 * 1000
 
 
-class ClientIpBackgroundUpdateStore(background_updates.BackgroundUpdateStore):
+class ClientIpBackgroundUpdateStore(SQLBaseStore):
     def __init__(self, db_conn, hs):
         super(ClientIpBackgroundUpdateStore, self).__init__(db_conn, hs)
 
-        self.register_background_index_update(
+        self.db.updates.register_background_index_update(
             "user_ips_device_index",
             index_name="user_ips_device_id",
             table="user_ips",
             columns=["user_id", "device_id", "last_seen"],
         )
 
-        self.register_background_index_update(
+        self.db.updates.register_background_index_update(
             "user_ips_last_seen_index",
             index_name="user_ips_last_seen",
             table="user_ips",
             columns=["user_id", "last_seen"],
         )
 
-        self.register_background_index_update(
+        self.db.updates.register_background_index_update(
             "user_ips_last_seen_only_index",
             index_name="user_ips_last_seen_only",
             table="user_ips",
             columns=["last_seen"],
         )
 
-        self.register_background_update_handler(
+        self.db.updates.register_background_update_handler(
             "user_ips_analyze", self._analyze_user_ip
         )
 
-        self.register_background_update_handler(
+        self.db.updates.register_background_update_handler(
             "user_ips_remove_dupes", self._remove_user_ip_dupes
         )
 
         # Register a unique index
-        self.register_background_index_update(
+        self.db.updates.register_background_index_update(
             "user_ips_device_unique_index",
             index_name="user_ips_user_token_ip_unique_index",
             table="user_ips",
@@ -75,12 +75,12 @@ class ClientIpBackgroundUpdateStore(background_updates.BackgroundUpdateStore):
         )
 
         # Drop the old non-unique index
-        self.register_background_update_handler(
+        self.db.updates.register_background_update_handler(
             "user_ips_drop_nonunique_index", self._remove_user_ip_nonunique
         )
 
         # Update the last seen info in devices.
-        self.register_background_update_handler(
+        self.db.updates.register_background_update_handler(
             "devices_last_seen", self._devices_last_seen_update
         )
 
@@ -92,7 +92,7 @@ class ClientIpBackgroundUpdateStore(background_updates.BackgroundUpdateStore):
             txn.close()
 
         yield self.db.runWithConnection(f)
-        yield self._end_background_update("user_ips_drop_nonunique_index")
+        yield self.db.updates._end_background_update("user_ips_drop_nonunique_index")
         return 1
 
     @defer.inlineCallbacks
@@ -108,7 +108,7 @@ class ClientIpBackgroundUpdateStore(background_updates.BackgroundUpdateStore):
 
         yield self.db.runInteraction("user_ips_analyze", user_ips_analyze)
 
-        yield self._end_background_update("user_ips_analyze")
+        yield self.db.updates._end_background_update("user_ips_analyze")
 
         return 1
 
@@ -271,14 +271,14 @@ class ClientIpBackgroundUpdateStore(background_updates.BackgroundUpdateStore):
                     (user_id, access_token, ip, device_id, user_agent, last_seen),
                 )
 
-            self._background_update_progress_txn(
+            self.db.updates._background_update_progress_txn(
                 txn, "user_ips_remove_dupes", {"last_seen": end_last_seen}
             )
 
         yield self.db.runInteraction("user_ips_dups_remove", remove)
 
         if last:
-            yield self._end_background_update("user_ips_remove_dupes")
+            yield self.db.updates._end_background_update("user_ips_remove_dupes")
 
         return batch_size
 
@@ -344,7 +344,7 @@ class ClientIpBackgroundUpdateStore(background_updates.BackgroundUpdateStore):
             txn.execute_batch(sql, rows)
 
             _, _, _, user_id, device_id = rows[-1]
-            self._background_update_progress_txn(
+            self.db.updates._background_update_progress_txn(
                 txn,
                 "devices_last_seen",
                 {"last_user_id": user_id, "last_device_id": device_id},
@@ -357,7 +357,7 @@ class ClientIpBackgroundUpdateStore(background_updates.BackgroundUpdateStore):
         )
 
         if not updated:
-            yield self._end_background_update("devices_last_seen")
+            yield self.db.updates._end_background_update("devices_last_seen")
 
         return updated
 
@@ -546,7 +546,9 @@ class ClientIpStore(ClientIpBackgroundUpdateStore):
             # Nothing to do
             return
 
-        if not await self.has_completed_background_update("devices_last_seen"):
+        if not await self.db.updates.has_completed_background_update(
+            "devices_last_seen"
+        ):
             # Only start pruning if we have finished populating the devices
             # last seen info.
             return
diff --git a/synapse/storage/data_stores/main/deviceinbox.py b/synapse/storage/data_stores/main/deviceinbox.py
index 440793ad49..3c9f09301a 100644
--- a/synapse/storage/data_stores/main/deviceinbox.py
+++ b/synapse/storage/data_stores/main/deviceinbox.py
@@ -21,7 +21,6 @@ from twisted.internet import defer
 
 from synapse.logging.opentracing import log_kv, set_tag, trace
 from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
-from synapse.storage.background_updates import BackgroundUpdateStore
 from synapse.util.caches.expiringcache import ExpiringCache
 
 logger = logging.getLogger(__name__)
@@ -208,20 +207,20 @@ class DeviceInboxWorkerStore(SQLBaseStore):
         )
 
 
-class DeviceInboxBackgroundUpdateStore(BackgroundUpdateStore):
+class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
     DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"
 
     def __init__(self, db_conn, hs):
         super(DeviceInboxBackgroundUpdateStore, self).__init__(db_conn, hs)
 
-        self.register_background_index_update(
+        self.db.updates.register_background_index_update(
             "device_inbox_stream_index",
             index_name="device_inbox_stream_id_user_id",
             table="device_inbox",
             columns=["stream_id", "user_id"],
         )
 
-        self.register_background_update_handler(
+        self.db.updates.register_background_update_handler(
             self.DEVICE_INBOX_STREAM_ID, self._background_drop_index_device_inbox
         )
 
@@ -234,7 +233,7 @@ class DeviceInboxBackgroundUpdateStore(BackgroundUpdateStore):
 
         yield self.db.runWithConnection(reindex_txn)
 
-        yield self._end_background_update(self.DEVICE_INBOX_STREAM_ID)
+        yield self.db.updates._end_background_update(self.DEVICE_INBOX_STREAM_ID)
 
         return 1
 
diff --git a/synapse/storage/data_stores/main/devices.py b/synapse/storage/data_stores/main/devices.py
index d98511ddd4..91ddaf137e 100644
--- a/synapse/storage/data_stores/main/devices.py
+++ b/synapse/storage/data_stores/main/devices.py
@@ -31,7 +31,6 @@ from synapse.logging.opentracing import (
 )
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
-from synapse.storage.background_updates import BackgroundUpdateStore
 from synapse.types import get_verify_key_from_cross_signing_key
 from synapse.util import batch_iter
 from synapse.util.caches.descriptors import (
@@ -642,11 +641,11 @@ class DeviceWorkerStore(SQLBaseStore):
         return results
 
 
-class DeviceBackgroundUpdateStore(BackgroundUpdateStore):
+class DeviceBackgroundUpdateStore(SQLBaseStore):
     def __init__(self, db_conn, hs):
         super(DeviceBackgroundUpdateStore, self).__init__(db_conn, hs)
 
-        self.register_background_index_update(
+        self.db.updates.register_background_index_update(
             "device_lists_stream_idx",
             index_name="device_lists_stream_user_id",
             table="device_lists_stream",
@@ -654,7 +653,7 @@ class DeviceBackgroundUpdateStore(BackgroundUpdateStore):
         )
 
         # create a unique index on device_lists_remote_cache
-        self.register_background_index_update(
+        self.db.updates.register_background_index_update(
             "device_lists_remote_cache_unique_idx",
             index_name="device_lists_remote_cache_unique_id",
             table="device_lists_remote_cache",
@@ -663,7 +662,7 @@ class DeviceBackgroundUpdateStore(BackgroundUpdateStore):
         )
 
         # And one on device_lists_remote_extremeties
-        self.register_background_index_update(
+        self.db.updates.register_background_index_update(
             "device_lists_remote_extremeties_unique_idx",
             index_name="device_lists_remote_extremeties_unique_idx",
             table="device_lists_remote_extremeties",
@@ -672,7 +671,7 @@ class DeviceBackgroundUpdateStore(BackgroundUpdateStore):
         )
 
         # once they complete, we can remove the old non-unique indexes.
-        self.register_background_update_handler(
+        self.db.updates.register_background_update_handler(
             DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES,
             self._drop_device_list_streams_non_unique_indexes,
         )
@@ -686,7 +685,9 @@ class DeviceBackgroundUpdateStore(BackgroundUpdateStore):
             txn.close()
 
         yield self.db.runWithConnection(f)
-        yield self._end_background_update(DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES)
+        yield self.db.updates._end_background_update(
+            DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES
+        )
         return 1
 
 
diff --git a/synapse/storage/data_stores/main/event_federation.py b/synapse/storage/data_stores/main/event_federation.py
index 77e4353b59..31d2e8eb28 100644
--- a/synapse/storage/data_stores/main/event_federation.py
+++ b/synapse/storage/data_stores/main/event_federation.py
@@ -494,7 +494,7 @@ class EventFederationStore(EventFederationWorkerStore):
     def __init__(self, db_conn, hs):
         super(EventFederationStore, self).__init__(db_conn, hs)
 
-        self.register_background_update_handler(
+        self.db.updates.register_background_update_handler(
             self.EVENT_AUTH_STATE_ONLY, self._background_delete_non_state_event_auth
         )
 
@@ -654,7 +654,7 @@ class EventFederationStore(EventFederationWorkerStore):
                 "max_stream_id_exclusive": min_stream_id,
             }
 
-            self._background_update_progress_txn(
+            self.db.updates._background_update_progress_txn(
                 txn, self.EVENT_AUTH_STATE_ONLY, new_progress
             )
 
@@ -665,6 +665,6 @@ class EventFederationStore(EventFederationWorkerStore):
         )
 
         if not result:
-            yield self._end_background_update(self.EVENT_AUTH_STATE_ONLY)
+            yield self.db.updates._end_background_update(self.EVENT_AUTH_STATE_ONLY)
 
         return batch_size
diff --git a/synapse/storage/data_stores/main/event_push_actions.py b/synapse/storage/data_stores/main/event_push_actions.py
index 725d0881dc..eec054cd48 100644
--- a/synapse/storage/data_stores/main/event_push_actions.py
+++ b/synapse/storage/data_stores/main/event_push_actions.py
@@ -614,14 +614,14 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
     def __init__(self, db_conn, hs):
         super(EventPushActionsStore, self).__init__(db_conn, hs)
 
-        self.register_background_index_update(
+        self.db.updates.register_background_index_update(
             self.EPA_HIGHLIGHT_INDEX,
             index_name="event_push_actions_u_highlight",
             table="event_push_actions",
             columns=["user_id", "stream_ordering"],
         )
 
-        self.register_background_index_update(
+        self.db.updates.register_background_index_update(
             "event_push_actions_highlights_index",
             index_name="event_push_actions_highlights_index",
             table="event_push_actions",
diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py
index 01ec9ec397..d644c82784 100644
--- a/synapse/storage/data_stores/main/events.py
+++ b/synapse/storage/data_stores/main/events.py
@@ -38,7 +38,6 @@ from synapse.logging.utils import log_function
 from synapse.metrics import BucketCollector
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage._base import make_in_list_sql_clause
-from synapse.storage.background_updates import BackgroundUpdateStore
 from synapse.storage.data_stores.main.event_federation import EventFederationStore
 from synapse.storage.data_stores.main.events_worker import EventsWorkerStore
 from synapse.storage.data_stores.main.state import StateGroupWorkerStore
@@ -94,10 +93,7 @@ def _retry_on_integrity_error(func):
 # inherits from EventFederationStore so that we can call _update_backward_extremities
 # and _handle_mult_prev_events (though arguably those could both be moved in here)
 class EventsStore(
-    StateGroupWorkerStore,
-    EventFederationStore,
-    EventsWorkerStore,
-    BackgroundUpdateStore,
+    StateGroupWorkerStore, EventFederationStore, EventsWorkerStore,
 ):
     def __init__(self, db_conn, hs):
         super(EventsStore, self).__init__(db_conn, hs)
diff --git a/synapse/storage/data_stores/main/events_bg_updates.py b/synapse/storage/data_stores/main/events_bg_updates.py
index 365e966956..cb1fc30c31 100644
--- a/synapse/storage/data_stores/main/events_bg_updates.py
+++ b/synapse/storage/data_stores/main/events_bg_updates.py
@@ -22,13 +22,12 @@ from canonicaljson import json
 from twisted.internet import defer
 
 from synapse.api.constants import EventContentFields
-from synapse.storage._base import make_in_list_sql_clause
-from synapse.storage.background_updates import BackgroundUpdateStore
+from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
 
 logger = logging.getLogger(__name__)
 
 
-class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
+class EventsBackgroundUpdatesStore(SQLBaseStore):
 
     EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
     EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
@@ -37,15 +36,15 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
     def __init__(self, db_conn, hs):
         super(EventsBackgroundUpdatesStore, self).__init__(db_conn, hs)
 
-        self.register_background_update_handler(
+        self.db.updates.register_background_update_handler(
             self.EVENT_ORIGIN_SERVER_TS_NAME, self._background_reindex_origin_server_ts
         )
-        self.register_background_update_handler(
+        self.db.updates.register_background_update_handler(
             self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME,
             self._background_reindex_fields_sender,
         )
 
-        self.register_background_index_update(
+        self.db.updates.register_background_index_update(
             "event_contains_url_index",
             index_name="event_contains_url_index",
             table="events",
@@ -56,7 +55,7 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
         # an event_id index on event_search is useful for the purge_history
         # api. Plus it means we get to enforce some integrity with a UNIQUE
         # clause
-        self.register_background_index_update(
+        self.db.updates.register_background_index_update(
             "event_search_event_id_idx",
             index_name="event_search_event_id_idx",
             table="event_search",
@@ -65,16 +64,16 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
             psql_only=True,
         )
 
-        self.register_background_update_handler(
+        self.db.updates.register_background_update_handler(
             self.DELETE_SOFT_FAILED_EXTREMITIES, self._cleanup_extremities_bg_update
         )
 
-        self.register_background_update_handler(
+        self.db.updates.register_background_update_handler(
             "redactions_received_ts", self._redactions_received_ts
         )
 
         # This index gets deleted in `event_fix_redactions_bytes` update
-        self.register_background_index_update(
+        self.db.updates.register_background_index_update(
             "event_fix_redactions_bytes_create_index",
             index_name="redactions_censored_redacts",
             table="redactions",
@@ -82,11 +81,11 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
             where_clause="have_censored",
         )
 
-        self.register_background_update_handler(
+        self.db.updates.register_background_update_handler(
             "event_fix_redactions_bytes", self._event_fix_redactions_bytes
         )
 
-        self.register_background_update_handler(
+        self.db.updates.register_background_update_handler(
             "event_store_labels", self._event_store_labels
         )
 
@@ -145,7 +144,7 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
                 "rows_inserted": rows_inserted + len(rows),
             }
 
-            self._background_update_progress_txn(
+            self.db.updates._background_update_progress_txn(
                 txn, self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME, progress
             )
 
@@ -156,7 +155,9 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
         )
 
         if not result:
-            yield self._end_background_update(self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME)
+            yield self.db.updates._end_background_update(
+                self.EVENT_FIELDS_SENDER_URL_UPDATE_NAME
+            )
 
         return result
 
@@ -222,7 +223,7 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
                 "rows_inserted": rows_inserted + len(rows_to_update),
             }
 
-            self._background_update_progress_txn(
+            self.db.updates._background_update_progress_txn(
                 txn, self.EVENT_ORIGIN_SERVER_TS_NAME, progress
             )
 
@@ -233,7 +234,9 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
         )
 
         if not result:
-            yield self._end_background_update(self.EVENT_ORIGIN_SERVER_TS_NAME)
+            yield self.db.updates._end_background_update(
+                self.EVENT_ORIGIN_SERVER_TS_NAME
+            )
 
         return result
 
@@ -411,7 +414,9 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
         )
 
         if not num_handled:
-            yield self._end_background_update(self.DELETE_SOFT_FAILED_EXTREMITIES)
+            yield self.db.updates._end_background_update(
+                self.DELETE_SOFT_FAILED_EXTREMITIES
+            )
 
             def _drop_table_txn(txn):
                 txn.execute("DROP TABLE _extremities_to_check")
@@ -464,7 +469,7 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
 
             txn.execute(sql, (self._clock.time_msec(), last_event_id, upper_event_id))
 
-            self._background_update_progress_txn(
+            self.db.updates._background_update_progress_txn(
                 txn, "redactions_received_ts", {"last_event_id": upper_event_id}
             )
 
@@ -475,7 +480,7 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
         )
 
         if not count:
-            yield self._end_background_update("redactions_received_ts")
+            yield self.db.updates._end_background_update("redactions_received_ts")
 
         return count
 
@@ -505,7 +510,7 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
             "_event_fix_redactions_bytes", _event_fix_redactions_bytes_txn
         )
 
-        yield self._end_background_update("event_fix_redactions_bytes")
+        yield self.db.updates._end_background_update("event_fix_redactions_bytes")
 
         return 1
 
@@ -559,7 +564,7 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
                 nbrows += 1
                 last_row_event_id = event_id
 
-            self._background_update_progress_txn(
+            self.db.updates._background_update_progress_txn(
                 txn, "event_store_labels", {"last_event_id": last_row_event_id}
             )
 
@@ -570,6 +575,6 @@ class EventsBackgroundUpdatesStore(BackgroundUpdateStore):
         )
 
         if not num_rows:
-            yield self._end_background_update("event_store_labels")
+            yield self.db.updates._end_background_update("event_store_labels")
 
         return num_rows
diff --git a/synapse/storage/data_stores/main/media_repository.py b/synapse/storage/data_stores/main/media_repository.py
index ea02497784..03c9c6f8ae 100644
--- a/synapse/storage/data_stores/main/media_repository.py
+++ b/synapse/storage/data_stores/main/media_repository.py
@@ -12,14 +12,14 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from synapse.storage.background_updates import BackgroundUpdateStore
+from synapse.storage._base import SQLBaseStore
 
 
-class MediaRepositoryBackgroundUpdateStore(BackgroundUpdateStore):
+class MediaRepositoryBackgroundUpdateStore(SQLBaseStore):
     def __init__(self, db_conn, hs):
         super(MediaRepositoryBackgroundUpdateStore, self).__init__(db_conn, hs)
 
-        self.register_background_index_update(
+        self.db.updates.register_background_index_update(
             update_name="local_media_repository_url_idx",
             index_name="local_media_repository_url_idx",
             table="local_media_repository",
diff --git a/synapse/storage/data_stores/main/registration.py b/synapse/storage/data_stores/main/registration.py
index 8f9aa87ceb..1ef143c6d8 100644
--- a/synapse/storage/data_stores/main/registration.py
+++ b/synapse/storage/data_stores/main/registration.py
@@ -26,7 +26,6 @@ from twisted.internet.defer import Deferred
 from synapse.api.constants import UserTypes
 from synapse.api.errors import Codes, StoreError, SynapseError, ThreepidValidationError
 from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.storage import background_updates
 from synapse.storage._base import SQLBaseStore
 from synapse.types import UserID
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
@@ -794,23 +793,21 @@ class RegistrationWorkerStore(SQLBaseStore):
         )
 
 
-class RegistrationBackgroundUpdateStore(
-    RegistrationWorkerStore, background_updates.BackgroundUpdateStore
-):
+class RegistrationBackgroundUpdateStore(RegistrationWorkerStore):
     def __init__(self, db_conn, hs):
         super(RegistrationBackgroundUpdateStore, self).__init__(db_conn, hs)
 
         self.clock = hs.get_clock()
         self.config = hs.config
 
-        self.register_background_index_update(
+        self.db.updates.register_background_index_update(
             "access_tokens_device_index",
             index_name="access_tokens_device_id",
             table="access_tokens",
             columns=["user_id", "device_id"],
         )
 
-        self.register_background_index_update(
+        self.db.updates.register_background_index_update(
             "users_creation_ts",
             index_name="users_creation_ts",
             table="users",
@@ -820,13 +817,13 @@ class RegistrationBackgroundUpdateStore(
         # we no longer use refresh tokens, but it's possible that some people
         # might have a background update queued to build this index. Just
         # clear the background update.
-        self.register_noop_background_update("refresh_tokens_device_index")
+        self.db.updates.register_noop_background_update("refresh_tokens_device_index")
 
-        self.register_background_update_handler(
+        self.db.updates.register_background_update_handler(
             "user_threepids_grandfather", self._bg_user_threepids_grandfather
         )
 
-        self.register_background_update_handler(
+        self.db.updates.register_background_update_handler(
             "users_set_deactivated_flag", self._background_update_set_deactivated_flag
         )
 
@@ -873,7 +870,7 @@ class RegistrationBackgroundUpdateStore(
 
             logger.info("Marked %d rows as deactivated", rows_processed_nb)
 
-            self._background_update_progress_txn(
+            self.db.updates._background_update_progress_txn(
                 txn, "users_set_deactivated_flag", {"user_id": rows[-1]["name"]}
             )
 
@@ -887,7 +884,7 @@ class RegistrationBackgroundUpdateStore(
         )
 
         if end:
-            yield self._end_background_update("users_set_deactivated_flag")
+            yield self.db.updates._end_background_update("users_set_deactivated_flag")
 
         return nb_processed
 
@@ -917,7 +914,7 @@ class RegistrationBackgroundUpdateStore(
                 "_bg_user_threepids_grandfather", _bg_user_threepids_grandfather_txn
             )
 
-        yield self._end_background_update("user_threepids_grandfather")
+        yield self.db.updates._end_background_update("user_threepids_grandfather")
 
         return 1
 
diff --git a/synapse/storage/data_stores/main/room.py b/synapse/storage/data_stores/main/room.py
index a26ed47afc..da42dae243 100644
--- a/synapse/storage/data_stores/main/room.py
+++ b/synapse/storage/data_stores/main/room.py
@@ -28,7 +28,6 @@ from twisted.internet import defer
 from synapse.api.constants import EventTypes
 from synapse.api.errors import StoreError
 from synapse.storage._base import SQLBaseStore
-from synapse.storage.background_updates import BackgroundUpdateStore
 from synapse.storage.data_stores.main.search import SearchStore
 from synapse.types import ThirdPartyInstanceID
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
@@ -361,13 +360,13 @@ class RoomWorkerStore(SQLBaseStore):
         defer.returnValue(row)
 
 
-class RoomBackgroundUpdateStore(BackgroundUpdateStore):
+class RoomBackgroundUpdateStore(SQLBaseStore):
     def __init__(self, db_conn, hs):
         super(RoomBackgroundUpdateStore, self).__init__(db_conn, hs)
 
         self.config = hs.config
 
-        self.register_background_update_handler(
+        self.db.updates.register_background_update_handler(
             "insert_room_retention", self._background_insert_retention,
         )
 
@@ -421,7 +420,7 @@ class RoomBackgroundUpdateStore(BackgroundUpdateStore):
 
             logger.info("Inserted %d rows into room_retention", len(rows))
 
-            self._background_update_progress_txn(
+            self.db.updates._background_update_progress_txn(
                 txn, "insert_room_retention", {"room_id": rows[-1]["room_id"]}
             )
 
@@ -435,7 +434,7 @@ class RoomBackgroundUpdateStore(BackgroundUpdateStore):
         )
 
         if end:
-            yield self._end_background_update("insert_room_retention")
+            yield self.db.updates._end_background_update("insert_room_retention")
 
         defer.returnValue(batch_size)
 
diff --git a/synapse/storage/data_stores/main/roommember.py b/synapse/storage/data_stores/main/roommember.py
index 7f4d02b25b..929f6b0d39 100644
--- a/synapse/storage/data_stores/main/roommember.py
+++ b/synapse/storage/data_stores/main/roommember.py
@@ -26,8 +26,11 @@ from twisted.internet import defer
 from synapse.api.constants import EventTypes, Membership
 from synapse.metrics import LaterGauge
 from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.storage._base import LoggingTransaction, make_in_list_sql_clause
-from synapse.storage.background_updates import BackgroundUpdateStore
+from synapse.storage._base import (
+    LoggingTransaction,
+    SQLBaseStore,
+    make_in_list_sql_clause,
+)
 from synapse.storage.data_stores.main.events_worker import EventsWorkerStore
 from synapse.storage.engines import Sqlite3Engine
 from synapse.storage.roommember import (
@@ -831,17 +834,17 @@ class RoomMemberWorkerStore(EventsWorkerStore):
         )
 
 
-class RoomMemberBackgroundUpdateStore(BackgroundUpdateStore):
+class RoomMemberBackgroundUpdateStore(SQLBaseStore):
     def __init__(self, db_conn, hs):
         super(RoomMemberBackgroundUpdateStore, self).__init__(db_conn, hs)
-        self.register_background_update_handler(
+        self.db.updates.register_background_update_handler(
             _MEMBERSHIP_PROFILE_UPDATE_NAME, self._background_add_membership_profile
         )
-        self.register_background_update_handler(
+        self.db.updates.register_background_update_handler(
             _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME,
             self._background_current_state_membership,
         )
-        self.register_background_index_update(
+        self.db.updates.register_background_index_update(
             "room_membership_forgotten_idx",
             index_name="room_memberships_user_room_forgotten",
             table="room_memberships",
@@ -909,7 +912,7 @@ class RoomMemberBackgroundUpdateStore(BackgroundUpdateStore):
                 "max_stream_id_exclusive": min_stream_id,
             }
 
-            self._background_update_progress_txn(
+            self.db.updates._background_update_progress_txn(
                 txn, _MEMBERSHIP_PROFILE_UPDATE_NAME, progress
             )
 
@@ -920,7 +923,9 @@ class RoomMemberBackgroundUpdateStore(BackgroundUpdateStore):
         )
 
         if not result:
-            yield self._end_background_update(_MEMBERSHIP_PROFILE_UPDATE_NAME)
+            yield self.db.updates._end_background_update(
+                _MEMBERSHIP_PROFILE_UPDATE_NAME
+            )
 
         return result
 
@@ -959,7 +964,7 @@ class RoomMemberBackgroundUpdateStore(BackgroundUpdateStore):
 
                 last_processed_room = next_room
 
-            self._background_update_progress_txn(
+            self.db.updates._background_update_progress_txn(
                 txn,
                 _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME,
                 {"last_processed_room": last_processed_room},
@@ -978,7 +983,9 @@ class RoomMemberBackgroundUpdateStore(BackgroundUpdateStore):
         )
 
         if finished:
-            yield self._end_background_update(_CURRENT_STATE_MEMBERSHIP_UPDATE_NAME)
+            yield self.db.updates._end_background_update(
+                _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME
+            )
 
         return row_count
 
diff --git a/synapse/storage/data_stores/main/search.py b/synapse/storage/data_stores/main/search.py
index 55a604850e..ffa1817e64 100644
--- a/synapse/storage/data_stores/main/search.py
+++ b/synapse/storage/data_stores/main/search.py
@@ -24,8 +24,7 @@ from canonicaljson import json
 from twisted.internet import defer
 
 from synapse.api.errors import SynapseError
-from synapse.storage._base import make_in_list_sql_clause
-from synapse.storage.background_updates import BackgroundUpdateStore
+from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
 from synapse.storage.engines import PostgresEngine, Sqlite3Engine
 
 logger = logging.getLogger(__name__)
@@ -36,7 +35,7 @@ SearchEntry = namedtuple(
 )
 
 
-class SearchBackgroundUpdateStore(BackgroundUpdateStore):
+class SearchBackgroundUpdateStore(SQLBaseStore):
 
     EVENT_SEARCH_UPDATE_NAME = "event_search"
     EVENT_SEARCH_ORDER_UPDATE_NAME = "event_search_order"
@@ -49,10 +48,10 @@ class SearchBackgroundUpdateStore(BackgroundUpdateStore):
         if not hs.config.enable_search:
             return
 
-        self.register_background_update_handler(
+        self.db.updates.register_background_update_handler(
             self.EVENT_SEARCH_UPDATE_NAME, self._background_reindex_search
         )
-        self.register_background_update_handler(
+        self.db.updates.register_background_update_handler(
             self.EVENT_SEARCH_ORDER_UPDATE_NAME, self._background_reindex_search_order
         )
 
@@ -61,9 +60,11 @@ class SearchBackgroundUpdateStore(BackgroundUpdateStore):
         # a GIN index. However, it's possible that some people might still have
         # the background update queued, so we register a handler to clear the
         # background update.
-        self.register_noop_background_update(self.EVENT_SEARCH_USE_GIST_POSTGRES_NAME)
+        self.db.updates.register_noop_background_update(
+            self.EVENT_SEARCH_USE_GIST_POSTGRES_NAME
+        )
 
-        self.register_background_update_handler(
+        self.db.updates.register_background_update_handler(
             self.EVENT_SEARCH_USE_GIN_POSTGRES_NAME, self._background_reindex_gin_search
         )
 
@@ -153,7 +154,7 @@ class SearchBackgroundUpdateStore(BackgroundUpdateStore):
                 "rows_inserted": rows_inserted + len(event_search_rows),
             }
 
-            self._background_update_progress_txn(
+            self.db.updates._background_update_progress_txn(
                 txn, self.EVENT_SEARCH_UPDATE_NAME, progress
             )
 
@@ -164,7 +165,7 @@ class SearchBackgroundUpdateStore(BackgroundUpdateStore):
         )
 
         if not result:
-            yield self._end_background_update(self.EVENT_SEARCH_UPDATE_NAME)
+            yield self.db.updates._end_background_update(self.EVENT_SEARCH_UPDATE_NAME)
 
         return result
 
@@ -208,7 +209,9 @@ class SearchBackgroundUpdateStore(BackgroundUpdateStore):
         if isinstance(self.database_engine, PostgresEngine):
             yield self.db.runWithConnection(create_index)
 
-        yield self._end_background_update(self.EVENT_SEARCH_USE_GIN_POSTGRES_NAME)
+        yield self.db.updates._end_background_update(
+            self.EVENT_SEARCH_USE_GIN_POSTGRES_NAME
+        )
         return 1
 
     @defer.inlineCallbacks
@@ -244,7 +247,7 @@ class SearchBackgroundUpdateStore(BackgroundUpdateStore):
 
             yield self.db.runInteraction(
                 self.EVENT_SEARCH_ORDER_UPDATE_NAME,
-                self._background_update_progress_txn,
+                self.db.updates._background_update_progress_txn,
                 self.EVENT_SEARCH_ORDER_UPDATE_NAME,
                 pg,
             )
@@ -274,7 +277,7 @@ class SearchBackgroundUpdateStore(BackgroundUpdateStore):
                 "have_added_indexes": True,
             }
 
-            self._background_update_progress_txn(
+            self.db.updates._background_update_progress_txn(
                 txn, self.EVENT_SEARCH_ORDER_UPDATE_NAME, progress
             )
 
@@ -285,7 +288,9 @@ class SearchBackgroundUpdateStore(BackgroundUpdateStore):
         )
 
         if not finished:
-            yield self._end_background_update(self.EVENT_SEARCH_ORDER_UPDATE_NAME)
+            yield self.db.updates._end_background_update(
+                self.EVENT_SEARCH_ORDER_UPDATE_NAME
+            )
 
         return num_rows
 
diff --git a/synapse/storage/data_stores/main/state.py b/synapse/storage/data_stores/main/state.py
index 851e81d6b3..7d5a9f8128 100644
--- a/synapse/storage/data_stores/main/state.py
+++ b/synapse/storage/data_stores/main/state.py
@@ -27,7 +27,6 @@ from synapse.api.errors import NotFoundError
 from synapse.events import EventBase
 from synapse.events.snapshot import EventContext
 from synapse.storage._base import SQLBaseStore
-from synapse.storage.background_updates import BackgroundUpdateStore
 from synapse.storage.data_stores.main.events_worker import EventsWorkerStore
 from synapse.storage.engines import PostgresEngine
 from synapse.storage.state import StateFilter
@@ -1023,9 +1022,7 @@ class StateGroupWorkerStore(
         return set(row["state_group"] for row in rows)
 
 
-class StateBackgroundUpdateStore(
-    StateGroupBackgroundUpdateStore, BackgroundUpdateStore
-):
+class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore):
 
     STATE_GROUP_DEDUPLICATION_UPDATE_NAME = "state_group_state_deduplication"
     STATE_GROUP_INDEX_UPDATE_NAME = "state_group_state_type_index"
@@ -1034,21 +1031,21 @@ class StateBackgroundUpdateStore(
 
     def __init__(self, db_conn, hs):
         super(StateBackgroundUpdateStore, self).__init__(db_conn, hs)
-        self.register_background_update_handler(
+        self.db.updates.register_background_update_handler(
             self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME,
             self._background_deduplicate_state,
         )
-        self.register_background_update_handler(
+        self.db.updates.register_background_update_handler(
             self.STATE_GROUP_INDEX_UPDATE_NAME, self._background_index_state
         )
-        self.register_background_index_update(
+        self.db.updates.register_background_index_update(
             self.CURRENT_STATE_INDEX_UPDATE_NAME,
             index_name="current_state_events_member_index",
             table="current_state_events",
             columns=["state_key"],
             where_clause="type='m.room.member'",
         )
-        self.register_background_index_update(
+        self.db.updates.register_background_index_update(
             self.EVENT_STATE_GROUP_INDEX_UPDATE_NAME,
             index_name="event_to_state_groups_sg_index",
             table="event_to_state_groups",
@@ -1181,7 +1178,7 @@ class StateBackgroundUpdateStore(
                 "max_group": max_group,
             }
 
-            self._background_update_progress_txn(
+            self.db.updates._background_update_progress_txn(
                 txn, self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME, progress
             )
 
@@ -1192,7 +1189,7 @@ class StateBackgroundUpdateStore(
         )
 
         if finished:
-            yield self._end_background_update(
+            yield self.db.updates._end_background_update(
                 self.STATE_GROUP_DEDUPLICATION_UPDATE_NAME
             )
 
@@ -1224,7 +1221,7 @@ class StateBackgroundUpdateStore(
 
         yield self.db.runWithConnection(reindex_txn)
 
-        yield self._end_background_update(self.STATE_GROUP_INDEX_UPDATE_NAME)
+        yield self.db.updates._end_background_update(self.STATE_GROUP_INDEX_UPDATE_NAME)
 
         return 1
 
diff --git a/synapse/storage/data_stores/main/stats.py b/synapse/storage/data_stores/main/stats.py
index 974ffc15bd..6b91988c2a 100644
--- a/synapse/storage/data_stores/main/stats.py
+++ b/synapse/storage/data_stores/main/stats.py
@@ -68,17 +68,17 @@ class StatsStore(StateDeltasStore):
 
         self.stats_delta_processing_lock = DeferredLock()
 
-        self.register_background_update_handler(
+        self.db.updates.register_background_update_handler(
             "populate_stats_process_rooms", self._populate_stats_process_rooms
         )
-        self.register_background_update_handler(
+        self.db.updates.register_background_update_handler(
             "populate_stats_process_users", self._populate_stats_process_users
         )
         # we no longer need to perform clean-up, but we will give ourselves
         # the potential to reintroduce it in the future – so documentation
         # will still encourage the use of this no-op handler.
-        self.register_noop_background_update("populate_stats_cleanup")
-        self.register_noop_background_update("populate_stats_prepare")
+        self.db.updates.register_noop_background_update("populate_stats_cleanup")
+        self.db.updates.register_noop_background_update("populate_stats_prepare")
 
     def quantise_stats_time(self, ts):
         """
@@ -102,7 +102,7 @@ class StatsStore(StateDeltasStore):
         This is a background update which regenerates statistics for users.
         """
         if not self.stats_enabled:
-            yield self._end_background_update("populate_stats_process_users")
+            yield self.db.updates._end_background_update("populate_stats_process_users")
             return 1
 
         last_user_id = progress.get("last_user_id", "")
@@ -123,7 +123,7 @@ class StatsStore(StateDeltasStore):
 
         # No more rooms -- complete the transaction.
         if not users_to_work_on:
-            yield self._end_background_update("populate_stats_process_users")
+            yield self.db.updates._end_background_update("populate_stats_process_users")
             return 1
 
         for user_id in users_to_work_on:
@@ -132,7 +132,7 @@ class StatsStore(StateDeltasStore):
 
         yield self.db.runInteraction(
             "populate_stats_process_users",
-            self._background_update_progress_txn,
+            self.db.updates._background_update_progress_txn,
             "populate_stats_process_users",
             progress,
         )
@@ -145,7 +145,7 @@ class StatsStore(StateDeltasStore):
         This is a background update which regenerates statistics for rooms.
         """
         if not self.stats_enabled:
-            yield self._end_background_update("populate_stats_process_rooms")
+            yield self.db.updates._end_background_update("populate_stats_process_rooms")
             return 1
 
         last_room_id = progress.get("last_room_id", "")
@@ -166,7 +166,7 @@ class StatsStore(StateDeltasStore):
 
         # No more rooms -- complete the transaction.
         if not rooms_to_work_on:
-            yield self._end_background_update("populate_stats_process_rooms")
+            yield self.db.updates._end_background_update("populate_stats_process_rooms")
             return 1
 
         for room_id in rooms_to_work_on:
@@ -175,7 +175,7 @@ class StatsStore(StateDeltasStore):
 
         yield self.db.runInteraction(
             "_populate_stats_process_rooms",
-            self._background_update_progress_txn,
+            self.db.updates._background_update_progress_txn,
             "populate_stats_process_rooms",
             progress,
         )
diff --git a/synapse/storage/data_stores/main/user_directory.py b/synapse/storage/data_stores/main/user_directory.py
index 7118bd62f3..62ffb34b29 100644
--- a/synapse/storage/data_stores/main/user_directory.py
+++ b/synapse/storage/data_stores/main/user_directory.py
@@ -19,7 +19,6 @@ import re
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, JoinRules
-from synapse.storage.background_updates import BackgroundUpdateStore
 from synapse.storage.data_stores.main.state import StateFilter
 from synapse.storage.data_stores.main.state_deltas import StateDeltasStore
 from synapse.storage.engines import PostgresEngine, Sqlite3Engine
@@ -32,7 +31,7 @@ logger = logging.getLogger(__name__)
 TEMP_TABLE = "_temp_populate_user_directory"
 
 
-class UserDirectoryBackgroundUpdateStore(StateDeltasStore, BackgroundUpdateStore):
+class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
 
     # How many records do we calculate before sending it to
     # add_users_who_share_private_rooms?
@@ -43,19 +42,19 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore, BackgroundUpdateStore
 
         self.server_name = hs.hostname
 
-        self.register_background_update_handler(
+        self.db.updates.register_background_update_handler(
             "populate_user_directory_createtables",
             self._populate_user_directory_createtables,
         )
-        self.register_background_update_handler(
+        self.db.updates.register_background_update_handler(
             "populate_user_directory_process_rooms",
             self._populate_user_directory_process_rooms,
         )
-        self.register_background_update_handler(
+        self.db.updates.register_background_update_handler(
             "populate_user_directory_process_users",
             self._populate_user_directory_process_users,
         )
-        self.register_background_update_handler(
+        self.db.updates.register_background_update_handler(
             "populate_user_directory_cleanup", self._populate_user_directory_cleanup
         )
 
@@ -108,7 +107,9 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore, BackgroundUpdateStore
         )
         yield self.db.simple_insert(TEMP_TABLE + "_position", {"position": new_pos})
 
-        yield self._end_background_update("populate_user_directory_createtables")
+        yield self.db.updates._end_background_update(
+            "populate_user_directory_createtables"
+        )
         return 1
 
     @defer.inlineCallbacks
@@ -130,7 +131,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore, BackgroundUpdateStore
             "populate_user_directory_cleanup", _delete_staging_area
         )
 
-        yield self._end_background_update("populate_user_directory_cleanup")
+        yield self.db.updates._end_background_update("populate_user_directory_cleanup")
         return 1
 
     @defer.inlineCallbacks
@@ -176,7 +177,9 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore, BackgroundUpdateStore
 
         # No more rooms -- complete the transaction.
         if not rooms_to_work_on:
-            yield self._end_background_update("populate_user_directory_process_rooms")
+            yield self.db.updates._end_background_update(
+                "populate_user_directory_process_rooms"
+            )
             return 1
 
         logger.info(
@@ -248,7 +251,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore, BackgroundUpdateStore
             progress["remaining"] -= 1
             yield self.db.runInteraction(
                 "populate_user_directory",
-                self._background_update_progress_txn,
+                self.db.updates._background_update_progress_txn,
                 "populate_user_directory_process_rooms",
                 progress,
             )
@@ -267,7 +270,9 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore, BackgroundUpdateStore
         If search_all_users is enabled, add all of the users to the user directory.
         """
         if not self.hs.config.user_directory_search_all_users:
-            yield self._end_background_update("populate_user_directory_process_users")
+            yield self.db.updates._end_background_update(
+                "populate_user_directory_process_users"
+            )
             return 1
 
         def _get_next_batch(txn):
@@ -297,7 +302,9 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore, BackgroundUpdateStore
 
         # No more users -- complete the transaction.
         if not users_to_work_on:
-            yield self._end_background_update("populate_user_directory_process_users")
+            yield self.db.updates._end_background_update(
+                "populate_user_directory_process_users"
+            )
             return 1
 
         logger.info(
@@ -317,7 +324,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore, BackgroundUpdateStore
             progress["remaining"] -= 1
             yield self.db.runInteraction(
                 "populate_user_directory",
-                self._background_update_progress_txn,
+                self.db.updates._background_update_progress_txn,
                 "populate_user_directory_process_users",
                 progress,
             )
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index ac64d80806..be36c1b829 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -30,6 +30,7 @@ from twisted.internet import defer
 from synapse.api.errors import StoreError
 from synapse.logging.context import LoggingContext, make_deferred_yieldable
 from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.storage.background_updates import BackgroundUpdater
 from synapse.storage.engines import PostgresEngine, Sqlite3Engine
 from synapse.util.stringutils import exception_to_unicode
 
@@ -223,6 +224,8 @@ class Database(object):
         self._clock = hs.get_clock()
         self._db_pool = hs.get_db_pool()
 
+        self.updates = BackgroundUpdater(hs, self)
+
         self._previous_txn_total_time = 0
         self._current_txn_total_time = 0
         self._previous_loop_ts = 0
diff --git a/synmark/__init__.py b/synmark/__init__.py
index 570eb818d9..afe4fad8cb 100644
--- a/synmark/__init__.py
+++ b/synmark/__init__.py
@@ -47,9 +47,9 @@ async def make_homeserver(reactor, config=None):
     stor = hs.get_datastore()
 
     # Run the database background updates.
-    if hasattr(stor, "do_next_background_update"):
-        while not await stor.has_completed_background_updates():
-            await stor.do_next_background_update(1)
+    if hasattr(stor.db.updates, "do_next_background_update"):
+        while not await stor.db.updates.has_completed_background_updates():
+            await stor.db.updates.do_next_background_update(1)
 
     def cleanup():
         for i in cleanup_tasks:
diff --git a/tests/handlers/test_stats.py b/tests/handlers/test_stats.py
index 7f7962c3dd..d9d312f0fb 100644
--- a/tests/handlers/test_stats.py
+++ b/tests/handlers/test_stats.py
@@ -42,7 +42,7 @@ class StatsRoomTests(unittest.HomeserverTestCase):
         Add the background updates we need to run.
         """
         # Ugh, have to reset this flag
-        self.store._all_done = False
+        self.store.db.updates._all_done = False
 
         self.get_success(
             self.store.db.simple_insert(
@@ -108,8 +108,12 @@ class StatsRoomTests(unittest.HomeserverTestCase):
         # Do the initial population of the stats via the background update
         self._add_background_updates()
 
-        while not self.get_success(self.store.has_completed_background_updates()):
-            self.get_success(self.store.do_next_background_update(100), by=0.1)
+        while not self.get_success(
+            self.store.db.updates.has_completed_background_updates()
+        ):
+            self.get_success(
+                self.store.db.updates.do_next_background_update(100), by=0.1
+            )
 
     def test_initial_room(self):
         """
@@ -141,8 +145,12 @@ class StatsRoomTests(unittest.HomeserverTestCase):
         # Do the initial population of the user directory via the background update
         self._add_background_updates()
 
-        while not self.get_success(self.store.has_completed_background_updates()):
-            self.get_success(self.store.do_next_background_update(100), by=0.1)
+        while not self.get_success(
+            self.store.db.updates.has_completed_background_updates()
+        ):
+            self.get_success(
+                self.store.db.updates.do_next_background_update(100), by=0.1
+            )
 
         r = self.get_success(self.get_all_room_state())
 
@@ -178,7 +186,7 @@ class StatsRoomTests(unittest.HomeserverTestCase):
         # the position that the deltas should begin at, once they take over.
         self.hs.config.stats_enabled = True
         self.handler.stats_enabled = True
-        self.store._all_done = False
+        self.store.db.updates._all_done = False
         self.get_success(
             self.store.db.simple_update_one(
                 table="stats_incremental_position",
@@ -194,8 +202,12 @@ class StatsRoomTests(unittest.HomeserverTestCase):
             )
         )
 
-        while not self.get_success(self.store.has_completed_background_updates()):
-            self.get_success(self.store.do_next_background_update(100), by=0.1)
+        while not self.get_success(
+            self.store.db.updates.has_completed_background_updates()
+        ):
+            self.get_success(
+                self.store.db.updates.do_next_background_update(100), by=0.1
+            )
 
         # Now, before the table is actually ingested, add some more events.
         self.helper.invite(room=room_1, src=u1, targ=u2, tok=u1_token)
@@ -221,9 +233,13 @@ class StatsRoomTests(unittest.HomeserverTestCase):
             )
         )
 
-        self.store._all_done = False
-        while not self.get_success(self.store.has_completed_background_updates()):
-            self.get_success(self.store.do_next_background_update(100), by=0.1)
+        self.store.db.updates._all_done = False
+        while not self.get_success(
+            self.store.db.updates.has_completed_background_updates()
+        ):
+            self.get_success(
+                self.store.db.updates.do_next_background_update(100), by=0.1
+            )
 
         self.reactor.advance(86401)
 
@@ -653,7 +669,7 @@ class StatsRoomTests(unittest.HomeserverTestCase):
 
         # preparation stage of the initial background update
         # Ugh, have to reset this flag
-        self.store._all_done = False
+        self.store.db.updates._all_done = False
 
         self.get_success(
             self.store.db.simple_delete(
@@ -673,7 +689,7 @@ class StatsRoomTests(unittest.HomeserverTestCase):
 
         # now do the background updates
 
-        self.store._all_done = False
+        self.store.db.updates._all_done = False
         self.get_success(
             self.store.db.simple_insert(
                 "background_updates",
@@ -705,8 +721,12 @@ class StatsRoomTests(unittest.HomeserverTestCase):
             )
         )
 
-        while not self.get_success(self.store.has_completed_background_updates()):
-            self.get_success(self.store.do_next_background_update(100), by=0.1)
+        while not self.get_success(
+            self.store.db.updates.has_completed_background_updates()
+        ):
+            self.get_success(
+                self.store.db.updates.do_next_background_update(100), by=0.1
+            )
 
         r1stats_complete = self._get_current_stats("room", r1)
         u1stats_complete = self._get_current_stats("user", u1)
diff --git a/tests/handlers/test_user_directory.py b/tests/handlers/test_user_directory.py
index bc9d441541..26071059d2 100644
--- a/tests/handlers/test_user_directory.py
+++ b/tests/handlers/test_user_directory.py
@@ -181,7 +181,7 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase):
         Add the background updates we need to run.
         """
         # Ugh, have to reset this flag
-        self.store._all_done = False
+        self.store.db.updates._all_done = False
 
         self.get_success(
             self.store.db.simple_insert(
@@ -255,8 +255,12 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase):
         # Do the initial population of the user directory via the background update
         self._add_background_updates()
 
-        while not self.get_success(self.store.has_completed_background_updates()):
-            self.get_success(self.store.do_next_background_update(100), by=0.1)
+        while not self.get_success(
+            self.store.db.updates.has_completed_background_updates()
+        ):
+            self.get_success(
+                self.store.db.updates.do_next_background_update(100), by=0.1
+            )
 
         shares_private = self.get_users_who_share_private_rooms()
         public_users = self.get_users_in_public_rooms()
@@ -290,8 +294,12 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase):
         # Do the initial population of the user directory via the background update
         self._add_background_updates()
 
-        while not self.get_success(self.store.has_completed_background_updates()):
-            self.get_success(self.store.do_next_background_update(100), by=0.1)
+        while not self.get_success(
+            self.store.db.updates.has_completed_background_updates()
+        ):
+            self.get_success(
+                self.store.db.updates.do_next_background_update(100), by=0.1
+            )
 
         shares_private = self.get_users_who_share_private_rooms()
         public_users = self.get_users_in_public_rooms()
diff --git a/tests/storage/test_background_update.py b/tests/storage/test_background_update.py
index e360297df9..aec76f4ab1 100644
--- a/tests/storage/test_background_update.py
+++ b/tests/storage/test_background_update.py
@@ -15,7 +15,7 @@ class BackgroundUpdateTestCase(unittest.TestCase):
 
         self.update_handler = Mock()
 
-        yield self.store.register_background_update_handler(
+        yield self.store.db.updates.register_background_update_handler(
             "test_update", self.update_handler
         )
 
@@ -23,7 +23,7 @@ class BackgroundUpdateTestCase(unittest.TestCase):
         # (perhaps we should run them as part of the test HS setup, since we
         # run all of the other schema setup stuff there?)
         while True:
-            res = yield self.store.do_next_background_update(1000)
+            res = yield self.store.db.updates.do_next_background_update(1000)
             if res is None:
                 break
 
@@ -39,7 +39,7 @@ class BackgroundUpdateTestCase(unittest.TestCase):
             progress = {"my_key": progress["my_key"] + 1}
             yield self.store.db.runInteraction(
                 "update_progress",
-                self.store._background_update_progress_txn,
+                self.store.db.updates._background_update_progress_txn,
                 "test_update",
                 progress,
             )
@@ -47,29 +47,37 @@ class BackgroundUpdateTestCase(unittest.TestCase):
 
         self.update_handler.side_effect = update
 
-        yield self.store.start_background_update("test_update", {"my_key": 1})
+        yield self.store.db.updates.start_background_update(
+            "test_update", {"my_key": 1}
+        )
 
         self.update_handler.reset_mock()
-        result = yield self.store.do_next_background_update(duration_ms * desired_count)
+        result = yield self.store.db.updates.do_next_background_update(
+            duration_ms * desired_count
+        )
         self.assertIsNotNone(result)
         self.update_handler.assert_called_once_with(
-            {"my_key": 1}, self.store.DEFAULT_BACKGROUND_BATCH_SIZE
+            {"my_key": 1}, self.store.db.updates.DEFAULT_BACKGROUND_BATCH_SIZE
         )
 
         # second step: complete the update
         @defer.inlineCallbacks
         def update(progress, count):
-            yield self.store._end_background_update("test_update")
+            yield self.store.db.updates._end_background_update("test_update")
             return count
 
         self.update_handler.side_effect = update
         self.update_handler.reset_mock()
-        result = yield self.store.do_next_background_update(duration_ms * desired_count)
+        result = yield self.store.db.updates.do_next_background_update(
+            duration_ms * desired_count
+        )
         self.assertIsNotNone(result)
         self.update_handler.assert_called_once_with({"my_key": 2}, desired_count)
 
         # third step: we don't expect to be called any more
         self.update_handler.reset_mock()
-        result = yield self.store.do_next_background_update(duration_ms * desired_count)
+        result = yield self.store.db.updates.do_next_background_update(
+            duration_ms * desired_count
+        )
         self.assertIsNone(result)
         self.assertFalse(self.update_handler.called)
diff --git a/tests/storage/test_cleanup_extrems.py b/tests/storage/test_cleanup_extrems.py
index e454bbff29..029ac26454 100644
--- a/tests/storage/test_cleanup_extrems.py
+++ b/tests/storage/test_cleanup_extrems.py
@@ -46,7 +46,9 @@ class CleanupExtremBackgroundUpdateStoreTestCase(HomeserverTestCase):
         """Re run the background update to clean up the extremities.
         """
         # Make sure we don't clash with in progress updates.
-        self.assertTrue(self.store._all_done, "Background updates are still ongoing")
+        self.assertTrue(
+            self.store.db.updates._all_done, "Background updates are still ongoing"
+        )
 
         schema_path = os.path.join(
             prepare_database.dir_path,
@@ -68,10 +70,14 @@ class CleanupExtremBackgroundUpdateStoreTestCase(HomeserverTestCase):
         )
 
         # Ugh, have to reset this flag
-        self.store._all_done = False
+        self.store.db.updates._all_done = False
 
-        while not self.get_success(self.store.has_completed_background_updates()):
-            self.get_success(self.store.do_next_background_update(100), by=0.1)
+        while not self.get_success(
+            self.store.db.updates.has_completed_background_updates()
+        ):
+            self.get_success(
+                self.store.db.updates.do_next_background_update(100), by=0.1
+            )
 
     def test_soft_failed_extremities_handled_correctly(self):
         """Test that extremities are correctly calculated in the presence of
diff --git a/tests/storage/test_client_ips.py b/tests/storage/test_client_ips.py
index c4f838907c..fc279340d4 100644
--- a/tests/storage/test_client_ips.py
+++ b/tests/storage/test_client_ips.py
@@ -202,8 +202,12 @@ class ClientIpStoreTestCase(unittest.HomeserverTestCase):
 
     def test_devices_last_seen_bg_update(self):
         # First make sure we have completed all updates.
-        while not self.get_success(self.store.has_completed_background_updates()):
-            self.get_success(self.store.do_next_background_update(100), by=0.1)
+        while not self.get_success(
+            self.store.db.updates.has_completed_background_updates()
+        ):
+            self.get_success(
+                self.store.db.updates.do_next_background_update(100), by=0.1
+            )
 
         # Insert a user IP
         user_id = "@user:id"
@@ -256,11 +260,15 @@ class ClientIpStoreTestCase(unittest.HomeserverTestCase):
         )
 
         # ... and tell the DataStore that it hasn't finished all updates yet
-        self.store._all_done = False
+        self.store.db.updates._all_done = False
 
         # Now let's actually drive the updates to completion
-        while not self.get_success(self.store.has_completed_background_updates()):
-            self.get_success(self.store.do_next_background_update(100), by=0.1)
+        while not self.get_success(
+            self.store.db.updates.has_completed_background_updates()
+        ):
+            self.get_success(
+                self.store.db.updates.do_next_background_update(100), by=0.1
+            )
 
         # We should now get the correct result again
         result = self.get_success(
@@ -281,8 +289,12 @@ class ClientIpStoreTestCase(unittest.HomeserverTestCase):
 
     def test_old_user_ips_pruned(self):
         # First make sure we have completed all updates.
-        while not self.get_success(self.store.has_completed_background_updates()):
-            self.get_success(self.store.do_next_background_update(100), by=0.1)
+        while not self.get_success(
+            self.store.db.updates.has_completed_background_updates()
+        ):
+            self.get_success(
+                self.store.db.updates.do_next_background_update(100), by=0.1
+            )
 
         # Insert a user IP
         user_id = "@user:id"
diff --git a/tests/storage/test_roommember.py b/tests/storage/test_roommember.py
index 5f957680a2..7840f63fe3 100644
--- a/tests/storage/test_roommember.py
+++ b/tests/storage/test_roommember.py
@@ -122,8 +122,12 @@ class CurrentStateMembershipUpdateTestCase(unittest.HomeserverTestCase):
 
     def test_can_rerun_update(self):
         # First make sure we have completed all updates.
-        while not self.get_success(self.store.has_completed_background_updates()):
-            self.get_success(self.store.do_next_background_update(100), by=0.1)
+        while not self.get_success(
+            self.store.db.updates.has_completed_background_updates()
+        ):
+            self.get_success(
+                self.store.db.updates.do_next_background_update(100), by=0.1
+            )
 
         # Now let's create a room, which will insert a membership
         user = UserID("alice", "test")
@@ -143,8 +147,12 @@ class CurrentStateMembershipUpdateTestCase(unittest.HomeserverTestCase):
         )
 
         # ... and tell the DataStore that it hasn't finished all updates yet
-        self.store._all_done = False
+        self.store.db.updates._all_done = False
 
         # Now let's actually drive the updates to completion
-        while not self.get_success(self.store.has_completed_background_updates()):
-            self.get_success(self.store.do_next_background_update(100), by=0.1)
+        while not self.get_success(
+            self.store.db.updates.has_completed_background_updates()
+        ):
+            self.get_success(
+                self.store.db.updates.do_next_background_update(100), by=0.1
+            )
diff --git a/tests/unittest.py b/tests/unittest.py
index fc856a574a..68d245ec9f 100644
--- a/tests/unittest.py
+++ b/tests/unittest.py
@@ -401,10 +401,12 @@ class HomeserverTestCase(TestCase):
         hs = setup_test_homeserver(self.addCleanup, *args, **kwargs)
         stor = hs.get_datastore()
 
-        # Run the database background updates.
-        if hasattr(stor, "do_next_background_update"):
-            while not self.get_success(stor.has_completed_background_updates()):
-                self.get_success(stor.do_next_background_update(1))
+        # Run the database background updates, when running against "master".
+        if hs.__class__.__name__ == "TestHomeServer":
+            while not self.get_success(
+                stor.db.updates.has_completed_background_updates()
+            ):
+                self.get_success(stor.db.updates.do_next_background_update(1))
 
         return hs