diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index 1bb5459839..8927e5f6c4 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -36,7 +36,7 @@ ABSOLUTE_STATS_FIELDS = {
"invited_members",
"left_members",
"banned_members",
- "total_events", # TODO review this list
+ "total_events",
),
"user": ("public_rooms", "private_rooms"),
}
@@ -78,6 +78,16 @@ class StatsStore(StateDeltasStore):
)
def quantise_stats_time(self, ts):
+ """
+ Quantises a timestamp to be a multiple of the bucket size.
+
+ Args:
+ ts: the timestamp to quantise, in seconds since the Unix Epoch
+
+ Returns:
+ a timestamp which is divisible by the bucket size,
+ is no later than `ts` and is the largest such timestamp.
+ """
return (ts // self.stats_bucket_size) * self.stats_bucket_size
@defer.inlineCallbacks
@@ -120,6 +130,10 @@ class StatsStore(StateDeltasStore):
@defer.inlineCallbacks
def _populate_stats_prepare(self, progress, batch_size):
+ """
+ This is a background update, which prepares the database for
+ statistics regeneration.
+ """
if not self.stats_enabled:
yield self._end_background_update("populate_stats_prepare")
@@ -167,6 +181,9 @@ class StatsStore(StateDeltasStore):
txn.execute(statement)
def _delete_dirty_skeletons(txn):
+ """
+ Delete pre-existing rows which are incomplete.
+ """
sqls = """
DELETE FROM room_stats_current
WHERE completed_delta_stream_id IS NULL;
@@ -202,14 +219,20 @@ class StatsStore(StateDeltasStore):
@defer.inlineCallbacks
def _populate_stats_cleanup(self, progress, batch_size):
+ """
+ This is a background update which cleans up after statistics regeneration.
+ """
# TODO is there really no clean-up to be done?
- # TODO if not self.stats_enabled ….
+ # TODO if not self.stats_enabled … cleanup.
yield self._end_background_update("populate_stats_cleanup")
defer.returnValue(1)
@defer.inlineCallbacks
def _populate_stats_process_users(self, progress, batch_size):
+ """
+ This is a background update which regenerates statistics for users.
+ """
if not self.stats_enabled:
yield self._end_background_update("populate_stats_process_users")
defer.returnValue(1)
@@ -307,7 +330,6 @@ class StatsStore(StateDeltasStore):
"private_rooms": room_counts_by_publicness.get(False, 0),
},
)
- # TODO CHECK: actually want to **overwrite** some of these!
except OldCollectionRequired:
# this can't (shouldn't) actually happen
# since we only run the background update for incomplete rows
@@ -347,6 +369,9 @@ class StatsStore(StateDeltasStore):
@defer.inlineCallbacks
def _populate_stats_process_rooms(self, progress, batch_size):
+ """
+ This is a background update which regenerates statistics for rooms.
+ """
if not self.stats_enabled:
yield self._end_background_update("populate_stats_process_rooms")
defer.returnValue(1)
@@ -490,7 +515,6 @@ class StatsStore(StateDeltasStore):
"banned_members": membership_counts.get(Membership.BAN, 0),
},
)
- # TODO CHECK: actually want to **overwrite** some of these!
except OldCollectionRequired:
# this can't (shouldn't) actually happen
# since we only run the background update for incomplete rows
@@ -581,6 +605,18 @@ class StatsStore(StateDeltasStore):
continue
def _count_events_in_room_txn(self, txn, room_id, low_token, high_token):
+ """
+ Count the number of events in a room between two tokens, inclusive.
+ Args:
+ txn (cursor): The database
+ room_id (str): The ID of the room to count events for
+ low_token (int): the minimum stream ordering to count
+ high_token (int): the maximum stream ordering to count
+
+ Returns (int):
+ the number of events
+ """
+
sql = """
SELECT COUNT(*) AS num_events
FROM events
@@ -595,6 +631,7 @@ class StatsStore(StateDeltasStore):
"""
Delete all statistics records.
TODO obsolete?
+ TODO at least will need updating
"""
def _delete_all_stats_txn(txn):
@@ -606,6 +643,23 @@ class StatsStore(StateDeltasStore):
return self.runInteraction("delete_all_stats", _delete_all_stats_txn)
def get_stats_positions(self, for_initial_processor=False):
+ """
+ Returns the stats processor positions.
+
+ Args:
+ for_initial_processor (bool, optional): If true, returns the position
+ promised by the latest stats regeneration, rather than the current
+ incremental processor's position.
+ Otherwise (if false), return the incremental processor's position.
+
+ Returns (dict):
+ Dict containing :-
+ state_delta_stream_id: stream_id of last-processed state delta
+ total_events_min_stream_ordering: stream_ordering of latest-processed
+ backfilled event, in the context of total_events counting.
+ total_events_max_stream_ordering: stream_ordering of latest-processed
+ non-backfilled event, in the context of total_events counting.
+ """
return self._simple_select_one(
table="stats_incremental_position",
keyvalues={"is_background_contract": for_initial_processor},
@@ -618,6 +672,12 @@ class StatsStore(StateDeltasStore):
)
def _get_stats_positions_txn(self, txn, for_initial_processor=False):
+ """
+ See L{get_stats_positions}.
+
+ Args:
+ txn (cursor): Database cursor
+ """
return self._simple_select_one_txn(
txn=txn,
table="stats_incremental_position",
@@ -630,6 +690,13 @@ class StatsStore(StateDeltasStore):
)
def update_stats_positions(self, positions, for_initial_processor=False):
+ """
+ Updates the stats processor positions.
+
+ Args:
+ positions: See L{get_stats_positions}
+ for_initial_processor: See L{get_stats_positions}
+ """
if positions is None:
positions = {
"state_delta_stream_id": None,
@@ -644,6 +711,9 @@ class StatsStore(StateDeltasStore):
)
def _update_stats_positions_txn(self, txn, positions, for_initial_processor=False):
+ """
+ See L{update_stats_positions}
+ """
if positions is None:
positions = {
"state_delta_stream_id": None,
@@ -709,10 +779,12 @@ class StatsStore(StateDeltasStore):
size,
)
- # TODO fix and account for _current.
def _get_statistics_for_subject_txn(
self, txn, stats_type, stats_id, start, size=100
):
+ """
+ Transaction-bound version of L{get_statistics_for_subject}.
+ """
table, id_col = TYPE_TO_TABLE[stats_type]
selected_columns = list(
@@ -740,11 +812,18 @@ class StatsStore(StateDeltasStore):
allow_none=True,
)
- if current is not None and current["end_ts"] is not None:
- # it is dirty, so contains new information, so should be included
- current["bucket_size"] = current["end_ts"] - current["start_ts"]
- del current["start_ts"]
- return [current] + slice_list
+ if current is not None:
+ completed = current["completed_delta_stream_id"] is not None
+ dirty = current["end_ts"] is not None
+
+ if completed and dirty:
+ # it is dirty, so contains new information, so should be included
+ # we don't accept incomplete rows as that would almost certainly
+ # be giving misinformation, since it is awaiting an
+ # initial background count
+ current["bucket_size"] = current["end_ts"] - current["start_ts"]
+ del current["start_ts"]
+ return [current] + slice_list
return slice_list
def get_all_room_state(self):
@@ -753,6 +832,17 @@ class StatsStore(StateDeltasStore):
)
def get_room_state(self, room_id):
+ """
+ Returns the current room_state for a room.
+
+ Args:
+ room_id (str): The ID of the room to return state for.
+
+ Returns (dict):
+ Dictionary containing these keys:
+ "name", "topic", "canonical_alias", "avatar", "join_rules",
+ "history_visibility"
+ """
return self._simple_select_one(
"room_state",
{"room_id": room_id},
@@ -787,6 +877,14 @@ class StatsStore(StateDeltasStore):
)
def _collect_old_txn(self, txn, stats_type, limit=500):
+ """
+ See {collect_old}. Runs only a small batch, specified by limit.
+
+ Returns (bool):
+ True iff there is possibly more to do (i.e. this needs re-running),
+ False otherwise.
+
+ """
# we do them in batches to prevent concurrent updates from
# messing us over with lots of retries
@@ -801,11 +899,12 @@ class StatsStore(StateDeltasStore):
)
)
- sql = ("SELECT %s FROM %s_current WHERE end_ts <= ? LIMIT %d FOR UPDATE") % (
- id_col,
- table,
- limit,
- )
+ # `end_ts IS NOT NULL` is for partial index optimisation
+ sql = (
+ "SELECT %s FROM %s_current"
+ " WHERE end_ts <= ? AND end_ts IS NOT NULL"
+ " LIMIT %d FOR UPDATE"
+ ) % (id_col, table, limit)
txn.execute(sql, (quantised_ts,))
maybe_more = txn.rowcount == limit
updates = txn.fetchall()
@@ -827,6 +926,19 @@ class StatsStore(StateDeltasStore):
@defer.inlineCallbacks
def collect_old(self, stats_type):
+ """
+ Run 'old collection' on current stats rows.
+
+ Old collection is the process of copying dirty (updated) stats rows
+ from the current table to the historical table, when those rows have
+ finished their stats time slice.
+ Collected rows are then cleared of their dirty status.
+
+ Args:
+ stats_type: "room" or "user" – the type of stats to run old collection
+ on.
+
+ """
while True:
maybe_more = yield self.runInteraction(
"stats_collect_old", self._collect_old_txn, stats_type
@@ -839,16 +951,18 @@ class StatsStore(StateDeltasStore):
self, ts, stats_type, stats_id, fields, complete_with_stream_id=None
):
"""
+ Updates the statistics for a subject, with a delta (difference/relative
+ change).
Args:
- ts (int):
- stats_type (str):
- stats_id (str):
+ ts (int): timestamp of the change
+ stats_type (str): "room" or "user" – the kind of subject
+ stats_id (str): the subject's ID (room ID or user ID)
fields (dict[str, int]): Deltas of stats values.
complete_with_stream_id (int, optional):
-
- Returns:
-
+ If supplied, converts an incomplete row into a complete row,
+ with the supplied stream_id marked as the stream_id where the
+ row was completed.
"""
while True:
@@ -877,6 +991,11 @@ class StatsStore(StateDeltasStore):
complete_with_stream_id=None,
absolute_fields=None,
):
+ """
+ See L{update_stats_delta}
+ Additional Args:
+ absolute_fields (dict[str, int]): Absolute stats values (i.e. not deltas).
+ """
table, id_col = TYPE_TO_TABLE[stats_type]
quantised_ts = self.quantise_stats_time(int(ts))
@@ -923,7 +1042,7 @@ class StatsStore(StateDeltasStore):
insertee = {
id_col: stats_id,
"end_ts": end_ts,
- "start_ts": ts, # TODO or do we use qts?
+ "start_ts": ts,
"completed_delta_stream_id": complete_with_stream_id,
}
@@ -968,6 +1087,19 @@ class StatsStore(StateDeltasStore):
raise OldCollectionRequired()
def incremental_update_total_events(self, in_positions):
+ """
+ Counts the number of events per-room and then adds these to the respective
+ total_events room counts.
+
+ Args:
+ in_positions (dict): Positions,
+ as retrieved from L{get_stats_positions}.
+
+ Returns (dict):
+ The new positions. Note that this is for reference only –
+ the new positions WILL be committed by this function.
+ """
+
def incremental_update_total_events_txn(txn):
positions = in_positions.copy()
|