diff --git a/synapse/storage/data_stores/main/stats.py b/synapse/storage/data_stores/main/stats.py
index 380c1ec7da..922400a7c3 100644
--- a/synapse/storage/data_stores/main/stats.py
+++ b/synapse/storage/data_stores/main/stats.py
@@ -16,8 +16,8 @@
import logging
from itertools import chain
+from typing import Tuple
-from twisted.internet import defer
from twisted.internet.defer import DeferredLock
from synapse.api.constants import EventTypes, Membership
@@ -97,13 +97,12 @@ class StatsStore(StateDeltasStore):
"""
return (ts // self.stats_bucket_size) * self.stats_bucket_size
- @defer.inlineCallbacks
- def _populate_stats_process_users(self, progress, batch_size):
+ async def _populate_stats_process_users(self, progress, batch_size):
"""
This is a background update which regenerates statistics for users.
"""
if not self.stats_enabled:
- yield self.db.updates._end_background_update("populate_stats_process_users")
+ await self.db.updates._end_background_update("populate_stats_process_users")
return 1
last_user_id = progress.get("last_user_id", "")
@@ -118,20 +117,20 @@ class StatsStore(StateDeltasStore):
txn.execute(sql, (last_user_id, batch_size))
return [r for r, in txn]
- users_to_work_on = yield self.db.runInteraction(
+ users_to_work_on = await self.db.runInteraction(
"_populate_stats_process_users", _get_next_batch
)
# No more rooms -- complete the transaction.
if not users_to_work_on:
- yield self.db.updates._end_background_update("populate_stats_process_users")
+ await self.db.updates._end_background_update("populate_stats_process_users")
return 1
for user_id in users_to_work_on:
- yield self._calculate_and_set_initial_state_for_user(user_id)
+ await self._calculate_and_set_initial_state_for_user(user_id)
progress["last_user_id"] = user_id
- yield self.db.runInteraction(
+ await self.db.runInteraction(
"populate_stats_process_users",
self.db.updates._background_update_progress_txn,
"populate_stats_process_users",
@@ -140,13 +139,12 @@ class StatsStore(StateDeltasStore):
return len(users_to_work_on)
- @defer.inlineCallbacks
- def _populate_stats_process_rooms(self, progress, batch_size):
+ async def _populate_stats_process_rooms(self, progress, batch_size):
"""
This is a background update which regenerates statistics for rooms.
"""
if not self.stats_enabled:
- yield self.db.updates._end_background_update("populate_stats_process_rooms")
+ await self.db.updates._end_background_update("populate_stats_process_rooms")
return 1
last_room_id = progress.get("last_room_id", "")
@@ -161,20 +159,20 @@ class StatsStore(StateDeltasStore):
txn.execute(sql, (last_room_id, batch_size))
return [r for r, in txn]
- rooms_to_work_on = yield self.db.runInteraction(
+ rooms_to_work_on = await self.db.runInteraction(
"populate_stats_rooms_get_batch", _get_next_batch
)
# No more rooms -- complete the transaction.
if not rooms_to_work_on:
- yield self.db.updates._end_background_update("populate_stats_process_rooms")
+ await self.db.updates._end_background_update("populate_stats_process_rooms")
return 1
for room_id in rooms_to_work_on:
- yield self._calculate_and_set_initial_state_for_room(room_id)
+ await self._calculate_and_set_initial_state_for_room(room_id)
progress["last_room_id"] = room_id
- yield self.db.runInteraction(
+ await self.db.runInteraction(
"_populate_stats_process_rooms",
self.db.updates._background_update_progress_txn,
"populate_stats_process_rooms",
@@ -696,16 +694,16 @@ class StatsStore(StateDeltasStore):
return room_deltas, user_deltas
- @defer.inlineCallbacks
- def _calculate_and_set_initial_state_for_room(self, room_id):
+ async def _calculate_and_set_initial_state_for_room(
+ self, room_id: str
+ ) -> Tuple[dict, dict, int]:
"""Calculate and insert an entry into room_stats_current.
Args:
- room_id (str)
+ room_id: The room ID under calculation.
Returns:
- Deferred[tuple[dict, dict, int]]: A tuple of room state, membership
- counts and stream position.
+ A tuple of room state, membership counts and stream position.
"""
def _fetch_current_state_stats(txn):
@@ -767,11 +765,11 @@ class StatsStore(StateDeltasStore):
current_state_events_count,
users_in_room,
pos,
- ) = yield self.db.runInteraction(
+ ) = await self.db.runInteraction(
"get_initial_state_for_room", _fetch_current_state_stats
)
- state_event_map = yield self.get_events(event_ids, get_prev_content=False)
+ state_event_map = await self.get_events(event_ids, get_prev_content=False)
room_state = {
"join_rules": None,
@@ -806,11 +804,11 @@ class StatsStore(StateDeltasStore):
event.content.get("m.federate", True) is True
)
- yield self.update_room_state(room_id, room_state)
+ await self.update_room_state(room_id, room_state)
local_users_in_room = [u for u in users_in_room if self.hs.is_mine_id(u)]
- yield self.update_stats_delta(
+ await self.update_stats_delta(
ts=self.clock.time_msec(),
stats_type="room",
stats_id=room_id,
@@ -826,8 +824,7 @@ class StatsStore(StateDeltasStore):
},
)
- @defer.inlineCallbacks
- def _calculate_and_set_initial_state_for_user(self, user_id):
+ async def _calculate_and_set_initial_state_for_user(self, user_id):
def _calculate_and_set_initial_state_for_user_txn(txn):
pos = self._get_max_stream_id_in_current_state_deltas_txn(txn)
@@ -842,12 +839,12 @@ class StatsStore(StateDeltasStore):
(count,) = txn.fetchone()
return count, pos
- joined_rooms, pos = yield self.db.runInteraction(
+ joined_rooms, pos = await self.db.runInteraction(
"calculate_and_set_initial_state_for_user",
_calculate_and_set_initial_state_for_user_txn,
)
- yield self.update_stats_delta(
+ await self.update_stats_delta(
ts=self.clock.time_msec(),
stats_type="user",
stats_id=user_id,
|