summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/handlers/stats.py18
-rw-r--r--synapse/storage/schema/delta/56/stats_separated1.sql2
-rw-r--r--synapse/storage/stats.py112
3 files changed, 126 insertions, 6 deletions
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index 7536e1a54c..f44adfc07b 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -91,25 +91,31 @@ class StatsHandler(StateDeltasHandler):
             return None
 
         # Loop round handling deltas until we're up to date
-        with Measure(self.clock, "stats_delta"):
-            while True:
+
+        while True:
+            with Measure(self.clock, "stats_delta"):
                 deltas = yield self.store.get_current_state_deltas(
                     self.pos["state_delta_stream_id"]
                 )
-                if not deltas:
-                    break
 
                 logger.debug("Handling %d state deltas", len(deltas))
                 yield self._handle_deltas(deltas)
 
                 self.pos["state_delta_stream_id"] = deltas[-1]["stream_id"]
+                yield self.store.update_stats_positions(self.pos)
 
                 event_processing_positions.labels("stats").set(
                     self.pos["state_delta_stream_id"]
                 )
 
-                if self.pos is not None:
-                    yield self.store.update_stats_positions(self.pos)
+            # Then count deltas for total_events and total_event_bytes.
+            with Measure(self.clock, "stats_total_events_and_bytes"):
+                self.pos, had_counts = yield self.store.incremental_update_room_total_events_and_bytes(
+                    self.pos
+                )
+
+            if not deltas and not had_counts:
+                break
 
     @defer.inlineCallbacks
     def _handle_deltas(self, deltas):
diff --git a/synapse/storage/schema/delta/56/stats_separated1.sql b/synapse/storage/schema/delta/56/stats_separated1.sql
index 52fb09c0e6..6d4648c0d7 100644
--- a/synapse/storage/schema/delta/56/stats_separated1.sql
+++ b/synapse/storage/schema/delta/56/stats_separated1.sql
@@ -69,6 +69,7 @@ CREATE TABLE IF NOT EXISTS room_stats_current (
 
     current_state_events INT NOT NULL,
     total_events INT NOT NULL,
+    total_event_bytes BIGINT NOT NULL,
     joined_members INT NOT NULL,
     invited_members INT NOT NULL,
     left_members INT NOT NULL,
@@ -91,6 +92,7 @@ CREATE TABLE IF NOT EXISTS room_stats_historical (
 
     current_state_events INT NOT NULL,
     total_events INT NOT NULL,
+    total_event_bytes BIGINT NOT NULL,
     joined_members INT NOT NULL,
     invited_members INT NOT NULL,
     left_members INT NOT NULL,
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index 3df57b52ea..9d6c3027d5 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -19,6 +19,7 @@ from itertools import chain
 
 from twisted.internet.defer import DeferredLock
 
+from synapse.storage import PostgresEngine
 from synapse.storage.state_deltas import StateDeltasStore
 from synapse.util.caches.descriptors import cached
 
@@ -36,6 +37,7 @@ ABSOLUTE_STATS_FIELDS = {
         "left_members",
         "banned_members",
         "total_events",
+        "total_event_bytes",
     ),
     "user": ("public_rooms", "private_rooms"),
 }
@@ -475,3 +477,113 @@ class StatsStore(StateDeltasStore):
                 for (key, val) in additive_relatives.items():
                     src_row[key] = dest_current_row[key] + val
                 self._simple_update_txn(txn, into_table, keyvalues, src_row)
+
+    def incremental_update_room_total_events_and_bytes(self, in_positions):
+        """
+        Counts the number of events and total event bytes per-room and then adds
+        these to the respective total_events and total_event_bytes room counts.
+
+        Args:
+            in_positions (dict): Positions,
+                as retrieved from L{get_stats_positions}.
+
+        Returns (Deferred[tuple[dict, bool]]):
+            First element (dict):
+                The new positions. Note that this is for reference only –
+                the new positions WILL be committed by this function.
+            Second element (bool):
+                true iff there was a change to the positions, false otherwise
+        """
+
+        def incremental_update_total_events_and_bytes_txn(txn):
+            positions = in_positions.copy()
+
+            max_pos = self.get_room_max_stream_ordering()
+            min_pos = self.get_room_min_stream_ordering()
+            self.update_total_event_and_bytes_count_between_txn(
+                txn,
+                low_pos=positions["total_events_max_stream_ordering"],
+                high_pos=max_pos,
+            )
+
+            self.update_total_event_and_bytes_count_between_txn(
+                txn,
+                low_pos=min_pos,
+                high_pos=positions["total_events_min_stream_ordering"],
+            )
+
+            if (
+                positions["total_events_max_stream_ordering"] != max_pos
+                or positions["total_events_min_stream_ordering"] != min_pos
+            ):
+                positions["total_events_max_stream_ordering"] = max_pos
+                positions["total_events_min_stream_ordering"] = min_pos
+
+                self._update_stats_positions_txn(txn, positions)
+
+                return positions, True
+            else:
+                return positions, False
+
+        return self.runInteraction(
+            "stats_incremental_total_events_and_bytes",
+            incremental_update_total_events_and_bytes_txn,
+        )
+
+    def update_total_event_and_bytes_count_between_txn(self, txn, low_pos, high_pos):
+        """
+        Updates the total_events and total_event_bytes counts for rooms,
+            in a range of stream_orderings.
+
+        Inclusivity of low_pos and high_pos is dependent upon their signs.
+        This makes it intuitive to use this function for both backfilled
+        and non-backfilled events.
+
+        Examples:
+        (low, high) → (kind)
+        (3, 7) → 3 < … <= 7 (normal-filled; low already processed before)
+        (-4, -2) → -4 <= … < -2 (backfilled; high already processed before)
+        (-7, 7) → -7 <= … <= 7 (both)
+
+        Args:
+            txn: Database transaction.
+            low_pos: Low stream ordering
+            high_pos: High stream ordering
+        """
+
+        if low_pos >= high_pos:
+            # nothing to do here.
+            return
+
+        now = self.hs.clock.time_msec()
+
+        # we choose comparators based on the signs
+        low_comparator = "<=" if low_pos < 0 else "<"
+        high_comparator = "<" if high_pos < 0 else "<="
+
+        if isinstance(self.database_engine, PostgresEngine):
+            new_bytes_expression = "OCTET_LENGTH(json)"
+        else:
+            new_bytes_expression = "LENGTH(CAST(json AS BLOB))"
+
+        sql = """
+            SELECT room_id, COUNT(*) AS new_events, SUM(%s) AS new_bytes
+            FROM events INNER JOIN event_json USING (event_id)
+            WHERE ? %s stream_ordering AND stream_ordering %s ?
+            GROUP BY room_id
+        """ % (
+            low_comparator,
+            high_comparator,
+            new_bytes_expression,
+        )
+
+        txn.execute(sql, (low_pos, high_pos))
+
+        for room_id, new_events, new_bytes in txn.fetchall():
+            self._update_stats_delta_txn(
+                txn,
+                now,
+                "room",
+                room_id,
+                {"total_events": new_events, "total_event_bytes": new_bytes},
+            )