diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index e9b1646e0e..f44adfc07b 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -91,31 +91,31 @@ class StatsHandler(StateDeltasHandler):
return None
# Loop round handling deltas until we're up to date
- with Measure(self.clock, "stats_delta"):
- while True:
+
+ while True:
+ with Measure(self.clock, "stats_delta"):
deltas = yield self.store.get_current_state_deltas(
self.pos["state_delta_stream_id"]
)
- if not deltas:
- break
logger.debug("Handling %d state deltas", len(deltas))
yield self._handle_deltas(deltas)
self.pos["state_delta_stream_id"] = deltas[-1]["stream_id"]
+ yield self.store.update_stats_positions(self.pos)
event_processing_positions.labels("stats").set(
self.pos["state_delta_stream_id"]
)
- if self.pos is not None:
- yield self.store.update_stats_positions(self.pos)
+ # Then count deltas for total_events and total_event_bytes.
+ with Measure(self.clock, "stats_total_events_and_bytes"):
+ self.pos, had_counts = yield self.store.incremental_update_room_total_events_and_bytes(
+ self.pos
+ )
- # Then count deltas for total_events and total_event_bytes.
- with Measure(self.clock, "stats_total_events_and_bytes"):
- self.pos = yield self.store.incremental_update_total_events_and_bytes(
- self.pos
- )
+ if not deltas and not had_counts:
+ break
@defer.inlineCallbacks
def _handle_deltas(self, deltas):
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index 8cfa694d31..9d6c3027d5 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -487,9 +487,12 @@ class StatsStore(StateDeltasStore):
in_positions (dict): Positions,
as retrieved from L{get_stats_positions}.
- Returns (dict):
- The new positions. Note that this is for reference only –
- the new positions WILL be committed by this function.
+ Returns (Deferred[tuple[dict, bool]]):
+ First element (dict):
+ The new positions. Note that this is for reference only –
+ the new positions WILL be committed by this function.
+ Second element (bool):
+ true iff there was a change to the positions, false otherwise
"""
def incremental_update_total_events_and_bytes_txn(txn):
@@ -518,7 +521,9 @@ class StatsStore(StateDeltasStore):
self._update_stats_positions_txn(txn, positions)
- return positions
+ return positions, True
+ else:
+ return positions, False
return self.runInteraction(
"stats_incremental_total_events_and_bytes",
|