diff options
Diffstat (limited to 'synapse/storage/state.py')
-rw-r--r-- | synapse/storage/state.py | 112 |
1 files changed, 71 insertions, 41 deletions
diff --git a/synapse/storage/state.py b/synapse/storage/state.py index f7cf5c86c9..0f86311ed4 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -1041,55 +1041,85 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): return count - def _is_state_group_referenced(self, txn, state_group): - """Checks if a given state group is referenced, or is safe to delete. + def _find_unreferenced_groups(self, txn, state_groups): + """Used when purging history to figure out which state groups can be + deleted and which need to be de-delta'ed (due to one of its prev groups + being scheduled for deletion). - A state group is referenced if it or any of its descendants are - pointed at by an event. (A descendant is a state_group whose chain of - prev_groups includes the given state_group.) - """ - - # We check this by doing a depth first search to look for any - # descendant referenced by `event_to_state_groups`. - - # State groups we need to check, contains state groups that are - # descendants of `state_group` - state_groups_to_search = [state_group] - - # Set of state groups we've already checked - state_groups_searched = set() - - while state_groups_to_search: - state_group = state_groups_to_search.pop() # Next state group to check + Args: + txn + state_groups (set[int]): Set of state groups referenced by events + that are going to be deleted. - is_referenced = self._simple_select_one_onecol_txn( + Returns: + tuple[set[int], set[int]]: The set of state groups that can be + deleted and the set of state groups that need to be de-delta'ed + """ + # Graph of state group -> previous group + graph = {} + + # Set of events that we have found to be referenced by events + referenced_groups = set() + + # Set of state groups we've already seen + state_groups_seen = set(state_groups) + + # Set of state groups to handle next. + next_to_search = set(state_groups) + while next_to_search: + # We bound size of groups we're looking up at once, to stop the + # SQL query getting too big + if len(next_to_search) < 100: + current_search = next_to_search + next_to_search = set() + else: + lst = list(next_to_search) + current_search = set(lst[:100]) + next_to_search = set(lst[100:]) + + # Check if state groups are referenced + sql = """ + SELECT state_group, count(*) FROM event_to_state_groups + LEFT JOIN events_to_purge AS ep USING (event_id) + WHERE state_group IN (%s) AND ep.event_id IS NULL + GROUP BY state_group + """ % (",".join("?" for _ in current_search),) + txn.execute(sql, list(current_search)) + + referenced = set(sg for sg, cnt in txn if cnt > 0) + referenced_groups |= referenced + + # We don't continue iterating up the state group graphs for state + # groups that are referenced. + current_search -= referenced + + rows = self._simple_select_many_txn( txn, - table="event_to_state_groups", - keyvalues={"state_group": state_group}, - retcol="event_id", - allow_none=True, + table="state_group_edges", + column="prev_state_group", + iterable=current_search, + keyvalues={}, + retcols=("prev_state_group", "state_group",), ) - if is_referenced: - # A descendant is referenced by event_to_state_groups, so - # original state group is referenced. - return True - state_groups_searched.add(state_group) + next_to_search.update(row["state_group"] for row in rows) + # We don't bother re-handling groups we've already seen + next_to_search -= state_groups_seen + state_groups_seen |= next_to_search - # Find all children of current state group and add to search - references = self._simple_select_onecol_txn( - txn, - table="state_group_edges", - keyvalues={"prev_state_group": state_group}, - retcol="state_group", - ) - state_groups_to_search.extend(references) + for row in rows: + # Note: Each state group can have at most one prev group + graph[row["state_group"]] = row["prev_state_group"] + + to_delete = state_groups_seen - referenced_groups - # Lets be paranoid and check for cycles - if state_groups_searched.intersection(references): - raise Exception("State group %s has cyclic dependency", state_group) + to_dedelta = set() + for sg in referenced_groups: + prev_sg = graph.get(sg) + if prev_sg and prev_sg in to_delete: + to_dedelta.add(sg) - return False + return to_delete, to_dedelta class StateStore(StateGroupWorkerStore, BackgroundUpdateStore): |