summary refs log tree commit diff
path: root/synapse/storage/stats.py
diff options
context:
space:
mode:
authorOlivier Wilkinson (reivilibre) <olivier@librepush.net>2019-08-20 14:24:35 +0100
committerOlivier Wilkinson (reivilibre) <olivier@librepush.net>2019-08-20 14:24:35 +0100
commite4cbea6c46afe6c45e0ee0604eaf536da70cb9f3 (patch)
treeccd6cbefdf0bfe1fd935d3afe4ea808a079906ad /synapse/storage/stats.py
parentAdd storage function for storing stats deltas (diff)
downloadsynapse-e4cbea6c46afe6c45e0ee0604eaf536da70cb9f3.tar.xz
Handle state deltas and turn them into stats deltas
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
Diffstat (limited to 'synapse/storage/stats.py')
-rw-r--r--synapse/storage/stats.py112
1 files changed, 111 insertions, 1 deletions
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py

index e8b1ce240b..4112291c76 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py
@@ -15,10 +15,13 @@ # limitations under the License. import logging +from threading import Lock -from synapse.storage.state_deltas import StateDeltasStore from twisted.internet import defer +from synapse.storage.state_deltas import StateDeltasStore +from synapse.util.caches.descriptors import cached + logger = logging.getLogger(__name__) # these fields track absolutes (e.g. total number of rooms on the server) @@ -55,6 +58,8 @@ class StatsStore(StateDeltasStore): self.stats_enabled = hs.config.stats_enabled self.stats_bucket_size = hs.config.stats_bucket_size + self.stats_delta_processing_lock = Lock() + self.register_noop_background_update("populate_stats_createtables") self.register_noop_background_update("populate_stats_process_rooms") self.register_noop_background_update("populate_stats_cleanup") @@ -74,6 +79,91 @@ class StatsStore(StateDeltasStore): """ return (ts // self.stats_bucket_size) * self.stats_bucket_size + def get_stats_positions(self, for_initial_processor=False): + """ + Returns the stats processor positions. + + Args: + for_initial_processor (bool, optional): If true, returns the position + promised by the latest stats regeneration, rather than the current + incremental processor's position. + Otherwise (if false), return the incremental processor's position. + + Returns (dict): + Dict containing :- + state_delta_stream_id: stream_id of last-processed state delta + total_events_min_stream_ordering: stream_ordering of latest-processed + backfilled event, in the context of total_events counting. + total_events_max_stream_ordering: stream_ordering of latest-processed + non-backfilled event, in the context of total_events counting. + """ + return self._simple_select_one( + table="stats_incremental_position", + keyvalues={"is_background_contract": for_initial_processor}, + retcols=( + "state_delta_stream_id", + "total_events_min_stream_ordering", + "total_events_max_stream_ordering", + ), + desc="stats_incremental_position", + ) + + def _get_stats_positions_txn(self, txn, for_initial_processor=False): + """ + See L{get_stats_positions}. + + Args: + txn (cursor): Database cursor + """ + return self._simple_select_one_txn( + txn=txn, + table="stats_incremental_position", + keyvalues={"is_background_contract": for_initial_processor}, + retcols=( + "state_delta_stream_id", + "total_events_min_stream_ordering", + "total_events_max_stream_ordering", + ), + ) + + def update_stats_positions(self, positions, for_initial_processor=False): + """ + Updates the stats processor positions. + + Args: + positions: See L{get_stats_positions} + for_initial_processor: See L{get_stats_positions} + """ + if positions is None: + positions = { + "state_delta_stream_id": None, + "total_events_min_stream_ordering": None, + "total_events_max_stream_ordering": None, + } + return self._simple_update_one( + table="stats_incremental_position", + keyvalues={"is_background_contract": for_initial_processor}, + updatevalues=positions, + desc="update_stats_incremental_position", + ) + + def _update_stats_positions_txn(self, txn, positions, for_initial_processor=False): + """ + See L{update_stats_positions} + """ + if positions is None: + positions = { + "state_delta_stream_id": None, + "total_events_min_stream_ordering": None, + "total_events_max_stream_ordering": None, + } + return self._simple_update_one_txn( + txn, + table="stats_incremental_position", + keyvalues={"is_background_contract": for_initial_processor}, + updatevalues=positions, + ) + def update_room_state(self, room_id, fields): """ Args: @@ -103,6 +193,26 @@ class StatsStore(StateDeltasStore): desc="update_room_state", ) + @cached() + def get_earliest_token_for_stats(self, stats_type, id): + """ + Fetch the "earliest token". This is used by the room stats delta + processor to ignore deltas that have been processed between the + start of the background task and any particular room's stats + being calculated. + + Returns: + Deferred[int] + """ + table, id_col = TYPE_TO_TABLE[stats_type] + + return self._simple_select_one_onecol( + "%s_current" % (table,), + {id_col: id}, + retcol="completed_delta_stream_id", + allow_none=True, + ) + @defer.inlineCallbacks def update_stats_delta( self, ts, stats_type, stats_id, fields, complete_with_stream_id=None