summary refs log tree commit diff
path: root/synapse/storage/databases/main/stats.py
diff options
context:
space:
mode:
authorPatrick Cloke <clokep@users.noreply.github.com>2020-08-27 17:24:37 -0400
committerGitHub <noreply@github.com>2020-08-27 17:24:37 -0400
commitb49a5b9307fbbc9032e28e532e9036db07555d3d (patch)
treedaf087f5653a075a5f66aed337d5c1ad4eb8eec4 /synapse/storage/databases/main/stats.py
parentConvert simple_delete to async/await. (#8191) (diff)
downloadsynapse-b49a5b9307fbbc9032e28e532e9036db07555d3d.tar.xz
Convert stats and related calls to async/await (#8192)
Diffstat (limited to 'synapse/storage/databases/main/stats.py')
-rw-r--r--synapse/storage/databases/main/stats.py82
1 files changed, 42 insertions, 40 deletions
diff --git a/synapse/storage/databases/main/stats.py b/synapse/storage/databases/main/stats.py
index 7af2608ca4..9b9bc304a8 100644
--- a/synapse/storage/databases/main/stats.py
+++ b/synapse/storage/databases/main/stats.py
@@ -15,8 +15,9 @@
 # limitations under the License.
 
 import logging
+from collections import Counter
 from itertools import chain
-from typing import Any, Dict, Tuple
+from typing import Any, Dict, List, Optional, Tuple
 
 from twisted.internet.defer import DeferredLock
 
@@ -251,21 +252,23 @@ class StatsStore(StateDeltasStore):
             desc="update_room_state",
         )
 
-    def get_statistics_for_subject(self, stats_type, stats_id, start, size=100):
+    async def get_statistics_for_subject(
+        self, stats_type: str, stats_id: str, start: str, size: int = 100
+    ) -> List[dict]:
         """
         Get statistics for a given subject.
 
         Args:
-            stats_type (str): The type of subject
-            stats_id (str): The ID of the subject (e.g. room_id or user_id)
-            start (int): Pagination start. Number of entries, not timestamp.
-            size (int): How many entries to return.
+            stats_type: The type of subject
+            stats_id: The ID of the subject (e.g. room_id or user_id)
+            start: Pagination start. Number of entries, not timestamp.
+            size: How many entries to return.
 
         Returns:
-            Deferred[list[dict]], where the dict has the keys of
+            A list of dicts, where the dict has the keys of
             ABSOLUTE_STATS_FIELDS[stats_type],  and "bucket_size" and "end_ts".
         """
-        return self.db_pool.runInteraction(
+        return await self.db_pool.runInteraction(
             "get_statistics_for_subject",
             self._get_statistics_for_subject_txn,
             stats_type,
@@ -319,18 +322,17 @@ class StatsStore(StateDeltasStore):
             allow_none=True,
         )
 
-    def bulk_update_stats_delta(self, ts, updates, stream_id):
+    async def bulk_update_stats_delta(
+        self, ts: int, updates: Dict[str, Dict[str, Dict[str, Counter]]], stream_id: int
+    ) -> None:
         """Bulk update stats tables for a given stream_id and updates the stats
         incremental position.
 
         Args:
-            ts (int): Current timestamp in ms
-            updates(dict[str, dict[str, dict[str, Counter]]]): The updates to
-                commit as a mapping stats_type -> stats_id -> field -> delta.
-            stream_id (int): Current position.
-
-        Returns:
-            Deferred
+            ts: Current timestamp in ms
+            updates: The updates to commit as a mapping of
+                stats_type -> stats_id -> field -> delta.
+            stream_id: Current position.
         """
 
         def _bulk_update_stats_delta_txn(txn):
@@ -355,38 +357,37 @@ class StatsStore(StateDeltasStore):
                 updatevalues={"stream_id": stream_id},
             )
 
-        return self.db_pool.runInteraction(
+        await self.db_pool.runInteraction(
             "bulk_update_stats_delta", _bulk_update_stats_delta_txn
         )
 
-    def update_stats_delta(
+    async def update_stats_delta(
         self,
-        ts,
-        stats_type,
-        stats_id,
-        fields,
-        complete_with_stream_id,
-        absolute_field_overrides=None,
-    ):
+        ts: int,
+        stats_type: str,
+        stats_id: str,
+        fields: Dict[str, int],
+        complete_with_stream_id: Optional[int],
+        absolute_field_overrides: Optional[Dict[str, int]] = None,
+    ) -> None:
         """
         Updates the statistics for a subject, with a delta (difference/relative
         change).
 
         Args:
-            ts (int): timestamp of the change
-            stats_type (str): "room" or "user" – the kind of subject
-            stats_id (str): the subject's ID (room ID or user ID)
-            fields (dict[str, int]): Deltas of stats values.
-            complete_with_stream_id (int, optional):
+            ts: timestamp of the change
+            stats_type: "room" or "user" – the kind of subject
+            stats_id: the subject's ID (room ID or user ID)
+            fields: Deltas of stats values.
+            complete_with_stream_id:
                 If supplied, converts an incomplete row into a complete row,
                 with the supplied stream_id marked as the stream_id where the
                 row was completed.
-            absolute_field_overrides (dict[str, int]): Current stats values
-                (i.e. not deltas) of absolute fields.
-                Does not work with per-slice fields.
+            absolute_field_overrides: Current stats values (i.e. not deltas) of
+                absolute fields. Does not work with per-slice fields.
         """
 
-        return self.db_pool.runInteraction(
+        await self.db_pool.runInteraction(
             "update_stats_delta",
             self._update_stats_delta_txn,
             ts,
@@ -646,19 +647,20 @@ class StatsStore(StateDeltasStore):
                     txn, into_table, all_dest_keyvalues, src_row
                 )
 
-    def get_changes_room_total_events_and_bytes(self, min_pos, max_pos):
+    async def get_changes_room_total_events_and_bytes(
+        self, min_pos: int, max_pos: int
+    ) -> Dict[str, Dict[str, int]]:
         """Fetches the counts of events in the given range of stream IDs.
 
         Args:
-            min_pos (int)
-            max_pos (int)
+            min_pos
+            max_pos
 
         Returns:
-            Deferred[dict[str, dict[str, int]]]: Mapping of room ID to field
-            changes.
+            Mapping of room ID to field changes.
         """
 
-        return self.db_pool.runInteraction(
+        return await self.db_pool.runInteraction(
             "stats_incremental_total_events_and_bytes",
             self.get_changes_room_total_events_and_bytes_txn,
             min_pos,