diff options
author | Patrick Cloke <clokep@users.noreply.github.com> | 2020-08-27 17:24:37 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-08-27 17:24:37 -0400 |
commit | b49a5b9307fbbc9032e28e532e9036db07555d3d (patch) | |
tree | daf087f5653a075a5f66aed337d5c1ad4eb8eec4 /synapse/storage/databases/main/stats.py | |
parent | Convert simple_delete to async/await. (#8191) (diff) | |
download | synapse-b49a5b9307fbbc9032e28e532e9036db07555d3d.tar.xz |
Convert stats and related calls to async/await (#8192)
Diffstat (limited to 'synapse/storage/databases/main/stats.py')
-rw-r--r-- | synapse/storage/databases/main/stats.py | 82 |
1 files changed, 42 insertions, 40 deletions
diff --git a/synapse/storage/databases/main/stats.py b/synapse/storage/databases/main/stats.py index 7af2608ca4..9b9bc304a8 100644 --- a/synapse/storage/databases/main/stats.py +++ b/synapse/storage/databases/main/stats.py @@ -15,8 +15,9 @@ # limitations under the License. import logging +from collections import Counter from itertools import chain -from typing import Any, Dict, Tuple +from typing import Any, Dict, List, Optional, Tuple from twisted.internet.defer import DeferredLock @@ -251,21 +252,23 @@ class StatsStore(StateDeltasStore): desc="update_room_state", ) - def get_statistics_for_subject(self, stats_type, stats_id, start, size=100): + async def get_statistics_for_subject( + self, stats_type: str, stats_id: str, start: str, size: int = 100 + ) -> List[dict]: """ Get statistics for a given subject. Args: - stats_type (str): The type of subject - stats_id (str): The ID of the subject (e.g. room_id or user_id) - start (int): Pagination start. Number of entries, not timestamp. - size (int): How many entries to return. + stats_type: The type of subject + stats_id: The ID of the subject (e.g. room_id or user_id) + start: Pagination start. Number of entries, not timestamp. + size: How many entries to return. Returns: - Deferred[list[dict]], where the dict has the keys of + A list of dicts, where the dict has the keys of ABSOLUTE_STATS_FIELDS[stats_type], and "bucket_size" and "end_ts". """ - return self.db_pool.runInteraction( + return await self.db_pool.runInteraction( "get_statistics_for_subject", self._get_statistics_for_subject_txn, stats_type, @@ -319,18 +322,17 @@ class StatsStore(StateDeltasStore): allow_none=True, ) - def bulk_update_stats_delta(self, ts, updates, stream_id): + async def bulk_update_stats_delta( + self, ts: int, updates: Dict[str, Dict[str, Dict[str, Counter]]], stream_id: int + ) -> None: """Bulk update stats tables for a given stream_id and updates the stats incremental position. Args: - ts (int): Current timestamp in ms - updates(dict[str, dict[str, dict[str, Counter]]]): The updates to - commit as a mapping stats_type -> stats_id -> field -> delta. - stream_id (int): Current position. - - Returns: - Deferred + ts: Current timestamp in ms + updates: The updates to commit as a mapping of + stats_type -> stats_id -> field -> delta. + stream_id: Current position. """ def _bulk_update_stats_delta_txn(txn): @@ -355,38 +357,37 @@ class StatsStore(StateDeltasStore): updatevalues={"stream_id": stream_id}, ) - return self.db_pool.runInteraction( + await self.db_pool.runInteraction( "bulk_update_stats_delta", _bulk_update_stats_delta_txn ) - def update_stats_delta( + async def update_stats_delta( self, - ts, - stats_type, - stats_id, - fields, - complete_with_stream_id, - absolute_field_overrides=None, - ): + ts: int, + stats_type: str, + stats_id: str, + fields: Dict[str, int], + complete_with_stream_id: Optional[int], + absolute_field_overrides: Optional[Dict[str, int]] = None, + ) -> None: """ Updates the statistics for a subject, with a delta (difference/relative change). Args: - ts (int): timestamp of the change - stats_type (str): "room" or "user" – the kind of subject - stats_id (str): the subject's ID (room ID or user ID) - fields (dict[str, int]): Deltas of stats values. - complete_with_stream_id (int, optional): + ts: timestamp of the change + stats_type: "room" or "user" – the kind of subject + stats_id: the subject's ID (room ID or user ID) + fields: Deltas of stats values. + complete_with_stream_id: If supplied, converts an incomplete row into a complete row, with the supplied stream_id marked as the stream_id where the row was completed. - absolute_field_overrides (dict[str, int]): Current stats values - (i.e. not deltas) of absolute fields. - Does not work with per-slice fields. + absolute_field_overrides: Current stats values (i.e. not deltas) of + absolute fields. Does not work with per-slice fields. """ - return self.db_pool.runInteraction( + await self.db_pool.runInteraction( "update_stats_delta", self._update_stats_delta_txn, ts, @@ -646,19 +647,20 @@ class StatsStore(StateDeltasStore): txn, into_table, all_dest_keyvalues, src_row ) - def get_changes_room_total_events_and_bytes(self, min_pos, max_pos): + async def get_changes_room_total_events_and_bytes( + self, min_pos: int, max_pos: int + ) -> Dict[str, Dict[str, int]]: """Fetches the counts of events in the given range of stream IDs. Args: - min_pos (int) - max_pos (int) + min_pos + max_pos Returns: - Deferred[dict[str, dict[str, int]]]: Mapping of room ID to field - changes. + Mapping of room ID to field changes. """ - return self.db_pool.runInteraction( + return await self.db_pool.runInteraction( "stats_incremental_total_events_and_bytes", self.get_changes_room_total_events_and_bytes_txn, min_pos, |