diff --git a/synapse/storage/data_stores/main/room.py b/synapse/storage/data_stores/main/room.py
index 5f84982543..d1dbd62b5b 100644
--- a/synapse/storage/data_stores/main/room.py
+++ b/synapse/storage/data_stores/main/room.py
@@ -28,7 +28,6 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes
from synapse.api.errors import StoreError
from synapse.storage._base import SQLBaseStore
-from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.storage.data_stores.main.search import SearchStore
from synapse.types import ThirdPartyInstanceID
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
@@ -54,7 +53,7 @@ class RoomWorkerStore(SQLBaseStore):
Returns:
A dict containing the room information, or None if the room is unknown.
"""
- return self.simple_select_one(
+ return self.db.simple_select_one(
table="rooms",
keyvalues={"room_id": room_id},
retcols=("room_id", "is_public", "creator"),
@@ -63,7 +62,7 @@ class RoomWorkerStore(SQLBaseStore):
)
def get_public_room_ids(self):
- return self.simple_select_onecol(
+ return self.db.simple_select_onecol(
table="rooms",
keyvalues={"is_public": True},
retcol="room_id",
@@ -120,7 +119,7 @@ class RoomWorkerStore(SQLBaseStore):
txn.execute(sql, query_args)
return txn.fetchone()[0]
- return self.runInteraction("count_public_rooms", _count_public_rooms_txn)
+ return self.db.runInteraction("count_public_rooms", _count_public_rooms_txn)
@defer.inlineCallbacks
def get_largest_public_rooms(
@@ -253,21 +252,21 @@ class RoomWorkerStore(SQLBaseStore):
def _get_largest_public_rooms_txn(txn):
txn.execute(sql, query_args)
- results = self.cursor_to_dict(txn)
+ results = self.db.cursor_to_dict(txn)
if not forwards:
results.reverse()
return results
- ret_val = yield self.runInteraction(
+ ret_val = yield self.db.runInteraction(
"get_largest_public_rooms", _get_largest_public_rooms_txn
)
defer.returnValue(ret_val)
@cached(max_entries=10000)
def is_room_blocked(self, room_id):
- return self.simple_select_one_onecol(
+ return self.db.simple_select_one_onecol(
table="blocked_rooms",
keyvalues={"room_id": room_id},
retcol="1",
@@ -306,7 +305,7 @@ class RoomWorkerStore(SQLBaseStore):
of RatelimitOverride are None or 0 then ratelimitng has been
disabled for that user entirely.
"""
- row = yield self.simple_select_one(
+ row = yield self.db.simple_select_one(
table="ratelimit_override",
keyvalues={"user_id": user_id},
retcols=("messages_per_second", "burst_count"),
@@ -353,10 +352,10 @@ class RoomWorkerStore(SQLBaseStore):
(room_id,),
)
- return self.cursor_to_dict(txn)
+ return self.db.cursor_to_dict(txn)
- ret = yield self.runInteraction(
- "get_retention_policy_for_room", get_retention_policy_for_room_txn
+ ret = yield self.db.runInteraction(
+ "get_retention_policy_for_room", get_retention_policy_for_room_txn,
)
# If we don't know this room ID, ret will be None, in this case return the default
@@ -384,13 +383,13 @@ class RoomWorkerStore(SQLBaseStore):
defer.returnValue(row)
-class RoomBackgroundUpdateStore(BackgroundUpdateStore):
+class RoomBackgroundUpdateStore(SQLBaseStore):
def __init__(self, db_conn, hs):
super(RoomBackgroundUpdateStore, self).__init__(db_conn, hs)
self.config = hs.config
- self.register_background_update_handler(
+ self.db.updates.register_background_update_handler(
"insert_room_retention", self._background_insert_retention,
)
@@ -419,7 +418,7 @@ class RoomBackgroundUpdateStore(BackgroundUpdateStore):
(last_room, batch_size),
)
- rows = self.cursor_to_dict(txn)
+ rows = self.db.cursor_to_dict(txn)
if not rows:
return True
@@ -431,7 +430,7 @@ class RoomBackgroundUpdateStore(BackgroundUpdateStore):
ev = json.loads(row["json"])
retention_policy = json.dumps(ev["content"])
- self.simple_insert_txn(
+ self.db.simple_insert_txn(
txn=txn,
table="room_retention",
values={
@@ -444,7 +443,7 @@ class RoomBackgroundUpdateStore(BackgroundUpdateStore):
logger.info("Inserted %d rows into room_retention", len(rows))
- self._background_update_progress_txn(
+ self.db.updates._background_update_progress_txn(
txn, "insert_room_retention", {"room_id": rows[-1]["room_id"]}
)
@@ -453,12 +452,12 @@ class RoomBackgroundUpdateStore(BackgroundUpdateStore):
else:
return False
- end = yield self.runInteraction(
+ end = yield self.db.runInteraction(
"insert_room_retention", _background_insert_retention_txn,
)
if end:
- yield self._end_background_update("insert_room_retention")
+ yield self.db.updates._end_background_update("insert_room_retention")
defer.returnValue(batch_size)
@@ -484,7 +483,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
try:
def store_room_txn(txn, next_id):
- self.simple_insert_txn(
+ self.db.simple_insert_txn(
txn,
"rooms",
{
@@ -494,7 +493,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
},
)
if is_public:
- self.simple_insert_txn(
+ self.db.simple_insert_txn(
txn,
table="public_room_list_stream",
values={
@@ -505,7 +504,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
)
with self._public_room_id_gen.get_next() as next_id:
- yield self.runInteraction("store_room_txn", store_room_txn, next_id)
+ yield self.db.runInteraction("store_room_txn", store_room_txn, next_id)
except Exception as e:
logger.error("store_room with room_id=%s failed: %s", room_id, e)
raise StoreError(500, "Problem creating room.")
@@ -513,14 +512,14 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
@defer.inlineCallbacks
def set_room_is_public(self, room_id, is_public):
def set_room_is_public_txn(txn, next_id):
- self.simple_update_one_txn(
+ self.db.simple_update_one_txn(
txn,
table="rooms",
keyvalues={"room_id": room_id},
updatevalues={"is_public": is_public},
)
- entries = self.simple_select_list_txn(
+ entries = self.db.simple_select_list_txn(
txn,
table="public_room_list_stream",
keyvalues={
@@ -538,7 +537,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
add_to_stream = bool(entries[-1]["visibility"]) != is_public
if add_to_stream:
- self.simple_insert_txn(
+ self.db.simple_insert_txn(
txn,
table="public_room_list_stream",
values={
@@ -551,7 +550,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
)
with self._public_room_id_gen.get_next() as next_id:
- yield self.runInteraction(
+ yield self.db.runInteraction(
"set_room_is_public", set_room_is_public_txn, next_id
)
self.hs.get_notifier().on_new_replication_data()
@@ -578,7 +577,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
def set_room_is_public_appservice_txn(txn, next_id):
if is_public:
try:
- self.simple_insert_txn(
+ self.db.simple_insert_txn(
txn,
table="appservice_room_list",
values={
@@ -591,7 +590,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
# We've already inserted, nothing to do.
return
else:
- self.simple_delete_txn(
+ self.db.simple_delete_txn(
txn,
table="appservice_room_list",
keyvalues={
@@ -601,7 +600,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
},
)
- entries = self.simple_select_list_txn(
+ entries = self.db.simple_select_list_txn(
txn,
table="public_room_list_stream",
keyvalues={
@@ -619,7 +618,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
add_to_stream = bool(entries[-1]["visibility"]) != is_public
if add_to_stream:
- self.simple_insert_txn(
+ self.db.simple_insert_txn(
txn,
table="public_room_list_stream",
values={
@@ -632,7 +631,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
)
with self._public_room_id_gen.get_next() as next_id:
- yield self.runInteraction(
+ yield self.db.runInteraction(
"set_room_is_public_appservice",
set_room_is_public_appservice_txn,
next_id,
@@ -649,7 +648,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
row = txn.fetchone()
return row[0] or 0
- return self.runInteraction("get_rooms", f)
+ return self.db.runInteraction("get_rooms", f)
def _store_room_topic_txn(self, txn, event):
if hasattr(event, "content") and "topic" in event.content:
@@ -683,7 +682,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
# Ignore the event if one of the value isn't an integer.
return
- self.simple_insert_txn(
+ self.db.simple_insert_txn(
txn=txn,
table="room_retention",
values={
@@ -702,7 +701,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
self, room_id, event_id, user_id, reason, content, received_ts
):
next_id = self._event_reports_id_gen.get_next()
- return self.simple_insert(
+ return self.db.simple_insert(
table="event_reports",
values={
"id": next_id,
@@ -735,7 +734,9 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
if prev_id == current_id:
return defer.succeed([])
- return self.runInteraction("get_all_new_public_rooms", get_all_new_public_rooms)
+ return self.db.runInteraction(
+ "get_all_new_public_rooms", get_all_new_public_rooms
+ )
@defer.inlineCallbacks
def block_room(self, room_id, user_id):
@@ -748,14 +749,14 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
Returns:
Deferred
"""
- yield self.simple_upsert(
+ yield self.db.simple_upsert(
table="blocked_rooms",
keyvalues={"room_id": room_id},
values={},
insertion_values={"user_id": user_id},
desc="block_room",
)
- yield self.runInteraction(
+ yield self.db.runInteraction(
"block_room_invalidation",
self._invalidate_cache_and_stream,
self.is_room_blocked,
@@ -786,7 +787,9 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
return local_media_mxcs, remote_media_mxcs
- return self.runInteraction("get_media_ids_in_room", _get_media_mxcs_in_room_txn)
+ return self.db.runInteraction(
+ "get_media_ids_in_room", _get_media_mxcs_in_room_txn
+ )
def quarantine_media_ids_in_room(self, room_id, quarantined_by):
"""For a room loops through all events with media and quarantines
@@ -825,7 +828,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
return total_media_quarantined
- return self.runInteraction(
+ return self.db.runInteraction(
"quarantine_media_in_room", _quarantine_media_in_room_txn
)
@@ -930,7 +933,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
txn.execute(sql, args)
- rows = self.cursor_to_dict(txn)
+ rows = self.db.cursor_to_dict(txn)
rooms_dict = {}
for row in rows:
@@ -946,7 +949,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
txn.execute(sql)
- rows = self.cursor_to_dict(txn)
+ rows = self.db.cursor_to_dict(txn)
# If a room isn't already in the dict (i.e. it doesn't have a retention
# policy in its state), add it with a null policy.
@@ -959,7 +962,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
return rooms_dict
- rooms = yield self.runInteraction(
+ rooms = yield self.db.runInteraction(
"get_rooms_for_retention_period_in_range",
get_rooms_for_retention_period_in_range_txn,
)
|