diff options
author | reivilibre <38398653+reivilibre@users.noreply.github.com> | 2019-08-30 07:52:07 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-08-30 07:52:07 +0100 |
commit | 9dbf42af8ab9f704f16261da024c0c8e8451ed9a (patch) | |
tree | b7f1e9e461e3034b1ca0fd75e6ea4ea185986cfb | |
parent | Merge branch 'develop' into rei/rss_target (diff) | |
parent | Merge branch 'rei/rss_target' into rei/rss_inc5 (diff) | |
download | synapse-9dbf42af8ab9f704f16261da024c0c8e8451ed9a.tar.xz |
Merge pull request #5923 from matrix-org/rei/rss_inc5
Separated Statistics [5/7ish]
-rw-r--r-- | synapse/handlers/stats.py | 18 | ||||
-rw-r--r-- | synapse/storage/schema/delta/56/stats_separated1.sql | 2 | ||||
-rw-r--r-- | synapse/storage/stats.py | 112 |
3 files changed, 126 insertions, 6 deletions
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index 7536e1a54c..f44adfc07b 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -91,25 +91,31 @@ class StatsHandler(StateDeltasHandler): return None # Loop round handling deltas until we're up to date - with Measure(self.clock, "stats_delta"): - while True: + + while True: + with Measure(self.clock, "stats_delta"): deltas = yield self.store.get_current_state_deltas( self.pos["state_delta_stream_id"] ) - if not deltas: - break logger.debug("Handling %d state deltas", len(deltas)) yield self._handle_deltas(deltas) self.pos["state_delta_stream_id"] = deltas[-1]["stream_id"] + yield self.store.update_stats_positions(self.pos) event_processing_positions.labels("stats").set( self.pos["state_delta_stream_id"] ) - if self.pos is not None: - yield self.store.update_stats_positions(self.pos) + # Then count deltas for total_events and total_event_bytes. + with Measure(self.clock, "stats_total_events_and_bytes"): + self.pos, had_counts = yield self.store.incremental_update_room_total_events_and_bytes( + self.pos + ) + + if not deltas and not had_counts: + break @defer.inlineCallbacks def _handle_deltas(self, deltas): diff --git a/synapse/storage/schema/delta/56/stats_separated1.sql b/synapse/storage/schema/delta/56/stats_separated1.sql index 52fb09c0e6..6d4648c0d7 100644 --- a/synapse/storage/schema/delta/56/stats_separated1.sql +++ b/synapse/storage/schema/delta/56/stats_separated1.sql @@ -69,6 +69,7 @@ CREATE TABLE IF NOT EXISTS room_stats_current ( current_state_events INT NOT NULL, total_events INT NOT NULL, + total_event_bytes BIGINT NOT NULL, joined_members INT NOT NULL, invited_members INT NOT NULL, left_members INT NOT NULL, @@ -91,6 +92,7 @@ CREATE TABLE IF NOT EXISTS room_stats_historical ( current_state_events INT NOT NULL, total_events INT NOT NULL, + total_event_bytes BIGINT NOT NULL, joined_members INT NOT NULL, invited_members INT NOT NULL, left_members INT NOT NULL, diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 3df57b52ea..9d6c3027d5 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -19,6 +19,7 @@ from itertools import chain from twisted.internet.defer import DeferredLock +from synapse.storage import PostgresEngine from synapse.storage.state_deltas import StateDeltasStore from synapse.util.caches.descriptors import cached @@ -36,6 +37,7 @@ ABSOLUTE_STATS_FIELDS = { "left_members", "banned_members", "total_events", + "total_event_bytes", ), "user": ("public_rooms", "private_rooms"), } @@ -475,3 +477,113 @@ class StatsStore(StateDeltasStore): for (key, val) in additive_relatives.items(): src_row[key] = dest_current_row[key] + val self._simple_update_txn(txn, into_table, keyvalues, src_row) + + def incremental_update_room_total_events_and_bytes(self, in_positions): + """ + Counts the number of events and total event bytes per-room and then adds + these to the respective total_events and total_event_bytes room counts. + + Args: + in_positions (dict): Positions, + as retrieved from L{get_stats_positions}. + + Returns (Deferred[tuple[dict, bool]]): + First element (dict): + The new positions. Note that this is for reference only – + the new positions WILL be committed by this function. + Second element (bool): + true iff there was a change to the positions, false otherwise + """ + + def incremental_update_total_events_and_bytes_txn(txn): + positions = in_positions.copy() + + max_pos = self.get_room_max_stream_ordering() + min_pos = self.get_room_min_stream_ordering() + self.update_total_event_and_bytes_count_between_txn( + txn, + low_pos=positions["total_events_max_stream_ordering"], + high_pos=max_pos, + ) + + self.update_total_event_and_bytes_count_between_txn( + txn, + low_pos=min_pos, + high_pos=positions["total_events_min_stream_ordering"], + ) + + if ( + positions["total_events_max_stream_ordering"] != max_pos + or positions["total_events_min_stream_ordering"] != min_pos + ): + positions["total_events_max_stream_ordering"] = max_pos + positions["total_events_min_stream_ordering"] = min_pos + + self._update_stats_positions_txn(txn, positions) + + return positions, True + else: + return positions, False + + return self.runInteraction( + "stats_incremental_total_events_and_bytes", + incremental_update_total_events_and_bytes_txn, + ) + + def update_total_event_and_bytes_count_between_txn(self, txn, low_pos, high_pos): + """ + Updates the total_events and total_event_bytes counts for rooms, + in a range of stream_orderings. + + Inclusivity of low_pos and high_pos is dependent upon their signs. + This makes it intuitive to use this function for both backfilled + and non-backfilled events. + + Examples: + (low, high) → (kind) + (3, 7) → 3 < … <= 7 (normal-filled; low already processed before) + (-4, -2) → -4 <= … < -2 (backfilled; high already processed before) + (-7, 7) → -7 <= … <= 7 (both) + + Args: + txn: Database transaction. + low_pos: Low stream ordering + high_pos: High stream ordering + """ + + if low_pos >= high_pos: + # nothing to do here. + return + + now = self.hs.clock.time_msec() + + # we choose comparators based on the signs + low_comparator = "<=" if low_pos < 0 else "<" + high_comparator = "<" if high_pos < 0 else "<=" + + if isinstance(self.database_engine, PostgresEngine): + new_bytes_expression = "OCTET_LENGTH(json)" + else: + new_bytes_expression = "LENGTH(CAST(json AS BLOB))" + + sql = """ + SELECT room_id, COUNT(*) AS new_events, SUM(%s) AS new_bytes + FROM events INNER JOIN event_json USING (event_id) + WHERE ? %s stream_ordering AND stream_ordering %s ? + GROUP BY room_id + """ % ( + low_comparator, + high_comparator, + new_bytes_expression, + ) + + txn.execute(sql, (low_pos, high_pos)) + + for room_id, new_events, new_bytes in txn.fetchall(): + self._update_stats_delta_txn( + txn, + now, + "room", + room_id, + {"total_events": new_events, "total_event_bytes": new_bytes}, + ) |