summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/databases/main/events_bg_updates.py290
-rw-r--r--synapse/storage/databases/main/metrics.py38
-rw-r--r--synapse/storage/schema/__init__.py8
-rw-r--r--synapse/storage/schema/main/delta/92/01_event_stats.sql33
4 files changed, 367 insertions, 2 deletions
diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py

index 4b0bdd79c6..b821d1c1b4 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -47,7 +47,7 @@ from synapse.storage.databases.main.events_worker import ( ) from synapse.storage.databases.main.state_deltas import StateDeltasStore from synapse.storage.databases.main.stream import StreamWorkerStore -from synapse.storage.engines import PostgresEngine +from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.storage.types import Cursor from synapse.types import JsonDict, RoomStreamToken, StateMap, StrCollection from synapse.types.handlers import SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES @@ -311,6 +311,12 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS self._sliding_sync_membership_snapshots_fix_forgotten_column_bg_update, ) + # Add a background update to add triggers which track event counts. + self.db_pool.updates.register_background_update_handler( + _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE, + self._event_stats_populate_counts_bg_update, + ) + # We want this to run on the main database at startup before we start processing # events. # @@ -2547,6 +2553,288 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS return num_rows + async def _event_stats_populate_counts_bg_update( + self, progress: JsonDict, batch_size: int + ) -> int: + """ + Background update to populate the `event_stats` table with initial + values, and register DB triggers to continue updating it. + + We first register TRIGGERs on rows being added/removed from the `events` table, + which will keep the event counts continuously updated. We also mark the stopping + point for the main population step so we don't double count events. + + Then we will iterate through the `events` table in batches and update event + counts until we reach the stopping point. + + This data is intended to be used by the phone-home stats to keep track + of total event and message counts. A trigger is preferred to counting + rows in the `events` table, as said table can grow quite large. + + It is also preferable to adding an index on the `events` table, as even + an index can grow large. And calculating total counts would require + querying that entire index. + """ + # The last event `stream_ordering` we processed (starting place of this next + # batch). + last_event_stream_ordering = progress.get( + "last_event_stream_ordering", -(1 << 31) + ) + # The event `stream_ordering` we should stop at. This is used to avoid double + # counting events that are already accounted for because of the triggers. + stop_event_stream_ordering: Optional[int] = progress.get( + "stop_event_stream_ordering", None + ) + + def _add_triggers_txn( + txn: LoggingTransaction, + ) -> Optional[int]: + """ + Adds the triggers to the `events` table to keep the `event_stats` counts + up-to-date. + + Also populates the `stop_event_stream_ordering` background update progress + value. This marks the point at which we added the triggers, so we can avoid + double counting events that are already accounted for in the population + step. + + Returns: + The latest event `stream_ordering` in the `events` table when the triggers + were added or `None` if the `events` table is empty. + """ + + # Each time an event is inserted into the `events` table, update the stats. + # + # We're using `AFTER` triggers as we want to count successful inserts/deletes and + # not the ones that could potentially fail. + if isinstance(txn.database_engine, Sqlite3Engine): + txn.execute( + """ + CREATE TRIGGER IF NOT EXISTS event_stats_events_insert_trigger + AFTER INSERT ON events + BEGIN + -- Always increment total_event_count + UPDATE event_stats SET total_event_count = total_event_count + 1; + + -- Increment unencrypted_message_count for m.room.message events + UPDATE event_stats + SET unencrypted_message_count = unencrypted_message_count + 1 + WHERE NEW.type = 'm.room.message' AND NEW.state_key IS NULL; + + -- Increment e2ee_event_count for m.room.encrypted events + UPDATE event_stats + SET e2ee_event_count = e2ee_event_count + 1 + WHERE NEW.type = 'm.room.encrypted' AND NEW.state_key IS NULL; + END; + """ + ) + + txn.execute( + """ + CREATE TRIGGER IF NOT EXISTS event_stats_events_delete_trigger + AFTER DELETE ON events + BEGIN + -- Always decrement total_event_count + UPDATE event_stats SET total_event_count = total_event_count - 1; + + -- Decrement unencrypted_message_count for m.room.message events + UPDATE event_stats + SET unencrypted_message_count = unencrypted_message_count - 1 + WHERE OLD.type = 'm.room.message' AND OLD.state_key IS NULL; + + -- Decrement e2ee_event_count for m.room.encrypted events + UPDATE event_stats + SET e2ee_event_count = e2ee_event_count - 1 + WHERE OLD.type = 'm.room.encrypted' AND OLD.state_key IS NULL; + END; + """ + ) + elif isinstance(txn.database_engine, PostgresEngine): + txn.execute( + """ + CREATE OR REPLACE FUNCTION event_stats_increment_counts() RETURNS trigger AS $BODY$ + BEGIN + IF TG_OP = 'INSERT' THEN + -- Always increment total_event_count + UPDATE event_stats SET total_event_count = total_event_count + 1; + + -- Increment unencrypted_message_count for m.room.message events + IF NEW.type = 'm.room.message' AND NEW.state_key IS NULL THEN + UPDATE event_stats SET unencrypted_message_count = unencrypted_message_count + 1; + END IF; + + -- Increment e2ee_event_count for m.room.encrypted events + IF NEW.type = 'm.room.encrypted' AND NEW.state_key IS NULL THEN + UPDATE event_stats SET e2ee_event_count = e2ee_event_count + 1; + END IF; + + -- We're not modifying the row being inserted/deleted, so we return it unchanged. + RETURN NEW; + + ELSIF TG_OP = 'DELETE' THEN + -- Always decrement total_event_count + UPDATE event_stats SET total_event_count = total_event_count - 1; + + -- Decrement unencrypted_message_count for m.room.message events + IF OLD.type = 'm.room.message' AND OLD.state_key IS NULL THEN + UPDATE event_stats SET unencrypted_message_count = unencrypted_message_count - 1; + END IF; + + -- Decrement e2ee_event_count for m.room.encrypted events + IF OLD.type = 'm.room.encrypted' AND OLD.state_key IS NULL THEN + UPDATE event_stats SET e2ee_event_count = e2ee_event_count - 1; + END IF; + + -- "The usual idiom in DELETE triggers is to return OLD." + -- (https://www.postgresql.org/docs/current/plpgsql-trigger.html) + RETURN OLD; + END IF; + + RAISE EXCEPTION 'update_event_stats() was run with unexpected operation (%%). ' + 'This indicates a trigger misconfiguration as this function should only' + 'run with INSERT/DELETE operations.', TG_OP; + END; + $BODY$ LANGUAGE plpgsql; + """ + ) + + # We could use `CREATE OR REPLACE TRIGGER` but that's only available in Postgres + # 14 (https://www.postgresql.org/docs/14/sql-createtrigger.html) + txn.execute( + """ + DO + $$BEGIN + CREATE TRIGGER event_stats_increment_counts_trigger + AFTER INSERT OR DELETE ON events + FOR EACH ROW + EXECUTE PROCEDURE event_stats_increment_counts(); + EXCEPTION + -- This acts as a "CREATE TRIGGER IF NOT EXISTS" for Postgres + WHEN duplicate_object THEN + NULL; + END;$$; + """ + ) + else: + raise NotImplementedError("Unknown database engine") + + # Find the latest `stream_ordering` in the `events` table. We need to do + # this in the same transaction as where we add the triggers so we don't miss + # any events. + txn.execute( + """ + SELECT stream_ordering + FROM events + ORDER BY stream_ordering DESC + LIMIT 1 + """ + ) + row = cast(Optional[Tuple[int]], txn.fetchone()) + + # Update the progress + if row is not None: + (max_stream_ordering,) = row + self.db_pool.updates._background_update_progress_txn( + txn, + _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE, + {"stop_event_stream_ordering": max_stream_ordering}, + ) + return max_stream_ordering + + return None + + # First, add the triggers to keep the `event_stats` values up-to-date. + # + # If we don't have a `stop_event_stream_ordering` yet, we need to add the + # triggers to the `events` table and set the stopping point so we don't + # double count `events` later. + if stop_event_stream_ordering is None: + stop_event_stream_ordering = await self.db_pool.runInteraction( + "_event_stats_populate_counts_bg_update_add_triggers", + _add_triggers_txn, + ) + + # If there is no `stop_event_stream_ordering`, then there are no events + # in the `events` table and we can end the background update altogether. + if stop_event_stream_ordering is None: + await self.db_pool.updates._end_background_update( + _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE + ) + return batch_size + + def _populate_txn( + txn: LoggingTransaction, + ) -> int: + """ + Updates the `event_stats` table from this batch of events. + """ + + # Increment the counts based on the events present in this batch. + txn.execute( + """ + WITH event_batch AS ( + SELECT * + FROM events + WHERE stream_ordering > ? AND stream_ordering <= ? + ORDER BY stream_ordering ASC + LIMIT ? + ), + batch_stats AS ( + SELECT + MAX(stream_ordering) AS max_stream_ordering, + COALESCE(COUNT(*), 0) AS total_event_count, + COALESCE(SUM(CASE WHEN type = 'm.room.message' AND state_key IS NULL THEN 1 ELSE 0 END), 0) AS unencrypted_message_count, + COALESCE(SUM(CASE WHEN type = 'm.room.encrypted' AND state_key IS NULL THEN 1 ELSE 0 END), 0) AS e2ee_event_count + FROM event_batch + + UNION ALL + + SELECT null, 0, 0, 0 + WHERE NOT EXISTS (SELECT 1 FROM event_batch) + LIMIT 1 + ) + UPDATE event_stats + SET + total_event_count = total_event_count + (SELECT total_event_count FROM batch_stats), + unencrypted_message_count = unencrypted_message_count + (SELECT unencrypted_message_count FROM batch_stats), + e2ee_event_count = e2ee_event_count + (SELECT e2ee_event_count FROM batch_stats) + RETURNING + (SELECT total_event_count FROM batch_stats) AS total_event_count, + (SELECT max_stream_ordering FROM batch_stats) AS max_stream_ordering + """, + (last_event_stream_ordering, stop_event_stream_ordering, batch_size), + ) + + # Get the results of the update + (total_event_count, max_stream_ordering) = cast( + Tuple[int, Optional[int]], txn.fetchone() + ) + + # Update the progress + self.db_pool.updates._background_update_progress_txn( + txn, + _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE, + { + "last_event_stream_ordering": max_stream_ordering, + "stop_event_stream_ordering": stop_event_stream_ordering, + }, + ) + + return total_event_count + + num_rows_processed = await self.db_pool.runInteraction( + "_event_stats_populate_counts_bg_update", + _populate_txn, + ) + + # No more rows to process, so our background update is complete. + if not num_rows_processed: + await self.db_pool.updates._end_background_update( + _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE + ) + + return batch_size + def _resolve_stale_data_in_sliding_sync_tables( txn: LoggingTransaction, diff --git a/synapse/storage/databases/main/metrics.py b/synapse/storage/databases/main/metrics.py
index 9ce1100b5c..a9cecc4bc1 100644 --- a/synapse/storage/databases/main/metrics.py +++ b/synapse/storage/databases/main/metrics.py
@@ -126,6 +126,44 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore): return await self.db_pool.runInteraction("count_e2ee_messages", _count_messages) + async def count_total_events(self) -> int: + """ + Returns the total number of events present on the server. + """ + + return await self.db_pool.simple_select_one_onecol( + table="event_stats", + keyvalues={}, + retcol="total_event_count", + desc="count_total_events", + ) + + async def count_total_messages(self) -> int: + """ + Returns the total number of `m.room.message` events present on the + server. + """ + + return await self.db_pool.simple_select_one_onecol( + table="event_stats", + keyvalues={}, + retcol="unencrypted_message_count", + desc="count_total_messages", + ) + + async def count_total_e2ee_events(self) -> int: + """ + Returns the total number of `m.room.encrypted` events present on the + server. + """ + + return await self.db_pool.simple_select_one_onecol( + table="event_stats", + keyvalues={}, + retcol="e2ee_event_count", + desc="count_total_e2ee_events", + ) + async def count_daily_sent_e2ee_messages(self) -> int: def _count_messages(txn: LoggingTransaction) -> int: # This is good enough as if you have silly characters in your own diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index ad683a3a07..7474ba4542 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py
@@ -19,7 +19,7 @@ # # -SCHEMA_VERSION = 91 # remember to update the list below when updating +SCHEMA_VERSION = 92 # remember to update the list below when updating """Represents the expectations made by the codebase about the database schema This should be incremented whenever the codebase changes its requirements on the @@ -162,6 +162,12 @@ Changes in SCHEMA_VERSION = 89 Changes in SCHEMA_VERSION = 90 - Add a column `participant` to `room_memberships` table - Add background update to delete unreferenced state groups. + +Changes in SCHEMA_VERSION = 91 + - TODO + +Changes in SCHEMA_VERSION = 92 + - Add `event_stats` table to store global event statistics like total counts """ diff --git a/synapse/storage/schema/main/delta/92/01_event_stats.sql b/synapse/storage/schema/main/delta/92/01_event_stats.sql new file mode 100644
index 0000000000..4bded03578 --- /dev/null +++ b/synapse/storage/schema/main/delta/92/01_event_stats.sql
@@ -0,0 +1,33 @@ +-- +-- This file is licensed under the Affero General Public License (AGPL) version 3. +-- +-- Copyright (C) 2025 New Vector, Ltd +-- +-- This program is free software: you can redistribute it and/or modify +-- it under the terms of the GNU Affero General Public License as +-- published by the Free Software Foundation, either version 3 of the +-- License, or (at your option) any later version. +-- +-- See the GNU Affero General Public License for more details: +-- <https://www.gnu.org/licenses/agpl-3.0.html>. + + +-- Create the `event_stats` table to store these statistics. +CREATE TABLE event_stats ( + total_event_count INTEGER NOT NULL DEFAULT 0, + unencrypted_message_count INTEGER NOT NULL DEFAULT 0, + e2ee_event_count INTEGER NOT NULL DEFAULT 0 +); + +-- Insert initial values into the table. +INSERT INTO event_stats ( + total_event_count, + unencrypted_message_count, + e2ee_event_count +) VALUES (0, 0, 0); + +-- Add a background update to populate the `event_stats` table with the current counts +-- from the `events` table and add triggers to keep this count up-to-date. +INSERT INTO background_updates (ordering, update_name, progress_json) VALUES + (9201, 'event_stats_populate_counts_bg_update', '{}'); +