From e4cbea6c46afe6c45e0ee0604eaf536da70cb9f3 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 20 Aug 2019 14:24:35 +0100 Subject: Handle state deltas and turn them into stats deltas Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/handlers/stats.py | 329 +++++++++++++++++++++++++++++++++++++++++++++- synapse/storage/stats.py | 112 +++++++++++++++- 2 files changed, 439 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index 7b1d1b4203..156adfd310 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -15,7 +15,14 @@ import logging +from twisted.internet import defer + +from synapse.api.constants import EventTypes, JoinRules, Membership from synapse.handlers.state_deltas import StateDeltasHandler +from synapse.metrics import event_processing_positions +from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.types import UserID +from synapse.util.metrics import Measure logger = logging.getLogger(__name__) @@ -52,4 +59,324 @@ class StatsHandler(StateDeltasHandler): def notify_new_event(self): """Called when there may be more deltas to process """ - pass + if not self.hs.config.stats_enabled: + return + + lock = self.store.stats_delta_processing_lock + + @defer.inlineCallbacks + def process(): + try: + yield self._unsafe_process() + finally: + lock.release() + + if lock.acquire(blocking=False): + # we only want to run this process one-at-a-time, + # and also, if the initial background updater wants us to keep out, + # we should respect that. + try: + run_as_background_process("stats.notify_new_event", process) + except: # noqa: E722 – re-raised so fine + lock.release() + raise + + @defer.inlineCallbacks + def _unsafe_process(self): + # If self.pos is None then means we haven't fetched it from DB + if self.pos is None or None in self.pos.values(): + self.pos = yield self.store.get_stats_positions() + + # If still None then the initial background update hasn't started yet + if self.pos is None or None in self.pos.values(): + return None + + # Loop round handling deltas until we're up to date + with Measure(self.clock, "stats_delta"): + while True: + deltas = yield self.store.get_current_state_deltas( + self.pos["state_delta_stream_id"] + ) + if not deltas: + break + + logger.debug("Handling %d state deltas", len(deltas)) + yield self._handle_deltas(deltas) + + self.pos["state_delta_stream_id"] = deltas[-1]["stream_id"] + + event_processing_positions.labels("stats").set( + self.pos["state_delta_stream_id"] + ) + + if self.pos is not None: + yield self.store.update_stats_positions(self.pos) + + @defer.inlineCallbacks + def _handle_deltas(self, deltas): + """ + Called with the state deltas to process + """ + for delta in deltas: + typ = delta["type"] + state_key = delta["state_key"] + room_id = delta["room_id"] + event_id = delta["event_id"] + stream_id = delta["stream_id"] + prev_event_id = delta["prev_event_id"] + stream_pos = delta["stream_id"] + + logger.debug("Handling: %r %r, %s", typ, state_key, event_id) + + token = yield self.store.get_earliest_token_for_stats("room", room_id) + + # If the earliest token to begin from is larger than our current + # stream ID, skip processing this delta. + if token is not None and token >= stream_id: + logger.debug( + "Ignoring: %s as earlier than this room's initial ingestion event", + event_id, + ) + continue + + if event_id is None and prev_event_id is None: + # Errr... + continue + + event_content = {} + + if event_id is not None: + event = yield self.store.get_event(event_id, allow_none=True) + if event: + event_content = event.content or {} + + # We use stream_pos here rather than fetch by event_id as event_id + # may be None + now = yield self.store.get_received_ts_by_stream_pos(stream_pos) + now = int(now) // 1000 + + room_stats_delta = {} + room_stats_complete = False + + if prev_event_id is None: + # this state event doesn't overwrite another, + # so it is a new effective/current state event + room_stats_delta["current_state_events"] = ( + room_stats_delta.get("current_state_events", 0) + 1 + ) + + if typ == EventTypes.Member: + # we could use _get_key_change here but it's a bit inefficient + # given we're not testing for a specific result; might as well + # just grab the prev_membership and membership strings and + # compare them. + # We take None rather than leave as a previous membership + # in the absence of a previous event because we do not want to + # reduce the leave count when a new-to-the-room user joins. + prev_membership = None + if prev_event_id is not None: + prev_event = yield self.store.get_event( + prev_event_id, allow_none=True + ) + if prev_event: + prev_event_content = prev_event.content + prev_membership = prev_event_content.get( + "membership", Membership.LEAVE + ) + + membership = event_content.get("membership", Membership.LEAVE) + + if prev_membership is None: + logger.debug("No previous membership for this user.") + elif prev_membership == Membership.JOIN: + room_stats_delta["joined_members"] = ( + room_stats_delta.get("joined_members", 0) - 1 + ) + elif prev_membership == Membership.INVITE: + room_stats_delta["invited_members"] = ( + room_stats_delta.get("invited_members", 0) - 1 + ) + elif prev_membership == Membership.LEAVE: + room_stats_delta["left_members"] = ( + room_stats_delta.get("left_members", 0) - 1 + ) + elif prev_membership == Membership.BAN: + room_stats_delta["banned_members"] = ( + room_stats_delta.get("banned_members", 0) - 1 + ) + else: + err = "%s is not a valid prev_membership" % (repr(prev_membership),) + logger.error(err) + raise ValueError(err) + + if membership == Membership.JOIN: + room_stats_delta["joined_members"] = ( + room_stats_delta.get("joined_members", 0) + 1 + ) + elif membership == Membership.INVITE: + room_stats_delta["invited_members"] = ( + room_stats_delta.get("invited_members", 0) + 1 + ) + elif membership == Membership.LEAVE: + room_stats_delta["left_members"] = ( + room_stats_delta.get("left_members", 0) + 1 + ) + elif membership == Membership.BAN: + room_stats_delta["banned_members"] = ( + room_stats_delta.get("banned_members", 0) + 1 + ) + else: + err = "%s is not a valid membership" % (repr(membership),) + logger.error(err) + raise ValueError(err) + + user_id = state_key + if self.is_mine_id(user_id) and membership in ( + Membership.JOIN, + Membership.LEAVE, + ): + # update user_stats as it's one of our users + public = yield self._is_public_room(room_id) + + field = "public_rooms" if public else "private_rooms" + delta = +1 if membership == Membership.JOIN else -1 + + yield self.store.update_stats_delta( + now, "user", user_id, {field: delta} + ) + + elif typ == EventTypes.Create: + # Newly created room. Add it with all blank portions. + yield self.store.update_room_state( + room_id, + { + "join_rules": None, + "history_visibility": None, + "encryption": None, + "name": None, + "topic": None, + "avatar": None, + "canonical_alias": None, + }, + ) + + room_stats_complete = True + + elif typ == EventTypes.JoinRules: + old_room_state = yield self.store.get_room_state(room_id) + yield self.store.update_room_state( + room_id, {"join_rules": event_content.get("join_rule")} + ) + + # whether the room would be public anyway, + # because of history_visibility + other_field_gives_publicity = ( + old_room_state["history_visibility"] == "world_readable" + ) + + if not other_field_gives_publicity: + is_public = yield self._get_key_change( + prev_event_id, event_id, "join_rule", JoinRules.PUBLIC + ) + if is_public is not None: + yield self.update_public_room_stats(now, room_id, is_public) + + elif typ == EventTypes.RoomHistoryVisibility: + old_room_state = yield self.store.get_room_state(room_id) + yield self.store.update_room_state( + room_id, + {"history_visibility": event_content.get("history_visibility")}, + ) + + # whether the room would be public anyway, + # because of join_rule + other_field_gives_publicity = ( + old_room_state["join_rules"] == JoinRules.PUBLIC + ) + + if not other_field_gives_publicity: + is_public = yield self._get_key_change( + prev_event_id, event_id, "history_visibility", "world_readable" + ) + if is_public is not None: + yield self.update_public_room_stats(now, room_id, is_public) + + elif typ == EventTypes.Encryption: + yield self.store.update_room_state( + room_id, {"encryption": event_content.get("algorithm")} + ) + elif typ == EventTypes.Name: + yield self.store.update_room_state( + room_id, {"name": event_content.get("name")} + ) + elif typ == EventTypes.Topic: + yield self.store.update_room_state( + room_id, {"topic": event_content.get("topic")} + ) + elif typ == EventTypes.RoomAvatar: + yield self.store.update_room_state( + room_id, {"avatar": event_content.get("url")} + ) + elif typ == EventTypes.CanonicalAlias: + yield self.store.update_room_state( + room_id, {"canonical_alias": event_content.get("alias")} + ) + + if room_stats_complete: + yield self.store.update_stats_delta( + now, + "room", + room_id, + room_stats_delta, + complete_with_stream_id=stream_id, + ) + + elif len(room_stats_delta) > 0: + yield self.store.update_stats_delta( + now, "room", room_id, room_stats_delta + ) + + @defer.inlineCallbacks + def update_public_room_stats(self, ts, room_id, is_public): + """ + Increment/decrement a user's number of public rooms when a room they are + in changes to/from public visibility. + + Args: + ts (int): Timestamp in seconds + room_id (str) + is_public (bool) + """ + # For now, blindly iterate over all local users in the room so that + # we can handle the whole problem of copying buckets over as needed + user_ids = yield self.store.get_users_in_room(room_id) + + for user_id in user_ids: + if self.hs.is_mine(UserID.from_string(user_id)): + yield self.store.update_stats_delta( + ts, + "user", + user_id, + { + "public_rooms": +1 if is_public else -1, + "private_rooms": -1 if is_public else +1, + }, + ) + + @defer.inlineCallbacks + def _is_public_room(self, room_id): + join_rules = yield self.state.get_current_state(room_id, EventTypes.JoinRules) + history_visibility = yield self.state.get_current_state( + room_id, EventTypes.RoomHistoryVisibility + ) + + if (join_rules and join_rules.content.get("join_rule") == JoinRules.PUBLIC) or ( + ( + history_visibility + and history_visibility.content.get("history_visibility") + == "world_readable" + ) + ): + return True + else: + return False diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index e8b1ce240b..4112291c76 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -15,10 +15,13 @@ # limitations under the License. import logging +from threading import Lock -from synapse.storage.state_deltas import StateDeltasStore from twisted.internet import defer +from synapse.storage.state_deltas import StateDeltasStore +from synapse.util.caches.descriptors import cached + logger = logging.getLogger(__name__) # these fields track absolutes (e.g. total number of rooms on the server) @@ -55,6 +58,8 @@ class StatsStore(StateDeltasStore): self.stats_enabled = hs.config.stats_enabled self.stats_bucket_size = hs.config.stats_bucket_size + self.stats_delta_processing_lock = Lock() + self.register_noop_background_update("populate_stats_createtables") self.register_noop_background_update("populate_stats_process_rooms") self.register_noop_background_update("populate_stats_cleanup") @@ -74,6 +79,91 @@ class StatsStore(StateDeltasStore): """ return (ts // self.stats_bucket_size) * self.stats_bucket_size + def get_stats_positions(self, for_initial_processor=False): + """ + Returns the stats processor positions. + + Args: + for_initial_processor (bool, optional): If true, returns the position + promised by the latest stats regeneration, rather than the current + incremental processor's position. + Otherwise (if false), return the incremental processor's position. + + Returns (dict): + Dict containing :- + state_delta_stream_id: stream_id of last-processed state delta + total_events_min_stream_ordering: stream_ordering of latest-processed + backfilled event, in the context of total_events counting. + total_events_max_stream_ordering: stream_ordering of latest-processed + non-backfilled event, in the context of total_events counting. + """ + return self._simple_select_one( + table="stats_incremental_position", + keyvalues={"is_background_contract": for_initial_processor}, + retcols=( + "state_delta_stream_id", + "total_events_min_stream_ordering", + "total_events_max_stream_ordering", + ), + desc="stats_incremental_position", + ) + + def _get_stats_positions_txn(self, txn, for_initial_processor=False): + """ + See L{get_stats_positions}. + + Args: + txn (cursor): Database cursor + """ + return self._simple_select_one_txn( + txn=txn, + table="stats_incremental_position", + keyvalues={"is_background_contract": for_initial_processor}, + retcols=( + "state_delta_stream_id", + "total_events_min_stream_ordering", + "total_events_max_stream_ordering", + ), + ) + + def update_stats_positions(self, positions, for_initial_processor=False): + """ + Updates the stats processor positions. + + Args: + positions: See L{get_stats_positions} + for_initial_processor: See L{get_stats_positions} + """ + if positions is None: + positions = { + "state_delta_stream_id": None, + "total_events_min_stream_ordering": None, + "total_events_max_stream_ordering": None, + } + return self._simple_update_one( + table="stats_incremental_position", + keyvalues={"is_background_contract": for_initial_processor}, + updatevalues=positions, + desc="update_stats_incremental_position", + ) + + def _update_stats_positions_txn(self, txn, positions, for_initial_processor=False): + """ + See L{update_stats_positions} + """ + if positions is None: + positions = { + "state_delta_stream_id": None, + "total_events_min_stream_ordering": None, + "total_events_max_stream_ordering": None, + } + return self._simple_update_one_txn( + txn, + table="stats_incremental_position", + keyvalues={"is_background_contract": for_initial_processor}, + updatevalues=positions, + ) + def update_room_state(self, room_id, fields): """ Args: @@ -103,6 +193,26 @@ class StatsStore(StateDeltasStore): desc="update_room_state", ) + @cached() + def get_earliest_token_for_stats(self, stats_type, id): + """ + Fetch the "earliest token". This is used by the room stats delta + processor to ignore deltas that have been processed between the + start of the background task and any particular room's stats + being calculated. + + Returns: + Deferred[int] + """ + table, id_col = TYPE_TO_TABLE[stats_type] + + return self._simple_select_one_onecol( + "%s_current" % (table,), + {id_col: id}, + retcol="completed_delta_stream_id", + allow_none=True, + ) + @defer.inlineCallbacks def update_stats_delta( self, ts, stats_type, stats_id, fields, complete_with_stream_id=None -- cgit 1.4.1 From 3b09a37682140a671106abac36fe8a1c8686a512 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 14:10:39 +0100 Subject: Adapt to stats now working in milliseconds Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/handlers/stats.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index 156adfd310..3dbc0a2434 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -153,7 +153,7 @@ class StatsHandler(StateDeltasHandler): # We use stream_pos here rather than fetch by event_id as event_id # may be None now = yield self.store.get_received_ts_by_stream_pos(stream_pos) - now = int(now) // 1000 + now = int(now) room_stats_delta = {} room_stats_complete = False -- cgit 1.4.1 From 99c88ac84eb394104f06ff42dde891cabf00466c Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 14:14:17 +0100 Subject: No-op if no membership change and thus simplify verbose dict updates. Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/handlers/stats.py | 42 ++++++++++++++---------------------------- 1 file changed, 14 insertions(+), 28 deletions(-) diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index 3dbc0a2434..b3d59fde05 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -161,9 +161,7 @@ class StatsHandler(StateDeltasHandler): if prev_event_id is None: # this state event doesn't overwrite another, # so it is a new effective/current state event - room_stats_delta["current_state_events"] = ( - room_stats_delta.get("current_state_events", 0) + 1 - ) + room_stats_delta["current_state_events"] = 1 if typ == EventTypes.Member: # we could use _get_key_change here but it's a bit inefficient @@ -188,43 +186,31 @@ class StatsHandler(StateDeltasHandler): if prev_membership is None: logger.debug("No previous membership for this user.") + elif membership == prev_membership: + pass # noop elif prev_membership == Membership.JOIN: - room_stats_delta["joined_members"] = ( - room_stats_delta.get("joined_members", 0) - 1 - ) + room_stats_delta["joined_members"] = -1 elif prev_membership == Membership.INVITE: - room_stats_delta["invited_members"] = ( - room_stats_delta.get("invited_members", 0) - 1 - ) + room_stats_delta["invited_members"] = -1 elif prev_membership == Membership.LEAVE: - room_stats_delta["left_members"] = ( - room_stats_delta.get("left_members", 0) - 1 - ) + room_stats_delta["left_members"] = -1 elif prev_membership == Membership.BAN: - room_stats_delta["banned_members"] = ( - room_stats_delta.get("banned_members", 0) - 1 - ) + room_stats_delta["banned_members"] = -1 else: err = "%s is not a valid prev_membership" % (repr(prev_membership),) logger.error(err) raise ValueError(err) + if membership == prev_membership: + pass # noop if membership == Membership.JOIN: - room_stats_delta["joined_members"] = ( - room_stats_delta.get("joined_members", 0) + 1 - ) + room_stats_delta["joined_members"] = +1 elif membership == Membership.INVITE: - room_stats_delta["invited_members"] = ( - room_stats_delta.get("invited_members", 0) + 1 - ) + room_stats_delta["invited_members"] = +1 elif membership == Membership.LEAVE: - room_stats_delta["left_members"] = ( - room_stats_delta.get("left_members", 0) + 1 - ) + room_stats_delta["left_members"] = +1 elif membership == Membership.BAN: - room_stats_delta["banned_members"] = ( - room_stats_delta.get("banned_members", 0) + 1 - ) + room_stats_delta["banned_members"] = +1 else: err = "%s is not a valid membership" % (repr(membership),) logger.error(err) @@ -234,7 +220,7 @@ class StatsHandler(StateDeltasHandler): if self.is_mine_id(user_id) and membership in ( Membership.JOIN, Membership.LEAVE, - ): + ) and prev_membership != membership: # update user_stats as it's one of our users public = yield self._is_public_room(room_id) -- cgit 1.4.1 From dd8e6020d866008ca445a56e1662249066bafee8 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 14:17:04 +0100 Subject: For user stats, handle other membership transitions properly. Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/handlers/stats.py | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index b3d59fde05..f065d88a7d 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -217,20 +217,24 @@ class StatsHandler(StateDeltasHandler): raise ValueError(err) user_id = state_key - if self.is_mine_id(user_id) and membership in ( - Membership.JOIN, - Membership.LEAVE, - ) and prev_membership != membership: - # update user_stats as it's one of our users - public = yield self._is_public_room(room_id) - - field = "public_rooms" if public else "private_rooms" - delta = +1 if membership == Membership.JOIN else -1 - - yield self.store.update_stats_delta( - now, "user", user_id, {field: delta} + if self.is_mine_id(user_id): + # this accounts for transitions like leave → ban and so on. + has_changed_joinedness = ( + (prev_membership == Membership.JOIN) != + (membership == Membership.JOIN) ) + if has_changed_joinedness: + # update user_stats as it's one of our users + public = yield self._is_public_room(room_id) + + field = "public_rooms" if public else "private_rooms" + delta = +1 if membership == Membership.JOIN else -1 + + yield self.store.update_stats_delta( + now, "user", user_id, {field: delta} + ) + elif typ == EventTypes.Create: # Newly created room. Add it with all blank portions. yield self.store.update_room_state( -- cgit 1.4.1 From 07c267c51676fe1df80993da6700f35e69fe6761 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 14:34:05 +0100 Subject: For user stats, handle other membership transitions properly. Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/handlers/stats.py | 5 ++--- synapse/storage/stats.py | 5 +---- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index f065d88a7d..2f7c108181 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -219,9 +219,8 @@ class StatsHandler(StateDeltasHandler): user_id = state_key if self.is_mine_id(user_id): # this accounts for transitions like leave → ban and so on. - has_changed_joinedness = ( - (prev_membership == Membership.JOIN) != - (membership == Membership.JOIN) + has_changed_joinedness = (prev_membership == Membership.JOIN) != ( + membership == Membership.JOIN ) if has_changed_joinedness: diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 6832ec6b7f..f86e9bd269 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -15,12 +15,9 @@ # limitations under the License. import logging -from threading import Lock - -from twisted.internet import defer from itertools import chain +from threading import Lock -from synapse.storage.state_deltas import StateDeltasStore from synapse.storage.state_deltas import StateDeltasStore from synapse.util.caches.descriptors import cached -- cgit 1.4.1 From 44d3c2e80b03bf7168ef23f0a3f080013d8800b0 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 14:34:20 +0100 Subject: Invalidate `get_earliest_token_for_stats` cache as required. Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 1 + 1 file changed, 1 insertion(+) diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index f86e9bd269..d345b2cb32 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -433,6 +433,7 @@ class StatsStore(StateDeltasStore): elif complete_with_stream_id is not None: absolute_fields = absolute_fields.copy() absolute_fields["completed_delta_stream_id"] = complete_with_stream_id + self.get_earliest_token_for_stats.invalidate(stats_type, stats_id) # first upsert the `_current` table self._upsert_with_additive_relatives_txn( -- cgit 1.4.1 From 10c1a233f91bd3fcc505e5521cadfc6ed8daa301 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 14:49:12 +0100 Subject: Fix logic error. `absolute_fields` being None shouldn't preclude completion of a current stats row. Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index d345b2cb32..ede5002fca 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -430,7 +430,8 @@ class StatsStore(StateDeltasStore): if absolute_fields is None: absolute_fields = {} - elif complete_with_stream_id is not None: + + if complete_with_stream_id is not None: absolute_fields = absolute_fields.copy() absolute_fields["completed_delta_stream_id"] = complete_with_stream_id self.get_earliest_token_for_stats.invalidate(stats_type, stats_id) -- cgit 1.4.1 From 064143c1308cf6354554702d5041ec4bd3ac8ff8 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 14:59:39 +0100 Subject: Use `DeferredLock` instead of `threading.Lock` Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/handlers/stats.py | 5 +++-- synapse/storage/stats.py | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index 2f7c108181..8e1bf8b5d5 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -66,12 +66,13 @@ class StatsHandler(StateDeltasHandler): @defer.inlineCallbacks def process(): + yield lock.acquire() try: yield self._unsafe_process() finally: - lock.release() + yield lock.release() - if lock.acquire(blocking=False): + if not lock.locked: # we only want to run this process one-at-a-time, # and also, if the initial background updater wants us to keep out, # we should respect that. diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index ede5002fca..c9687c29d2 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -16,7 +16,8 @@ import logging from itertools import chain -from threading import Lock + +from twisted.internet.defer import DeferredLock from synapse.storage.state_deltas import StateDeltasStore from synapse.util.caches.descriptors import cached @@ -57,7 +58,7 @@ class StatsStore(StateDeltasStore): self.stats_enabled = hs.config.stats_enabled self.stats_bucket_size = hs.config.stats_bucket_size - self.stats_delta_processing_lock = Lock() + self.stats_delta_processing_lock = DeferredLock() self.register_noop_background_update("populate_stats_createtables") self.register_noop_background_update("populate_stats_process_rooms") -- cgit 1.4.1 From 81aa6d53b08f9f9b37a20378ffa4bb1dfa33c4be Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Wed, 28 Aug 2019 10:54:22 +0100 Subject: Address code review comments Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/handlers/stats.py | 54 ++++++++++++++++++++++++++--------------------- 1 file changed, 30 insertions(+), 24 deletions(-) diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index 8e1bf8b5d5..7536e1a54c 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -76,20 +76,18 @@ class StatsHandler(StateDeltasHandler): # we only want to run this process one-at-a-time, # and also, if the initial background updater wants us to keep out, # we should respect that. - try: - run_as_background_process("stats.notify_new_event", process) - except: # noqa: E722 – re-raised so fine - lock.release() - raise + run_as_background_process("stats.notify_new_event", process) @defer.inlineCallbacks def _unsafe_process(self): # If self.pos is None then means we haven't fetched it from DB + # If None is one of the values, then means that the stats regenerator has not (or had not) yet unwedged us + # but note that this might be outdated, so we retrieve the positions again. if self.pos is None or None in self.pos.values(): self.pos = yield self.store.get_stats_positions() - # If still None then the initial background update hasn't started yet - if self.pos is None or None in self.pos.values(): + # If still contains a None position, then the stats regenerator hasn't started yet + if None in self.pos.values(): return None # Loop round handling deltas until we're up to date @@ -141,7 +139,10 @@ class StatsHandler(StateDeltasHandler): continue if event_id is None and prev_event_id is None: - # Errr... + logger.error( + "event ID is None and so is the previous event ID. stream_id: %s", + stream_id, + ) continue event_content = {} @@ -153,11 +154,14 @@ class StatsHandler(StateDeltasHandler): # We use stream_pos here rather than fetch by event_id as event_id # may be None - now = yield self.store.get_received_ts_by_stream_pos(stream_pos) - now = int(now) + stream_timestamp = yield self.store.get_received_ts_by_stream_pos( + stream_pos + ) + stream_timestamp = int(stream_timestamp) + # All the values in this dict are deltas (RELATIVE changes) room_stats_delta = {} - room_stats_complete = False + is_newly_created = False if prev_event_id is None: # this state event doesn't overwrite another, @@ -198,9 +202,9 @@ class StatsHandler(StateDeltasHandler): elif prev_membership == Membership.BAN: room_stats_delta["banned_members"] = -1 else: - err = "%s is not a valid prev_membership" % (repr(prev_membership),) - logger.error(err) - raise ValueError(err) + raise ValueError( + "%r is not a valid prev_membership" % (prev_membership,) + ) if membership == prev_membership: pass # noop @@ -213,9 +217,7 @@ class StatsHandler(StateDeltasHandler): elif membership == Membership.BAN: room_stats_delta["banned_members"] = +1 else: - err = "%s is not a valid membership" % (repr(membership),) - logger.error(err) - raise ValueError(err) + raise ValueError("%r is not a valid membership" % (membership,)) user_id = state_key if self.is_mine_id(user_id): @@ -232,7 +234,7 @@ class StatsHandler(StateDeltasHandler): delta = +1 if membership == Membership.JOIN else -1 yield self.store.update_stats_delta( - now, "user", user_id, {field: delta} + stream_timestamp, "user", user_id, {field: delta} ) elif typ == EventTypes.Create: @@ -250,7 +252,7 @@ class StatsHandler(StateDeltasHandler): }, ) - room_stats_complete = True + is_newly_created = True elif typ == EventTypes.JoinRules: old_room_state = yield self.store.get_room_state(room_id) @@ -269,7 +271,9 @@ class StatsHandler(StateDeltasHandler): prev_event_id, event_id, "join_rule", JoinRules.PUBLIC ) if is_public is not None: - yield self.update_public_room_stats(now, room_id, is_public) + yield self.update_public_room_stats( + stream_timestamp, room_id, is_public + ) elif typ == EventTypes.RoomHistoryVisibility: old_room_state = yield self.store.get_room_state(room_id) @@ -289,7 +293,9 @@ class StatsHandler(StateDeltasHandler): prev_event_id, event_id, "history_visibility", "world_readable" ) if is_public is not None: - yield self.update_public_room_stats(now, room_id, is_public) + yield self.update_public_room_stats( + stream_timestamp, room_id, is_public + ) elif typ == EventTypes.Encryption: yield self.store.update_room_state( @@ -312,9 +318,9 @@ class StatsHandler(StateDeltasHandler): room_id, {"canonical_alias": event_content.get("alias")} ) - if room_stats_complete: + if is_newly_created: yield self.store.update_stats_delta( - now, + stream_timestamp, "room", room_id, room_stats_delta, @@ -323,7 +329,7 @@ class StatsHandler(StateDeltasHandler): elif len(room_stats_delta) > 0: yield self.store.update_stats_delta( - now, "room", room_id, room_stats_delta + stream_timestamp, "room", room_id, room_stats_delta ) @defer.inlineCallbacks -- cgit 1.4.1