From bc2c284dbe02ef283ab525f14febd7d0998ba552 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Wed, 28 Aug 2019 14:28:44 +0100 Subject: Add `total_event_bytes` to room statistics schema. Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/schema/delta/56/stats_separated1.sql | 2 ++ synapse/storage/stats.py | 1 + 2 files changed, 3 insertions(+) diff --git a/synapse/storage/schema/delta/56/stats_separated1.sql b/synapse/storage/schema/delta/56/stats_separated1.sql index 52fb09c0e6..6d4648c0d7 100644 --- a/synapse/storage/schema/delta/56/stats_separated1.sql +++ b/synapse/storage/schema/delta/56/stats_separated1.sql @@ -69,6 +69,7 @@ CREATE TABLE IF NOT EXISTS room_stats_current ( current_state_events INT NOT NULL, total_events INT NOT NULL, + total_event_bytes BIGINT NOT NULL, joined_members INT NOT NULL, invited_members INT NOT NULL, left_members INT NOT NULL, @@ -91,6 +92,7 @@ CREATE TABLE IF NOT EXISTS room_stats_historical ( current_state_events INT NOT NULL, total_events INT NOT NULL, + total_event_bytes BIGINT NOT NULL, joined_members INT NOT NULL, invited_members INT NOT NULL, left_members INT NOT NULL, diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 3df57b52ea..8c99a125a9 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -36,6 +36,7 @@ ABSOLUTE_STATS_FIELDS = { "left_members", "banned_members", "total_events", + "total_event_bytes", ), "user": ("public_rooms", "private_rooms"), } -- cgit 1.4.1 From a13ad21abf029901f5d74dd683e536d81f3150c3 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Wed, 28 Aug 2019 14:49:58 +0100 Subject: Add incremental counting for rooms' total events and total event bytes. Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 106 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 106 insertions(+) diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 8c99a125a9..8cfa694d31 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -19,6 +19,7 @@ from itertools import chain from twisted.internet.defer import DeferredLock +from synapse.storage import PostgresEngine from synapse.storage.state_deltas import StateDeltasStore from synapse.util.caches.descriptors import cached @@ -476,3 +477,108 @@ class StatsStore(StateDeltasStore): for (key, val) in additive_relatives.items(): src_row[key] = dest_current_row[key] + val self._simple_update_txn(txn, into_table, keyvalues, src_row) + + def incremental_update_room_total_events_and_bytes(self, in_positions): + """ + Counts the number of events and total event bytes per-room and then adds + these to the respective total_events and total_event_bytes room counts. + + Args: + in_positions (dict): Positions, + as retrieved from L{get_stats_positions}. + + Returns (dict): + The new positions. Note that this is for reference only – + the new positions WILL be committed by this function. + """ + + def incremental_update_total_events_and_bytes_txn(txn): + positions = in_positions.copy() + + max_pos = self.get_room_max_stream_ordering() + min_pos = self.get_room_min_stream_ordering() + self.update_total_event_and_bytes_count_between_txn( + txn, + low_pos=positions["total_events_max_stream_ordering"], + high_pos=max_pos, + ) + + self.update_total_event_and_bytes_count_between_txn( + txn, + low_pos=min_pos, + high_pos=positions["total_events_min_stream_ordering"], + ) + + if ( + positions["total_events_max_stream_ordering"] != max_pos + or positions["total_events_min_stream_ordering"] != min_pos + ): + positions["total_events_max_stream_ordering"] = max_pos + positions["total_events_min_stream_ordering"] = min_pos + + self._update_stats_positions_txn(txn, positions) + + return positions + + return self.runInteraction( + "stats_incremental_total_events_and_bytes", + incremental_update_total_events_and_bytes_txn, + ) + + def update_total_event_and_bytes_count_between_txn(self, txn, low_pos, high_pos): + """ + Updates the total_events and total_event_bytes counts for rooms, + in a range of stream_orderings. + + Inclusivity of low_pos and high_pos is dependent upon their signs. + This makes it intuitive to use this function for both backfilled + and non-backfilled events. + + Examples: + (low, high) → (kind) + (3, 7) → 3 < … <= 7 (normal-filled; low already processed before) + (-4, -2) → -4 <= … < -2 (backfilled; high already processed before) + (-7, 7) → -7 <= … <= 7 (both) + + Args: + txn: Database transaction. + low_pos: Low stream ordering + high_pos: High stream ordering + """ + + if low_pos >= high_pos: + # nothing to do here. + return + + now = self.hs.clock.time_msec() + + # we choose comparators based on the signs + low_comparator = "<=" if low_pos < 0 else "<" + high_comparator = "<" if high_pos < 0 else "<=" + + if isinstance(self.database_engine, PostgresEngine): + new_bytes_expression = "OCTET_LENGTH(json)" + else: + new_bytes_expression = "LENGTH(CAST(json AS BLOB))" + + sql = """ + SELECT room_id, COUNT(*) AS new_events, SUM(%s) AS new_bytes + FROM events INNER JOIN event_json USING (event_id) + WHERE ? %s stream_ordering AND stream_ordering %s ? + GROUP BY room_id + """ % ( + low_comparator, + high_comparator, + new_bytes_expression, + ) + + txn.execute(sql, (low_pos, high_pos)) + + for room_id, new_events, new_bytes in txn.fetchall(): + self._update_stats_delta_txn( + txn, + now, + "room", + room_id, + {"total_events": new_events, "total_event_bytes": new_bytes}, + ) -- cgit 1.4.1 From d7a692f8608725ff5edca340a93f51ed76c743fb Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Wed, 28 Aug 2019 14:57:03 +0100 Subject: Update total_events and total_event_bytes on new events. Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/handlers/stats.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index 7536e1a54c..e9b1646e0e 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -111,6 +111,12 @@ class StatsHandler(StateDeltasHandler): if self.pos is not None: yield self.store.update_stats_positions(self.pos) + # Then count deltas for total_events and total_event_bytes. + with Measure(self.clock, "stats_total_events_and_bytes"): + self.pos = yield self.store.incremental_update_total_events_and_bytes( + self.pos + ) + @defer.inlineCallbacks def _handle_deltas(self, deltas): """ -- cgit 1.4.1 From 39dbee2a3e129415e2c20aa9a4f5f866e723fe41 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 29 Aug 2019 14:10:21 +0100 Subject: Count total_events and total_event_bytes within the loop. In this case, we still update these counts if we get stuck in the loop because the server is busy. Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/handlers/stats.py | 22 +++++++++++----------- synapse/storage/stats.py | 13 +++++++++---- 2 files changed, 20 insertions(+), 15 deletions(-) diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index e9b1646e0e..f44adfc07b 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -91,31 +91,31 @@ class StatsHandler(StateDeltasHandler): return None # Loop round handling deltas until we're up to date - with Measure(self.clock, "stats_delta"): - while True: + + while True: + with Measure(self.clock, "stats_delta"): deltas = yield self.store.get_current_state_deltas( self.pos["state_delta_stream_id"] ) - if not deltas: - break logger.debug("Handling %d state deltas", len(deltas)) yield self._handle_deltas(deltas) self.pos["state_delta_stream_id"] = deltas[-1]["stream_id"] + yield self.store.update_stats_positions(self.pos) event_processing_positions.labels("stats").set( self.pos["state_delta_stream_id"] ) - if self.pos is not None: - yield self.store.update_stats_positions(self.pos) + # Then count deltas for total_events and total_event_bytes. + with Measure(self.clock, "stats_total_events_and_bytes"): + self.pos, had_counts = yield self.store.incremental_update_room_total_events_and_bytes( + self.pos + ) - # Then count deltas for total_events and total_event_bytes. - with Measure(self.clock, "stats_total_events_and_bytes"): - self.pos = yield self.store.incremental_update_total_events_and_bytes( - self.pos - ) + if not deltas and not had_counts: + break @defer.inlineCallbacks def _handle_deltas(self, deltas): diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 8cfa694d31..9d6c3027d5 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -487,9 +487,12 @@ class StatsStore(StateDeltasStore): in_positions (dict): Positions, as retrieved from L{get_stats_positions}. - Returns (dict): - The new positions. Note that this is for reference only – - the new positions WILL be committed by this function. + Returns (Deferred[tuple[dict, bool]]): + First element (dict): + The new positions. Note that this is for reference only – + the new positions WILL be committed by this function. + Second element (bool): + true iff there was a change to the positions, false otherwise """ def incremental_update_total_events_and_bytes_txn(txn): @@ -518,7 +521,9 @@ class StatsStore(StateDeltasStore): self._update_stats_positions_txn(txn, positions) - return positions + return positions, True + else: + return positions, False return self.runInteraction( "stats_incremental_total_events_and_bytes", -- cgit 1.4.1