diff options
Diffstat (limited to 'synapse/storage/stats.py')
-rw-r--r-- | synapse/storage/stats.py | 112 |
1 files changed, 112 insertions, 0 deletions
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 8bbf1b00d1..63c8c2840a 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -19,6 +19,7 @@ from itertools import chain from twisted.internet.defer import DeferredLock +from synapse.storage import PostgresEngine from synapse.storage.state_deltas import StateDeltasStore from synapse.util.caches.descriptors import cached @@ -36,6 +37,7 @@ ABSOLUTE_STATS_FIELDS = { "left_members", "banned_members", "total_events", + "total_event_bytes", ), "user": ("public_rooms", "private_rooms"), } @@ -501,3 +503,113 @@ class StatsStore(StateDeltasStore): for (key, val) in additive_relatives.items(): src_row[key] = dest_current_row[key] + val self._simple_update_txn(txn, into_table, keyvalues, src_row) + + def incremental_update_room_total_events_and_bytes(self, in_positions): + """ + Counts the number of events and total event bytes per-room and then adds + these to the respective total_events and total_event_bytes room counts. + + Args: + in_positions (dict): Positions, + as retrieved from L{get_stats_positions}. + + Returns (Deferred[tuple[dict, bool]]): + First element (dict): + The new positions. Note that this is for reference only – + the new positions WILL be committed by this function. + Second element (bool): + true iff there was a change to the positions, false otherwise + """ + + def incremental_update_total_events_and_bytes_txn(txn): + positions = in_positions.copy() + + max_pos = self.get_room_max_stream_ordering() + min_pos = self.get_room_min_stream_ordering() + self.update_total_event_and_bytes_count_between_txn( + txn, + low_pos=positions["total_events_max_stream_ordering"], + high_pos=max_pos, + ) + + self.update_total_event_and_bytes_count_between_txn( + txn, + low_pos=min_pos, + high_pos=positions["total_events_min_stream_ordering"], + ) + + if ( + positions["total_events_max_stream_ordering"] != max_pos + or positions["total_events_min_stream_ordering"] != min_pos + ): + positions["total_events_max_stream_ordering"] = max_pos + positions["total_events_min_stream_ordering"] = min_pos + + self._update_stats_positions_txn(txn, positions) + + return positions, True + else: + return positions, False + + return self.runInteraction( + "stats_incremental_total_events_and_bytes", + incremental_update_total_events_and_bytes_txn, + ) + + def update_total_event_and_bytes_count_between_txn(self, txn, low_pos, high_pos): + """ + Updates the total_events and total_event_bytes counts for rooms, + in a range of stream_orderings. + + Inclusivity of low_pos and high_pos is dependent upon their signs. + This makes it intuitive to use this function for both backfilled + and non-backfilled events. + + Examples: + (low, high) → (kind) + (3, 7) → 3 < … <= 7 (normal-filled; low already processed before) + (-4, -2) → -4 <= … < -2 (backfilled; high already processed before) + (-7, 7) → -7 <= … <= 7 (both) + + Args: + txn: Database transaction. + low_pos: Low stream ordering + high_pos: High stream ordering + """ + + if low_pos >= high_pos: + # nothing to do here. + return + + now = self.hs.clock.time_msec() + + # we choose comparators based on the signs + low_comparator = "<=" if low_pos < 0 else "<" + high_comparator = "<" if high_pos < 0 else "<=" + + if isinstance(self.database_engine, PostgresEngine): + new_bytes_expression = "OCTET_LENGTH(json)" + else: + new_bytes_expression = "LENGTH(CAST(json AS BLOB))" + + sql = """ + SELECT room_id, COUNT(*) AS new_events, SUM(%s) AS new_bytes + FROM events INNER JOIN event_json USING (event_id) + WHERE ? %s stream_ordering AND stream_ordering %s ? + GROUP BY room_id + """ % ( + low_comparator, + high_comparator, + new_bytes_expression, + ) + + txn.execute(sql, (low_pos, high_pos)) + + for room_id, new_events, new_bytes in txn.fetchall(): + self._update_stats_delta_txn( + txn, + now, + "room", + room_id, + {"total_events": new_events, "total_event_bytes": new_bytes}, + ) |