diff --git a/synapse/storage/data_stores/main/deviceinbox.py b/synapse/storage/data_stores/main/deviceinbox.py
index 206d39134d..3c9f09301a 100644
--- a/synapse/storage/data_stores/main/deviceinbox.py
+++ b/synapse/storage/data_stores/main/deviceinbox.py
@@ -21,7 +21,6 @@ from twisted.internet import defer
from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
-from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.util.caches.expiringcache import ExpiringCache
logger = logging.getLogger(__name__)
@@ -69,7 +68,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
stream_pos = current_stream_id
return messages, stream_pos
- return self.runInteraction(
+ return self.db.runInteraction(
"get_new_messages_for_device", get_new_messages_for_device_txn
)
@@ -109,7 +108,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
txn.execute(sql, (user_id, device_id, up_to_stream_id))
return txn.rowcount
- count = yield self.runInteraction(
+ count = yield self.db.runInteraction(
"delete_messages_for_device", delete_messages_for_device_txn
)
@@ -178,7 +177,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
stream_pos = current_stream_id
return messages, stream_pos
- return self.runInteraction(
+ return self.db.runInteraction(
"get_new_device_msgs_for_remote",
get_new_messages_for_remote_destination_txn,
)
@@ -203,25 +202,25 @@ class DeviceInboxWorkerStore(SQLBaseStore):
)
txn.execute(sql, (destination, up_to_stream_id))
- return self.runInteraction(
+ return self.db.runInteraction(
"delete_device_msgs_for_remote", delete_messages_for_remote_destination_txn
)
-class DeviceInboxBackgroundUpdateStore(BackgroundUpdateStore):
+class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"
def __init__(self, db_conn, hs):
super(DeviceInboxBackgroundUpdateStore, self).__init__(db_conn, hs)
- self.register_background_index_update(
+ self.db.updates.register_background_index_update(
"device_inbox_stream_index",
index_name="device_inbox_stream_id_user_id",
table="device_inbox",
columns=["stream_id", "user_id"],
)
- self.register_background_update_handler(
+ self.db.updates.register_background_update_handler(
self.DEVICE_INBOX_STREAM_ID, self._background_drop_index_device_inbox
)
@@ -232,9 +231,9 @@ class DeviceInboxBackgroundUpdateStore(BackgroundUpdateStore):
txn.execute("DROP INDEX IF EXISTS device_inbox_stream_id")
txn.close()
- yield self.runWithConnection(reindex_txn)
+ yield self.db.runWithConnection(reindex_txn)
- yield self._end_background_update(self.DEVICE_INBOX_STREAM_ID)
+ yield self.db.updates._end_background_update(self.DEVICE_INBOX_STREAM_ID)
return 1
@@ -294,7 +293,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore)
with self._device_inbox_id_gen.get_next() as stream_id:
now_ms = self.clock.time_msec()
- yield self.runInteraction(
+ yield self.db.runInteraction(
"add_messages_to_device_inbox", add_messages_txn, now_ms, stream_id
)
for user_id in local_messages_by_user_then_device.keys():
@@ -314,7 +313,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore)
# Check if we've already inserted a matching message_id for that
# origin. This can happen if the origin doesn't receive our
# acknowledgement from the first time we received the message.
- already_inserted = self.simple_select_one_txn(
+ already_inserted = self.db.simple_select_one_txn(
txn,
table="device_federation_inbox",
keyvalues={"origin": origin, "message_id": message_id},
@@ -326,7 +325,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore)
# Add an entry for this message_id so that we know we've processed
# it.
- self.simple_insert_txn(
+ self.db.simple_insert_txn(
txn,
table="device_federation_inbox",
values={
@@ -344,7 +343,7 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore)
with self._device_inbox_id_gen.get_next() as stream_id:
now_ms = self.clock.time_msec()
- yield self.runInteraction(
+ yield self.db.runInteraction(
"add_messages_from_remote_to_device_inbox",
add_messages_txn,
now_ms,
@@ -465,6 +464,6 @@ class DeviceInboxStore(DeviceInboxWorkerStore, DeviceInboxBackgroundUpdateStore)
return rows
- return self.runInteraction(
+ return self.db.runInteraction(
"get_all_new_device_messages", get_all_new_device_messages_txn
)
|