diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index 8e1bf8b5d5..7536e1a54c 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -76,20 +76,18 @@ class StatsHandler(StateDeltasHandler):
# we only want to run this process one-at-a-time,
# and also, if the initial background updater wants us to keep out,
# we should respect that.
- try:
- run_as_background_process("stats.notify_new_event", process)
- except: # noqa: E722 – re-raised so fine
- lock.release()
- raise
+ run_as_background_process("stats.notify_new_event", process)
@defer.inlineCallbacks
def _unsafe_process(self):
# If self.pos is None then means we haven't fetched it from DB
+ # If None is one of the values, then means that the stats regenerator has not (or had not) yet unwedged us
+ # but note that this might be outdated, so we retrieve the positions again.
if self.pos is None or None in self.pos.values():
self.pos = yield self.store.get_stats_positions()
- # If still None then the initial background update hasn't started yet
- if self.pos is None or None in self.pos.values():
+ # If still contains a None position, then the stats regenerator hasn't started yet
+ if None in self.pos.values():
return None
# Loop round handling deltas until we're up to date
@@ -141,7 +139,10 @@ class StatsHandler(StateDeltasHandler):
continue
if event_id is None and prev_event_id is None:
- # Errr...
+ logger.error(
+ "event ID is None and so is the previous event ID. stream_id: %s",
+ stream_id,
+ )
continue
event_content = {}
@@ -153,11 +154,14 @@ class StatsHandler(StateDeltasHandler):
# We use stream_pos here rather than fetch by event_id as event_id
# may be None
- now = yield self.store.get_received_ts_by_stream_pos(stream_pos)
- now = int(now)
+ stream_timestamp = yield self.store.get_received_ts_by_stream_pos(
+ stream_pos
+ )
+ stream_timestamp = int(stream_timestamp)
+ # All the values in this dict are deltas (RELATIVE changes)
room_stats_delta = {}
- room_stats_complete = False
+ is_newly_created = False
if prev_event_id is None:
# this state event doesn't overwrite another,
@@ -198,9 +202,9 @@ class StatsHandler(StateDeltasHandler):
elif prev_membership == Membership.BAN:
room_stats_delta["banned_members"] = -1
else:
- err = "%s is not a valid prev_membership" % (repr(prev_membership),)
- logger.error(err)
- raise ValueError(err)
+ raise ValueError(
+ "%r is not a valid prev_membership" % (prev_membership,)
+ )
if membership == prev_membership:
pass # noop
@@ -213,9 +217,7 @@ class StatsHandler(StateDeltasHandler):
elif membership == Membership.BAN:
room_stats_delta["banned_members"] = +1
else:
- err = "%s is not a valid membership" % (repr(membership),)
- logger.error(err)
- raise ValueError(err)
+ raise ValueError("%r is not a valid membership" % (membership,))
user_id = state_key
if self.is_mine_id(user_id):
@@ -232,7 +234,7 @@ class StatsHandler(StateDeltasHandler):
delta = +1 if membership == Membership.JOIN else -1
yield self.store.update_stats_delta(
- now, "user", user_id, {field: delta}
+ stream_timestamp, "user", user_id, {field: delta}
)
elif typ == EventTypes.Create:
@@ -250,7 +252,7 @@ class StatsHandler(StateDeltasHandler):
},
)
- room_stats_complete = True
+ is_newly_created = True
elif typ == EventTypes.JoinRules:
old_room_state = yield self.store.get_room_state(room_id)
@@ -269,7 +271,9 @@ class StatsHandler(StateDeltasHandler):
prev_event_id, event_id, "join_rule", JoinRules.PUBLIC
)
if is_public is not None:
- yield self.update_public_room_stats(now, room_id, is_public)
+ yield self.update_public_room_stats(
+ stream_timestamp, room_id, is_public
+ )
elif typ == EventTypes.RoomHistoryVisibility:
old_room_state = yield self.store.get_room_state(room_id)
@@ -289,7 +293,9 @@ class StatsHandler(StateDeltasHandler):
prev_event_id, event_id, "history_visibility", "world_readable"
)
if is_public is not None:
- yield self.update_public_room_stats(now, room_id, is_public)
+ yield self.update_public_room_stats(
+ stream_timestamp, room_id, is_public
+ )
elif typ == EventTypes.Encryption:
yield self.store.update_room_state(
@@ -312,9 +318,9 @@ class StatsHandler(StateDeltasHandler):
room_id, {"canonical_alias": event_content.get("alias")}
)
- if room_stats_complete:
+ if is_newly_created:
yield self.store.update_stats_delta(
- now,
+ stream_timestamp,
"room",
room_id,
room_stats_delta,
@@ -323,7 +329,7 @@ class StatsHandler(StateDeltasHandler):
elif len(room_stats_delta) > 0:
yield self.store.update_stats_delta(
- now, "room", room_id, room_stats_delta
+ stream_timestamp, "room", room_id, room_stats_delta
)
@defer.inlineCallbacks
|