summary refs log tree commit diff
path: root/synapse/storage/databases/main
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases/main')
-rw-r--r--synapse/storage/databases/main/events_bg_updates.py290
-rw-r--r--synapse/storage/databases/main/metrics.py38
2 files changed, 1 insertions, 327 deletions
diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py

index b821d1c1b4..4b0bdd79c6 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -47,7 +47,7 @@ from synapse.storage.databases.main.events_worker import ( ) from synapse.storage.databases.main.state_deltas import StateDeltasStore from synapse.storage.databases.main.stream import StreamWorkerStore -from synapse.storage.engines import PostgresEngine, Sqlite3Engine +from synapse.storage.engines import PostgresEngine from synapse.storage.types import Cursor from synapse.types import JsonDict, RoomStreamToken, StateMap, StrCollection from synapse.types.handlers import SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES @@ -311,12 +311,6 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS self._sliding_sync_membership_snapshots_fix_forgotten_column_bg_update, ) - # Add a background update to add triggers which track event counts. - self.db_pool.updates.register_background_update_handler( - _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE, - self._event_stats_populate_counts_bg_update, - ) - # We want this to run on the main database at startup before we start processing # events. # @@ -2553,288 +2547,6 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS return num_rows - async def _event_stats_populate_counts_bg_update( - self, progress: JsonDict, batch_size: int - ) -> int: - """ - Background update to populate the `event_stats` table with initial - values, and register DB triggers to continue updating it. - - We first register TRIGGERs on rows being added/removed from the `events` table, - which will keep the event counts continuously updated. We also mark the stopping - point for the main population step so we don't double count events. - - Then we will iterate through the `events` table in batches and update event - counts until we reach the stopping point. - - This data is intended to be used by the phone-home stats to keep track - of total event and message counts. A trigger is preferred to counting - rows in the `events` table, as said table can grow quite large. - - It is also preferable to adding an index on the `events` table, as even - an index can grow large. And calculating total counts would require - querying that entire index. - """ - # The last event `stream_ordering` we processed (starting place of this next - # batch). - last_event_stream_ordering = progress.get( - "last_event_stream_ordering", -(1 << 31) - ) - # The event `stream_ordering` we should stop at. This is used to avoid double - # counting events that are already accounted for because of the triggers. - stop_event_stream_ordering: Optional[int] = progress.get( - "stop_event_stream_ordering", None - ) - - def _add_triggers_txn( - txn: LoggingTransaction, - ) -> Optional[int]: - """ - Adds the triggers to the `events` table to keep the `event_stats` counts - up-to-date. - - Also populates the `stop_event_stream_ordering` background update progress - value. This marks the point at which we added the triggers, so we can avoid - double counting events that are already accounted for in the population - step. - - Returns: - The latest event `stream_ordering` in the `events` table when the triggers - were added or `None` if the `events` table is empty. - """ - - # Each time an event is inserted into the `events` table, update the stats. - # - # We're using `AFTER` triggers as we want to count successful inserts/deletes and - # not the ones that could potentially fail. - if isinstance(txn.database_engine, Sqlite3Engine): - txn.execute( - """ - CREATE TRIGGER IF NOT EXISTS event_stats_events_insert_trigger - AFTER INSERT ON events - BEGIN - -- Always increment total_event_count - UPDATE event_stats SET total_event_count = total_event_count + 1; - - -- Increment unencrypted_message_count for m.room.message events - UPDATE event_stats - SET unencrypted_message_count = unencrypted_message_count + 1 - WHERE NEW.type = 'm.room.message' AND NEW.state_key IS NULL; - - -- Increment e2ee_event_count for m.room.encrypted events - UPDATE event_stats - SET e2ee_event_count = e2ee_event_count + 1 - WHERE NEW.type = 'm.room.encrypted' AND NEW.state_key IS NULL; - END; - """ - ) - - txn.execute( - """ - CREATE TRIGGER IF NOT EXISTS event_stats_events_delete_trigger - AFTER DELETE ON events - BEGIN - -- Always decrement total_event_count - UPDATE event_stats SET total_event_count = total_event_count - 1; - - -- Decrement unencrypted_message_count for m.room.message events - UPDATE event_stats - SET unencrypted_message_count = unencrypted_message_count - 1 - WHERE OLD.type = 'm.room.message' AND OLD.state_key IS NULL; - - -- Decrement e2ee_event_count for m.room.encrypted events - UPDATE event_stats - SET e2ee_event_count = e2ee_event_count - 1 - WHERE OLD.type = 'm.room.encrypted' AND OLD.state_key IS NULL; - END; - """ - ) - elif isinstance(txn.database_engine, PostgresEngine): - txn.execute( - """ - CREATE OR REPLACE FUNCTION event_stats_increment_counts() RETURNS trigger AS $BODY$ - BEGIN - IF TG_OP = 'INSERT' THEN - -- Always increment total_event_count - UPDATE event_stats SET total_event_count = total_event_count + 1; - - -- Increment unencrypted_message_count for m.room.message events - IF NEW.type = 'm.room.message' AND NEW.state_key IS NULL THEN - UPDATE event_stats SET unencrypted_message_count = unencrypted_message_count + 1; - END IF; - - -- Increment e2ee_event_count for m.room.encrypted events - IF NEW.type = 'm.room.encrypted' AND NEW.state_key IS NULL THEN - UPDATE event_stats SET e2ee_event_count = e2ee_event_count + 1; - END IF; - - -- We're not modifying the row being inserted/deleted, so we return it unchanged. - RETURN NEW; - - ELSIF TG_OP = 'DELETE' THEN - -- Always decrement total_event_count - UPDATE event_stats SET total_event_count = total_event_count - 1; - - -- Decrement unencrypted_message_count for m.room.message events - IF OLD.type = 'm.room.message' AND OLD.state_key IS NULL THEN - UPDATE event_stats SET unencrypted_message_count = unencrypted_message_count - 1; - END IF; - - -- Decrement e2ee_event_count for m.room.encrypted events - IF OLD.type = 'm.room.encrypted' AND OLD.state_key IS NULL THEN - UPDATE event_stats SET e2ee_event_count = e2ee_event_count - 1; - END IF; - - -- "The usual idiom in DELETE triggers is to return OLD." - -- (https://www.postgresql.org/docs/current/plpgsql-trigger.html) - RETURN OLD; - END IF; - - RAISE EXCEPTION 'update_event_stats() was run with unexpected operation (%%). ' - 'This indicates a trigger misconfiguration as this function should only' - 'run with INSERT/DELETE operations.', TG_OP; - END; - $BODY$ LANGUAGE plpgsql; - """ - ) - - # We could use `CREATE OR REPLACE TRIGGER` but that's only available in Postgres - # 14 (https://www.postgresql.org/docs/14/sql-createtrigger.html) - txn.execute( - """ - DO - $$BEGIN - CREATE TRIGGER event_stats_increment_counts_trigger - AFTER INSERT OR DELETE ON events - FOR EACH ROW - EXECUTE PROCEDURE event_stats_increment_counts(); - EXCEPTION - -- This acts as a "CREATE TRIGGER IF NOT EXISTS" for Postgres - WHEN duplicate_object THEN - NULL; - END;$$; - """ - ) - else: - raise NotImplementedError("Unknown database engine") - - # Find the latest `stream_ordering` in the `events` table. We need to do - # this in the same transaction as where we add the triggers so we don't miss - # any events. - txn.execute( - """ - SELECT stream_ordering - FROM events - ORDER BY stream_ordering DESC - LIMIT 1 - """ - ) - row = cast(Optional[Tuple[int]], txn.fetchone()) - - # Update the progress - if row is not None: - (max_stream_ordering,) = row - self.db_pool.updates._background_update_progress_txn( - txn, - _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE, - {"stop_event_stream_ordering": max_stream_ordering}, - ) - return max_stream_ordering - - return None - - # First, add the triggers to keep the `event_stats` values up-to-date. - # - # If we don't have a `stop_event_stream_ordering` yet, we need to add the - # triggers to the `events` table and set the stopping point so we don't - # double count `events` later. - if stop_event_stream_ordering is None: - stop_event_stream_ordering = await self.db_pool.runInteraction( - "_event_stats_populate_counts_bg_update_add_triggers", - _add_triggers_txn, - ) - - # If there is no `stop_event_stream_ordering`, then there are no events - # in the `events` table and we can end the background update altogether. - if stop_event_stream_ordering is None: - await self.db_pool.updates._end_background_update( - _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE - ) - return batch_size - - def _populate_txn( - txn: LoggingTransaction, - ) -> int: - """ - Updates the `event_stats` table from this batch of events. - """ - - # Increment the counts based on the events present in this batch. - txn.execute( - """ - WITH event_batch AS ( - SELECT * - FROM events - WHERE stream_ordering > ? AND stream_ordering <= ? - ORDER BY stream_ordering ASC - LIMIT ? - ), - batch_stats AS ( - SELECT - MAX(stream_ordering) AS max_stream_ordering, - COALESCE(COUNT(*), 0) AS total_event_count, - COALESCE(SUM(CASE WHEN type = 'm.room.message' AND state_key IS NULL THEN 1 ELSE 0 END), 0) AS unencrypted_message_count, - COALESCE(SUM(CASE WHEN type = 'm.room.encrypted' AND state_key IS NULL THEN 1 ELSE 0 END), 0) AS e2ee_event_count - FROM event_batch - - UNION ALL - - SELECT null, 0, 0, 0 - WHERE NOT EXISTS (SELECT 1 FROM event_batch) - LIMIT 1 - ) - UPDATE event_stats - SET - total_event_count = total_event_count + (SELECT total_event_count FROM batch_stats), - unencrypted_message_count = unencrypted_message_count + (SELECT unencrypted_message_count FROM batch_stats), - e2ee_event_count = e2ee_event_count + (SELECT e2ee_event_count FROM batch_stats) - RETURNING - (SELECT total_event_count FROM batch_stats) AS total_event_count, - (SELECT max_stream_ordering FROM batch_stats) AS max_stream_ordering - """, - (last_event_stream_ordering, stop_event_stream_ordering, batch_size), - ) - - # Get the results of the update - (total_event_count, max_stream_ordering) = cast( - Tuple[int, Optional[int]], txn.fetchone() - ) - - # Update the progress - self.db_pool.updates._background_update_progress_txn( - txn, - _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE, - { - "last_event_stream_ordering": max_stream_ordering, - "stop_event_stream_ordering": stop_event_stream_ordering, - }, - ) - - return total_event_count - - num_rows_processed = await self.db_pool.runInteraction( - "_event_stats_populate_counts_bg_update", - _populate_txn, - ) - - # No more rows to process, so our background update is complete. - if not num_rows_processed: - await self.db_pool.updates._end_background_update( - _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE - ) - - return batch_size - def _resolve_stale_data_in_sliding_sync_tables( txn: LoggingTransaction, diff --git a/synapse/storage/databases/main/metrics.py b/synapse/storage/databases/main/metrics.py
index a9cecc4bc1..9ce1100b5c 100644 --- a/synapse/storage/databases/main/metrics.py +++ b/synapse/storage/databases/main/metrics.py
@@ -126,44 +126,6 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore): return await self.db_pool.runInteraction("count_e2ee_messages", _count_messages) - async def count_total_events(self) -> int: - """ - Returns the total number of events present on the server. - """ - - return await self.db_pool.simple_select_one_onecol( - table="event_stats", - keyvalues={}, - retcol="total_event_count", - desc="count_total_events", - ) - - async def count_total_messages(self) -> int: - """ - Returns the total number of `m.room.message` events present on the - server. - """ - - return await self.db_pool.simple_select_one_onecol( - table="event_stats", - keyvalues={}, - retcol="unencrypted_message_count", - desc="count_total_messages", - ) - - async def count_total_e2ee_events(self) -> int: - """ - Returns the total number of `m.room.encrypted` events present on the - server. - """ - - return await self.db_pool.simple_select_one_onecol( - table="event_stats", - keyvalues={}, - retcol="e2ee_event_count", - desc="count_total_e2ee_events", - ) - async def count_daily_sent_e2ee_messages(self) -> int: def _count_messages(txn: LoggingTransaction) -> int: # This is good enough as if you have silly characters in your own