diff --git a/synapse/storage/databases/main/stats.py b/synapse/storage/databases/main/stats.py
index 802c9019b9..55a250ef06 100644
--- a/synapse/storage/databases/main/stats.py
+++ b/synapse/storage/databases/main/stats.py
@@ -15,8 +15,9 @@
# limitations under the License.
import logging
+from collections import Counter
from itertools import chain
-from typing import Tuple
+from typing import Any, Dict, List, Optional, Tuple
from twisted.internet.defer import DeferredLock
@@ -211,26 +212,44 @@ class StatsStore(StateDeltasStore):
return len(rooms_to_work_on)
- def get_stats_positions(self):
+ async def get_stats_positions(self) -> int:
"""
Returns the stats processor positions.
"""
- return self.db_pool.simple_select_one_onecol(
+ return await self.db_pool.simple_select_one_onecol(
table="stats_incremental_position",
keyvalues={},
retcol="stream_id",
desc="stats_incremental_position",
)
- def update_room_state(self, room_id, fields):
- """
+ async def update_room_state(self, room_id: str, fields: Dict[str, Any]) -> None:
+ """Update the state of a room.
+
+ fields can contain the following keys with string values:
+ * join_rules
+ * history_visibility
+ * encryption
+ * name
+ * topic
+ * avatar
+ * canonical_alias
+
+ A is_federatable key can also be included with a boolean value.
+
Args:
- room_id (str)
- fields (dict[str:Any])
+ room_id: The room ID to update the state of.
+ fields: The fields to update. This can include a partial list of the
+ above fields to only update some room information.
"""
-
- # For whatever reason some of the fields may contain null bytes, which
- # postgres isn't a fan of, so we replace those fields with null.
+ # Ensure that the values to update are valid, they should be strings and
+ # not contain any null bytes.
+ #
+ # Invalid data gets overwritten with null.
+ #
+ # Note that a missing value should not be overwritten (it keeps the
+ # previous value).
+ sentinel = object()
for col in (
"join_rules",
"history_visibility",
@@ -240,32 +259,34 @@ class StatsStore(StateDeltasStore):
"avatar",
"canonical_alias",
):
- field = fields.get(col)
- if field and "\0" in field:
+ field = fields.get(col, sentinel)
+ if field is not sentinel and (not isinstance(field, str) or "\0" in field):
fields[col] = None
- return self.db_pool.simple_upsert(
+ await self.db_pool.simple_upsert(
table="room_stats_state",
keyvalues={"room_id": room_id},
values=fields,
desc="update_room_state",
)
- def get_statistics_for_subject(self, stats_type, stats_id, start, size=100):
+ async def get_statistics_for_subject(
+ self, stats_type: str, stats_id: str, start: str, size: int = 100
+ ) -> List[dict]:
"""
Get statistics for a given subject.
Args:
- stats_type (str): The type of subject
- stats_id (str): The ID of the subject (e.g. room_id or user_id)
- start (int): Pagination start. Number of entries, not timestamp.
- size (int): How many entries to return.
+ stats_type: The type of subject
+ stats_id: The ID of the subject (e.g. room_id or user_id)
+ start: Pagination start. Number of entries, not timestamp.
+ size: How many entries to return.
Returns:
- Deferred[list[dict]], where the dict has the keys of
+ A list of dicts, where the dict has the keys of
ABSOLUTE_STATS_FIELDS[stats_type], and "bucket_size" and "end_ts".
"""
- return self.db_pool.runInteraction(
+ return await self.db_pool.runInteraction(
"get_statistics_for_subject",
self._get_statistics_for_subject_txn,
stats_type,
@@ -300,7 +321,7 @@ class StatsStore(StateDeltasStore):
return slice_list
@cached()
- def get_earliest_token_for_stats(self, stats_type, id):
+ async def get_earliest_token_for_stats(self, stats_type: str, id: str) -> int:
"""
Fetch the "earliest token". This is used by the room stats delta
processor to ignore deltas that have been processed between the
@@ -308,29 +329,28 @@ class StatsStore(StateDeltasStore):
being calculated.
Returns:
- Deferred[int]
+ The earliest token.
"""
table, id_col = TYPE_TO_TABLE[stats_type]
- return self.db_pool.simple_select_one_onecol(
+ return await self.db_pool.simple_select_one_onecol(
"%s_current" % (table,),
keyvalues={id_col: id},
retcol="completed_delta_stream_id",
allow_none=True,
)
- def bulk_update_stats_delta(self, ts, updates, stream_id):
+ async def bulk_update_stats_delta(
+ self, ts: int, updates: Dict[str, Dict[str, Dict[str, Counter]]], stream_id: int
+ ) -> None:
"""Bulk update stats tables for a given stream_id and updates the stats
incremental position.
Args:
- ts (int): Current timestamp in ms
- updates(dict[str, dict[str, dict[str, Counter]]]): The updates to
- commit as a mapping stats_type -> stats_id -> field -> delta.
- stream_id (int): Current position.
-
- Returns:
- Deferred
+ ts: Current timestamp in ms
+ updates: The updates to commit as a mapping of
+ stats_type -> stats_id -> field -> delta.
+ stream_id: Current position.
"""
def _bulk_update_stats_delta_txn(txn):
@@ -355,38 +375,37 @@ class StatsStore(StateDeltasStore):
updatevalues={"stream_id": stream_id},
)
- return self.db_pool.runInteraction(
+ await self.db_pool.runInteraction(
"bulk_update_stats_delta", _bulk_update_stats_delta_txn
)
- def update_stats_delta(
+ async def update_stats_delta(
self,
- ts,
- stats_type,
- stats_id,
- fields,
- complete_with_stream_id,
- absolute_field_overrides=None,
- ):
+ ts: int,
+ stats_type: str,
+ stats_id: str,
+ fields: Dict[str, int],
+ complete_with_stream_id: Optional[int],
+ absolute_field_overrides: Optional[Dict[str, int]] = None,
+ ) -> None:
"""
Updates the statistics for a subject, with a delta (difference/relative
change).
Args:
- ts (int): timestamp of the change
- stats_type (str): "room" or "user" – the kind of subject
- stats_id (str): the subject's ID (room ID or user ID)
- fields (dict[str, int]): Deltas of stats values.
- complete_with_stream_id (int, optional):
+ ts: timestamp of the change
+ stats_type: "room" or "user" – the kind of subject
+ stats_id: the subject's ID (room ID or user ID)
+ fields: Deltas of stats values.
+ complete_with_stream_id:
If supplied, converts an incomplete row into a complete row,
with the supplied stream_id marked as the stream_id where the
row was completed.
- absolute_field_overrides (dict[str, int]): Current stats values
- (i.e. not deltas) of absolute fields.
- Does not work with per-slice fields.
+ absolute_field_overrides: Current stats values (i.e. not deltas) of
+ absolute fields. Does not work with per-slice fields.
"""
- return self.db_pool.runInteraction(
+ await self.db_pool.runInteraction(
"update_stats_delta",
self._update_stats_delta_txn,
ts,
@@ -646,19 +665,20 @@ class StatsStore(StateDeltasStore):
txn, into_table, all_dest_keyvalues, src_row
)
- def get_changes_room_total_events_and_bytes(self, min_pos, max_pos):
+ async def get_changes_room_total_events_and_bytes(
+ self, min_pos: int, max_pos: int
+ ) -> Dict[str, Dict[str, int]]:
"""Fetches the counts of events in the given range of stream IDs.
Args:
- min_pos (int)
- max_pos (int)
+ min_pos
+ max_pos
Returns:
- Deferred[dict[str, dict[str, int]]]: Mapping of room ID to field
- changes.
+ Mapping of room ID to field changes.
"""
- return self.db_pool.runInteraction(
+ return await self.db_pool.runInteraction(
"stats_incremental_total_events_and_bytes",
self.get_changes_room_total_events_and_bytes_txn,
min_pos,
|