From 485d999c8a95f8fdc6425a00e906e86efc77a917 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 5 Sep 2016 14:49:08 +0100 Subject: Correctly delete old state groups in purge history API --- synapse/storage/events.py | 99 ++++++++++++++++++++++++------- synapse/storage/schema/delta/35/state.sql | 1 + 2 files changed, 80 insertions(+), 20 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 7e9b351513..bec35ea68d 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -1578,26 +1578,85 @@ class EventsStore(SQLBaseStore): # Get all state groups that are only referenced by events that are # to be deleted. - # txn.execute( - # "SELECT state_group FROM event_to_state_groups" - # " INNER JOIN events USING (event_id)" - # " WHERE state_group IN (" - # " SELECT DISTINCT state_group FROM events" - # " INNER JOIN event_to_state_groups USING (event_id)" - # " WHERE room_id = ? AND topological_ordering < ?" - # " )" - # " GROUP BY state_group HAVING MAX(topological_ordering) < ?", - # (room_id, topological_ordering, topological_ordering) - # ) - # state_rows = txn.fetchall() - # txn.executemany( - # "DELETE FROM state_groups_state WHERE state_group = ?", - # state_rows - # ) - # txn.executemany( - # "DELETE FROM state_groups WHERE id = ?", - # state_rows - # ) + txn.execute( + "SELECT state_group FROM event_to_state_groups" + " INNER JOIN events USING (event_id)" + " WHERE state_group IN (" + " SELECT DISTINCT state_group FROM events" + " INNER JOIN event_to_state_groups USING (event_id)" + " WHERE room_id = ? AND topological_ordering < ?" + " )" + " GROUP BY state_group HAVING MAX(topological_ordering) < ?", + (room_id, topological_ordering, topological_ordering) + ) + + state_rows = txn.fetchall() + state_groups_to_delete = [sg for sg, in state_rows] + + # Now we get all the state groups that rely on these state groups + new_state_edges = [] + chunks = [ + state_groups_to_delete[i:i + 100] + for i in xrange(0, len(state_groups_to_delete), 100) + ] + for chunk in chunks: + rows = self._simple_select_many_txn( + txn, + table="state_group_edges", + column="prev_state_group", + iterable=chunk, + retcols=["state_group"], + keyvalues={}, + ) + new_state_edges.extend(row["state_group"] for row in rows) + + # Now we turn the state groups that reference to-be-deleted state groups + # to non delta versions. + for new_state_edge in new_state_edges: + curr_state = self._get_state_groups_from_groups_txn( + txn, [new_state_edge], types=None + ) + curr_state = curr_state.values()[0] + + self._simple_delete_txn( + txn, + table="state_groups_state", + keyvalues={ + "state_group": new_state_edge, + } + ) + + self._simple_delete_txn( + txn, + table="state_group_edges", + keyvalues={ + "state_group": new_state_edge, + } + ) + + self._simple_insert_many_txn( + txn, + table="state_groups_state", + values=[ + { + "state_group": new_state_edge, + "room_id": room_id, + "type": key[0], + "state_key": key[1], + "event_id": state_id, + } + for key, state_id in curr_state.items() + ], + ) + + txn.executemany( + "DELETE FROM state_groups_state WHERE state_group = ?", + state_rows + ) + txn.executemany( + "DELETE FROM state_groups WHERE id = ?", + state_rows + ) # Delete all non-state txn.executemany( "DELETE FROM event_to_state_groups WHERE event_id = ?", diff --git a/synapse/storage/schema/delta/35/state.sql b/synapse/storage/schema/delta/35/state.sql index c4c244c169..0f1fa68a89 100644 --- a/synapse/storage/schema/delta/35/state.sql +++ b/synapse/storage/schema/delta/35/state.sql @@ -19,3 +19,4 @@ CREATE TABLE state_group_edges( ); CREATE INDEX state_group_edges_idx ON state_group_edges(state_group); +CREATE INDEX state_group_edges_prev_idx ON state_group_edges(prev_state_group); -- cgit 1.4.1