summary refs log tree commit diff
path: root/synapse/storage/controllers/purge_events.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/storage/controllers/purge_events.py337
1 files changed, 331 insertions, 6 deletions
diff --git a/synapse/storage/controllers/purge_events.py b/synapse/storage/controllers/purge_events.py

index e794b370c2..c2d4bf8290 100644 --- a/synapse/storage/controllers/purge_events.py +++ b/synapse/storage/controllers/purge_events.py
@@ -21,10 +21,19 @@ import itertools import logging -from typing import TYPE_CHECKING, Set +from typing import ( + TYPE_CHECKING, + Collection, + Mapping, + Optional, + Set, +) from synapse.logging.context import nested_logging_context +from synapse.metrics.background_process_metrics import wrap_as_background_process +from synapse.storage.database import LoggingTransaction from synapse.storage.databases import Databases +from synapse.types.storage import _BackgroundUpdates if TYPE_CHECKING: from synapse.server import HomeServer @@ -38,12 +47,22 @@ class PurgeEventsStorageController: def __init__(self, hs: "HomeServer", stores: Databases): self.stores = stores + if hs.config.worker.run_background_tasks: + self._delete_state_loop_call = hs.get_clock().looping_call( + self._delete_state_groups_loop, 60 * 1000 + ) + + self.stores.state.db_pool.updates.register_background_update_handler( + _BackgroundUpdates.MARK_UNREFERENCED_STATE_GROUPS_FOR_DELETION_BG_UPDATE, + self._background_delete_unrefereneced_state_groups, + ) + async def purge_room(self, room_id: str) -> None: """Deletes all record of a room""" with nested_logging_context(room_id): - state_groups_to_delete = await self.stores.main.purge_room(room_id) - await self.stores.state.purge_room_state(room_id, state_groups_to_delete) + await self.stores.main.purge_room(room_id) + await self.stores.state.purge_room_state(room_id) async def purge_history( self, room_id: str, token: str, delete_local_events: bool @@ -68,11 +87,16 @@ class PurgeEventsStorageController: logger.info("[purge] finding state groups that can be deleted") sg_to_delete = await self._find_unreferenced_groups(state_groups) - await self.stores.state.purge_unreferenced_state_groups( - room_id, sg_to_delete + # Mark these state groups as pending deletion, they will actually + # get deleted automatically later. + await self.stores.state_deletion.mark_state_groups_as_pending_deletion( + sg_to_delete ) - async def _find_unreferenced_groups(self, state_groups: Set[int]) -> Set[int]: + async def _find_unreferenced_groups( + self, + state_groups: Collection[int], + ) -> Set[int]: """Used when purging history to figure out which state groups can be deleted. @@ -118,6 +142,307 @@ class PurgeEventsStorageController: next_to_search |= prevs state_groups_seen |= prevs + # We also check to see if anything referencing the state groups are + # also unreferenced. This helps ensure that we delete unreferenced + # state groups, if we don't then we will de-delta them when we + # delete the other state groups leading to increased DB usage. + next_edges = await self.stores.state.get_next_state_groups(current_search) + nexts = set(next_edges.keys()) + nexts -= state_groups_seen + next_to_search |= nexts + state_groups_seen |= nexts + to_delete = state_groups_seen - referenced_groups return to_delete + + @wrap_as_background_process("_delete_state_groups_loop") + async def _delete_state_groups_loop(self) -> None: + """Background task that deletes any state groups that may be pending + deletion.""" + + while True: + next_to_delete = await self.stores.state_deletion.get_next_state_group_collection_to_delete() + if next_to_delete is None: + break + + (room_id, groups_to_sequences) = next_to_delete + made_progress = await self._delete_state_groups( + room_id, groups_to_sequences + ) + + # If no progress was made in deleting the state groups, then we + # break to allow a pause before trying again next time we get + # called. + if not made_progress: + break + + async def _delete_state_groups( + self, room_id: str, groups_to_sequences: Mapping[int, int] + ) -> bool: + """Tries to delete the given state groups. + + Returns: + Whether we made progress in deleting the state groups (or marking + them as referenced). + """ + + # We double check if any of the state groups have become referenced. + # This shouldn't happen, as any usages should cause the state group to + # be removed as pending deletion. + referenced_state_groups = await self.stores.main.get_referenced_state_groups( + groups_to_sequences + ) + + if referenced_state_groups: + # We mark any state groups that have become referenced as being + # used. + await self.stores.state_deletion.mark_state_groups_as_used( + referenced_state_groups + ) + + # Update list of state groups to remove referenced ones + groups_to_sequences = { + state_group: sequence_number + for state_group, sequence_number in groups_to_sequences.items() + if state_group not in referenced_state_groups + } + + if not groups_to_sequences: + # We made progress here as long as we marked some state groups as + # now referenced. + return len(referenced_state_groups) > 0 + + return await self.stores.state.purge_unreferenced_state_groups( + room_id, + groups_to_sequences, + ) + + async def _background_delete_unrefereneced_state_groups( + self, progress: dict, batch_size: int + ) -> int: + """This background update will slowly delete any unreferenced state groups""" + + last_checked_state_group = progress.get("last_checked_state_group") + + if last_checked_state_group is None: + # This is the first run. + last_checked_state_group = ( + await self.stores.state.db_pool.simple_select_one_onecol( + table="state_groups", + keyvalues={}, + retcol="MAX(id)", + allow_none=True, + desc="get_max_state_group", + ) + ) + if last_checked_state_group is None: + # There are no state groups so the background process is finished. + await self.stores.state.db_pool.updates._end_background_update( + _BackgroundUpdates.MARK_UNREFERENCED_STATE_GROUPS_FOR_DELETION_BG_UPDATE + ) + return batch_size + last_checked_state_group += 1 + + ( + last_checked_state_group, + final_batch, + ) = await self._delete_unreferenced_state_groups_batch( + last_checked_state_group, + batch_size, + ) + + if not final_batch: + # There are more state groups to check. + progress = { + "last_checked_state_group": last_checked_state_group, + } + await self.stores.state.db_pool.updates._background_update_progress( + _BackgroundUpdates.MARK_UNREFERENCED_STATE_GROUPS_FOR_DELETION_BG_UPDATE, + progress, + ) + else: + # This background process is finished. + await self.stores.state.db_pool.updates._end_background_update( + _BackgroundUpdates.MARK_UNREFERENCED_STATE_GROUPS_FOR_DELETION_BG_UPDATE + ) + + return batch_size + + async def _delete_unreferenced_state_groups_batch( + self, + last_checked_state_group: int, + batch_size: int, + ) -> tuple[int, bool]: + """Looks for unreferenced state groups starting from the last state group + checked and marks them for deletion. + + Args: + last_checked_state_group: The last state group that was checked. + batch_size: How many state groups to process in this iteration. + + Returns: + (last_checked_state_group, final_batch) + """ + + # Find all state groups that can be deleted if any of the original set are deleted. + ( + to_delete, + last_checked_state_group, + final_batch, + ) = await self._find_unreferenced_groups_for_background_deletion( + last_checked_state_group, batch_size + ) + + if len(to_delete) == 0: + return last_checked_state_group, final_batch + + await self.stores.state_deletion.mark_state_groups_as_pending_deletion( + to_delete + ) + + return last_checked_state_group, final_batch + + async def _find_unreferenced_groups_for_background_deletion( + self, + last_checked_state_group: int, + batch_size: int, + ) -> tuple[Set[int], int, bool]: + """Used when deleting unreferenced state groups in the background to figure out + which state groups can be deleted. + To avoid increased DB usage due to de-deltaing state groups, this returns only + state groups which are free standing (ie. no shared edges with referenced groups) or + state groups which do not share edges which result in a future referenced group. + + The following scenarios outline the possibilities based on state group data in + the DB. + + ie. Free standing -> state groups 1-N would be returned: + SG_1 + | + ... + | + SG_N + + ie. Previous reference -> state groups 2-N would be returned: + SG_1 <- referenced by event + | + SG_2 + | + ... + | + SG_N + + ie. Future reference -> none of the following state groups would be returned: + SG_1 + | + SG_2 + | + ... + | + SG_N <- referenced by event + + Args: + last_checked_state_group: The last state group that was checked. + batch_size: How many state groups to process in this iteration. + + Returns: + (to_delete, last_checked_state_group, final_batch) + """ + + # If a state group's next edge is not pending deletion then we don't delete the state group. + # If there is no next edge or the next edges are all marked for deletion, then delete + # the state group. + # This holds since we walk backwards from the latest state groups, ensuring that + # we've already checked newer state groups for event references along the way. + def get_next_state_groups_marked_for_deletion_txn( + txn: LoggingTransaction, + ) -> tuple[dict[int, bool], dict[int, int]]: + state_group_sql = """ + SELECT s.id, e.state_group, d.state_group + FROM ( + SELECT id FROM state_groups + WHERE id < ? ORDER BY id DESC LIMIT ? + ) as s + LEFT JOIN state_group_edges AS e ON (s.id = e.prev_state_group) + LEFT JOIN state_groups_pending_deletion AS d ON (e.state_group = d.state_group) + """ + txn.execute(state_group_sql, (last_checked_state_group, batch_size)) + + # Mapping from state group to whether we should delete it. + state_groups_to_deletion: dict[int, bool] = {} + + # Mapping from state group to prev state group. + state_groups_to_prev: dict[int, int] = {} + + for row in txn: + state_group = row[0] + next_edge = row[1] + pending_deletion = row[2] + + if next_edge is not None: + state_groups_to_prev[next_edge] = state_group + + if next_edge is not None and not pending_deletion: + # We have found an edge not marked for deletion. + # Check previous results to see if this group is part of a chain + # within this batch that qualifies for deletion. + # ie. batch contains: + # SG_1 -> SG_2 -> SG_3 + # If SG_3 is a candidate for deletion, then SG_2 & SG_1 should also + # be, even though they have edges which may not be marked for + # deletion. + # This relies on SQL results being sorted in DESC order to work. + next_is_deletion_candidate = state_groups_to_deletion.get(next_edge) + if ( + next_is_deletion_candidate is None + or not next_is_deletion_candidate + ): + state_groups_to_deletion[state_group] = False + else: + state_groups_to_deletion.setdefault(state_group, True) + else: + # This state group may be a candidate for deletion + state_groups_to_deletion.setdefault(state_group, True) + + return state_groups_to_deletion, state_groups_to_prev + + ( + state_groups_to_deletion, + state_group_edges, + ) = await self.stores.state.db_pool.runInteraction( + "get_next_state_groups_marked_for_deletion", + get_next_state_groups_marked_for_deletion_txn, + ) + deletion_candidates = { + state_group + for state_group, deletion in state_groups_to_deletion.items() + if deletion + } + + final_batch = False + state_groups = state_groups_to_deletion.keys() + if len(state_groups) < batch_size: + final_batch = True + else: + last_checked_state_group = min(state_groups) + + if len(state_groups) == 0: + return set(), last_checked_state_group, final_batch + + # Determine if any of the remaining state groups are directly referenced. + referenced = await self.stores.main.get_referenced_state_groups( + deletion_candidates + ) + + # Remove state groups from deletion_candidates which are directly referenced or share a + # future edge with a referenced state group within this batch. + def filter_reference_chains(group: Optional[int]) -> None: + while group is not None: + deletion_candidates.discard(group) + group = state_group_edges.get(group) + + for referenced_group in referenced: + filter_reference_chains(referenced_group) + + return deletion_candidates, last_checked_state_group, final_batch