From e4cbea6c46afe6c45e0ee0604eaf536da70cb9f3 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 20 Aug 2019 14:24:35 +0100 Subject: Handle state deltas and turn them into stats deltas Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 112 ++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 111 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index e8b1ce240b..4112291c76 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -15,10 +15,13 @@ # limitations under the License. import logging +from threading import Lock -from synapse.storage.state_deltas import StateDeltasStore from twisted.internet import defer +from synapse.storage.state_deltas import StateDeltasStore +from synapse.util.caches.descriptors import cached + logger = logging.getLogger(__name__) # these fields track absolutes (e.g. total number of rooms on the server) @@ -55,6 +58,8 @@ class StatsStore(StateDeltasStore): self.stats_enabled = hs.config.stats_enabled self.stats_bucket_size = hs.config.stats_bucket_size + self.stats_delta_processing_lock = Lock() + self.register_noop_background_update("populate_stats_createtables") self.register_noop_background_update("populate_stats_process_rooms") self.register_noop_background_update("populate_stats_cleanup") @@ -74,6 +79,91 @@ class StatsStore(StateDeltasStore): """ return (ts // self.stats_bucket_size) * self.stats_bucket_size + def get_stats_positions(self, for_initial_processor=False): + """ + Returns the stats processor positions. + + Args: + for_initial_processor (bool, optional): If true, returns the position + promised by the latest stats regeneration, rather than the current + incremental processor's position. + Otherwise (if false), return the incremental processor's position. + + Returns (dict): + Dict containing :- + state_delta_stream_id: stream_id of last-processed state delta + total_events_min_stream_ordering: stream_ordering of latest-processed + backfilled event, in the context of total_events counting. + total_events_max_stream_ordering: stream_ordering of latest-processed + non-backfilled event, in the context of total_events counting. + """ + return self._simple_select_one( + table="stats_incremental_position", + keyvalues={"is_background_contract": for_initial_processor}, + retcols=( + "state_delta_stream_id", + "total_events_min_stream_ordering", + "total_events_max_stream_ordering", + ), + desc="stats_incremental_position", + ) + + def _get_stats_positions_txn(self, txn, for_initial_processor=False): + """ + See L{get_stats_positions}. + + Args: + txn (cursor): Database cursor + """ + return self._simple_select_one_txn( + txn=txn, + table="stats_incremental_position", + keyvalues={"is_background_contract": for_initial_processor}, + retcols=( + "state_delta_stream_id", + "total_events_min_stream_ordering", + "total_events_max_stream_ordering", + ), + ) + + def update_stats_positions(self, positions, for_initial_processor=False): + """ + Updates the stats processor positions. + + Args: + positions: See L{get_stats_positions} + for_initial_processor: See L{get_stats_positions} + """ + if positions is None: + positions = { + "state_delta_stream_id": None, + "total_events_min_stream_ordering": None, + "total_events_max_stream_ordering": None, + } + return self._simple_update_one( + table="stats_incremental_position", + keyvalues={"is_background_contract": for_initial_processor}, + updatevalues=positions, + desc="update_stats_incremental_position", + ) + + def _update_stats_positions_txn(self, txn, positions, for_initial_processor=False): + """ + See L{update_stats_positions} + """ + if positions is None: + positions = { + "state_delta_stream_id": None, + "total_events_min_stream_ordering": None, + "total_events_max_stream_ordering": None, + } + return self._simple_update_one_txn( + txn, + table="stats_incremental_position", + keyvalues={"is_background_contract": for_initial_processor}, + updatevalues=positions, + ) + def update_room_state(self, room_id, fields): """ Args: @@ -103,6 +193,26 @@ class StatsStore(StateDeltasStore): desc="update_room_state", ) + @cached() + def get_earliest_token_for_stats(self, stats_type, id): + """ + Fetch the "earliest token". This is used by the room stats delta + processor to ignore deltas that have been processed between the + start of the background task and any particular room's stats + being calculated. + + Returns: + Deferred[int] + """ + table, id_col = TYPE_TO_TABLE[stats_type] + + return self._simple_select_one_onecol( + "%s_current" % (table,), + {id_col: id}, + retcol="completed_delta_stream_id", + allow_none=True, + ) + @defer.inlineCallbacks def update_stats_delta( self, ts, stats_type, stats_id, fields, complete_with_stream_id=None -- cgit 1.5.1 From 07c267c51676fe1df80993da6700f35e69fe6761 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 14:34:05 +0100 Subject: For user stats, handle other membership transitions properly. Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/handlers/stats.py | 5 ++--- synapse/storage/stats.py | 5 +---- 2 files changed, 3 insertions(+), 7 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index f065d88a7d..2f7c108181 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -219,9 +219,8 @@ class StatsHandler(StateDeltasHandler): user_id = state_key if self.is_mine_id(user_id): # this accounts for transitions like leave → ban and so on. - has_changed_joinedness = ( - (prev_membership == Membership.JOIN) != - (membership == Membership.JOIN) + has_changed_joinedness = (prev_membership == Membership.JOIN) != ( + membership == Membership.JOIN ) if has_changed_joinedness: diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 6832ec6b7f..f86e9bd269 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -15,12 +15,9 @@ # limitations under the License. import logging -from threading import Lock - -from twisted.internet import defer from itertools import chain +from threading import Lock -from synapse.storage.state_deltas import StateDeltasStore from synapse.storage.state_deltas import StateDeltasStore from synapse.util.caches.descriptors import cached -- cgit 1.5.1 From 44d3c2e80b03bf7168ef23f0a3f080013d8800b0 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 14:34:20 +0100 Subject: Invalidate `get_earliest_token_for_stats` cache as required. Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 1 + 1 file changed, 1 insertion(+) (limited to 'synapse/storage') diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index f86e9bd269..d345b2cb32 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -433,6 +433,7 @@ class StatsStore(StateDeltasStore): elif complete_with_stream_id is not None: absolute_fields = absolute_fields.copy() absolute_fields["completed_delta_stream_id"] = complete_with_stream_id + self.get_earliest_token_for_stats.invalidate(stats_type, stats_id) # first upsert the `_current` table self._upsert_with_additive_relatives_txn( -- cgit 1.5.1 From 10c1a233f91bd3fcc505e5521cadfc6ed8daa301 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 14:49:12 +0100 Subject: Fix logic error. `absolute_fields` being None shouldn't preclude completion of a current stats row. Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index d345b2cb32..ede5002fca 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -430,7 +430,8 @@ class StatsStore(StateDeltasStore): if absolute_fields is None: absolute_fields = {} - elif complete_with_stream_id is not None: + + if complete_with_stream_id is not None: absolute_fields = absolute_fields.copy() absolute_fields["completed_delta_stream_id"] = complete_with_stream_id self.get_earliest_token_for_stats.invalidate(stats_type, stats_id) -- cgit 1.5.1 From 064143c1308cf6354554702d5041ec4bd3ac8ff8 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 14:59:39 +0100 Subject: Use `DeferredLock` instead of `threading.Lock` Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/handlers/stats.py | 5 +++-- synapse/storage/stats.py | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index 2f7c108181..8e1bf8b5d5 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -66,12 +66,13 @@ class StatsHandler(StateDeltasHandler): @defer.inlineCallbacks def process(): + yield lock.acquire() try: yield self._unsafe_process() finally: - lock.release() + yield lock.release() - if lock.acquire(blocking=False): + if not lock.locked: # we only want to run this process one-at-a-time, # and also, if the initial background updater wants us to keep out, # we should respect that. diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index ede5002fca..c9687c29d2 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -16,7 +16,8 @@ import logging from itertools import chain -from threading import Lock + +from twisted.internet.defer import DeferredLock from synapse.storage.state_deltas import StateDeltasStore from synapse.util.caches.descriptors import cached @@ -57,7 +58,7 @@ class StatsStore(StateDeltasStore): self.stats_enabled = hs.config.stats_enabled self.stats_bucket_size = hs.config.stats_bucket_size - self.stats_delta_processing_lock = Lock() + self.stats_delta_processing_lock = DeferredLock() self.register_noop_background_update("populate_stats_createtables") self.register_noop_background_update("populate_stats_process_rooms") -- cgit 1.5.1