diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index 8c99a125a9..8cfa694d31 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -19,6 +19,7 @@ from itertools import chain
from twisted.internet.defer import DeferredLock
+from synapse.storage import PostgresEngine
from synapse.storage.state_deltas import StateDeltasStore
from synapse.util.caches.descriptors import cached
@@ -476,3 +477,108 @@ class StatsStore(StateDeltasStore):
for (key, val) in additive_relatives.items():
src_row[key] = dest_current_row[key] + val
self._simple_update_txn(txn, into_table, keyvalues, src_row)
+
+ def incremental_update_room_total_events_and_bytes(self, in_positions):
+ """
+ Counts the number of events and total event bytes per-room and then adds
+ these to the respective total_events and total_event_bytes room counts.
+
+ Args:
+ in_positions (dict): Positions,
+ as retrieved from L{get_stats_positions}.
+
+ Returns (dict):
+ The new positions. Note that this is for reference only –
+ the new positions WILL be committed by this function.
+ """
+
+ def incremental_update_total_events_and_bytes_txn(txn):
+ positions = in_positions.copy()
+
+ max_pos = self.get_room_max_stream_ordering()
+ min_pos = self.get_room_min_stream_ordering()
+ self.update_total_event_and_bytes_count_between_txn(
+ txn,
+ low_pos=positions["total_events_max_stream_ordering"],
+ high_pos=max_pos,
+ )
+
+ self.update_total_event_and_bytes_count_between_txn(
+ txn,
+ low_pos=min_pos,
+ high_pos=positions["total_events_min_stream_ordering"],
+ )
+
+ if (
+ positions["total_events_max_stream_ordering"] != max_pos
+ or positions["total_events_min_stream_ordering"] != min_pos
+ ):
+ positions["total_events_max_stream_ordering"] = max_pos
+ positions["total_events_min_stream_ordering"] = min_pos
+
+ self._update_stats_positions_txn(txn, positions)
+
+ return positions
+
+ return self.runInteraction(
+ "stats_incremental_total_events_and_bytes",
+ incremental_update_total_events_and_bytes_txn,
+ )
+
+ def update_total_event_and_bytes_count_between_txn(self, txn, low_pos, high_pos):
+ """
+ Updates the total_events and total_event_bytes counts for rooms,
+ in a range of stream_orderings.
+
+ Inclusivity of low_pos and high_pos is dependent upon their signs.
+ This makes it intuitive to use this function for both backfilled
+ and non-backfilled events.
+
+ Examples:
+ (low, high) → (kind)
+ (3, 7) → 3 < … <= 7 (normal-filled; low already processed before)
+ (-4, -2) → -4 <= … < -2 (backfilled; high already processed before)
+ (-7, 7) → -7 <= … <= 7 (both)
+
+ Args:
+ txn: Database transaction.
+ low_pos: Low stream ordering
+ high_pos: High stream ordering
+ """
+
+ if low_pos >= high_pos:
+ # nothing to do here.
+ return
+
+ now = self.hs.clock.time_msec()
+
+ # we choose comparators based on the signs
+ low_comparator = "<=" if low_pos < 0 else "<"
+ high_comparator = "<" if high_pos < 0 else "<="
+
+ if isinstance(self.database_engine, PostgresEngine):
+ new_bytes_expression = "OCTET_LENGTH(json)"
+ else:
+ new_bytes_expression = "LENGTH(CAST(json AS BLOB))"
+
+ sql = """
+ SELECT room_id, COUNT(*) AS new_events, SUM(%s) AS new_bytes
+ FROM events INNER JOIN event_json USING (event_id)
+ WHERE ? %s stream_ordering AND stream_ordering %s ?
+ GROUP BY room_id
+ """ % (
+ low_comparator,
+ high_comparator,
+ new_bytes_expression,
+ )
+
+ txn.execute(sql, (low_pos, high_pos))
+
+ for room_id, new_events, new_bytes in txn.fetchall():
+ self._update_stats_delta_txn(
+ txn,
+ now,
+ "room",
+ room_id,
+ {"total_events": new_events, "total_event_bytes": new_bytes},
+ )
|