diff --git a/synapse/storage/controllers/purge_events.py b/synapse/storage/controllers/purge_events.py
index 15c04ffef8..2d6f80f770 100644
--- a/synapse/storage/controllers/purge_events.py
+++ b/synapse/storage/controllers/purge_events.py
@@ -21,9 +21,10 @@
import itertools
import logging
-from typing import TYPE_CHECKING, Set
+from typing import TYPE_CHECKING, Collection, Mapping, Set
from synapse.logging.context import nested_logging_context
+from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage.databases import Databases
if TYPE_CHECKING:
@@ -38,6 +39,11 @@ class PurgeEventsStorageController:
def __init__(self, hs: "HomeServer", stores: Databases):
self.stores = stores
+ if hs.config.worker.run_background_tasks:
+ self._delete_state_loop_call = hs.get_clock().looping_call(
+ self._delete_state_groups_loop, 60 * 1000
+ )
+
async def purge_room(self, room_id: str) -> None:
"""Deletes all record of a room"""
@@ -68,11 +74,15 @@ class PurgeEventsStorageController:
logger.info("[purge] finding state groups that can be deleted")
sg_to_delete = await self._find_unreferenced_groups(state_groups)
- await self.stores.state.purge_unreferenced_state_groups(
- room_id, sg_to_delete
+ # Mark these state groups as pending deletion, they will actually
+ # get deleted automatically later.
+ await self.stores.state_deletion.mark_state_groups_as_pending_deletion(
+ sg_to_delete
)
- async def _find_unreferenced_groups(self, state_groups: Set[int]) -> Set[int]:
+ async def _find_unreferenced_groups(
+ self, state_groups: Collection[int]
+ ) -> Set[int]:
"""Used when purging history to figure out which state groups can be
deleted.
@@ -121,3 +131,65 @@ class PurgeEventsStorageController:
to_delete = state_groups_seen - referenced_groups
return to_delete
+
+ @wrap_as_background_process("_delete_state_groups_loop")
+ async def _delete_state_groups_loop(self) -> None:
+ """Background task that deletes any state groups that may be pending
+ deletion."""
+
+ while True:
+ next_to_delete = await self.stores.state_deletion.get_next_state_group_collection_to_delete()
+ if next_to_delete is None:
+ break
+
+ (room_id, groups_to_sequences) = next_to_delete
+ made_progress = await self._delete_state_groups(
+ room_id, groups_to_sequences
+ )
+
+ # If no progress was made in deleting the state groups, then we
+ # break to allow a pause before trying again next time we get
+ # called.
+ if not made_progress:
+ break
+
+ async def _delete_state_groups(
+ self, room_id: str, groups_to_sequences: Mapping[int, int]
+ ) -> bool:
+ """Tries to delete the given state groups.
+
+ Returns:
+ Whether we made progress in deleting the state groups (or marking
+ them as referenced).
+ """
+
+ # We double check if any of the state groups have become referenced.
+ # This shouldn't happen, as any usages should cause the state group to
+ # be removed as pending deletion.
+ referenced_state_groups = await self.stores.main.get_referenced_state_groups(
+ groups_to_sequences
+ )
+
+ if referenced_state_groups:
+ # We mark any state groups that have become referenced as being
+ # used.
+ await self.stores.state_deletion.mark_state_groups_as_used(
+ referenced_state_groups
+ )
+
+ # Update list of state groups to remove referenced ones
+ groups_to_sequences = {
+ state_group: sequence_number
+ for state_group, sequence_number in groups_to_sequences.items()
+ if state_group not in referenced_state_groups
+ }
+
+ if not groups_to_sequences:
+ # We made progress here as long as we marked some state groups as
+ # now referenced.
+ return len(referenced_state_groups) > 0
+
+ return await self.stores.state.purge_unreferenced_state_groups(
+ room_id,
+ groups_to_sequences,
+ )
diff --git a/synapse/storage/databases/state/deletion.py b/synapse/storage/databases/state/deletion.py
index 07dbbc8e75..4853e5aa2f 100644
--- a/synapse/storage/databases/state/deletion.py
+++ b/synapse/storage/databases/state/deletion.py
@@ -20,6 +20,7 @@ from typing import (
AsyncIterator,
Collection,
Mapping,
+ Optional,
Set,
Tuple,
)
@@ -307,6 +308,17 @@ class StateDeletionDataStore:
desc="mark_state_groups_as_pending_deletion",
)
+ async def mark_state_groups_as_used(self, state_groups: Collection[int]) -> None:
+ """Mark the given state groups as now being referenced"""
+
+ await self.db_pool.simple_delete_many(
+ table="state_groups_pending_deletion",
+ column="state_group",
+ iterable=state_groups,
+ keyvalues={},
+ desc="mark_state_groups_as_used",
+ )
+
async def get_pending_deletions(
self, state_groups: Collection[int]
) -> Mapping[int, int]:
@@ -444,3 +456,56 @@ class StateDeletionDataStore:
can_be_deleted.difference_update(state_group for (state_group,) in txn)
return can_be_deleted
+
+ async def get_next_state_group_collection_to_delete(
+ self,
+ ) -> Optional[Tuple[str, Mapping[int, int]]]:
+ """Get the next set of state groups to try and delete
+
+ Returns:
+ 2-tuple of room_id and mapping of state groups to sequence number.
+ """
+ return await self.db_pool.runInteraction(
+ "get_next_state_group_collection_to_delete",
+ self._get_next_state_group_collection_to_delete_txn,
+ )
+
+ def _get_next_state_group_collection_to_delete_txn(
+ self,
+ txn: LoggingTransaction,
+ ) -> Optional[Tuple[str, Mapping[int, int]]]:
+ """Implementation of `get_next_state_group_collection_to_delete`"""
+
+ # We want to return chunks of state groups that were marked for deletion
+ # at the same time (this isn't necessary, just more efficient). We do
+ # this by looking for the oldest insertion_ts, and then pulling out all
+ # rows that have the same insertion_ts (and room ID).
+ now = self._clock.time_msec()
+
+ sql = """
+ SELECT room_id, insertion_ts
+ FROM state_groups_pending_deletion AS sd
+ INNER JOIN state_groups AS sg ON (id = sd.state_group)
+ LEFT JOIN state_groups_persisting AS sp USING (state_group)
+ WHERE insertion_ts < ? AND sp.state_group IS NULL
+ ORDER BY insertion_ts
+ LIMIT 1
+ """
+ txn.execute(sql, (now - self.DELAY_BEFORE_DELETION_MS,))
+ row = txn.fetchone()
+ if not row:
+ return None
+
+ (room_id, insertion_ts) = row
+
+ sql = """
+ SELECT state_group, sequence_number
+ FROM state_groups_pending_deletion AS sd
+ INNER JOIN state_groups AS sg ON (id = sd.state_group)
+ LEFT JOIN state_groups_persisting AS sp USING (state_group)
+ WHERE room_id = ? AND insertion_ts = ? AND sp.state_group IS NULL
+ ORDER BY insertion_ts
+ """
+ txn.execute(sql, (room_id, insertion_ts))
+
+ return room_id, dict(txn)
diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py
index 7e986e0601..0f47642ae5 100644
--- a/synapse/storage/databases/state/store.py
+++ b/synapse/storage/databases/state/store.py
@@ -22,10 +22,10 @@
import logging
from typing import (
TYPE_CHECKING,
- Collection,
Dict,
Iterable,
List,
+ Mapping,
Optional,
Set,
Tuple,
@@ -735,8 +735,10 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
)
async def purge_unreferenced_state_groups(
- self, room_id: str, state_groups_to_delete: Collection[int]
- ) -> None:
+ self,
+ room_id: str,
+ state_groups_to_sequence_numbers: Mapping[int, int],
+ ) -> bool:
"""Deletes no longer referenced state groups and de-deltas any state
groups that reference them.
@@ -744,21 +746,31 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
room_id: The room the state groups belong to (must all be in the
same room).
state_groups_to_delete: Set of all state groups to delete.
+
+ Returns:
+ Whether any state groups were actually deleted.
"""
- await self.db_pool.runInteraction(
+ return await self.db_pool.runInteraction(
"purge_unreferenced_state_groups",
self._purge_unreferenced_state_groups,
room_id,
- state_groups_to_delete,
+ state_groups_to_sequence_numbers,
)
def _purge_unreferenced_state_groups(
self,
txn: LoggingTransaction,
room_id: str,
- state_groups_to_delete: Collection[int],
- ) -> None:
+ state_groups_to_sequence_numbers: Mapping[int, int],
+ ) -> bool:
+ state_groups_to_delete = self._state_deletion_store.get_state_groups_ready_for_potential_deletion_txn(
+ txn, state_groups_to_sequence_numbers
+ )
+
+ if not state_groups_to_delete:
+ return False
+
logger.info(
"[purge] found %i state groups to delete", len(state_groups_to_delete)
)
@@ -821,6 +833,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
[(sg,) for sg in state_groups_to_delete],
)
+ return True
+
@trace
@tag_args
async def get_previous_state_groups(
|