diff --git a/synapse/storage/databases/state/deletion.py b/synapse/storage/databases/state/deletion.py
index 07dbbc8e75..4853e5aa2f 100644
--- a/synapse/storage/databases/state/deletion.py
+++ b/synapse/storage/databases/state/deletion.py
@@ -20,6 +20,7 @@ from typing import (
AsyncIterator,
Collection,
Mapping,
+ Optional,
Set,
Tuple,
)
@@ -307,6 +308,17 @@ class StateDeletionDataStore:
desc="mark_state_groups_as_pending_deletion",
)
+ async def mark_state_groups_as_used(self, state_groups: Collection[int]) -> None:
+ """Mark the given state groups as now being referenced"""
+
+ await self.db_pool.simple_delete_many(
+ table="state_groups_pending_deletion",
+ column="state_group",
+ iterable=state_groups,
+ keyvalues={},
+ desc="mark_state_groups_as_used",
+ )
+
async def get_pending_deletions(
self, state_groups: Collection[int]
) -> Mapping[int, int]:
@@ -444,3 +456,56 @@ class StateDeletionDataStore:
can_be_deleted.difference_update(state_group for (state_group,) in txn)
return can_be_deleted
+
+ async def get_next_state_group_collection_to_delete(
+ self,
+ ) -> Optional[Tuple[str, Mapping[int, int]]]:
+ """Get the next set of state groups to try and delete
+
+ Returns:
+ 2-tuple of room_id and mapping of state groups to sequence number.
+ """
+ return await self.db_pool.runInteraction(
+ "get_next_state_group_collection_to_delete",
+ self._get_next_state_group_collection_to_delete_txn,
+ )
+
+ def _get_next_state_group_collection_to_delete_txn(
+ self,
+ txn: LoggingTransaction,
+ ) -> Optional[Tuple[str, Mapping[int, int]]]:
+ """Implementation of `get_next_state_group_collection_to_delete`"""
+
+ # We want to return chunks of state groups that were marked for deletion
+ # at the same time (this isn't necessary, just more efficient). We do
+ # this by looking for the oldest insertion_ts, and then pulling out all
+ # rows that have the same insertion_ts (and room ID).
+ now = self._clock.time_msec()
+
+ sql = """
+ SELECT room_id, insertion_ts
+ FROM state_groups_pending_deletion AS sd
+ INNER JOIN state_groups AS sg ON (id = sd.state_group)
+ LEFT JOIN state_groups_persisting AS sp USING (state_group)
+ WHERE insertion_ts < ? AND sp.state_group IS NULL
+ ORDER BY insertion_ts
+ LIMIT 1
+ """
+ txn.execute(sql, (now - self.DELAY_BEFORE_DELETION_MS,))
+ row = txn.fetchone()
+ if not row:
+ return None
+
+ (room_id, insertion_ts) = row
+
+ sql = """
+ SELECT state_group, sequence_number
+ FROM state_groups_pending_deletion AS sd
+ INNER JOIN state_groups AS sg ON (id = sd.state_group)
+ LEFT JOIN state_groups_persisting AS sp USING (state_group)
+ WHERE room_id = ? AND insertion_ts = ? AND sp.state_group IS NULL
+ ORDER BY insertion_ts
+ """
+ txn.execute(sql, (room_id, insertion_ts))
+
+ return room_id, dict(txn)
diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py
index 7e986e0601..0f47642ae5 100644
--- a/synapse/storage/databases/state/store.py
+++ b/synapse/storage/databases/state/store.py
@@ -22,10 +22,10 @@
import logging
from typing import (
TYPE_CHECKING,
- Collection,
Dict,
Iterable,
List,
+ Mapping,
Optional,
Set,
Tuple,
@@ -735,8 +735,10 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
)
async def purge_unreferenced_state_groups(
- self, room_id: str, state_groups_to_delete: Collection[int]
- ) -> None:
+ self,
+ room_id: str,
+ state_groups_to_sequence_numbers: Mapping[int, int],
+ ) -> bool:
"""Deletes no longer referenced state groups and de-deltas any state
groups that reference them.
@@ -744,21 +746,31 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
room_id: The room the state groups belong to (must all be in the
same room).
state_groups_to_delete: Set of all state groups to delete.
+
+ Returns:
+ Whether any state groups were actually deleted.
"""
- await self.db_pool.runInteraction(
+ return await self.db_pool.runInteraction(
"purge_unreferenced_state_groups",
self._purge_unreferenced_state_groups,
room_id,
- state_groups_to_delete,
+ state_groups_to_sequence_numbers,
)
def _purge_unreferenced_state_groups(
self,
txn: LoggingTransaction,
room_id: str,
- state_groups_to_delete: Collection[int],
- ) -> None:
+ state_groups_to_sequence_numbers: Mapping[int, int],
+ ) -> bool:
+ state_groups_to_delete = self._state_deletion_store.get_state_groups_ready_for_potential_deletion_txn(
+ txn, state_groups_to_sequence_numbers
+ )
+
+ if not state_groups_to_delete:
+ return False
+
logger.info(
"[purge] found %i state groups to delete", len(state_groups_to_delete)
)
@@ -821,6 +833,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
[(sg,) for sg in state_groups_to_delete],
)
+ return True
+
@trace
@tag_args
async def get_previous_state_groups(
|