summary refs log tree commit diff
path: root/synapse/storage/databases
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases')
-rw-r--r--synapse/storage/databases/main/events_bg_updates.py290
-rw-r--r--synapse/storage/databases/main/metrics.py38
2 files changed, 327 insertions, 1 deletions
diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py

index 4b0bdd79c6..b821d1c1b4 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -47,7 +47,7 @@ from synapse.storage.databases.main.events_worker import ( ) from synapse.storage.databases.main.state_deltas import StateDeltasStore from synapse.storage.databases.main.stream import StreamWorkerStore -from synapse.storage.engines import PostgresEngine +from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.storage.types import Cursor from synapse.types import JsonDict, RoomStreamToken, StateMap, StrCollection from synapse.types.handlers import SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES @@ -311,6 +311,12 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS self._sliding_sync_membership_snapshots_fix_forgotten_column_bg_update, ) + # Add a background update to add triggers which track event counts. + self.db_pool.updates.register_background_update_handler( + _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE, + self._event_stats_populate_counts_bg_update, + ) + # We want this to run on the main database at startup before we start processing # events. # @@ -2547,6 +2553,288 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS return num_rows + async def _event_stats_populate_counts_bg_update( + self, progress: JsonDict, batch_size: int + ) -> int: + """ + Background update to populate the `event_stats` table with initial + values, and register DB triggers to continue updating it. + + We first register TRIGGERs on rows being added/removed from the `events` table, + which will keep the event counts continuously updated. We also mark the stopping + point for the main population step so we don't double count events. + + Then we will iterate through the `events` table in batches and update event + counts until we reach the stopping point. + + This data is intended to be used by the phone-home stats to keep track + of total event and message counts. A trigger is preferred to counting + rows in the `events` table, as said table can grow quite large. + + It is also preferable to adding an index on the `events` table, as even + an index can grow large. And calculating total counts would require + querying that entire index. + """ + # The last event `stream_ordering` we processed (starting place of this next + # batch). + last_event_stream_ordering = progress.get( + "last_event_stream_ordering", -(1 << 31) + ) + # The event `stream_ordering` we should stop at. This is used to avoid double + # counting events that are already accounted for because of the triggers. + stop_event_stream_ordering: Optional[int] = progress.get( + "stop_event_stream_ordering", None + ) + + def _add_triggers_txn( + txn: LoggingTransaction, + ) -> Optional[int]: + """ + Adds the triggers to the `events` table to keep the `event_stats` counts + up-to-date. + + Also populates the `stop_event_stream_ordering` background update progress + value. This marks the point at which we added the triggers, so we can avoid + double counting events that are already accounted for in the population + step. + + Returns: + The latest event `stream_ordering` in the `events` table when the triggers + were added or `None` if the `events` table is empty. + """ + + # Each time an event is inserted into the `events` table, update the stats. + # + # We're using `AFTER` triggers as we want to count successful inserts/deletes and + # not the ones that could potentially fail. + if isinstance(txn.database_engine, Sqlite3Engine): + txn.execute( + """ + CREATE TRIGGER IF NOT EXISTS event_stats_events_insert_trigger + AFTER INSERT ON events + BEGIN + -- Always increment total_event_count + UPDATE event_stats SET total_event_count = total_event_count + 1; + + -- Increment unencrypted_message_count for m.room.message events + UPDATE event_stats + SET unencrypted_message_count = unencrypted_message_count + 1 + WHERE NEW.type = 'm.room.message' AND NEW.state_key IS NULL; + + -- Increment e2ee_event_count for m.room.encrypted events + UPDATE event_stats + SET e2ee_event_count = e2ee_event_count + 1 + WHERE NEW.type = 'm.room.encrypted' AND NEW.state_key IS NULL; + END; + """ + ) + + txn.execute( + """ + CREATE TRIGGER IF NOT EXISTS event_stats_events_delete_trigger + AFTER DELETE ON events + BEGIN + -- Always decrement total_event_count + UPDATE event_stats SET total_event_count = total_event_count - 1; + + -- Decrement unencrypted_message_count for m.room.message events + UPDATE event_stats + SET unencrypted_message_count = unencrypted_message_count - 1 + WHERE OLD.type = 'm.room.message' AND OLD.state_key IS NULL; + + -- Decrement e2ee_event_count for m.room.encrypted events + UPDATE event_stats + SET e2ee_event_count = e2ee_event_count - 1 + WHERE OLD.type = 'm.room.encrypted' AND OLD.state_key IS NULL; + END; + """ + ) + elif isinstance(txn.database_engine, PostgresEngine): + txn.execute( + """ + CREATE OR REPLACE FUNCTION event_stats_increment_counts() RETURNS trigger AS $BODY$ + BEGIN + IF TG_OP = 'INSERT' THEN + -- Always increment total_event_count + UPDATE event_stats SET total_event_count = total_event_count + 1; + + -- Increment unencrypted_message_count for m.room.message events + IF NEW.type = 'm.room.message' AND NEW.state_key IS NULL THEN + UPDATE event_stats SET unencrypted_message_count = unencrypted_message_count + 1; + END IF; + + -- Increment e2ee_event_count for m.room.encrypted events + IF NEW.type = 'm.room.encrypted' AND NEW.state_key IS NULL THEN + UPDATE event_stats SET e2ee_event_count = e2ee_event_count + 1; + END IF; + + -- We're not modifying the row being inserted/deleted, so we return it unchanged. + RETURN NEW; + + ELSIF TG_OP = 'DELETE' THEN + -- Always decrement total_event_count + UPDATE event_stats SET total_event_count = total_event_count - 1; + + -- Decrement unencrypted_message_count for m.room.message events + IF OLD.type = 'm.room.message' AND OLD.state_key IS NULL THEN + UPDATE event_stats SET unencrypted_message_count = unencrypted_message_count - 1; + END IF; + + -- Decrement e2ee_event_count for m.room.encrypted events + IF OLD.type = 'm.room.encrypted' AND OLD.state_key IS NULL THEN + UPDATE event_stats SET e2ee_event_count = e2ee_event_count - 1; + END IF; + + -- "The usual idiom in DELETE triggers is to return OLD." + -- (https://www.postgresql.org/docs/current/plpgsql-trigger.html) + RETURN OLD; + END IF; + + RAISE EXCEPTION 'update_event_stats() was run with unexpected operation (%%). ' + 'This indicates a trigger misconfiguration as this function should only' + 'run with INSERT/DELETE operations.', TG_OP; + END; + $BODY$ LANGUAGE plpgsql; + """ + ) + + # We could use `CREATE OR REPLACE TRIGGER` but that's only available in Postgres + # 14 (https://www.postgresql.org/docs/14/sql-createtrigger.html) + txn.execute( + """ + DO + $$BEGIN + CREATE TRIGGER event_stats_increment_counts_trigger + AFTER INSERT OR DELETE ON events + FOR EACH ROW + EXECUTE PROCEDURE event_stats_increment_counts(); + EXCEPTION + -- This acts as a "CREATE TRIGGER IF NOT EXISTS" for Postgres + WHEN duplicate_object THEN + NULL; + END;$$; + """ + ) + else: + raise NotImplementedError("Unknown database engine") + + # Find the latest `stream_ordering` in the `events` table. We need to do + # this in the same transaction as where we add the triggers so we don't miss + # any events. + txn.execute( + """ + SELECT stream_ordering + FROM events + ORDER BY stream_ordering DESC + LIMIT 1 + """ + ) + row = cast(Optional[Tuple[int]], txn.fetchone()) + + # Update the progress + if row is not None: + (max_stream_ordering,) = row + self.db_pool.updates._background_update_progress_txn( + txn, + _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE, + {"stop_event_stream_ordering": max_stream_ordering}, + ) + return max_stream_ordering + + return None + + # First, add the triggers to keep the `event_stats` values up-to-date. + # + # If we don't have a `stop_event_stream_ordering` yet, we need to add the + # triggers to the `events` table and set the stopping point so we don't + # double count `events` later. + if stop_event_stream_ordering is None: + stop_event_stream_ordering = await self.db_pool.runInteraction( + "_event_stats_populate_counts_bg_update_add_triggers", + _add_triggers_txn, + ) + + # If there is no `stop_event_stream_ordering`, then there are no events + # in the `events` table and we can end the background update altogether. + if stop_event_stream_ordering is None: + await self.db_pool.updates._end_background_update( + _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE + ) + return batch_size + + def _populate_txn( + txn: LoggingTransaction, + ) -> int: + """ + Updates the `event_stats` table from this batch of events. + """ + + # Increment the counts based on the events present in this batch. + txn.execute( + """ + WITH event_batch AS ( + SELECT * + FROM events + WHERE stream_ordering > ? AND stream_ordering <= ? + ORDER BY stream_ordering ASC + LIMIT ? + ), + batch_stats AS ( + SELECT + MAX(stream_ordering) AS max_stream_ordering, + COALESCE(COUNT(*), 0) AS total_event_count, + COALESCE(SUM(CASE WHEN type = 'm.room.message' AND state_key IS NULL THEN 1 ELSE 0 END), 0) AS unencrypted_message_count, + COALESCE(SUM(CASE WHEN type = 'm.room.encrypted' AND state_key IS NULL THEN 1 ELSE 0 END), 0) AS e2ee_event_count + FROM event_batch + + UNION ALL + + SELECT null, 0, 0, 0 + WHERE NOT EXISTS (SELECT 1 FROM event_batch) + LIMIT 1 + ) + UPDATE event_stats + SET + total_event_count = total_event_count + (SELECT total_event_count FROM batch_stats), + unencrypted_message_count = unencrypted_message_count + (SELECT unencrypted_message_count FROM batch_stats), + e2ee_event_count = e2ee_event_count + (SELECT e2ee_event_count FROM batch_stats) + RETURNING + (SELECT total_event_count FROM batch_stats) AS total_event_count, + (SELECT max_stream_ordering FROM batch_stats) AS max_stream_ordering + """, + (last_event_stream_ordering, stop_event_stream_ordering, batch_size), + ) + + # Get the results of the update + (total_event_count, max_stream_ordering) = cast( + Tuple[int, Optional[int]], txn.fetchone() + ) + + # Update the progress + self.db_pool.updates._background_update_progress_txn( + txn, + _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE, + { + "last_event_stream_ordering": max_stream_ordering, + "stop_event_stream_ordering": stop_event_stream_ordering, + }, + ) + + return total_event_count + + num_rows_processed = await self.db_pool.runInteraction( + "_event_stats_populate_counts_bg_update", + _populate_txn, + ) + + # No more rows to process, so our background update is complete. + if not num_rows_processed: + await self.db_pool.updates._end_background_update( + _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE + ) + + return batch_size + def _resolve_stale_data_in_sliding_sync_tables( txn: LoggingTransaction, diff --git a/synapse/storage/databases/main/metrics.py b/synapse/storage/databases/main/metrics.py
index 9ce1100b5c..a9cecc4bc1 100644 --- a/synapse/storage/databases/main/metrics.py +++ b/synapse/storage/databases/main/metrics.py
@@ -126,6 +126,44 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore): return await self.db_pool.runInteraction("count_e2ee_messages", _count_messages) + async def count_total_events(self) -> int: + """ + Returns the total number of events present on the server. + """ + + return await self.db_pool.simple_select_one_onecol( + table="event_stats", + keyvalues={}, + retcol="total_event_count", + desc="count_total_events", + ) + + async def count_total_messages(self) -> int: + """ + Returns the total number of `m.room.message` events present on the + server. + """ + + return await self.db_pool.simple_select_one_onecol( + table="event_stats", + keyvalues={}, + retcol="unencrypted_message_count", + desc="count_total_messages", + ) + + async def count_total_e2ee_events(self) -> int: + """ + Returns the total number of `m.room.encrypted` events present on the + server. + """ + + return await self.db_pool.simple_select_one_onecol( + table="event_stats", + keyvalues={}, + retcol="e2ee_event_count", + desc="count_total_e2ee_events", + ) + async def count_daily_sent_e2ee_messages(self) -> int: def _count_messages(txn: LoggingTransaction) -> int: # This is good enough as if you have silly characters in your own