diff options
author | Olivier Wilkinson (reivilibre) <olivier@librepush.net> | 2019-08-12 16:22:18 +0100 |
---|---|---|
committer | Olivier Wilkinson (reivilibre) <olivier@librepush.net> | 2019-08-12 16:22:18 +0100 |
commit | 182cdcbf24f7132630fb8c54682e1795ad3a40de (patch) | |
tree | cea1a9ef79e544ddbcef08f66d5d6faca9c6784a /synapse/storage | |
parent | Split out partial indices from theschema delta, thus supporting SQLite. (diff) | |
download | synapse-182cdcbf24f7132630fb8c54682e1795ad3a40de.tar.xz |
Docstrings in `storage.stats`.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/stats.py | 176 |
1 files changed, 154 insertions, 22 deletions
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 1bb5459839..8927e5f6c4 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -36,7 +36,7 @@ ABSOLUTE_STATS_FIELDS = { "invited_members", "left_members", "banned_members", - "total_events", # TODO review this list + "total_events", ), "user": ("public_rooms", "private_rooms"), } @@ -78,6 +78,16 @@ class StatsStore(StateDeltasStore): ) def quantise_stats_time(self, ts): + """ + Quantises a timestamp to be a multiple of the bucket size. + + Args: + ts: the timestamp to quantise, in seconds since the Unix Epoch + + Returns: + a timestamp which is divisible by the bucket size, + is no later than `ts` and is the largest such timestamp. + """ return (ts // self.stats_bucket_size) * self.stats_bucket_size @defer.inlineCallbacks @@ -120,6 +130,10 @@ class StatsStore(StateDeltasStore): @defer.inlineCallbacks def _populate_stats_prepare(self, progress, batch_size): + """ + This is a background update, which prepares the database for + statistics regeneration. + """ if not self.stats_enabled: yield self._end_background_update("populate_stats_prepare") @@ -167,6 +181,9 @@ class StatsStore(StateDeltasStore): txn.execute(statement) def _delete_dirty_skeletons(txn): + """ + Delete pre-existing rows which are incomplete. + """ sqls = """ DELETE FROM room_stats_current WHERE completed_delta_stream_id IS NULL; @@ -202,14 +219,20 @@ class StatsStore(StateDeltasStore): @defer.inlineCallbacks def _populate_stats_cleanup(self, progress, batch_size): + """ + This is a background update which cleans up after statistics regeneration. + """ # TODO is there really no clean-up to be done? - # TODO if not self.stats_enabled …. + # TODO if not self.stats_enabled … cleanup. yield self._end_background_update("populate_stats_cleanup") defer.returnValue(1) @defer.inlineCallbacks def _populate_stats_process_users(self, progress, batch_size): + """ + This is a background update which regenerates statistics for users. + """ if not self.stats_enabled: yield self._end_background_update("populate_stats_process_users") defer.returnValue(1) @@ -307,7 +330,6 @@ class StatsStore(StateDeltasStore): "private_rooms": room_counts_by_publicness.get(False, 0), }, ) - # TODO CHECK: actually want to **overwrite** some of these! except OldCollectionRequired: # this can't (shouldn't) actually happen # since we only run the background update for incomplete rows @@ -347,6 +369,9 @@ class StatsStore(StateDeltasStore): @defer.inlineCallbacks def _populate_stats_process_rooms(self, progress, batch_size): + """ + This is a background update which regenerates statistics for rooms. + """ if not self.stats_enabled: yield self._end_background_update("populate_stats_process_rooms") defer.returnValue(1) @@ -490,7 +515,6 @@ class StatsStore(StateDeltasStore): "banned_members": membership_counts.get(Membership.BAN, 0), }, ) - # TODO CHECK: actually want to **overwrite** some of these! except OldCollectionRequired: # this can't (shouldn't) actually happen # since we only run the background update for incomplete rows @@ -581,6 +605,18 @@ class StatsStore(StateDeltasStore): continue def _count_events_in_room_txn(self, txn, room_id, low_token, high_token): + """ + Count the number of events in a room between two tokens, inclusive. + Args: + txn (cursor): The database + room_id (str): The ID of the room to count events for + low_token (int): the minimum stream ordering to count + high_token (int): the maximum stream ordering to count + + Returns (int): + the number of events + """ + sql = """ SELECT COUNT(*) AS num_events FROM events @@ -595,6 +631,7 @@ class StatsStore(StateDeltasStore): """ Delete all statistics records. TODO obsolete? + TODO at least will need updating """ def _delete_all_stats_txn(txn): @@ -606,6 +643,23 @@ class StatsStore(StateDeltasStore): return self.runInteraction("delete_all_stats", _delete_all_stats_txn) def get_stats_positions(self, for_initial_processor=False): + """ + Returns the stats processor positions. + + Args: + for_initial_processor (bool, optional): If true, returns the position + promised by the latest stats regeneration, rather than the current + incremental processor's position. + Otherwise (if false), return the incremental processor's position. + + Returns (dict): + Dict containing :- + state_delta_stream_id: stream_id of last-processed state delta + total_events_min_stream_ordering: stream_ordering of latest-processed + backfilled event, in the context of total_events counting. + total_events_max_stream_ordering: stream_ordering of latest-processed + non-backfilled event, in the context of total_events counting. + """ return self._simple_select_one( table="stats_incremental_position", keyvalues={"is_background_contract": for_initial_processor}, @@ -618,6 +672,12 @@ class StatsStore(StateDeltasStore): ) def _get_stats_positions_txn(self, txn, for_initial_processor=False): + """ + See L{get_stats_positions}. + + Args: + txn (cursor): Database cursor + """ return self._simple_select_one_txn( txn=txn, table="stats_incremental_position", @@ -630,6 +690,13 @@ class StatsStore(StateDeltasStore): ) def update_stats_positions(self, positions, for_initial_processor=False): + """ + Updates the stats processor positions. + + Args: + positions: See L{get_stats_positions} + for_initial_processor: See L{get_stats_positions} + """ if positions is None: positions = { "state_delta_stream_id": None, @@ -644,6 +711,9 @@ class StatsStore(StateDeltasStore): ) def _update_stats_positions_txn(self, txn, positions, for_initial_processor=False): + """ + See L{update_stats_positions} + """ if positions is None: positions = { "state_delta_stream_id": None, @@ -709,10 +779,12 @@ class StatsStore(StateDeltasStore): size, ) - # TODO fix and account for _current. def _get_statistics_for_subject_txn( self, txn, stats_type, stats_id, start, size=100 ): + """ + Transaction-bound version of L{get_statistics_for_subject}. + """ table, id_col = TYPE_TO_TABLE[stats_type] selected_columns = list( @@ -740,11 +812,18 @@ class StatsStore(StateDeltasStore): allow_none=True, ) - if current is not None and current["end_ts"] is not None: - # it is dirty, so contains new information, so should be included - current["bucket_size"] = current["end_ts"] - current["start_ts"] - del current["start_ts"] - return [current] + slice_list + if current is not None: + completed = current["completed_delta_stream_id"] is not None + dirty = current["end_ts"] is not None + + if completed and dirty: + # it is dirty, so contains new information, so should be included + # we don't accept incomplete rows as that would almost certainly + # be giving misinformation, since it is awaiting an + # initial background count + current["bucket_size"] = current["end_ts"] - current["start_ts"] + del current["start_ts"] + return [current] + slice_list return slice_list def get_all_room_state(self): @@ -753,6 +832,17 @@ class StatsStore(StateDeltasStore): ) def get_room_state(self, room_id): + """ + Returns the current room_state for a room. + + Args: + room_id (str): The ID of the room to return state for. + + Returns (dict): + Dictionary containing these keys: + "name", "topic", "canonical_alias", "avatar", "join_rules", + "history_visibility" + """ return self._simple_select_one( "room_state", {"room_id": room_id}, @@ -787,6 +877,14 @@ class StatsStore(StateDeltasStore): ) def _collect_old_txn(self, txn, stats_type, limit=500): + """ + See {collect_old}. Runs only a small batch, specified by limit. + + Returns (bool): + True iff there is possibly more to do (i.e. this needs re-running), + False otherwise. + + """ # we do them in batches to prevent concurrent updates from # messing us over with lots of retries @@ -801,11 +899,12 @@ class StatsStore(StateDeltasStore): ) ) - sql = ("SELECT %s FROM %s_current WHERE end_ts <= ? LIMIT %d FOR UPDATE") % ( - id_col, - table, - limit, - ) + # `end_ts IS NOT NULL` is for partial index optimisation + sql = ( + "SELECT %s FROM %s_current" + " WHERE end_ts <= ? AND end_ts IS NOT NULL" + " LIMIT %d FOR UPDATE" + ) % (id_col, table, limit) txn.execute(sql, (quantised_ts,)) maybe_more = txn.rowcount == limit updates = txn.fetchall() @@ -827,6 +926,19 @@ class StatsStore(StateDeltasStore): @defer.inlineCallbacks def collect_old(self, stats_type): + """ + Run 'old collection' on current stats rows. + + Old collection is the process of copying dirty (updated) stats rows + from the current table to the historical table, when those rows have + finished their stats time slice. + Collected rows are then cleared of their dirty status. + + Args: + stats_type: "room" or "user" – the type of stats to run old collection + on. + + """ while True: maybe_more = yield self.runInteraction( "stats_collect_old", self._collect_old_txn, stats_type @@ -839,16 +951,18 @@ class StatsStore(StateDeltasStore): self, ts, stats_type, stats_id, fields, complete_with_stream_id=None ): """ + Updates the statistics for a subject, with a delta (difference/relative + change). Args: - ts (int): - stats_type (str): - stats_id (str): + ts (int): timestamp of the change + stats_type (str): "room" or "user" – the kind of subject + stats_id (str): the subject's ID (room ID or user ID) fields (dict[str, int]): Deltas of stats values. complete_with_stream_id (int, optional): - - Returns: - + If supplied, converts an incomplete row into a complete row, + with the supplied stream_id marked as the stream_id where the + row was completed. """ while True: @@ -877,6 +991,11 @@ class StatsStore(StateDeltasStore): complete_with_stream_id=None, absolute_fields=None, ): + """ + See L{update_stats_delta} + Additional Args: + absolute_fields (dict[str, int]): Absolute stats values (i.e. not deltas). + """ table, id_col = TYPE_TO_TABLE[stats_type] quantised_ts = self.quantise_stats_time(int(ts)) @@ -923,7 +1042,7 @@ class StatsStore(StateDeltasStore): insertee = { id_col: stats_id, "end_ts": end_ts, - "start_ts": ts, # TODO or do we use qts? + "start_ts": ts, "completed_delta_stream_id": complete_with_stream_id, } @@ -968,6 +1087,19 @@ class StatsStore(StateDeltasStore): raise OldCollectionRequired() def incremental_update_total_events(self, in_positions): + """ + Counts the number of events per-room and then adds these to the respective + total_events room counts. + + Args: + in_positions (dict): Positions, + as retrieved from L{get_stats_positions}. + + Returns (dict): + The new positions. Note that this is for reference only – + the new positions WILL be committed by this function. + """ + def incremental_update_total_events_txn(txn): positions = in_positions.copy() |