summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorOlivier Wilkinson (reivilibre) <olivier@librepush.net>2019-08-12 16:22:18 +0100
committerOlivier Wilkinson (reivilibre) <olivier@librepush.net>2019-08-12 16:22:18 +0100
commit182cdcbf24f7132630fb8c54682e1795ad3a40de (patch)
treecea1a9ef79e544ddbcef08f66d5d6faca9c6784a /synapse/storage
parentSplit out partial indices from theschema delta, thus supporting SQLite. (diff)
downloadsynapse-182cdcbf24f7132630fb8c54682e1795ad3a40de.tar.xz
Docstrings in `storage.stats`.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/stats.py176
1 files changed, 154 insertions, 22 deletions
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index 1bb5459839..8927e5f6c4 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -36,7 +36,7 @@ ABSOLUTE_STATS_FIELDS = {
         "invited_members",
         "left_members",
         "banned_members",
-        "total_events",  # TODO review this list
+        "total_events",
     ),
     "user": ("public_rooms", "private_rooms"),
 }
@@ -78,6 +78,16 @@ class StatsStore(StateDeltasStore):
         )
 
     def quantise_stats_time(self, ts):
+        """
+        Quantises a timestamp to be a multiple of the bucket size.
+
+        Args:
+            ts: the timestamp to quantise, in seconds since the Unix Epoch
+
+        Returns:
+            a timestamp which is divisible by the bucket size,
+            is no later than `ts` and is the largest such timestamp.
+        """
         return (ts // self.stats_bucket_size) * self.stats_bucket_size
 
     @defer.inlineCallbacks
@@ -120,6 +130,10 @@ class StatsStore(StateDeltasStore):
 
     @defer.inlineCallbacks
     def _populate_stats_prepare(self, progress, batch_size):
+        """
+        This is a background update, which prepares the database for
+        statistics regeneration.
+        """
 
         if not self.stats_enabled:
             yield self._end_background_update("populate_stats_prepare")
@@ -167,6 +181,9 @@ class StatsStore(StateDeltasStore):
                 txn.execute(statement)
 
         def _delete_dirty_skeletons(txn):
+            """
+            Delete pre-existing rows which are incomplete.
+            """
             sqls = """
                 DELETE FROM room_stats_current
                 WHERE completed_delta_stream_id IS NULL;
@@ -202,14 +219,20 @@ class StatsStore(StateDeltasStore):
 
     @defer.inlineCallbacks
     def _populate_stats_cleanup(self, progress, batch_size):
+        """
+        This is a background update which cleans up after statistics regeneration.
+        """
         # TODO is there really no clean-up to be done?
 
-        # TODO if not self.stats_enabled ….
+        # TODO if not self.stats_enabled … cleanup.
         yield self._end_background_update("populate_stats_cleanup")
         defer.returnValue(1)
 
     @defer.inlineCallbacks
     def _populate_stats_process_users(self, progress, batch_size):
+        """
+        This is a background update which regenerates statistics for users.
+        """
         if not self.stats_enabled:
             yield self._end_background_update("populate_stats_process_users")
             defer.returnValue(1)
@@ -307,7 +330,6 @@ class StatsStore(StateDeltasStore):
                             "private_rooms": room_counts_by_publicness.get(False, 0),
                         },
                     )
-                    # TODO CHECK: actually want to **overwrite** some of these!
                 except OldCollectionRequired:
                     # this can't (shouldn't) actually happen
                     # since we only run the background update for incomplete rows
@@ -347,6 +369,9 @@ class StatsStore(StateDeltasStore):
 
     @defer.inlineCallbacks
     def _populate_stats_process_rooms(self, progress, batch_size):
+        """
+        This is a background update which regenerates statistics for rooms.
+        """
         if not self.stats_enabled:
             yield self._end_background_update("populate_stats_process_rooms")
             defer.returnValue(1)
@@ -490,7 +515,6 @@ class StatsStore(StateDeltasStore):
                             "banned_members": membership_counts.get(Membership.BAN, 0),
                         },
                     )
-                    # TODO CHECK: actually want to **overwrite** some of these!
                 except OldCollectionRequired:
                     # this can't (shouldn't) actually happen
                     # since we only run the background update for incomplete rows
@@ -581,6 +605,18 @@ class StatsStore(StateDeltasStore):
                     continue
 
     def _count_events_in_room_txn(self, txn, room_id, low_token, high_token):
+        """
+        Count the number of events in a room between two tokens, inclusive.
+        Args:
+            txn (cursor): The database
+            room_id (str): The ID of the room to count events for
+            low_token (int): the minimum stream ordering to count
+            high_token (int): the maximum stream ordering to count
+
+        Returns (int):
+            the number of events
+        """
+
         sql = """
             SELECT COUNT(*) AS num_events
             FROM events
@@ -595,6 +631,7 @@ class StatsStore(StateDeltasStore):
         """
         Delete all statistics records.
         TODO obsolete?
+        TODO at least will need updating
         """
 
         def _delete_all_stats_txn(txn):
@@ -606,6 +643,23 @@ class StatsStore(StateDeltasStore):
         return self.runInteraction("delete_all_stats", _delete_all_stats_txn)
 
     def get_stats_positions(self, for_initial_processor=False):
+        """
+        Returns the stats processor positions.
+
+        Args:
+            for_initial_processor (bool, optional): If true, returns the position
+                promised by the latest stats regeneration, rather than the current
+                incremental processor's position.
+                Otherwise (if false), return the incremental processor's position.
+
+        Returns (dict):
+            Dict containing :-
+                state_delta_stream_id: stream_id of last-processed state delta
+                total_events_min_stream_ordering: stream_ordering of latest-processed
+                    backfilled event, in the context of total_events counting.
+                total_events_max_stream_ordering: stream_ordering of latest-processed
+                    non-backfilled event, in the context of total_events counting.
+        """
         return self._simple_select_one(
             table="stats_incremental_position",
             keyvalues={"is_background_contract": for_initial_processor},
@@ -618,6 +672,12 @@ class StatsStore(StateDeltasStore):
         )
 
     def _get_stats_positions_txn(self, txn, for_initial_processor=False):
+        """
+        See L{get_stats_positions}.
+
+        Args:
+             txn (cursor): Database cursor
+        """
         return self._simple_select_one_txn(
             txn=txn,
             table="stats_incremental_position",
@@ -630,6 +690,13 @@ class StatsStore(StateDeltasStore):
         )
 
     def update_stats_positions(self, positions, for_initial_processor=False):
+        """
+        Updates the stats processor positions.
+
+        Args:
+            positions: See L{get_stats_positions}
+            for_initial_processor: See L{get_stats_positions}
+        """
         if positions is None:
             positions = {
                 "state_delta_stream_id": None,
@@ -644,6 +711,9 @@ class StatsStore(StateDeltasStore):
         )
 
     def _update_stats_positions_txn(self, txn, positions, for_initial_processor=False):
+        """
+        See L{update_stats_positions}
+        """
         if positions is None:
             positions = {
                 "state_delta_stream_id": None,
@@ -709,10 +779,12 @@ class StatsStore(StateDeltasStore):
             size,
         )
 
-    # TODO fix and account for _current.
     def _get_statistics_for_subject_txn(
         self, txn, stats_type, stats_id, start, size=100
     ):
+        """
+        Transaction-bound version of L{get_statistics_for_subject}.
+        """
 
         table, id_col = TYPE_TO_TABLE[stats_type]
         selected_columns = list(
@@ -740,11 +812,18 @@ class StatsStore(StateDeltasStore):
                 allow_none=True,
             )
 
-            if current is not None and current["end_ts"] is not None:
-                # it is dirty, so contains new information, so should be included
-                current["bucket_size"] = current["end_ts"] - current["start_ts"]
-                del current["start_ts"]
-                return [current] + slice_list
+            if current is not None:
+                completed = current["completed_delta_stream_id"] is not None
+                dirty = current["end_ts"] is not None
+
+                if completed and dirty:
+                    # it is dirty, so contains new information, so should be included
+                    # we don't accept incomplete rows as that would almost certainly
+                    # be giving misinformation, since it is awaiting an
+                    # initial background count
+                    current["bucket_size"] = current["end_ts"] - current["start_ts"]
+                    del current["start_ts"]
+                    return [current] + slice_list
         return slice_list
 
     def get_all_room_state(self):
@@ -753,6 +832,17 @@ class StatsStore(StateDeltasStore):
         )
 
     def get_room_state(self, room_id):
