diff options
Diffstat (limited to 'synapse/handlers/stats.py')
-rw-r--r-- | synapse/handlers/stats.py | 54 |
1 files changed, 30 insertions, 24 deletions
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index 8e1bf8b5d5..7536e1a54c 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -76,20 +76,18 @@ class StatsHandler(StateDeltasHandler): # we only want to run this process one-at-a-time, # and also, if the initial background updater wants us to keep out, # we should respect that. - try: - run_as_background_process("stats.notify_new_event", process) - except: # noqa: E722 – re-raised so fine - lock.release() - raise + run_as_background_process("stats.notify_new_event", process) @defer.inlineCallbacks def _unsafe_process(self): # If self.pos is None then means we haven't fetched it from DB + # If None is one of the values, then means that the stats regenerator has not (or had not) yet unwedged us + # but note that this might be outdated, so we retrieve the positions again. if self.pos is None or None in self.pos.values(): self.pos = yield self.store.get_stats_positions() - # If still None then the initial background update hasn't started yet - if self.pos is None or None in self.pos.values(): + # If still contains a None position, then the stats regenerator hasn't started yet + if None in self.pos.values(): return None # Loop round handling deltas until we're up to date @@ -141,7 +139,10 @@ class StatsHandler(StateDeltasHandler): continue if event_id is None and prev_event_id is None: - # Errr... + logger.error( + "event ID is None and so is the previous event ID. stream_id: %s", + stream_id, + ) continue event_content = {} @@ -153,11 +154,14 @@ class StatsHandler(StateDeltasHandler): # We use stream_pos here rather than fetch by event_id as event_id # may be None - now = yield self.store.get_received_ts_by_stream_pos(stream_pos) - now = int(now) + stream_timestamp = yield self.store.get_received_ts_by_stream_pos( + stream_pos + ) + stream_timestamp = int(stream_timestamp) + # All the values in this dict are deltas (RELATIVE changes) room_stats_delta = {} - room_stats_complete = False + is_newly_created = False if prev_event_id is None: # this state event doesn't overwrite another, @@ -198,9 +202,9 @@ class StatsHandler(StateDeltasHandler): elif prev_membership == Membership.BAN: room_stats_delta["banned_members"] = -1 else: - err = "%s is not a valid prev_membership" % (repr(prev_membership),) - logger.error(err) - raise ValueError(err) + raise ValueError( + "%r is not a valid prev_membership" % (prev_membership,) + ) if membership == prev_membership: pass # noop @@ -213,9 +217,7 @@ class StatsHandler(StateDeltasHandler): elif membership == Membership.BAN: room_stats_delta["banned_members"] = +1 else: - err = "%s is not a valid membership" % (repr(membership),) - logger.error(err) - raise ValueError(err) + raise ValueError("%r is not a valid membership" % (membership,)) user_id = state_key if self.is_mine_id(user_id): @@ -232,7 +234,7 @@ class StatsHandler(StateDeltasHandler): delta = +1 if membership == Membership.JOIN else -1 yield self.store.update_stats_delta( - now, "user", user_id, {field: delta} + stream_timestamp, "user", user_id, {field: delta} ) elif typ == EventTypes.Create: @@ -250,7 +252,7 @@ class StatsHandler(StateDeltasHandler): }, ) - room_stats_complete = True + is_newly_created = True elif typ == EventTypes.JoinRules: old_room_state = yield self.store.get_room_state(room_id) @@ -269,7 +271,9 @@ class StatsHandler(StateDeltasHandler): prev_event_id, event_id, "join_rule", JoinRules.PUBLIC ) if is_public is not None: - yield self.update_public_room_stats(now, room_id, is_public) + yield self.update_public_room_stats( + stream_timestamp, room_id, is_public + ) elif typ == EventTypes.RoomHistoryVisibility: old_room_state = yield self.store.get_room_state(room_id) @@ -289,7 +293,9 @@ class StatsHandler(StateDeltasHandler): prev_event_id, event_id, "history_visibility", "world_readable" ) if is_public is not None: - yield self.update_public_room_stats(now, room_id, is_public) + yield self.update_public_room_stats( + stream_timestamp, room_id, is_public + ) elif typ == EventTypes.Encryption: yield self.store.update_room_state( @@ -312,9 +318,9 @@ class StatsHandler(StateDeltasHandler): room_id, {"canonical_alias": event_content.get("alias")} ) - if room_stats_complete: + if is_newly_created: yield self.store.update_stats_delta( - now, + stream_timestamp, "room", room_id, room_stats_delta, @@ -323,7 +329,7 @@ class StatsHandler(StateDeltasHandler): elif len(room_stats_delta) > 0: yield self.store.update_stats_delta( - now, "room", room_id, room_stats_delta + stream_timestamp, "room", room_id, room_stats_delta ) @defer.inlineCallbacks |