diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index b821d1c1b4..4b0bdd79c6 100644
--- a/synapse/storage/databases/main/events_bg_updates.py
+++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -47,7 +47,7 @@ from synapse.storage.databases.main.events_worker import (
)
from synapse.storage.databases.main.state_deltas import StateDeltasStore
from synapse.storage.databases.main.stream import StreamWorkerStore
-from synapse.storage.engines import PostgresEngine, Sqlite3Engine
+from synapse.storage.engines import PostgresEngine
from synapse.storage.types import Cursor
from synapse.types import JsonDict, RoomStreamToken, StateMap, StrCollection
from synapse.types.handlers import SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES
@@ -311,12 +311,6 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
self._sliding_sync_membership_snapshots_fix_forgotten_column_bg_update,
)
- # Add a background update to add triggers which track event counts.
- self.db_pool.updates.register_background_update_handler(
- _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE,
- self._event_stats_populate_counts_bg_update,
- )
-
# We want this to run on the main database at startup before we start processing
# events.
#
@@ -2553,288 +2547,6 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
return num_rows
- async def _event_stats_populate_counts_bg_update(
- self, progress: JsonDict, batch_size: int
- ) -> int:
- """
- Background update to populate the `event_stats` table with initial
- values, and register DB triggers to continue updating it.
-
- We first register TRIGGERs on rows being added/removed from the `events` table,
- which will keep the event counts continuously updated. We also mark the stopping
- point for the main population step so we don't double count events.
-
- Then we will iterate through the `events` table in batches and update event
- counts until we reach the stopping point.
-
- This data is intended to be used by the phone-home stats to keep track
- of total event and message counts. A trigger is preferred to counting
- rows in the `events` table, as said table can grow quite large.
-
- It is also preferable to adding an index on the `events` table, as even
- an index can grow large. And calculating total counts would require
- querying that entire index.
- """
- # The last event `stream_ordering` we processed (starting place of this next
- # batch).
- last_event_stream_ordering = progress.get(
- "last_event_stream_ordering", -(1 << 31)
- )
- # The event `stream_ordering` we should stop at. This is used to avoid double
- # counting events that are already accounted for because of the triggers.
- stop_event_stream_ordering: Optional[int] = progress.get(
- "stop_event_stream_ordering", None
- )
-
- def _add_triggers_txn(
- txn: LoggingTransaction,
- ) -> Optional[int]:
- """
- Adds the triggers to the `events` table to keep the `event_stats` counts
- up-to-date.
-
- Also populates the `stop_event_stream_ordering` background update progress
- value. This marks the point at which we added the triggers, so we can avoid
- double counting events that are already accounted for in the population
- step.
-
- Returns:
- The latest event `stream_ordering` in the `events` table when the triggers
- were added or `None` if the `events` table is empty.
- """
-
- # Each time an event is inserted into the `events` table, update the stats.
- #
- # We're using `AFTER` triggers as we want to count successful inserts/deletes and
- # not the ones that could potentially fail.
- if isinstance(txn.database_engine, Sqlite3Engine):
- txn.execute(
- """
- CREATE TRIGGER IF NOT EXISTS event_stats_events_insert_trigger
- AFTER INSERT ON events
- BEGIN
- -- Always increment total_event_count
- UPDATE event_stats SET total_event_count = total_event_count + 1;
-
- -- Increment unencrypted_message_count for m.room.message events
- UPDATE event_stats
- SET unencrypted_message_count = unencrypted_message_count + 1
- WHERE NEW.type = 'm.room.message' AND NEW.state_key IS NULL;
-
- -- Increment e2ee_event_count for m.room.encrypted events
- UPDATE event_stats
- SET e2ee_event_count = e2ee_event_count + 1
- WHERE NEW.type = 'm.room.encrypted' AND NEW.state_key IS NULL;
- END;
- """
- )
-
- txn.execute(
- """
- CREATE TRIGGER IF NOT EXISTS event_stats_events_delete_trigger
- AFTER DELETE ON events
- BEGIN
- -- Always decrement total_event_count
- UPDATE event_stats SET total_event_count = total_event_count - 1;
-
- -- Decrement unencrypted_message_count for m.room.message events
- UPDATE event_stats
- SET unencrypted_message_count = unencrypted_message_count - 1
- WHERE OLD.type = 'm.room.message' AND OLD.state_key IS NULL;
-
- -- Decrement e2ee_event_count for m.room.encrypted events
- UPDATE event_stats
- SET e2ee_event_count = e2ee_event_count - 1
- WHERE OLD.type = 'm.room.encrypted' AND OLD.state_key IS NULL;
- END;
- """
- )
- elif isinstance(txn.database_engine, PostgresEngine):
- txn.execute(
- """
- CREATE OR REPLACE FUNCTION event_stats_increment_counts() RETURNS trigger AS $BODY$
- BEGIN
- IF TG_OP = 'INSERT' THEN
- -- Always increment total_event_count
- UPDATE event_stats SET total_event_count = total_event_count + 1;
-
- -- Increment unencrypted_message_count for m.room.message events
- IF NEW.type = 'm.room.message' AND NEW.state_key IS NULL THEN
- UPDATE event_stats SET unencrypted_message_count = unencrypted_message_count + 1;
- END IF;
-
- -- Increment e2ee_event_count for m.room.encrypted events
- IF NEW.type = 'm.room.encrypted' AND NEW.state_key IS NULL THEN
- UPDATE event_stats SET e2ee_event_count = e2ee_event_count + 1;
- END IF;
-
- -- We're not modifying the row being inserted/deleted, so we return it unchanged.
- RETURN NEW;
-
- ELSIF TG_OP = 'DELETE' THEN
- -- Always decrement total_event_count
- UPDATE event_stats SET total_event_count = total_event_count - 1;
-
- -- Decrement unencrypted_message_count for m.room.message events
- IF OLD.type = 'm.room.message' AND OLD.state_key IS NULL THEN
- UPDATE event_stats SET unencrypted_message_count = unencrypted_message_count - 1;
- END IF;
-
- -- Decrement e2ee_event_count for m.room.encrypted events
- IF OLD.type = 'm.room.encrypted' AND OLD.state_key IS NULL THEN
- UPDATE event_stats SET e2ee_event_count = e2ee_event_count - 1;
- END IF;
-
- -- "The usual idiom in DELETE triggers is to return OLD."
- -- (https://www.postgresql.org/docs/current/plpgsql-trigger.html)
- RETURN OLD;
- END IF;
-
- RAISE EXCEPTION 'update_event_stats() was run with unexpected operation (%%). '
- 'This indicates a trigger misconfiguration as this function should only'
- 'run with INSERT/DELETE operations.', TG_OP;
- END;
- $BODY$ LANGUAGE plpgsql;
- """
- )
-
- # We could use `CREATE OR REPLACE TRIGGER` but that's only available in Postgres
- # 14 (https://www.postgresql.org/docs/14/sql-createtrigger.html)
- txn.execute(
- """
- DO
- $$BEGIN
- CREATE TRIGGER event_stats_increment_counts_trigger
- AFTER INSERT OR DELETE ON events
- FOR EACH ROW
- EXECUTE PROCEDURE event_stats_increment_counts();
- EXCEPTION
- -- This acts as a "CREATE TRIGGER IF NOT EXISTS" for Postgres
- WHEN duplicate_object THEN
- NULL;
- END;$$;
- """
- )
- else:
- raise NotImplementedError("Unknown database engine")
-
- # Find the latest `stream_ordering` in the `events` table. We need to do
- # this in the same transaction as where we add the triggers so we don't miss
- # any events.
- txn.execute(
- """
- SELECT stream_ordering
- FROM events
- ORDER BY stream_ordering DESC
- LIMIT 1
- """
- )
- row = cast(Optional[Tuple[int]], txn.fetchone())
-
- # Update the progress
- if row is not None:
- (max_stream_ordering,) = row
- self.db_pool.updates._background_update_progress_txn(
- txn,
- _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE,
- {"stop_event_stream_ordering": max_stream_ordering},
- )
- return max_stream_ordering
-
- return None
-
- # First, add the triggers to keep the `event_stats` values up-to-date.
- #
- # If we don't have a `stop_event_stream_ordering` yet, we need to add the
- # triggers to the `events` table and set the stopping point so we don't
- # double count `events` later.
- if stop_event_stream_ordering is None:
- stop_event_stream_ordering = await self.db_pool.runInteraction(
- "_event_stats_populate_counts_bg_update_add_triggers",
- _add_triggers_txn,
- )
-
- # If there is no `stop_event_stream_ordering`, then there are no events
- # in the `events` table and we can end the background update altogether.
- if stop_event_stream_ordering is None:
- await self.db_pool.updates._end_background_update(
- _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE
- )
- return batch_size
-
- def _populate_txn(
- txn: LoggingTransaction,
- ) -> int:
- """
- Updates the `event_stats` table from this batch of events.
- """
-
- # Increment the counts based on the events present in this batch.
- txn.execute(
- """
- WITH event_batch AS (
- SELECT *
- FROM events
- WHERE stream_ordering > ? AND stream_ordering <= ?
- ORDER BY stream_ordering ASC
- LIMIT ?
- ),
- batch_stats AS (
- SELECT
- MAX(stream_ordering) AS max_stream_ordering,
- COALESCE(COUNT(*), 0) AS total_event_count,
- COALESCE(SUM(CASE WHEN type = 'm.room.message' AND state_key IS NULL THEN 1 ELSE 0 END), 0) AS unencrypted_message_count,
- COALESCE(SUM(CASE WHEN type = 'm.room.encrypted' AND state_key IS NULL THEN 1 ELSE 0 END), 0) AS e2ee_event_count
- FROM event_batch
-
- UNION ALL
-
- SELECT null, 0, 0, 0
- WHERE NOT EXISTS (SELECT 1 FROM event_batch)
- LIMIT 1
- )
- UPDATE event_stats
- SET
- total_event_count = total_event_count + (SELECT total_event_count FROM batch_stats),
- unencrypted_message_count = unencrypted_message_count + (SELECT unencrypted_message_count FROM batch_stats),
- e2ee_event_count = e2ee_event_count + (SELECT e2ee_event_count FROM batch_stats)
- RETURNING
- (SELECT total_event_count FROM batch_stats) AS total_event_count,
- (SELECT max_stream_ordering FROM batch_stats) AS max_stream_ordering
- """,
- (last_event_stream_ordering, stop_event_stream_ordering, batch_size),
- )
-
- # Get the results of the update
- (total_event_count, max_stream_ordering) = cast(
- Tuple[int, Optional[int]], txn.fetchone()
- )
-
- # Update the progress
- self.db_pool.updates._background_update_progress_txn(
- txn,
- _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE,
- {
- "last_event_stream_ordering": max_stream_ordering,
- "stop_event_stream_ordering": stop_event_stream_ordering,
- },
- )
-
- return total_event_count
-
- num_rows_processed = await self.db_pool.runInteraction(
- "_event_stats_populate_counts_bg_update",
- _populate_txn,
- )
-
- # No more rows to process, so our background update is complete.
- if not num_rows_processed:
- await self.db_pool.updates._end_background_update(
- _BackgroundUpdates.EVENT_STATS_POPULATE_COUNTS_BG_UPDATE
- )
-
- return batch_size
-
def _resolve_stale_data_in_sliding_sync_tables(
txn: LoggingTransaction,
diff --git a/synapse/storage/databases/main/metrics.py b/synapse/storage/databases/main/metrics.py
index a9cecc4bc1..9ce1100b5c 100644
--- a/synapse/storage/databases/main/metrics.py
+++ b/synapse/storage/databases/main/metrics.py
@@ -126,44 +126,6 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
return await self.db_pool.runInteraction("count_e2ee_messages", _count_messages)
- async def count_total_events(self) -> int:
- """
- Returns the total number of events present on the server.
- """
-
- return await self.db_pool.simple_select_one_onecol(
- table="event_stats",
- keyvalues={},
- retcol="total_event_count",
- desc="count_total_events",
- )
-
- async def count_total_messages(self) -> int:
- """
- Returns the total number of `m.room.message` events present on the
- server.
- """
-
- return await self.db_pool.simple_select_one_onecol(
- table="event_stats",
- keyvalues={},
- retcol="unencrypted_message_count",
- desc="count_total_messages",
- )
-
- async def count_total_e2ee_events(self) -> int:
- """
- Returns the total number of `m.room.encrypted` events present on the
- server.
- """
-
- return await self.db_pool.simple_select_one_onecol(
- table="event_stats",
- keyvalues={},
- retcol="e2ee_event_count",
- desc="count_total_e2ee_events",
- )
-
async def count_daily_sent_e2ee_messages(self) -> int:
def _count_messages(txn: LoggingTransaction) -> int:
# This is good enough as if you have silly characters in your own
|