summary refs log tree commit diff
path: root/synapse/storage/databases
diff options
context:
space:
mode:
authorErik Johnston <erikj@element.io>2025-02-03 18:58:55 +0100
committerGitHub <noreply@github.com>2025-02-03 17:58:55 +0000
commit27dbb1b4290b9de64e24a11f892777378810b595 (patch)
treed69e68fa45963e7a42b39c7876f1c21af8d1fd85 /synapse/storage/databases
parentAdd locking to more safely delete state groups: Part 1 (#18107) (diff)
downloadsynapse-27dbb1b4290b9de64e24a11f892777378810b595.tar.xz
Add locking to more safely delete state groups: Part 2 (#18130)
This actually makes it so that deleting state groups goes via the new
mechanism.

c.f. #18107
Diffstat (limited to 'synapse/storage/databases')
-rw-r--r--synapse/storage/databases/state/deletion.py65
-rw-r--r--synapse/storage/databases/state/store.py28
2 files changed, 86 insertions, 7 deletions
diff --git a/synapse/storage/databases/state/deletion.py b/synapse/storage/databases/state/deletion.py

index 07dbbc8e75..4853e5aa2f 100644 --- a/synapse/storage/databases/state/deletion.py +++ b/synapse/storage/databases/state/deletion.py
@@ -20,6 +20,7 @@ from typing import ( AsyncIterator, Collection, Mapping, + Optional, Set, Tuple, ) @@ -307,6 +308,17 @@ class StateDeletionDataStore: desc="mark_state_groups_as_pending_deletion", ) + async def mark_state_groups_as_used(self, state_groups: Collection[int]) -> None: + """Mark the given state groups as now being referenced""" + + await self.db_pool.simple_delete_many( + table="state_groups_pending_deletion", + column="state_group", + iterable=state_groups, + keyvalues={}, + desc="mark_state_groups_as_used", + ) + async def get_pending_deletions( self, state_groups: Collection[int] ) -> Mapping[int, int]: @@ -444,3 +456,56 @@ class StateDeletionDataStore: can_be_deleted.difference_update(state_group for (state_group,) in txn) return can_be_deleted + + async def get_next_state_group_collection_to_delete( + self, + ) -> Optional[Tuple[str, Mapping[int, int]]]: + """Get the next set of state groups to try and delete + + Returns: + 2-tuple of room_id and mapping of state groups to sequence number. + """ + return await self.db_pool.runInteraction( + "get_next_state_group_collection_to_delete", + self._get_next_state_group_collection_to_delete_txn, + ) + + def _get_next_state_group_collection_to_delete_txn( + self, + txn: LoggingTransaction, + ) -> Optional[Tuple[str, Mapping[int, int]]]: + """Implementation of `get_next_state_group_collection_to_delete`""" + + # We want to return chunks of state groups that were marked for deletion + # at the same time (this isn't necessary, just more efficient). We do + # this by looking for the oldest insertion_ts, and then pulling out all + # rows that have the same insertion_ts (and room ID). + now = self._clock.time_msec() + + sql = """ + SELECT room_id, insertion_ts + FROM state_groups_pending_deletion AS sd + INNER JOIN state_groups AS sg ON (id = sd.state_group) + LEFT JOIN state_groups_persisting AS sp USING (state_group) + WHERE insertion_ts < ? AND sp.state_group IS NULL + ORDER BY insertion_ts + LIMIT 1 + """ + txn.execute(sql, (now - self.DELAY_BEFORE_DELETION_MS,)) + row = txn.fetchone() + if not row: + return None + + (room_id, insertion_ts) = row + + sql = """ + SELECT state_group, sequence_number + FROM state_groups_pending_deletion AS sd + INNER JOIN state_groups AS sg ON (id = sd.state_group) + LEFT JOIN state_groups_persisting AS sp USING (state_group) + WHERE room_id = ? AND insertion_ts = ? AND sp.state_group IS NULL + ORDER BY insertion_ts + """ + txn.execute(sql, (room_id, insertion_ts)) + + return room_id, dict(txn) diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py
index 7e986e0601..0f47642ae5 100644 --- a/synapse/storage/databases/state/store.py +++ b/synapse/storage/databases/state/store.py
@@ -22,10 +22,10 @@ import logging from typing import ( TYPE_CHECKING, - Collection, Dict, Iterable, List, + Mapping, Optional, Set, Tuple, @@ -735,8 +735,10 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): ) async def purge_unreferenced_state_groups( - self, room_id: str, state_groups_to_delete: Collection[int] - ) -> None: + self, + room_id: str, + state_groups_to_sequence_numbers: Mapping[int, int], + ) -> bool: """Deletes no longer referenced state groups and de-deltas any state groups that reference them. @@ -744,21 +746,31 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): room_id: The room the state groups belong to (must all be in the same room). state_groups_to_delete: Set of all state groups to delete. + + Returns: + Whether any state groups were actually deleted. """ - await self.db_pool.runInteraction( + return await self.db_pool.runInteraction( "purge_unreferenced_state_groups", self._purge_unreferenced_state_groups, room_id, - state_groups_to_delete, + state_groups_to_sequence_numbers, ) def _purge_unreferenced_state_groups( self, txn: LoggingTransaction, room_id: str, - state_groups_to_delete: Collection[int], - ) -> None: + state_groups_to_sequence_numbers: Mapping[int, int], + ) -> bool: + state_groups_to_delete = self._state_deletion_store.get_state_groups_ready_for_potential_deletion_txn( + txn, state_groups_to_sequence_numbers + ) + + if not state_groups_to_delete: + return False + logger.info( "[purge] found %i state groups to delete", len(state_groups_to_delete) ) @@ -821,6 +833,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): [(sg,) for sg in state_groups_to_delete], ) + return True + @trace @tag_args async def get_previous_state_groups(