diff --git a/synapse/storage/data_stores/main/stats.py b/synapse/storage/data_stores/main/stats.py
index b306478824..40579bf965 100644
--- a/synapse/storage/data_stores/main/stats.py
+++ b/synapse/storage/data_stores/main/stats.py
@@ -68,17 +68,17 @@ class StatsStore(StateDeltasStore):
self.stats_delta_processing_lock = DeferredLock()
- self.register_background_update_handler(
+ self.db.updates.register_background_update_handler(
"populate_stats_process_rooms", self._populate_stats_process_rooms
)
- self.register_background_update_handler(
+ self.db.updates.register_background_update_handler(
"populate_stats_process_users", self._populate_stats_process_users
)
# we no longer need to perform clean-up, but we will give ourselves
# the potential to reintroduce it in the future – so documentation
# will still encourage the use of this no-op handler.
- self.register_noop_background_update("populate_stats_cleanup")
- self.register_noop_background_update("populate_stats_prepare")
+ self.db.updates.register_noop_background_update("populate_stats_cleanup")
+ self.db.updates.register_noop_background_update("populate_stats_prepare")
def quantise_stats_time(self, ts):
"""
@@ -102,7 +102,7 @@ class StatsStore(StateDeltasStore):
This is a background update which regenerates statistics for users.
"""
if not self.stats_enabled:
- yield self._end_background_update("populate_stats_process_users")
+ yield self.db.updates._end_background_update("populate_stats_process_users")
return 1
last_user_id = progress.get("last_user_id", "")
@@ -117,22 +117,22 @@ class StatsStore(StateDeltasStore):
txn.execute(sql, (last_user_id, batch_size))
return [r for r, in txn]
- users_to_work_on = yield self.runInteraction(
+ users_to_work_on = yield self.db.runInteraction(
"_populate_stats_process_users", _get_next_batch
)
# No more rooms -- complete the transaction.
if not users_to_work_on:
- yield self._end_background_update("populate_stats_process_users")
+ yield self.db.updates._end_background_update("populate_stats_process_users")
return 1
for user_id in users_to_work_on:
yield self._calculate_and_set_initial_state_for_user(user_id)
progress["last_user_id"] = user_id
- yield self.runInteraction(
+ yield self.db.runInteraction(
"populate_stats_process_users",
- self._background_update_progress_txn,
+ self.db.updates._background_update_progress_txn,
"populate_stats_process_users",
progress,
)
@@ -145,7 +145,7 @@ class StatsStore(StateDeltasStore):
This is a background update which regenerates statistics for rooms.
"""
if not self.stats_enabled:
- yield self._end_background_update("populate_stats_process_rooms")
+ yield self.db.updates._end_background_update("populate_stats_process_rooms")
return 1
last_room_id = progress.get("last_room_id", "")
@@ -160,22 +160,22 @@ class StatsStore(StateDeltasStore):
txn.execute(sql, (last_room_id, batch_size))
return [r for r, in txn]
- rooms_to_work_on = yield self.runInteraction(
+ rooms_to_work_on = yield self.db.runInteraction(
"populate_stats_rooms_get_batch", _get_next_batch
)
# No more rooms -- complete the transaction.
if not rooms_to_work_on:
- yield self._end_background_update("populate_stats_process_rooms")
+ yield self.db.updates._end_background_update("populate_stats_process_rooms")
return 1
for room_id in rooms_to_work_on:
yield self._calculate_and_set_initial_state_for_room(room_id)
progress["last_room_id"] = room_id
- yield self.runInteraction(
+ yield self.db.runInteraction(
"_populate_stats_process_rooms",
- self._background_update_progress_txn,
+ self.db.updates._background_update_progress_txn,
"populate_stats_process_rooms",
progress,
)
@@ -186,7 +186,7 @@ class StatsStore(StateDeltasStore):
"""
Returns the stats processor positions.
"""
- return self.simple_select_one_onecol(
+ return self.db.simple_select_one_onecol(
table="stats_incremental_position",
keyvalues={},
retcol="stream_id",
@@ -215,7 +215,7 @@ class StatsStore(StateDeltasStore):
if field and "\0" in field:
fields[col] = None
- return self.simple_upsert(
+ return self.db.simple_upsert(
table="room_stats_state",
keyvalues={"room_id": room_id},
values=fields,
@@ -236,7 +236,7 @@ class StatsStore(StateDeltasStore):
Deferred[list[dict]], where the dict has the keys of
ABSOLUTE_STATS_FIELDS[stats_type], and "bucket_size" and "end_ts".
"""
- return self.runInteraction(
+ return self.db.runInteraction(
"get_statistics_for_subject",
self._get_statistics_for_subject_txn,
stats_type,
@@ -257,7 +257,7 @@ class StatsStore(StateDeltasStore):
ABSOLUTE_STATS_FIELDS[stats_type] + PER_SLICE_FIELDS[stats_type]
)
- slice_list = self.simple_select_list_paginate_txn(
+ slice_list = self.db.simple_select_list_paginate_txn(
txn,
table + "_historical",
"end_ts",
@@ -282,7 +282,7 @@ class StatsStore(StateDeltasStore):
"name", "topic", "canonical_alias", "avatar", "join_rules",
"history_visibility"
"""
- return self.simple_select_one(
+ return self.db.simple_select_one(
"room_stats_state",
{"room_id": room_id},
retcols=(
@@ -308,7 +308,7 @@ class StatsStore(StateDeltasStore):
"""
table, id_col = TYPE_TO_TABLE[stats_type]
- return self.simple_select_one_onecol(
+ return self.db.simple_select_one_onecol(
"%s_current" % (table,),
keyvalues={id_col: id},
retcol="completed_delta_stream_id",
@@ -344,14 +344,14 @@ class StatsStore(StateDeltasStore):
complete_with_stream_id=stream_id,
)
- self.simple_update_one_txn(
+ self.db.simple_update_one_txn(
txn,
table="stats_incremental_position",
keyvalues={},
updatevalues={"stream_id": stream_id},
)
- return self.runInteraction(
+ return self.db.runInteraction(
"bulk_update_stats_delta", _bulk_update_stats_delta_txn
)
@@ -382,7 +382,7 @@ class StatsStore(StateDeltasStore):
Does not work with per-slice fields.
"""
- return self.runInteraction(
+ return self.db.runInteraction(
"update_stats_delta",
self._update_stats_delta_txn,
ts,
@@ -517,17 +517,17 @@ class StatsStore(StateDeltasStore):
else:
self.database_engine.lock_table(txn, table)
retcols = list(chain(absolutes.keys(), additive_relatives.keys()))
- current_row = self.simple_select_one_txn(
+ current_row = self.db.simple_select_one_txn(
txn, table, keyvalues, retcols, allow_none=True
)
if current_row is None:
merged_dict = {**keyvalues, **absolutes, **additive_relatives}
- self.simple_insert_txn(txn, table, merged_dict)
+ self.db.simple_insert_txn(txn, table, merged_dict)
else:
for (key, val) in additive_relatives.items():
current_row[key] += val
current_row.update(absolutes)
- self.simple_update_one_txn(txn, table, keyvalues, current_row)
+ self.db.simple_update_one_txn(txn, table, keyvalues, current_row)
def _upsert_copy_from_table_with_additive_relatives_txn(
self,
@@ -614,11 +614,11 @@ class StatsStore(StateDeltasStore):
txn.execute(sql, qargs)
else:
self.database_engine.lock_table(txn, into_table)
- src_row = self.simple_select_one_txn(
+ src_row = self.db.simple_select_one_txn(
txn, src_table, keyvalues, copy_columns
)
all_dest_keyvalues = {**keyvalues, **extra_dst_keyvalues}
- dest_current_row = self.simple_select_one_txn(
+ dest_current_row = self.db.simple_select_one_txn(
txn,
into_table,
keyvalues=all_dest_keyvalues,
@@ -634,11 +634,11 @@ class StatsStore(StateDeltasStore):
**src_row,
**additive_relatives,
}
- self.simple_insert_txn(txn, into_table, merged_dict)
+ self.db.simple_insert_txn(txn, into_table, merged_dict)
else:
for (key, val) in additive_relatives.items():
src_row[key] = dest_current_row[key] + val
- self.simple_update_txn(txn, into_table, all_dest_keyvalues, src_row)
+ self.db.simple_update_txn(txn, into_table, all_dest_keyvalues, src_row)
def get_changes_room_total_events_and_bytes(self, min_pos, max_pos):
"""Fetches the counts of events in the given range of stream IDs.
@@ -652,7 +652,7 @@ class StatsStore(StateDeltasStore):
changes.
"""
- return self.runInteraction(
+ return self.db.runInteraction(
"stats_incremental_total_events_and_bytes",
self.get_changes_room_total_events_and_bytes_txn,
min_pos,
@@ -735,7 +735,7 @@ class StatsStore(StateDeltasStore):
def _fetch_current_state_stats(txn):
pos = self.get_room_max_stream_ordering()
- rows = self.simple_select_many_txn(
+ rows = self.db.simple_select_many_txn(
txn,
table="current_state_events",
column="type",
@@ -791,7 +791,7 @@ class StatsStore(StateDeltasStore):
current_state_events_count,
users_in_room,
pos,
- ) = yield self.runInteraction(
+ ) = yield self.db.runInteraction(
"get_initial_state_for_room", _fetch_current_state_stats
)
@@ -866,7 +866,7 @@ class StatsStore(StateDeltasStore):
(count,) = txn.fetchone()
return count, pos
- joined_rooms, pos = yield self.runInteraction(
+ joined_rooms, pos = yield self.db.runInteraction(
"calculate_and_set_initial_state_for_user",
_calculate_and_set_initial_state_for_user_txn,
)
|