+        """
+        Returns the current room_state for a room.
+
+        Args:
+            room_id (str): The ID of the room to return state for.
+
+        Returns (dict):
+            Dictionary containing these keys:
+                "name", "topic", "canonical_alias", "avatar", "join_rules",
+                "history_visibility"
+        """
         return self._simple_select_one(
             "room_state",
             {"room_id": room_id},
@@ -787,6 +877,14 @@ class StatsStore(StateDeltasStore):
         )
 
     def _collect_old_txn(self, txn, stats_type, limit=500):
+        """
+        See {collect_old}. Runs only a small batch, specified by limit.
+
+        Returns (bool):
+            True iff there is possibly more to do (i.e. this needs re-running),
+            False otherwise.
+
+        """
         # we do them in batches to prevent concurrent updates from
         # messing us over with lots of retries
 
@@ -801,11 +899,12 @@ class StatsStore(StateDeltasStore):
             )
         )
 
-        sql = ("SELECT %s FROM %s_current WHERE end_ts <= ? LIMIT %d FOR UPDATE") % (
-            id_col,
-            table,
-            limit,
-        )
+        # `end_ts IS NOT NULL` is for partial index optimisation
+        sql = (
+            "SELECT %s FROM %s_current"
+            " WHERE end_ts <= ? AND end_ts IS NOT NULL"
+            " LIMIT %d FOR UPDATE"
+        ) % (id_col, table, limit)
         txn.execute(sql, (quantised_ts,))
         maybe_more = txn.rowcount == limit
         updates = txn.fetchall()
