summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/controllers/purge_events.py80
-rw-r--r--synapse/storage/databases/state/deletion.py65
-rw-r--r--synapse/storage/databases/state/store.py28
3 files changed, 162 insertions, 11 deletions
diff --git a/synapse/storage/controllers/purge_events.py b/synapse/storage/controllers/purge_events.py

index 15c04ffef8..2d6f80f770 100644 --- a/synapse/storage/controllers/purge_events.py +++ b/synapse/storage/controllers/purge_events.py
@@ -21,9 +21,10 @@ import itertools import logging -from typing import TYPE_CHECKING, Set +from typing import TYPE_CHECKING, Collection, Mapping, Set from synapse.logging.context import nested_logging_context +from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.storage.databases import Databases if TYPE_CHECKING: @@ -38,6 +39,11 @@ class PurgeEventsStorageController: def __init__(self, hs: "HomeServer", stores: Databases): self.stores = stores + if hs.config.worker.run_background_tasks: + self._delete_state_loop_call = hs.get_clock().looping_call( + self._delete_state_groups_loop, 60 * 1000 + ) + async def purge_room(self, room_id: str) -> None: """Deletes all record of a room""" @@ -68,11 +74,15 @@ class PurgeEventsStorageController: logger.info("[purge] finding state groups that can be deleted") sg_to_delete = await self._find_unreferenced_groups(state_groups) - await self.stores.state.purge_unreferenced_state_groups( - room_id, sg_to_delete + # Mark these state groups as pending deletion, they will actually + # get deleted automatically later. + await self.stores.state_deletion.mark_state_groups_as_pending_deletion( + sg_to_delete ) - async def _find_unreferenced_groups(self, state_groups: Set[int]) -> Set[int]: + async def _find_unreferenced_groups( + self, state_groups: Collection[int] + ) -> Set[int]: """Used when purging history to figure out which state groups can be deleted. @@ -121,3 +131,65 @@ class PurgeEventsStorageController: to_delete = state_groups_seen - referenced_groups return to_delete + + @wrap_as_background_process("_delete_state_groups_loop") + async def _delete_state_groups_loop(self) -> None: + """Background task that deletes any state groups that may be pending + deletion.""" + + while True: + next_to_delete = await self.stores.state_deletion.get_next_state_group_collection_to_delete() + if next_to_delete is None: + break + + (room_id, groups_to_sequences) = next_to_delete + made_progress = await self._delete_state_groups( + room_id, groups_to_sequences + ) + + # If no progress was made in deleting the state groups, then we + # break to allow a pause before trying again next time we get + # called. + if not made_progress: + break + + async def _delete_state_groups( + self, room_id: str, groups_to_sequences: Mapping[int, int] + ) -> bool: + """Tries to delete the given state groups. + + Returns: + Whether we made progress in deleting the state groups (or marking + them as referenced). + """ + + # We double check if any of the state groups have become referenced. + # This shouldn't happen, as any usages should cause the state group to + # be removed as pending deletion. + referenced_state_groups = await self.stores.main.get_referenced_state_groups( + groups_to_sequences + ) + + if referenced_state_groups: + # We mark any state groups that have become referenced as being + # used. + await self.stores.state_deletion.mark_state_groups_as_used( + referenced_state_groups + ) + + # Update list of state groups to remove referenced ones + groups_to_sequences = { + state_group: sequence_number + for state_group, sequence_number in groups_to_sequences.items() + if state_group not in referenced_state_groups + } + + if not groups_to_sequences: + # We made progress here as long as we marked some state groups as + # now referenced. + return len(referenced_state_groups) > 0 + + return await self.stores.state.purge_unreferenced_state_groups( + room_id, + groups_to_sequences, + ) diff --git a/synapse/storage/databases/state/deletion.py b/synapse/storage/databases/state/deletion.py
index 07dbbc8e75..4853e5aa2f 100644 --- a/synapse/storage/databases/state/deletion.py +++ b/synapse/storage/databases/state/deletion.py
@@ -20,6 +20,7 @@ from typing import ( AsyncIterator, Collection, Mapping, + Optional, Set, Tuple, ) @@ -307,6 +308,17 @@ class StateDeletionDataStore: desc="mark_state_groups_as_pending_deletion", ) + async def mark_state_groups_as_used(self, state_groups: Collection[int]) -> None: + """Mark the given state groups as now being referenced""" + + await self.db_pool.simple_delete_many( + table="state_groups_pending_deletion", + column="state_group", + iterable=state_groups, + keyvalues={}, + desc="mark_state_groups_as_used", + ) + async def get_pending_deletions( self, state_groups: Collection[int] ) -> Mapping[int, int]: @@ -444,3 +456,56 @@ class StateDeletionDataStore: can_be_deleted.difference_update(state_group for (state_group,) in txn) return can_be_deleted + + async def get_next_state_group_collection_to_delete( + self, + ) -> Optional[Tuple[str, Mapping[int, int]]]: + """Get the next set of state groups to try and delete + + Returns: + 2-tuple of room_id and mapping of state groups to sequence number. + """ + return await self.db_pool.runInteraction( + "get_next_state_group_collection_to_delete", + self._get_next_state_group_collection_to_delete_txn, + ) + + def _get_next_state_group_collection_to_delete_txn( + self, + txn: LoggingTransaction, + ) -> Optional[Tuple[str, Mapping[int, int]]]: + """Implementation of `get_next_state_group_collection_to_delete`""" + + # We want to return chunks of state groups that were marked for deletion + # at the same time (this isn't necessary, just more efficient). We do + # this by looking for the oldest insertion_ts, and then pulling out all + # rows that have the same insertion_ts (and room ID). + now = self._clock.time_msec() + + sql = """ + SELECT room_id, insertion_ts + FROM state_groups_pending_deletion AS sd + INNER JOIN state_groups AS sg ON (id = sd.state_group) + LEFT JOIN state_groups_persisting AS sp USING (state_group) + WHERE insertion_ts < ? AND sp.state_group IS NULL + ORDER BY insertion_ts + LIMIT 1 + """ + txn.execute(sql, (now - self.DELAY_BEFORE_DELETION_MS,)) + row = txn.fetchone() + if not row: + return None + + (room_id, insertion_ts) = row + + sql = """ + SELECT state_group, sequence_number + FROM state_groups_pending_deletion AS sd + INNER JOIN state_groups AS sg ON (id = sd.state_group) + LEFT JOIN state_groups_persisting AS sp USING (state_group) + WHERE room_id = ? AND insertion_ts = ? AND sp.state_group IS NULL + ORDER BY insertion_ts + """ + txn.execute(sql, (room_id, insertion_ts)) + + return room_id, dict(txn) diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py
index 7e986e0601..0f47642ae5 100644 --- a/synapse/storage/databases/state/store.py +++ b/synapse/storage/databases/state/store.py
@@ -22,10 +22,10 @@ import logging from typing import ( TYPE_CHECKING, - Collection, Dict, Iterable, List, + Mapping, Optional, Set, Tuple, @@ -735,8 +735,10 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): ) async def purge_unreferenced_state_groups( - self, room_id: str, state_groups_to_delete: Collection[int] - ) -> None: + self, + room_id: str, + state_groups_to_sequence_numbers: Mapping[int, int], + ) -> bool: """Deletes no longer referenced state groups and de-deltas any state groups that reference them. @@ -744,21 +746,31 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): room_id: The room the state groups belong to (must all be in the same room). state_groups_to_delete: Set of all state groups to delete. + + Returns: + Whether any state groups were actually deleted. """ - await self.db_pool.runInteraction( + return await self.db_pool.runInteraction( "purge_unreferenced_state_groups", self._purge_unreferenced_state_groups, room_id, - state_groups_to_delete, + state_groups_to_sequence_numbers, ) def _purge_unreferenced_state_groups( self, txn: LoggingTransaction, room_id: str, - state_groups_to_delete: Collection[int], - ) -> None: + state_groups_to_sequence_numbers: Mapping[int, int], + ) -> bool: + state_groups_to_delete = self._state_deletion_store.get_state_groups_ready_for_potential_deletion_txn( + txn, state_groups_to_sequence_numbers + ) + + if not state_groups_to_delete: + return False + logger.info( "[purge] found %i state groups to delete", len(state_groups_to_delete) ) @@ -821,6 +833,8 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): [(sg,) for sg in state_groups_to_delete], ) + return True + @trace @tag_args async def get_previous_state_groups(