diff --git a/synapse/storage/databases/main/stats.py b/synapse/storage/databases/main/stats.py
index 5d7b59d861..427ae1f649 100644
--- a/synapse/storage/databases/main/stats.py
+++ b/synapse/storage/databases/main/stats.py
@@ -16,7 +16,7 @@
import logging
from enum import Enum
from itertools import chain
-from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, cast
from typing_extensions import Counter
@@ -24,7 +24,11 @@ from twisted.internet.defer import DeferredLock
from synapse.api.constants import EventContentFields, EventTypes, Membership
from synapse.api.errors import StoreError
-from synapse.storage.database import DatabasePool
+from synapse.storage.database import (
+ DatabasePool,
+ LoggingDatabaseConnection,
+ LoggingTransaction,
+)
from synapse.storage.databases.main.state_deltas import StateDeltasStore
from synapse.types import JsonDict
from synapse.util.caches.descriptors import cached
@@ -96,7 +100,12 @@ class UserSortOrder(Enum):
class StatsStore(StateDeltasStore):
- def __init__(self, database: DatabasePool, db_conn, hs: "HomeServer"):
+ def __init__(
+ self,
+ database: DatabasePool,
+ db_conn: LoggingDatabaseConnection,
+ hs: "HomeServer",
+ ):
super().__init__(database, db_conn, hs)
self.server_name = hs.hostname
@@ -117,7 +126,9 @@ class StatsStore(StateDeltasStore):
self.db_pool.updates.register_noop_background_update("populate_stats_cleanup")
self.db_pool.updates.register_noop_background_update("populate_stats_prepare")
- async def _populate_stats_process_users(self, progress, batch_size):
+ async def _populate_stats_process_users(
+ self, progress: JsonDict, batch_size: int
+ ) -> int:
"""
This is a background update which regenerates statistics for users.
"""
@@ -129,7 +140,7 @@ class StatsStore(StateDeltasStore):
last_user_id = progress.get("last_user_id", "")
- def _get_next_batch(txn):
+ def _get_next_batch(txn: LoggingTransaction) -> List[str]:
sql = """
SELECT DISTINCT name FROM users
WHERE name > ?
@@ -163,7 +174,9 @@ class StatsStore(StateDeltasStore):
return len(users_to_work_on)
- async def _populate_stats_process_rooms(self, progress, batch_size):
+ async def _populate_stats_process_rooms(
+ self, progress: JsonDict, batch_size: int
+ ) -> int:
"""This is a background update which regenerates statistics for rooms."""
if not self.stats_enabled:
await self.db_pool.updates._end_background_update(
@@ -173,7 +186,7 @@ class StatsStore(StateDeltasStore):
last_room_id = progress.get("last_room_id", "")
- def _get_next_batch(txn):
+ def _get_next_batch(txn: LoggingTransaction) -> List[str]:
sql = """
SELECT DISTINCT room_id FROM current_state_events
WHERE room_id > ?
@@ -302,7 +315,7 @@ class StatsStore(StateDeltasStore):
stream_id: Current position.
"""
- def _bulk_update_stats_delta_txn(txn):
+ def _bulk_update_stats_delta_txn(txn: LoggingTransaction) -> None:
for stats_type, stats_updates in updates.items():
for stats_id, fields in stats_updates.items():
logger.debug(
@@ -334,7 +347,7 @@ class StatsStore(StateDeltasStore):
stats_type: str,
stats_id: str,
fields: Dict[str, int],
- complete_with_stream_id: Optional[int],
+ complete_with_stream_id: int,
absolute_field_overrides: Optional[Dict[str, int]] = None,
) -> None:
"""
@@ -367,14 +380,14 @@ class StatsStore(StateDeltasStore):
def _update_stats_delta_txn(
self,
- txn,
- ts,
- stats_type,
- stats_id,
- fields,
- complete_with_stream_id,
- absolute_field_overrides=None,
- ):
+ txn: LoggingTransaction,
+ ts: int,
+ stats_type: str,
+ stats_id: str,
+ fields: Dict[str, int],
+ complete_with_stream_id: int,
+ absolute_field_overrides: Optional[Dict[str, int]] = None,
+ ) -> None:
if absolute_field_overrides is None:
absolute_field_overrides = {}
@@ -417,20 +430,23 @@ class StatsStore(StateDeltasStore):
)
def _upsert_with_additive_relatives_txn(
- self, txn, table, keyvalues, absolutes, additive_relatives
- ):
+ self,
+ txn: LoggingTransaction,
+ table: str,
+ keyvalues: Dict[str, Any],
+ absolutes: Dict[str, Any],
+ additive_relatives: Dict[str, int],
+ ) -> None:
"""Used to update values in the stats tables.
This is basically a slightly convoluted upsert that *adds* to any
existing rows.
Args:
- txn
- table (str): Table name
- keyvalues (dict[str, any]): Row-identifying key values
- absolutes (dict[str, any]): Absolute (set) fields
- additive_relatives (dict[str, int]): Fields that will be added onto
- if existing row present.
+ table: Table name
+ keyvalues: Row-identifying key values
+ absolutes: Absolute (set) fields
+ additive_relatives: Fields that will be added onto if existing row present.
"""
if self.database_engine.can_native_upsert:
absolute_updates = [
@@ -486,20 +502,17 @@ class StatsStore(StateDeltasStore):
current_row.update(absolutes)
self.db_pool.simple_update_one_txn(txn, table, keyvalues, current_row)
- async def _calculate_and_set_initial_state_for_room(
- self, room_id: str
- ) -> Tuple[dict, dict, int]:
+ async def _calculate_and_set_initial_state_for_room(self, room_id: str) -> None:
"""Calculate and insert an entry into room_stats_current.
Args:
room_id: The room ID under calculation.
-
- Returns:
- A tuple of room state, membership counts and stream position.
"""
- def _fetch_current_state_stats(txn):
- pos = self.get_room_max_stream_ordering()
+ def _fetch_current_state_stats(
+ txn: LoggingTransaction,
+ ) -> Tuple[List[str], Dict[str, int], int, List[str], int]:
+ pos = self.get_room_max_stream_ordering() # type: ignore[attr-defined]
rows = self.db_pool.simple_select_many_txn(
txn,
@@ -519,7 +532,7 @@ class StatsStore(StateDeltasStore):
retcols=["event_id"],
)
- event_ids = [row["event_id"] for row in rows]
+ event_ids = cast(List[str], [row["event_id"] for row in rows])
txn.execute(
"""
@@ -533,15 +546,15 @@ class StatsStore(StateDeltasStore):
txn.execute(
"""
- SELECT COALESCE(count(*), 0) FROM current_state_events
+ SELECT COUNT(*) FROM current_state_events
WHERE room_id = ?
""",
(room_id,),
)
- (current_state_events_count,) = txn.fetchone()
+ current_state_events_count = cast(Tuple[int], txn.fetchone())[0]
- users_in_room = self.get_users_in_room_txn(txn, room_id)
+ users_in_room = self.get_users_in_room_txn(txn, room_id) # type: ignore[attr-defined]
return (
event_ids,
@@ -561,7 +574,7 @@ class StatsStore(StateDeltasStore):
"get_initial_state_for_room", _fetch_current_state_stats
)
- state_event_map = await self.get_events(event_ids, get_prev_content=False)
+ state_event_map = await self.get_events(event_ids, get_prev_content=False) # type: ignore[attr-defined]
room_state = {
"join_rules": None,
@@ -617,8 +630,10 @@ class StatsStore(StateDeltasStore):
},
)
- async def _calculate_and_set_initial_state_for_user(self, user_id):
- def _calculate_and_set_initial_state_for_user_txn(txn):
+ async def _calculate_and_set_initial_state_for_user(self, user_id: str) -> None:
+ def _calculate_and_set_initial_state_for_user_txn(
+ txn: LoggingTransaction,
+ ) -> Tuple[int, int]:
pos = self._get_max_stream_id_in_current_state_deltas_txn(txn)
txn.execute(
@@ -629,7 +644,7 @@ class StatsStore(StateDeltasStore):
""",
(user_id,),
)
- (count,) = txn.fetchone()
+ count = cast(Tuple[int], txn.fetchone())[0]
return count, pos
joined_rooms, pos = await self.db_pool.runInteraction(
@@ -673,7 +688,9 @@ class StatsStore(StateDeltasStore):
users that exist given this query
"""
- def get_users_media_usage_paginate_txn(txn):
+ def get_users_media_usage_paginate_txn(
+ txn: LoggingTransaction,
+ ) -> Tuple[List[JsonDict], int]:
filters = []
args = [self.hs.config.server.server_name]
@@ -728,7 +745,7 @@ class StatsStore(StateDeltasStore):
sql_base=sql_base,
)
txn.execute(sql, args)
- count = txn.fetchone()[0]
+ count = cast(Tuple[int], txn.fetchone())[0]
sql = """
SELECT
|