@@ -827,6 +926,19 @@ class StatsStore(StateDeltasStore):
 
     @defer.inlineCallbacks
     def collect_old(self, stats_type):
+        """
+        Run 'old collection' on current stats rows.
+
+        Old collection is the process of copying dirty (updated) stats rows
+        from the current table to the historical table, when those rows have
+        finished their stats time slice.
+        Collected rows are then cleared of their dirty status.
+
+        Args:
+            stats_type: "room" or "user" – the type of stats to run old collection
+                on.
+
+        """
         while True:
             maybe_more = yield self.runInteraction(
                 "stats_collect_old", self._collect_old_txn, stats_type
@@ -839,16 +951,18 @@ class StatsStore(StateDeltasStore):
         self, ts, stats_type, stats_id, fields, complete_with_stream_id=None
     ):
         """
+        Updates the statistics for a subject, with a delta (difference/relative
+        change).
 
         Args:
-            ts (int):
-            stats_type (str):
-            stats_id (str):
+            ts (int): timestamp of the change
+            stats_type (str): "room" or "user" – the kind of subject
+            stats_id (str): the subject's ID (room ID or user ID)
             fields (dict[str, int]): Deltas of stats values.
             complete_with_stream_id (int, optional):
-
-        Returns:
-
+                If supplied, converts an incomplete row into a complete row,
+                with the supplied stream_id marked as the stream_id where the
+                row was completed.
         """
 
         while True:
@@ -877,6 +991,11 @@ class StatsStore(StateDeltasStore):
         complete_with_stream_id=None,
         absolute_fields=None,
     ):
+        """
+        See L{update_stats_delta}
+        Additional Args:
+            absolute_fields (dict[str, int]): Absolute stats values (i.e. not deltas).
+        """
         table, id_col = TYPE_TO_TABLE[stats_type]
 
         quantised_ts = self.quantise_stats_time(int(ts))
@@ -923,7 +1042,7 @@ class StatsStore(StateDeltasStore):
             insertee = {
                 id_col: stats_id,
                 "end_ts": end_ts,
-                "start_ts": ts,  # TODO or do we use qts?
+                "start_ts": ts,
                 "completed_delta_stream_id": complete_with_stream_id,
             }
 
@@ -968,6 +1087,19 @@ class StatsStore(StateDeltasStore):
             raise OldCollectionRequired()
 
     def incremental_update_total_events(self, in_positions):
+        """
+        Counts the number of events per-room and then adds these to the respective
+        total_events room counts.
+
+        Args:
+            in_positions (dict): Positions,
+                as retrieved from L{get_stats_positions}.
+
+        Returns (dict):
+            The new positions. Note that this is for reference only –
+            the new positions WILL be committed by this function.
+        """
+
         def incremental_update_total_events_txn(txn):
             positions = in_positions.copy()