diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index e8b1ce240b..4112291c76 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -15,10 +15,13 @@
# limitations under the License.
import logging
+from threading import Lock
-from synapse.storage.state_deltas import StateDeltasStore
from twisted.internet import defer
+from synapse.storage.state_deltas import StateDeltasStore
+from synapse.util.caches.descriptors import cached
+
logger = logging.getLogger(__name__)
# these fields track absolutes (e.g. total number of rooms on the server)
@@ -55,6 +58,8 @@ class StatsStore(StateDeltasStore):
self.stats_enabled = hs.config.stats_enabled
self.stats_bucket_size = hs.config.stats_bucket_size
+ self.stats_delta_processing_lock = Lock()
+
self.register_noop_background_update("populate_stats_createtables")
self.register_noop_background_update("populate_stats_process_rooms")
self.register_noop_background_update("populate_stats_cleanup")
@@ -74,6 +79,91 @@ class StatsStore(StateDeltasStore):
"""
return (ts // self.stats_bucket_size) * self.stats_bucket_size
+ def get_stats_positions(self, for_initial_processor=False):
+ """
+ Returns the stats processor positions.
+
+ Args:
+ for_initial_processor (bool, optional): If true, returns the position
+ promised by the latest stats regeneration, rather than the current
+ incremental processor's position.
+ Otherwise (if false), return the incremental processor's position.
+
+ Returns (dict):
+ Dict containing :-
+ state_delta_stream_id: stream_id of last-processed state delta
+ total_events_min_stream_ordering: stream_ordering of latest-processed
+ backfilled event, in the context of total_events counting.
+ total_events_max_stream_ordering: stream_ordering of latest-processed
+ non-backfilled event, in the context of total_events counting.
+ """
+ return self._simple_select_one(
+ table="stats_incremental_position",
+ keyvalues={"is_background_contract": for_initial_processor},
+ retcols=(
+ "state_delta_stream_id",
+ "total_events_min_stream_ordering",
+ "total_events_max_stream_ordering",
+ ),
+ desc="stats_incremental_position",
+ )
+
+ def _get_stats_positions_txn(self, txn, for_initial_processor=False):
+ """
+ See L{get_stats_positions}.
+
+ Args:
+ txn (cursor): Database cursor
+ """
+ return self._simple_select_one_txn(
+ txn=txn,
+ table="stats_incremental_position",
+ keyvalues={"is_background_contract": for_initial_processor},
+ retcols=(
+ "state_delta_stream_id",
+ "total_events_min_stream_ordering",
+ "total_events_max_stream_ordering",
+ ),
+ )
+
+ def update_stats_positions(self, positions, for_initial_processor=False):
+ """
+ Updates the stats processor positions.
+
+ Args:
+ positions: See L{get_stats_positions}
+ for_initial_processor: See L{get_stats_positions}
+ """
+ if positions is None:
+ positions = {
+ "state_delta_stream_id": None,
+ "total_events_min_stream_ordering": None,
+ "total_events_max_stream_ordering": None,
+ }
+ return self._simple_update_one(
+ table="stats_incremental_position",
+ keyvalues={"is_background_contract": for_initial_processor},
+ updatevalues=positions,
+ desc="update_stats_incremental_position",
+ )
+
+ def _update_stats_positions_txn(self, txn, positions, for_initial_processor=False):
+ """
+ See L{update_stats_positions}
+ """
+ if positions is None:
+ positions = {
+ "state_delta_stream_id": None,
+ "total_events_min_stream_ordering": None,
+ "total_events_max_stream_ordering": None,
+ }
+ return self._simple_update_one_txn(
+ txn,
+ table="stats_incremental_position",
+ keyvalues={"is_background_contract": for_initial_processor},
+ updatevalues=positions,
+ )
+
def update_room_state(self, room_id, fields):
"""
Args:
@@ -103,6 +193,26 @@ class StatsStore(StateDeltasStore):
desc="update_room_state",
)
+ @cached()
+ def get_earliest_token_for_stats(self, stats_type, id):
+ """
+ Fetch the "earliest token". This is used by the room stats delta
+ processor to ignore deltas that have been processed between the
+ start of the background task and any particular room's stats
+ being calculated.
+
+ Returns:
+ Deferred[int]
+ """
+ table, id_col = TYPE_TO_TABLE[stats_type]
+
+ return self._simple_select_one_onecol(
+ "%s_current" % (table,),
+ {id_col: id},
+ retcol="completed_delta_stream_id",
+ allow_none=True,
+ )
+
@defer.inlineCallbacks
def update_stats_delta(
self, ts, stats_type, stats_id, fields, complete_with_stream_id=None
|