diff --git a/changelog.d/18254.feature b/changelog.d/18254.feature
new file mode 100644
index 0000000000..62e1b79a15
--- /dev/null
+++ b/changelog.d/18254.feature
@@ -0,0 +1 @@
+Add background job to clear unreferenced state groups.
diff --git a/docs/development/database_schema.md b/docs/development/database_schema.md
index 37a06acc12..620d1c16b0 100644
--- a/docs/development/database_schema.md
+++ b/docs/development/database_schema.md
@@ -162,7 +162,7 @@ by a unique name, the current status (stored in JSON), and some dependency infor
* Whether the update requires a previous update to be complete.
* A rough ordering for which to complete updates.
-A new background updates needs to be added to the `background_updates` table:
+A new background update needs to be added to the `background_updates` table:
```sql
INSERT INTO background_updates (ordering, update_name, depends_on, progress_json) VALUES
diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py
index 1bb9940180..438b2ff8a0 100755
--- a/synapse/_scripts/synapse_port_db.py
+++ b/synapse/_scripts/synapse_port_db.py
@@ -192,6 +192,11 @@ APPEND_ONLY_TABLES = [
IGNORED_TABLES = {
+ # Porting the auto generated sequence in this table is non-trivial.
+ # None of the entries in this list are mandatory for Synapse to keep working.
+ # If state group disk space is an issue after the port, the
+ # `mark_unreferenced_state_groups_for_deletion_bg_update` background task can be run again.
+ "state_groups_pending_deletion",
# We don't port these tables, as they're a faff and we can regenerate
# them anyway.
"user_directory",
@@ -217,6 +222,15 @@ IGNORED_TABLES = {
}
+# These background updates will not be applied upon creation of the postgres database.
+IGNORED_BACKGROUND_UPDATES = {
+ # Reapplying this background update to the postgres database is unnecessary after
+ # already having waited for the SQLite database to complete all running background
+ # updates.
+ "mark_unreferenced_state_groups_for_deletion_bg_update",
+}
+
+
# Error returned by the run function. Used at the top-level part of the script to
# handle errors and return codes.
end_error: Optional[str] = None
@@ -688,6 +702,20 @@ class Porter:
# 0 means off. 1 means full. 2 means incremental.
return autovacuum_setting != 0
+ async def remove_ignored_background_updates_from_database(self) -> None:
+ def _remove_delete_unreferenced_state_groups_bg_updates(
+ txn: LoggingTransaction,
+ ) -> None:
+ txn.execute(
+ "DELETE FROM background_updates WHERE update_name = ANY(?)",
+ (list(IGNORED_BACKGROUND_UPDATES),),
+ )
+
+ await self.postgres_store.db_pool.runInteraction(
+ "remove_delete_unreferenced_state_groups_bg_updates",
+ _remove_delete_unreferenced_state_groups_bg_updates,
+ )
+
async def run(self) -> None:
"""Ports the SQLite database to a PostgreSQL database.
@@ -733,6 +761,8 @@ class Porter:
self.hs_config.database.get_single_database()
)
+ await self.remove_ignored_background_updates_from_database()
+
await self.run_background_updates_on_postgres()
self.progress.set_state("Creating port tables")
diff --git a/synapse/storage/controllers/purge_events.py b/synapse/storage/controllers/purge_events.py
index 47cec8c469..c2d4bf8290 100644
--- a/synapse/storage/controllers/purge_events.py
+++ b/synapse/storage/controllers/purge_events.py
@@ -21,11 +21,19 @@
import itertools
import logging
-from typing import TYPE_CHECKING, Collection, Mapping, Set
+from typing import (
+ TYPE_CHECKING,
+ Collection,
+ Mapping,
+ Optional,
+ Set,
+)
from synapse.logging.context import nested_logging_context
from synapse.metrics.background_process_metrics import wrap_as_background_process
+from synapse.storage.database import LoggingTransaction
from synapse.storage.databases import Databases
+from synapse.types.storage import _BackgroundUpdates
if TYPE_CHECKING:
from synapse.server import HomeServer
@@ -44,6 +52,11 @@ class PurgeEventsStorageController:
self._delete_state_groups_loop, 60 * 1000
)
+ self.stores.state.db_pool.updates.register_background_update_handler(
+ _BackgroundUpdates.MARK_UNREFERENCED_STATE_GROUPS_FOR_DELETION_BG_UPDATE,
+ self._background_delete_unrefereneced_state_groups,
+ )
+
async def purge_room(self, room_id: str) -> None:
"""Deletes all record of a room"""
@@ -81,7 +94,8 @@ class PurgeEventsStorageController:
)
async def _find_unreferenced_groups(
- self, state_groups: Collection[int]
+ self,
+ state_groups: Collection[int],
) -> Set[int]:
"""Used when purging history to figure out which state groups can be
deleted.
@@ -203,3 +217,232 @@ class PurgeEventsStorageController:
room_id,
groups_to_sequences,
)
+
+ async def _background_delete_unrefereneced_state_groups(
+ self, progress: dict, batch_size: int
+ ) -> int:
+ """This background update will slowly delete any unreferenced state groups"""
+
+ last_checked_state_group = progress.get("last_checked_state_group")
+
+ if last_checked_state_group is None:
+ # This is the first run.
+ last_checked_state_group = (
+ await self.stores.state.db_pool.simple_select_one_onecol(
+ table="state_groups",
+ keyvalues={},
+ retcol="MAX(id)",
+ allow_none=True,
+ desc="get_max_state_group",
+ )
+ )
+ if last_checked_state_group is None:
+ # There are no state groups so the background process is finished.
+ await self.stores.state.db_pool.updates._end_background_update(
+ _BackgroundUpdates.MARK_UNREFERENCED_STATE_GROUPS_FOR_DELETION_BG_UPDATE
+ )
+ return batch_size
+ last_checked_state_group += 1
+
+ (
+ last_checked_state_group,
+ final_batch,
+ ) = await self._delete_unreferenced_state_groups_batch(
+ last_checked_state_group,
+ batch_size,
+ )
+
+ if not final_batch:
+ # There are more state groups to check.
+ progress = {
+ "last_checked_state_group": last_checked_state_group,
+ }
+ await self.stores.state.db_pool.updates._background_update_progress(
+ _BackgroundUpdates.MARK_UNREFERENCED_STATE_GROUPS_FOR_DELETION_BG_UPDATE,
+ progress,
+ )
+ else:
+ # This background process is finished.
+ await self.stores.state.db_pool.updates._end_background_update(
+ _BackgroundUpdates.MARK_UNREFERENCED_STATE_GROUPS_FOR_DELETION_BG_UPDATE
+ )
+
+ return batch_size
+
+ async def _delete_unreferenced_state_groups_batch(
+ self,
+ last_checked_state_group: int,
+ batch_size: int,
+ ) -> tuple[int, bool]:
+ """Looks for unreferenced state groups starting from the last state group
+ checked and marks them for deletion.
+
+ Args:
+ last_checked_state_group: The last state group that was checked.
+ batch_size: How many state groups to process in this iteration.
+
+ Returns:
+ (last_checked_state_group, final_batch)
+ """
+
+ # Find all state groups that can be deleted if any of the original set are deleted.
+ (
+ to_delete,
+ last_checked_state_group,
+ final_batch,
+ ) = await self._find_unreferenced_groups_for_background_deletion(
+ last_checked_state_group, batch_size
+ )
+
+ if len(to_delete) == 0:
+ return last_checked_state_group, final_batch
+
+ await self.stores.state_deletion.mark_state_groups_as_pending_deletion(
+ to_delete
+ )
+
+ return last_checked_state_group, final_batch
+
+ async def _find_unreferenced_groups_for_background_deletion(
+ self,
+ last_checked_state_group: int,
+ batch_size: int,
+ ) -> tuple[Set[int], int, bool]:
+ """Used when deleting unreferenced state groups in the background to figure out
+ which state groups can be deleted.
+ To avoid increased DB usage due to de-deltaing state groups, this returns only
+ state groups which are free standing (ie. no shared edges with referenced groups) or
+ state groups which do not share edges which result in a future referenced group.
+
+ The following scenarios outline the possibilities based on state group data in
+ the DB.
+
+ ie. Free standing -> state groups 1-N would be returned:
+ SG_1
+ |
+ ...
+ |
+ SG_N
+
+ ie. Previous reference -> state groups 2-N would be returned:
+ SG_1 <- referenced by event
+ |
+ SG_2
+ |
+ ...
+ |
+ SG_N
+
+ ie. Future reference -> none of the following state groups would be returned:
+ SG_1
+ |
+ SG_2
+ |
+ ...
+ |
+ SG_N <- referenced by event
+
+ Args:
+ last_checked_state_group: The last state group that was checked.
+ batch_size: How many state groups to process in this iteration.
+
+ Returns:
+ (to_delete, last_checked_state_group, final_batch)
+ """
+
+ # If a state group's next edge is not pending deletion then we don't delete the state group.
+ # If there is no next edge or the next edges are all marked for deletion, then delete
+ # the state group.
+ # This holds since we walk backwards from the latest state groups, ensuring that
+ # we've already checked newer state groups for event references along the way.
+ def get_next_state_groups_marked_for_deletion_txn(
+ txn: LoggingTransaction,
+ ) -> tuple[dict[int, bool], dict[int, int]]:
+ state_group_sql = """
+ SELECT s.id, e.state_group, d.state_group
+ FROM (
+ SELECT id FROM state_groups
+ WHERE id < ? ORDER BY id DESC LIMIT ?
+ ) as s
+ LEFT JOIN state_group_edges AS e ON (s.id = e.prev_state_group)
+ LEFT JOIN state_groups_pending_deletion AS d ON (e.state_group = d.state_group)
+ """
+ txn.execute(state_group_sql, (last_checked_state_group, batch_size))
+
+ # Mapping from state group to whether we should delete it.
+ state_groups_to_deletion: dict[int, bool] = {}
+
+ # Mapping from state group to prev state group.
+ state_groups_to_prev: dict[int, int] = {}
+
+ for row in txn:
+ state_group = row[0]
+ next_edge = row[1]
+ pending_deletion = row[2]
+
+ if next_edge is not None:
+ state_groups_to_prev[next_edge] = state_group
+
+ if next_edge is not None and not pending_deletion:
+ # We have found an edge not marked for deletion.
+ # Check previous results to see if this group is part of a chain
+ # within this batch that qualifies for deletion.
+ # ie. batch contains:
+ # SG_1 -> SG_2 -> SG_3
+ # If SG_3 is a candidate for deletion, then SG_2 & SG_1 should also
+ # be, even though they have edges which may not be marked for
+ # deletion.
+ # This relies on SQL results being sorted in DESC order to work.
+ next_is_deletion_candidate = state_groups_to_deletion.get(next_edge)
+ if (
+ next_is_deletion_candidate is None
+ or not next_is_deletion_candidate
+ ):
+ state_groups_to_deletion[state_group] = False
+ else:
+ state_groups_to_deletion.setdefault(state_group, True)
+ else:
+ # This state group may be a candidate for deletion
+ state_groups_to_deletion.setdefault(state_group, True)
+
+ return state_groups_to_deletion, state_groups_to_prev
+
+ (
+ state_groups_to_deletion,
+ state_group_edges,
+ ) = await self.stores.state.db_pool.runInteraction(
+ "get_next_state_groups_marked_for_deletion",
+ get_next_state_groups_marked_for_deletion_txn,
+ )
+ deletion_candidates = {
+ state_group
+ for state_group, deletion in state_groups_to_deletion.items()
+ if deletion
+ }
+
+ final_batch = False
+ state_groups = state_groups_to_deletion.keys()
+ if len(state_groups) < batch_size:
+ final_batch = True
+ else:
+ last_checked_state_group = min(state_groups)
+
+ if len(state_groups) == 0:
+ return set(), last_checked_state_group, final_batch
+
+ # Determine if any of the remaining state groups are directly referenced.
+ referenced = await self.stores.main.get_referenced_state_groups(
+ deletion_candidates
+ )
+
+ # Remove state groups from deletion_candidates which are directly referenced or share a
+ # future edge with a referenced state group within this batch.
+ def filter_reference_chains(group: Optional[int]) -> None:
+ while group is not None:
+ deletion_candidates.discard(group)
+ group = state_group_edges.get(group)
+
+ for referenced_group in referenced:
+ filter_reference_chains(referenced_group)
+
+ return deletion_candidates, last_checked_state_group, final_batch
diff --git a/synapse/storage/databases/state/bg_updates.py b/synapse/storage/databases/state/bg_updates.py
index f7824cba0f..95fd0ae73a 100644
--- a/synapse/storage/databases/state/bg_updates.py
+++ b/synapse/storage/databases/state/bg_updates.py
@@ -20,7 +20,15 @@
#
import logging
-from typing import TYPE_CHECKING, Dict, List, Mapping, Optional, Tuple, Union
+from typing import (
+ TYPE_CHECKING,
+ Dict,
+ List,
+ Mapping,
+ Optional,
+ Tuple,
+ Union,
+)
from synapse.logging.opentracing import tag_args, trace
from synapse.storage._base import SQLBaseStore
diff --git a/synapse/storage/databases/state/deletion.py b/synapse/storage/databases/state/deletion.py
index d4b1c20a45..f77c46f6ae 100644
--- a/synapse/storage/databases/state/deletion.py
+++ b/synapse/storage/databases/state/deletion.py
@@ -321,19 +321,43 @@ class StateDeletionDataStore:
async def mark_state_groups_as_pending_deletion(
self, state_groups: Collection[int]
) -> None:
- """Mark the given state groups as pending deletion"""
+ """Mark the given state groups as pending deletion.
- now = self._clock.time_msec()
+ If any of the state groups are already pending deletion, then those records are
+ left as is.
+ """
- await self.db_pool.simple_upsert_many(
- table="state_groups_pending_deletion",
- key_names=("state_group",),
- key_values=[(state_group,) for state_group in state_groups],
- value_names=("insertion_ts",),
- value_values=[(now,) for _ in state_groups],
- desc="mark_state_groups_as_pending_deletion",
+ await self.db_pool.runInteraction(
+ "mark_state_groups_as_pending_deletion",
+ self._mark_state_groups_as_pending_deletion_txn,
+ state_groups,
)
+ def _mark_state_groups_as_pending_deletion_txn(
+ self,
+ txn: LoggingTransaction,
+ state_groups: Collection[int],
+ ) -> None:
+ sql = """
+ INSERT INTO state_groups_pending_deletion (state_group, insertion_ts)
+ VALUES %s
+ ON CONFLICT (state_group)
+ DO NOTHING
+ """
+
+ now = self._clock.time_msec()
+ rows = [
+ (
+ state_group,
+ now,
+ )
+ for state_group in state_groups
+ ]
+ if isinstance(txn.database_engine, PostgresEngine):
+ txn.execute_values(sql % ("?",), rows, fetch=False)
+ else:
+ txn.execute_batch(sql % ("(?, ?)",), rows)
+
async def mark_state_groups_as_used(self, state_groups: Collection[int]) -> None:
"""Mark the given state groups as now being referenced"""
diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index f87b1a4a0a..2160edb014 100644
--- a/synapse/storage/schema/__init__.py
+++ b/synapse/storage/schema/__init__.py
@@ -161,6 +161,7 @@ Changes in SCHEMA_VERSION = 89
Changes in SCHEMA_VERSION = 90
- Add a column `participant` to `room_memberships` table
+ - Add background update to delete unreferenced state groups.
"""
diff --git a/synapse/storage/schema/state/delta/90/02_delete_unreferenced_state_groups.sql b/synapse/storage/schema/state/delta/90/02_delete_unreferenced_state_groups.sql
new file mode 100644
index 0000000000..55a038e2b8
--- /dev/null
+++ b/synapse/storage/schema/state/delta/90/02_delete_unreferenced_state_groups.sql
@@ -0,0 +1,16 @@
+--
+-- This file is licensed under the Affero General Public License (AGPL) version 3.
+--
+-- Copyright (C) 2025 New Vector, Ltd
+--
+-- This program is free software: you can redistribute it and/or modify
+-- it under the terms of the GNU Affero General Public License as
+-- published by the Free Software Foundation, either version 3 of the
+-- License, or (at your option) any later version.
+--
+-- See the GNU Affero General Public License for more details:
+-- <https://www.gnu.org/licenses/agpl-3.0.html>.
+
+-- Add a background update to delete any unreferenced state groups
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+ (9002, 'mark_unreferenced_state_groups_for_deletion_bg_update', '{}');
diff --git a/synapse/types/storage/__init__.py b/synapse/types/storage/__init__.py
index b5fa20a41a..e03ff7ffc8 100644
--- a/synapse/types/storage/__init__.py
+++ b/synapse/types/storage/__init__.py
@@ -48,3 +48,7 @@ class _BackgroundUpdates:
SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_FIX_FORGOTTEN_COLUMN_BG_UPDATE = (
"sliding_sync_membership_snapshots_fix_forgotten_column_bg_update"
)
+
+ MARK_UNREFERENCED_STATE_GROUPS_FOR_DELETION_BG_UPDATE = (
+ "mark_unreferenced_state_groups_for_deletion_bg_update"
+ )
diff --git a/tests/storage/test_purge.py b/tests/storage/test_purge.py
index 916e42e731..0aa14fd1f4 100644
--- a/tests/storage/test_purge.py
+++ b/tests/storage/test_purge.py
@@ -24,6 +24,7 @@ from synapse.api.errors import NotFoundError, SynapseError
from synapse.rest.client import room
from synapse.server import HomeServer
from synapse.types.state import StateFilter
+from synapse.types.storage import _BackgroundUpdates
from synapse.util import Clock
from tests.unittest import HomeserverTestCase
@@ -303,3 +304,156 @@ class PurgeTests(HomeserverTestCase):
)
)
self.assertEqual(len(state_groups), 1)
+
+ def test_clear_unreferenced_state_groups(self) -> None:
+ """Test that any unreferenced state groups are automatically cleaned up."""
+
+ self.helper.send(self.room_id, body="test1")
+ state1 = self.helper.send_state(
+ self.room_id, "org.matrix.test", body={"number": 2}
+ )
+ # Create enough state events to require multiple batches of
+ # mark_unreferenced_state_groups_for_deletion_bg_update to be run.
+ for i in range(200):
+ self.helper.send_state(self.room_id, "org.matrix.test", body={"number": i})
+ self.helper.send(self.room_id, body="test4")
+ last = self.helper.send(self.room_id, body="test5")
+
+ # Create an unreferenced state group that has no prev group.
+ unreferenced_free_state_group = self.get_success(
+ self.state_store.store_state_group(
+ event_id=last["event_id"],
+ room_id=self.room_id,
+ prev_group=None,
+ delta_ids={("org.matrix.test", ""): state1["event_id"]},
+ current_state_ids={("org.matrix.test", ""): ""},
+ )
+ )
+
+ # Create some unreferenced state groups that have a prev group of one of the
+ # existing state groups.
+ prev_group = self.get_success(
+ self.store._get_state_group_for_event(state1["event_id"])
+ )
+ unreferenced_end_state_group = self.get_success(
+ self.state_store.store_state_group(
+ event_id=last["event_id"],
+ room_id=self.room_id,
+ prev_group=prev_group,
+ delta_ids={("org.matrix.test", ""): state1["event_id"]},
+ current_state_ids=None,
+ )
+ )
+ another_unreferenced_end_state_group = self.get_success(
+ self.state_store.store_state_group(
+ event_id=last["event_id"],
+ room_id=self.room_id,
+ prev_group=unreferenced_end_state_group,
+ delta_ids={("org.matrix.test", ""): state1["event_id"]},
+ current_state_ids=None,
+ )
+ )
+
+ # Add some other unreferenced state groups which lead to a referenced state
+ # group.
+ # These state groups should not get deleted.
+ chain_state_group = self.get_success(
+ self.state_store.store_state_group(
+ event_id=last["event_id"],
+ room_id=self.room_id,
+ prev_group=None,
+ delta_ids={("org.matrix.test", ""): ""},
+ current_state_ids={("org.matrix.test", ""): ""},
+ )
+ )
+ chain_state_group_2 = self.get_success(
+ self.state_store.store_state_group(
+ event_id=last["event_id"],
+ room_id=self.room_id,
+ prev_group=chain_state_group,
+ delta_ids={("org.matrix.test", ""): ""},
+ current_state_ids=None,
+ )
+ )
+ referenced_chain_state_group = self.get_success(
+ self.state_store.store_state_group(
+ event_id=last["event_id"],
+ room_id=self.room_id,
+ prev_group=chain_state_group_2,
+ delta_ids={("org.matrix.test", ""): ""},
+ current_state_ids=None,
+ )
+ )
+ self.get_success(
+ self.store.db_pool.simple_insert(
+ "event_to_state_groups",
+ {
+ "event_id": "$new_event",
+ "state_group": referenced_chain_state_group,
+ },
+ )
+ )
+
+ # Insert and run the background update.
+ self.get_success(
+ self.store.db_pool.simple_insert(
+ "background_updates",
+ {
+ "update_name": _BackgroundUpdates.MARK_UNREFERENCED_STATE_GROUPS_FOR_DELETION_BG_UPDATE,
+ "progress_json": "{}",
+ },
+ )
+ )
+ self.store.db_pool.updates._all_done = False
+ self.wait_for_background_updates()
+
+ # Advance so that the background job to delete the state groups runs
+ self.reactor.advance(
+ 1 + self.state_deletion_store.DELAY_BEFORE_DELETION_MS / 1000
+ )
+
+ # We expect that the unreferenced free state group has been deleted.
+ row = self.get_success(
+ self.state_store.db_pool.simple_select_one_onecol(
+ table="state_groups",
+ keyvalues={"id": unreferenced_free_state_group},
+ retcol="id",
+ allow_none=True,
+ desc="test_purge_unreferenced_state_group",
+ )
+ )
+ self.assertIsNone(row)
+
+ # We expect that both unreferenced end state groups have been deleted.
+ row = self.get_success(
+ self.state_store.db_pool.simple_select_one_onecol(
+ table="state_groups",
+ keyvalues={"id": unreferenced_end_state_group},
+ retcol="id",
+ allow_none=True,
+ desc="test_purge_unreferenced_state_group",
+ )
+ )
+ self.assertIsNone(row)
+ row = self.get_success(
+ self.state_store.db_pool.simple_select_one_onecol(
+ table="state_groups",
+ keyvalues={"id": another_unreferenced_end_state_group},
+ retcol="id",
+ allow_none=True,
+ desc="test_purge_unreferenced_state_group",
+ )
+ )
+ self.assertIsNone(row)
+
+ # We expect there to now only be one state group for the room, which is
+ # the state group of the last event (as the only outlier).
+ state_groups = self.get_success(
+ self.state_store.db_pool.simple_select_onecol(
+ table="state_groups",
+ keyvalues={"room_id": self.room_id},
+ retcol="id",
+ desc="test_purge_unreferenced_state_group",
+ )
+ )
+ self.assertEqual(len(state_groups), 210)
|