diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index 4b0bdd79c6..b821d1c1b4 100644
--- a/synapse/storage/databases/main/events_bg_updates.py
+++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -47,7 +47,7 @@ from synapse.storage.databases.main.events_worker import (
)
from synapse.storage.databases.main.state_deltas import StateDeltasStore
from synapse.storage.databases.main.stream import StreamWorkerStore
-from synapse.storage.engines import PostgresEngine
+from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.storage.types import Cursor
from synapse.types import JsonDict, RoomStreamToken, StateMap, StrCollection
from synapse.types.handlers import SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES
@@ -311,6 +311,12 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
self._sliding_sync_membership_snapshots_fix_forgotten_column_bg_update,
)
+ # Add a background update to add triggers which track event counts.
+ self.db_pool.updates.register_background_update_handler(
+ _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE,
+ self._event_stats_populate_counts_bg_update,
+ )
+
# We want this to run on the main database at startup before we start processing
# events.
#
@@ -2547,6 +2553,288 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
return num_rows
+ async def _event_stats_populate_counts_bg_update(
+ self, progress: JsonDict, batch_size: int
+ ) -> int:
+ """
+ Background update to populate the `event_stats` table with initial
+ values, and register DB triggers to continue updating it.
+
+ We first register TRIGGERs on rows being added/removed from the `events` table,
+ which will keep the event counts continuously updated. We also mark the stopping
+ point for the main population step so we don't double count events.
+
+ Then we will iterate through the `events` table in batches and update event
+ counts until we reach the stopping point.
+
+ This data is intended to be used by the phone-home stats to keep track
+ of total event and message counts. A trigger is preferred to counting
+ rows in the `events` table, as said table can grow quite large.
+
+ It is also preferable to adding an index on the `events` table, as even
+ an index can grow large. And calculating total counts would require
+ querying that entire index.
+ """
+ # The last event `stream_ordering` we processed (starting place of this next
+ # batch).
+ last_event_stream_ordering = progress.get(
+ "last_event_stream_ordering", -(1 << 31)
+ )
+ # The event `stream_ordering` we should stop at. This is used to avoid double
+ # counting events that are already accounted for because of the triggers.
+ stop_event_stream_ordering: Optional[int] = progress.get(
+ "stop_event_stream_ordering", None
+ )
+
+ def _add_triggers_txn(
+ txn: LoggingTransaction,
+ ) -> Optional[int]:
+ """
+ Adds the triggers to the `events` table to keep the `event_stats` counts
+ up-to-date.
+
+ Also populates the `stop_event_stream_ordering` background update progress
+ value. This marks the point at which we added the triggers, so we can avoid
+ double counting events that are already accounted for in the population
+ step.
+
+ Returns:
+ The latest event `stream_ordering` in the `events` table when the triggers
+ were added or `None` if the `events` table is empty.
+ """
+
+ # Each time an event is inserted into the `events` table, update the stats.
+ #
+ # We're using `AFTER` triggers as we want to count successful inserts/deletes and
+ # not the ones that could potentially fail.
+ if isinstance(txn.database_engine, Sqlite3Engine):
+ txn.execute(
+ """
+ CREATE TRIGGER IF NOT EXISTS event_stats_events_insert_trigger
+ AFTER INSERT ON events
+ BEGIN
+ -- Always increment total_event_count
+ UPDATE event_stats SET total_event_count = total_event_count + 1;
+
+ -- Increment unencrypted_message_count for m.room.message events
+ UPDATE event_stats
+ SET unencrypted_message_count = unencrypted_message_count + 1
+ WHERE NEW.type = 'm.room.message' AND NEW.state_key IS NULL;
+
+ -- Increment e2ee_event_count for m.room.encrypted events
+ UPDATE event_stats
+ SET e2ee_event_count = e2ee_event_count + 1
+ WHERE NEW.type = 'm.room.encrypted' AND NEW.state_key IS NULL;
+ END;
+ """
+ )
+
+ txn.execute(
+ """
+ CREATE TRIGGER IF NOT EXISTS event_stats_events_delete_trigger
+ AFTER DELETE ON events
+ BEGIN
+ -- Always decrement total_event_count
+ UPDATE event_stats SET total_event_count = total_event_count - 1;
+
+ -- Decrement unencrypted_message_count for m.room.message events
+ UPDATE event_stats
+ SET unencrypted_message_count = unencrypted_message_count - 1
+ WHERE OLD.type = 'm.room.message' AND OLD.state_key IS NULL;
+
+ -- Decrement e2ee_event_count for m.room.encrypted events
+ UPDATE event_stats
+ SET e2ee_event_count = e2ee_event_count - 1
+ WHERE OLD.type = 'm.room.encrypted' AND OLD.state_key IS NULL;
+ END;
+ """
+ )
+ elif isinstance(txn.database_engine, PostgresEngine):
+ txn.execute(
+ """
+ CREATE OR REPLACE FUNCTION event_stats_increment_counts() RETURNS trigger AS $BODY$
+ BEGIN
+ IF TG_OP = 'INSERT' THEN
+ -- Always increment total_event_count
+ UPDATE event_stats SET total_event_count = total_event_count + 1;
+
+ -- Increment unencrypted_message_count for m.room.message events
+ IF NEW.type = 'm.room.message' AND NEW.state_key IS NULL THEN
+ UPDATE event_stats SET unencrypted_message_count = unencrypted_message_count + 1;
+ END IF;
+
+ -- Increment e2ee_event_count for m.room.encrypted events
+ IF NEW.type = 'm.room.encrypted' AND NEW.state_key IS NULL THEN
+ UPDATE event_stats SET e2ee_event_count = e2ee_event_count + 1;
+ END IF;
+
+ -- We're not modifying the row being inserted/deleted, so we return it unchanged.
+ RETURN NEW;
+
+ ELSIF TG_OP = 'DELETE' THEN
+ -- Always decrement total_event_count
+ UPDATE event_stats SET total_event_count = total_event_count - 1;
+
+ -- Decrement unencrypted_message_count for m.room.message events
+ IF OLD.type = 'm.room.message' AND OLD.state_key IS NULL THEN
+ UPDATE event_stats SET unencrypted_message_count = unencrypted_message_count - 1;
+ END IF;
+
+ -- Decrement e2ee_event_count for m.room.encrypted events
+ IF OLD.type = 'm.room.encrypted' AND OLD.state_key IS NULL THEN
+ UPDATE event_stats SET e2ee_event_count = e2ee_event_count - 1;
+ END IF;
+
+ -- "The usual idiom in DELETE triggers is to return OLD."
+ -- (https://www.postgresql.org/docs/current/plpgsql-trigger.html)
+ RETURN OLD;
+ END IF;
+
+ RAISE EXCEPTION 'update_event_stats() was run with unexpected operation (%%). '
+ 'This indicates a trigger misconfiguration as this function should only'
+ 'run with INSERT/DELETE operations.', TG_OP;
+ END;
+ $BODY$ LANGUAGE plpgsql;
+ """
+ )
+
+ # We could use `CREATE OR REPLACE TRIGGER` but that's only available in Postgres
+ # 14 (https://www.postgresql.org/docs/14/sql-createtrigger.html)
+ txn.execute(
+ """
+ DO
+ $$BEGIN
+ CREATE TRIGGER event_stats_increment_counts_trigger
+ AFTER INSERT OR DELETE ON events
+ FOR EACH ROW
+ EXECUTE PROCEDURE event_stats_increment_counts();
+ EXCEPTION
+ -- This acts as a "CREATE TRIGGER IF NOT EXISTS" for Postgres
+ WHEN duplicate_object THEN
+ NULL;
+ END;$$;
+ """
+ )
+ else:
+ raise NotImplementedError("Unknown database engine")
+
+ # Find the latest `stream_ordering` in the `events` table. We need to do
+ # this in the same transaction as where we add the triggers so we don't miss
+ # any events.
+ txn.execute(
+ """
+ SELECT stream_ordering
+ FROM events
+ ORDER BY stream_ordering DESC
+ LIMIT 1
+ """
+ )
+ row = cast(Optional[Tuple[int]], txn.fetchone())
+
+ # Update the progress
+ if row is not None:
+ (max_stream_ordering,) = row
+ self.db_pool.updates._background_update_progress_txn(
+ txn,
+ _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE,
+ {"stop_event_stream_ordering": max_stream_ordering},
+ )
+ return max_stream_ordering
+
+ return None
+
+ # First, add the triggers to keep the `event_stats` values up-to-date.
+ #
+ # If we don't have a `stop_event_stream_ordering` yet, we need to add the
+ # triggers to the `events` table and set the stopping point so we don't
+ # double count `events` later.
+ if stop_event_stream_ordering is None:
+ stop_event_stream_ordering = await self.db_pool.runInteraction(
+ "_event_stats_populate_counts_bg_update_add_triggers",
+ _add_triggers_txn,
+ )
+
+ # If there is no `stop_event_stream_ordering`, then there are no events
+ # in the `events` table and we can end the background update altogether.
+ if stop_event_stream_ordering is None:
+ await self.db_pool.updates._end_background_update(
+ _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE
+ )
+ return batch_size
+
+ def _populate_txn(
+ txn: LoggingTransaction,
+ ) -> int:
+ """
+ Updates the `event_stats` table from this batch of events.
+ """
+
+ # Increment the counts based on the events present in this batch.
+ txn.execute(
+ """
+ WITH event_batch AS (
+ SELECT *
+ FROM events
+ WHERE stream_ordering > ? AND stream_ordering <= ?
+ ORDER BY stream_ordering ASC
+ LIMIT ?
+ ),
+ batch_stats AS (
+ SELECT
+ MAX(stream_ordering) AS max_stream_ordering,
+ COALESCE(COUNT(*), 0) AS total_event_count,
+ COALESCE(SUM(CASE WHEN type = 'm.room.message' AND state_key IS NULL THEN 1 ELSE 0 END), 0) AS unencrypted_message_count,
+ COALESCE(SUM(CASE WHEN type = 'm.room.encrypted' AND state_key IS NULL THEN 1 ELSE 0 END), 0) AS e2ee_event_count
+ FROM event_batch
+
+ UNION ALL
+
+ SELECT null, 0, 0, 0
+ WHERE NOT EXISTS (SELECT 1 FROM event_batch)
+ LIMIT 1
+ )
+ UPDATE event_stats
+ SET
+ total_event_count = total_event_count + (SELECT total_event_count FROM batch_stats),
+ unencrypted_message_count = unencrypted_message_count + (SELECT unencrypted_message_count FROM batch_stats),
+ e2ee_event_count = e2ee_event_count + (SELECT e2ee_event_count FROM batch_stats)
+ RETURNING
+ (SELECT total_event_count FROM batch_stats) AS total_event_count,
+ (SELECT max_stream_ordering FROM batch_stats) AS max_stream_ordering
+ """,
+ (last_event_stream_ordering, stop_event_stream_ordering, batch_size),
+ )
+
+ # Get the results of the update
+ (total_event_count, max_stream_ordering) = cast(
+ Tuple[int, Optional[int]], txn.fetchone()
+ )
+
+ # Update the progress
+ self.db_pool.updates._background_update_progress_txn(
+ txn,
+ _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE,
+ {
+ "last_event_stream_ordering": max_stream_ordering,
+ "stop_event_stream_ordering": stop_event_stream_ordering,
+ },
+ )
+
+ return total_event_count
+
+ num_rows_processed = await self.db_pool.runInteraction(
+ "_event_stats_populate_counts_bg_update",
+ _populate_txn,
+ )
+
+ # No more rows to process, so our background update is complete.
+ if not num_rows_processed:
+ await self.db_pool.updates._end_background_update(
+ _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE
+ )
+
+ return batch_size
+
def _resolve_stale_data_in_sliding_sync_tables(
txn: LoggingTransaction,
diff --git a/synapse/storage/databases/main/metrics.py b/synapse/storage/databases/main/metrics.py
index 9ce1100b5c..a9cecc4bc1 100644
--- a/synapse/storage/databases/main/metrics.py
+++ b/synapse/storage/databases/main/metrics.py
@@ -126,6 +126,44 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
return await self.db_pool.runInteraction("count_e2ee_messages", _count_messages)
+ async def count_total_events(self) -> int:
+ """
+ Returns the total number of events present on the server.
+ """
+
+ return await self.db_pool.simple_select_one_onecol(
+ table="event_stats",
+ keyvalues={},
+ retcol="total_event_count",
+ desc="count_total_events",
+ )
+
+ async def count_total_messages(self) -> int:
+ """
+ Returns the total number of `m.room.message` events present on the
+ server.
+ """
+
+ return await self.db_pool.simple_select_one_onecol(
+ table="event_stats",
+ keyvalues={},
+ retcol="unencrypted_message_count",
+ desc="count_total_messages",
+ )
+
+ async def count_total_e2ee_events(self) -> int:
+ """
+ Returns the total number of `m.room.encrypted` events present on the
+ server.
+ """
+
+ return await self.db_pool.simple_select_one_onecol(
+ table="event_stats",
+ keyvalues={},
+ retcol="e2ee_event_count",
+ desc="count_total_e2ee_events",
+ )
+
async def count_daily_sent_e2ee_messages(self) -> int:
def _count_messages(txn: LoggingTransaction) -> int:
# This is good enough as if you have silly characters in your own
|