diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index 7536e1a54c..f44adfc07b 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -91,25 +91,31 @@ class StatsHandler(StateDeltasHandler):
return None
# Loop round handling deltas until we're up to date
- with Measure(self.clock, "stats_delta"):
- while True:
+
+ while True:
+ with Measure(self.clock, "stats_delta"):
deltas = yield self.store.get_current_state_deltas(
self.pos["state_delta_stream_id"]
)
- if not deltas:
- break
logger.debug("Handling %d state deltas", len(deltas))
yield self._handle_deltas(deltas)
self.pos["state_delta_stream_id"] = deltas[-1]["stream_id"]
+ yield self.store.update_stats_positions(self.pos)
event_processing_positions.labels("stats").set(
self.pos["state_delta_stream_id"]
)
- if self.pos is not None:
- yield self.store.update_stats_positions(self.pos)
+ # Then count deltas for total_events and total_event_bytes.
+ with Measure(self.clock, "stats_total_events_and_bytes"):
+ self.pos, had_counts = yield self.store.incremental_update_room_total_events_and_bytes(
+ self.pos
+ )
+
+ if not deltas and not had_counts:
+ break
@defer.inlineCallbacks
def _handle_deltas(self, deltas):
diff --git a/synapse/storage/schema/delta/56/stats_separated1.sql b/synapse/storage/schema/delta/56/stats_separated1.sql
index 52fb09c0e6..6d4648c0d7 100644
--- a/synapse/storage/schema/delta/56/stats_separated1.sql
+++ b/synapse/storage/schema/delta/56/stats_separated1.sql
@@ -69,6 +69,7 @@ CREATE TABLE IF NOT EXISTS room_stats_current (
current_state_events INT NOT NULL,
total_events INT NOT NULL,
+ total_event_bytes BIGINT NOT NULL,
joined_members INT NOT NULL,
invited_members INT NOT NULL,
left_members INT NOT NULL,
@@ -91,6 +92,7 @@ CREATE TABLE IF NOT EXISTS room_stats_historical (
current_state_events INT NOT NULL,
total_events INT NOT NULL,
+ total_event_bytes BIGINT NOT NULL,
joined_members INT NOT NULL,
invited_members INT NOT NULL,
left_members INT NOT NULL,
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index 3df57b52ea..9d6c3027d5 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -19,6 +19,7 @@ from itertools import chain
from twisted.internet.defer import DeferredLock
+from synapse.storage import PostgresEngine
from synapse.storage.state_deltas import StateDeltasStore
from synapse.util.caches.descriptors import cached
@@ -36,6 +37,7 @@ ABSOLUTE_STATS_FIELDS = {
"left_members",
"banned_members",
"total_events",
+ "total_event_bytes",
),
"user": ("public_rooms", "private_rooms"),
}
@@ -475,3 +477,113 @@ class StatsStore(StateDeltasStore):
for (key, val) in additive_relatives.items():
src_row[key] = dest_current_row[key] + val
self._simple_update_txn(txn, into_table, keyvalues, src_row)
+
+ def incremental_update_room_total_events_and_bytes(self, in_positions):
+ """
+ Counts the number of events and total event bytes per-room and then adds
+ these to the respective total_events and total_event_bytes room counts.
+
+ Args:
+ in_positions (dict): Positions,
+ as retrieved from L{get_stats_positions}.
+
+ Returns (Deferred[tuple[dict, bool]]):
+ First element (dict):
+ The new positions. Note that this is for reference only –
+ the new positions WILL be committed by this function.
+ Second element (bool):
+ true iff there was a change to the positions, false otherwise
+ """
+
+ def incremental_update_total_events_and_bytes_txn(txn):
+ positions = in_positions.copy()
+
+ max_pos = self.get_room_max_stream_ordering()
+ min_pos = self.get_room_min_stream_ordering()
+ self.update_total_event_and_bytes_count_between_txn(
+ txn,
+ low_pos=positions["total_events_max_stream_ordering"],
+ high_pos=max_pos,
+ )
+
+ self.update_total_event_and_bytes_count_between_txn(
+ txn,
+ low_pos=min_pos,
+ high_pos=positions["total_events_min_stream_ordering"],
+ )
+
+ if (
+ positions["total_events_max_stream_ordering"] != max_pos
+ or positions["total_events_min_stream_ordering"] != min_pos
+ ):
+ positions["total_events_max_stream_ordering"] = max_pos
+ positions["total_events_min_stream_ordering"] = min_pos
+
+ self._update_stats_positions_txn(txn, positions)
+
+ return positions, True
+ else:
+ return positions, False
+
+ return self.runInteraction(
+ "stats_incremental_total_events_and_bytes",
+ incremental_update_total_events_and_bytes_txn,
+ )
+
+ def update_total_event_and_bytes_count_between_txn(self, txn, low_pos, high_pos):
+ """
+ Updates the total_events and total_event_bytes counts for rooms,
+ in a range of stream_orderings.
+
+ Inclusivity of low_pos and high_pos is dependent upon their signs.
+ This makes it intuitive to use this function for both backfilled
+ and non-backfilled events.
+
+ Examples:
+ (low, high) → (kind)
+ (3, 7) → 3 < … <= 7 (normal-filled; low already processed before)
+ (-4, -2) → -4 <= … < -2 (backfilled; high already processed before)
+ (-7, 7) → -7 <= … <= 7 (both)
+
+ Args:
+ txn: Database transaction.
+ low_pos: Low stream ordering
+ high_pos: High stream ordering
+ """
+
+ if low_pos >= high_pos:
+ # nothing to do here.
+ return
+
+ now = self.hs.clock.time_msec()
+
+ # we choose comparators based on the signs
+ low_comparator = "<=" if low_pos < 0 else "<"
+ high_comparator = "<" if high_pos < 0 else "<="
+
+ if isinstance(self.database_engine, PostgresEngine):
+ new_bytes_expression = "OCTET_LENGTH(json)"
+ else:
+ new_bytes_expression = "LENGTH(CAST(json AS BLOB))"
+
+ sql = """
+ SELECT room_id, COUNT(*) AS new_events, SUM(%s) AS new_bytes
+ FROM events INNER JOIN event_json USING (event_id)
+ WHERE ? %s stream_ordering AND stream_ordering %s ?
+ GROUP BY room_id
+ """ % (
+ low_comparator,
+ high_comparator,
+ new_bytes_expression,
+ )
+
+ txn.execute(sql, (low_pos, high_pos))
+
+ for room_id, new_events, new_bytes in txn.fetchall():
+ self._update_stats_delta_txn(
+ txn,
+ now,
+ "room",
+ room_id,
+ {"total_events": new_events, "total_event_bytes": new_bytes},
+ )
|