diff options
Diffstat (limited to 'synapse/storage/stats.py')
-rw-r--r-- | synapse/storage/stats.py | 181 |
1 files changed, 94 insertions, 87 deletions
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index c9687c29d2..3df57b52ea 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -25,6 +25,9 @@ from synapse.util.caches.descriptors import cached logger = logging.getLogger(__name__) # these fields track absolutes (e.g. total number of rooms on the server) +# You can think of these as Prometheus Gauges. +# You can draw these stats on a line graph. +# Example: number of users in a room ABSOLUTE_STATS_FIELDS = { "room": ( "current_state_events", @@ -38,17 +41,13 @@ ABSOLUTE_STATS_FIELDS = { } # these fields are per-timeslice and so should be reset to 0 upon a new slice +# You can draw these stats on a histogram. +# Example: number of events sent locally during a time slice PER_SLICE_FIELDS = {"room": (), "user": ()} TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")} -class OldCollectionRequired(Exception): - """ Signal that we need to collect old stats rows and retry. """ - - pass - - class StatsStore(StateDeltasStore): def __init__(self, db_conn, hs): super(StatsStore, self).__init__(db_conn, hs) @@ -188,7 +187,7 @@ class StatsStore(StateDeltasStore): fields[col] = None return self._simple_upsert( - table="room_state", + table="room_stats_state", keyvalues={"room_id": room_id}, values=fields, desc="update_room_state", @@ -242,6 +241,94 @@ class StatsStore(StateDeltasStore): complete_with_stream_id=complete_with_stream_id, ) + def _update_stats_delta_txn( + self, + txn, + ts, + stats_type, + stats_id, + fields, + complete_with_stream_id=None, + absolute_field_overrides=None, + ): + """ + See L{update_stats_delta} + Additional Args: + absolute_field_overrides (dict[str, int]): Current stats values + (i.e. not deltas) of absolute fields. + Does not work with per-slice fields. + """ + table, id_col = TYPE_TO_TABLE[stats_type] + + quantised_ts = self.quantise_stats_time(int(ts)) + end_ts = quantised_ts + self.stats_bucket_size + + abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type] + slice_field_names = PER_SLICE_FIELDS[stats_type] + for field in chain(fields.keys(), absolute_field_overrides.keys()): + if field not in abs_field_names and field not in slice_field_names: + # guard against potential SQL injection dodginess + raise ValueError( + "%s is not a recognised field" + " for stats type %s" % (field, stats_type) + ) + + # only absolute stats fields are tracked in the `_current` stats tables, + # so those are the only ones that we process deltas for when + # we upsert against the `_current` table. + + # This calculates the deltas (`field = field + ?` values) + # for absolute fields, + # * defaulting to 0 if not specified + # (required for the INSERT part of upserting to work) + # * omitting overrides specified in `absolute_field_overrides` + deltas_of_absolute_fields = { + key: fields.get(key, 0) + for key in abs_field_names + if key not in absolute_field_overrides + } + + if absolute_field_overrides is None: + absolute_field_overrides = {} + + if complete_with_stream_id is not None: + absolute_field_overrides = absolute_field_overrides.copy() + absolute_field_overrides[ + "completed_delta_stream_id" + ] = complete_with_stream_id + + # first upsert the `_current` table + self._upsert_with_additive_relatives_txn( + txn=txn, + table=table + "_current", + keyvalues={id_col: stats_id}, + absolutes=absolute_field_overrides, + additive_relatives=deltas_of_absolute_fields, + ) + + if self.has_completed_background_updates(): + # TODO want to check specifically for stats regenerator, not all + # background updates… + # then upsert the `_historical` table. + # we don't support absolute_fields for per-slice fields as it makes + # no sense. + per_slice_additive_relatives = { + key: fields.get(key, 0) for key in slice_field_names + } + self._upsert_copy_from_table_with_additive_relatives_txn( + txn=txn, + into_table=table + "_historical", + keyvalues={id_col: stats_id}, + extra_dst_keyvalues={ + "end_ts": end_ts, + "bucket_size": self.stats_bucket_size, + }, + additive_relatives=per_slice_additive_relatives, + src_table=table + "_current", + copy_columns=abs_field_names, + additional_where=" AND completed_delta_stream_id IS NOT NULL", + ) + def _upsert_with_additive_relatives_txn( self, txn, table, keyvalues, absolutes, additive_relatives ): @@ -388,83 +475,3 @@ class StatsStore(StateDeltasStore): for (key, val) in additive_relatives.items(): src_row[key] = dest_current_row[key] + val self._simple_update_txn(txn, into_table, keyvalues, src_row) - - def _update_stats_delta_txn( - self, - txn, - ts, - stats_type, - stats_id, - fields, - complete_with_stream_id=None, - absolute_fields=None, - ): - """ - See L{update_stats_delta} - Additional Args: - absolute_fields (dict[str, int]): Absolute current stats values - (i.e. not deltas). Does not work with per-slice fields. - """ - table, id_col = TYPE_TO_TABLE[stats_type] - - quantised_ts = self.quantise_stats_time(int(ts)) - end_ts = quantised_ts + self.stats_bucket_size - - abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type] - slice_field_names = PER_SLICE_FIELDS[stats_type] - for field in chain(fields.keys(), absolute_fields.keys()): - if field not in abs_field_names and field not in slice_field_names: - # guard against potential SQL injection dodginess - raise ValueError( - "%s is not a recognised field" - " for stats type %s" % (field, stats_type) - ) - - # only absolute stats fields are tracked in the `_current` stats tables, - # so those are the only ones that we process deltas for when - # we upsert against the `_current` table. - additive_relatives = { - key: fields.get(key, 0) - for key in abs_field_names - if key not in absolute_fields - } - - if absolute_fields is None: - absolute_fields = {} - - if complete_with_stream_id is not None: - absolute_fields = absolute_fields.copy() - absolute_fields["completed_delta_stream_id"] = complete_with_stream_id - self.get_earliest_token_for_stats.invalidate(stats_type, stats_id) - - # first upsert the `_current` table - self._upsert_with_additive_relatives_txn( - txn=txn, - table=table + "_current", - keyvalues={id_col: stats_id}, - absolutes=absolute_fields, - additive_relatives=additive_relatives, - ) - - if self.has_completed_background_updates(): - # TODO want to check specifically for stats regenerator, not all - # background updates… - # then upsert the `_historical` table. - # we don't support absolute_fields for per-slice fields as it makes - # no sense. - per_slice_additive_relatives = { - key: fields.get(key, 0) for key in slice_field_names - } - self._upsert_copy_from_table_with_additive_relatives_txn( - txn=txn, - into_table=table + "_historical", - keyvalues={id_col: stats_id}, - extra_dst_keyvalues={ - "end_ts": end_ts, - "bucket_size": self.stats_bucket_size, - }, - additive_relatives=per_slice_additive_relatives, - src_table=table + "_current", - copy_columns=abs_field_names, - additional_where=" AND completed_delta_stream_id IS NOT NULL", - ) |