summary refs log tree commit diff
path: root/synapse/storage/stats.py
diff options
context:
space:
mode:
authorOlivier Wilkinson (reivilibre) <olivier@librepush.net>2019-08-28 14:49:58 +0100
committerOlivier Wilkinson (reivilibre) <olivier@librepush.net>2019-08-28 14:49:58 +0100
commita13ad21abf029901f5d74dd683e536d81f3150c3 (patch)
treeb751725328db31636b10db5dc1f18a28d91368f9 /synapse/storage/stats.py
parentAdd `total_event_bytes` to room statistics schema. (diff)
downloadsynapse-a13ad21abf029901f5d74dd683e536d81f3150c3.tar.xz
Add incremental counting for rooms' total events and total event bytes.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
Diffstat (limited to 'synapse/storage/stats.py')
-rw-r--r--synapse/storage/stats.py106
1 files changed, 106 insertions, 0 deletions
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index 8c99a125a9..8cfa694d31 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -19,6 +19,7 @@ from itertools import chain
 
 from twisted.internet.defer import DeferredLock
 
+from synapse.storage import PostgresEngine
 from synapse.storage.state_deltas import StateDeltasStore
 from synapse.util.caches.descriptors import cached
 
@@ -476,3 +477,108 @@ class StatsStore(StateDeltasStore):
                 for (key, val) in additive_relatives.items():
                     src_row[key] = dest_current_row[key] + val
                 self._simple_update_txn(txn, into_table, keyvalues, src_row)
+
+    def incremental_update_room_total_events_and_bytes(self, in_positions):
+        """
+        Counts the number of events and total event bytes per-room and then adds
+        these to the respective total_events and total_event_bytes room counts.
+
+        Args:
+            in_positions (dict): Positions,
+                as retrieved from L{get_stats_positions}.
+
+        Returns (dict):
+            The new positions. Note that this is for reference only –
+            the new positions WILL be committed by this function.
+        """
+
+        def incremental_update_total_events_and_bytes_txn(txn):
+            positions = in_positions.copy()
+
+            max_pos = self.get_room_max_stream_ordering()
+            min_pos = self.get_room_min_stream_ordering()
+            self.update_total_event_and_bytes_count_between_txn(
+                txn,
+                low_pos=positions["total_events_max_stream_ordering"],
+                high_pos=max_pos,
+            )
+
+            self.update_total_event_and_bytes_count_between_txn(
+                txn,
+                low_pos=min_pos,
+                high_pos=positions["total_events_min_stream_ordering"],
+            )
+
+            if (
+                positions["total_events_max_stream_ordering"] != max_pos
+                or positions["total_events_min_stream_ordering"] != min_pos
+            ):
+                positions["total_events_max_stream_ordering"] = max_pos
+                positions["total_events_min_stream_ordering"] = min_pos
+
+                self._update_stats_positions_txn(txn, positions)
+
+            return positions
+
+        return self.runInteraction(
+            "stats_incremental_total_events_and_bytes",
+            incremental_update_total_events_and_bytes_txn,
+        )
+
+    def update_total_event_and_bytes_count_between_txn(self, txn, low_pos, high_pos):
+        """
+        Updates the total_events and total_event_bytes counts for rooms,
+            in a range of stream_orderings.
+
+        Inclusivity of low_pos and high_pos is dependent upon their signs.
+        This makes it intuitive to use this function for both backfilled
+        and non-backfilled events.
+
+        Examples:
+        (low, high) → (kind)
+        (3, 7) → 3 < … <= 7 (normal-filled; low already processed before)
+        (-4, -2) → -4 <= … < -2 (backfilled; high already processed before)
+        (-7, 7) → -7 <= … <= 7 (both)
+
+        Args:
+            txn: Database transaction.
+            low_pos: Low stream ordering
+            high_pos: High stream ordering
+        """
+
+        if low_pos >= high_pos:
+            # nothing to do here.
+            return
+
+        now = self.hs.clock.time_msec()
+
+        # we choose comparators based on the signs
+        low_comparator = "<=" if low_pos < 0 else "<"
+        high_comparator = "<" if high_pos < 0 else "<="
+
+        if isinstance(self.database_engine, PostgresEngine):
+            new_bytes_expression = "OCTET_LENGTH(json)"
+        else:
+            new_bytes_expression = "LENGTH(CAST(json AS BLOB))"
+
+        sql = """
+            SELECT room_id, COUNT(*) AS new_events, SUM(%s) AS new_bytes
+            FROM events INNER JOIN event_json USING (event_id)
+            WHERE ? %s stream_ordering AND stream_ordering %s ?
+            GROUP BY room_id
+        """ % (
+            low_comparator,
+            high_comparator,
+            new_bytes_expression,
+        )
+
+        txn.execute(sql, (low_pos, high_pos))
+
+        for room_id, new_events, new_bytes in txn.fetchall():
+            self._update_stats_delta_txn(
+                txn,
+                now,
+                "room",
+                room_id,
+                {"total_events": new_events, "total_event_bytes": new_bytes},
+            )