summary refs log tree commit diff
path: root/tests/storage/test_purge.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--tests/storage/test_purge.py329
1 files changed, 329 insertions, 0 deletions
diff --git a/tests/storage/test_purge.py b/tests/storage/test_purge.py

index 080d5640a5..0aa14fd1f4 100644 --- a/tests/storage/test_purge.py +++ b/tests/storage/test_purge.py
@@ -23,6 +23,8 @@ from twisted.test.proto_helpers import MemoryReactor from synapse.api.errors import NotFoundError, SynapseError from synapse.rest.client import room from synapse.server import HomeServer +from synapse.types.state import StateFilter +from synapse.types.storage import _BackgroundUpdates from synapse.util import Clock from tests.unittest import HomeserverTestCase @@ -40,6 +42,8 @@ class PurgeTests(HomeserverTestCase): self.room_id = self.helper.create_room_as(self.user_id) self.store = hs.get_datastores().main + self.state_store = hs.get_datastores().state + self.state_deletion_store = hs.get_datastores().state_deletion self._storage_controllers = self.hs.get_storage_controllers() def test_purge_history(self) -> None: @@ -128,3 +132,328 @@ class PurgeTests(HomeserverTestCase): self.store._invalidate_local_get_event_cache(create_event.event_id) self.get_failure(self.store.get_event(create_event.event_id), NotFoundError) self.get_failure(self.store.get_event(first["event_id"]), NotFoundError) + + def test_purge_history_deletes_state_groups(self) -> None: + """Test that unreferenced state groups get cleaned up after purge""" + + # Send four state changes to the room. + first = self.helper.send_state( + self.room_id, event_type="m.foo", body={"test": 1} + ) + second = self.helper.send_state( + self.room_id, event_type="m.foo", body={"test": 2} + ) + third = self.helper.send_state( + self.room_id, event_type="m.foo", body={"test": 3} + ) + last = self.helper.send_state( + self.room_id, event_type="m.foo", body={"test": 4} + ) + + # Get references to the state groups + event_to_groups = self.get_success( + self.store._get_state_group_for_events( + [ + first["event_id"], + second["event_id"], + third["event_id"], + last["event_id"], + ] + ) + ) + + # Get the topological token + token = self.get_success( + self.store.get_topological_token_for_event(last["event_id"]) + ) + token_str = self.get_success(token.to_string(self.hs.get_datastores().main)) + + # Purge everything before this topological token + self.get_success( + self._storage_controllers.purge_events.purge_history( + self.room_id, token_str, True + ) + ) + + # Advance so that the background jobs to delete the state groups runs + self.reactor.advance( + 1 + self.state_deletion_store.DELAY_BEFORE_DELETION_MS / 1000 + ) + + # We expect all the state groups associated with events above, except + # the last one, should return no state. + state_groups = self.get_success( + self.state_store._get_state_groups_from_groups( + list(event_to_groups.values()), StateFilter.all() + ) + ) + first_state = state_groups[event_to_groups[first["event_id"]]] + second_state = state_groups[event_to_groups[second["event_id"]]] + third_state = state_groups[event_to_groups[third["event_id"]]] + last_state = state_groups[event_to_groups[last["event_id"]]] + + self.assertEqual(first_state, {}) + self.assertEqual(second_state, {}) + self.assertEqual(third_state, {}) + self.assertNotEqual(last_state, {}) + + def test_purge_unreferenced_state_group(self) -> None: + """Test that purging a room also gets rid of unreferenced state groups + it encounters during the purge. + + This is important, as otherwise these unreferenced state groups get + "de-deltaed" during the purge process, consuming lots of disk space. + """ + + self.helper.send(self.room_id, body="test1") + state1 = self.helper.send_state( + self.room_id, "org.matrix.test", body={"number": 2} + ) + state2 = self.helper.send_state( + self.room_id, "org.matrix.test", body={"number": 3} + ) + self.helper.send(self.room_id, body="test4") + last = self.helper.send(self.room_id, body="test5") + + # Create an unreferenced state group that has a prev group of one of the + # to-be-purged events. + prev_group = self.get_success( + self.store._get_state_group_for_event(state1["event_id"]) + ) + unreferenced_state_group = self.get_success( + self.state_store.store_state_group( + event_id=last["event_id"], + room_id=self.room_id, + prev_group=prev_group, + delta_ids={("org.matrix.test", ""): state2["event_id"]}, + current_state_ids=None, + ) + ) + + # Get the topological token + token = self.get_success( + self.store.get_topological_token_for_event(last["event_id"]) + ) + token_str = self.get_success(token.to_string(self.hs.get_datastores().main)) + + # Purge everything before this topological token + self.get_success( + self._storage_controllers.purge_events.purge_history( + self.room_id, token_str, True + ) + ) + + # Advance so that the background jobs to delete the state groups runs + self.reactor.advance( + 1 + self.state_deletion_store.DELAY_BEFORE_DELETION_MS / 1000 + ) + + # We expect that the unreferenced state group has been deleted from all tables. + row = self.get_success( + self.state_store.db_pool.simple_select_one_onecol( + table="state_groups", + keyvalues={"id": unreferenced_state_group}, + retcol="id", + allow_none=True, + desc="test_purge_unreferenced_state_group", + ) + ) + self.assertIsNone(row) + + row = self.get_success( + self.state_store.db_pool.simple_select_one_onecol( + table="state_groups_state", + keyvalues={"state_group": unreferenced_state_group}, + retcol="state_group", + allow_none=True, + desc="test_purge_unreferenced_state_group", + ) + ) + self.assertIsNone(row) + + row = self.get_success( + self.state_store.db_pool.simple_select_one_onecol( + table="state_group_edges", + keyvalues={"state_group": unreferenced_state_group}, + retcol="state_group", + allow_none=True, + desc="test_purge_unreferenced_state_group", + ) + ) + self.assertIsNone(row) + + row = self.get_success( + self.state_store.db_pool.simple_select_one_onecol( + table="state_groups_pending_deletion", + keyvalues={"state_group": unreferenced_state_group}, + retcol="state_group", + allow_none=True, + desc="test_purge_unreferenced_state_group", + ) + ) + self.assertIsNone(row) + + # We expect there to now only be one state group for the room, which is + # the state group of the last event (as the only outlier). + state_groups = self.get_success( + self.state_store.db_pool.simple_select_onecol( + table="state_groups", + keyvalues={"room_id": self.room_id}, + retcol="id", + desc="test_purge_unreferenced_state_group", + ) + ) + self.assertEqual(len(state_groups), 1) + + def test_clear_unreferenced_state_groups(self) -> None: + """Test that any unreferenced state groups are automatically cleaned up.""" + + self.helper.send(self.room_id, body="test1") + state1 = self.helper.send_state( + self.room_id, "org.matrix.test", body={"number": 2} + ) + # Create enough state events to require multiple batches of + # mark_unreferenced_state_groups_for_deletion_bg_update to be run. + for i in range(200): + self.helper.send_state(self.room_id, "org.matrix.test", body={"number": i}) + self.helper.send(self.room_id, body="test4") + last = self.helper.send(self.room_id, body="test5") + + # Create an unreferenced state group that has no prev group. + unreferenced_free_state_group = self.get_success( + self.state_store.store_state_group( + event_id=last["event_id"], + room_id=self.room_id, + prev_group=None, + delta_ids={("org.matrix.test", ""): state1["event_id"]}, + current_state_ids={("org.matrix.test", ""): ""}, + ) + ) + + # Create some unreferenced state groups that have a prev group of one of the + # existing state groups. + prev_group = self.get_success( + self.store._get_state_group_for_event(state1["event_id"]) + ) + unreferenced_end_state_group = self.get_success( + self.state_store.store_state_group( + event_id=last["event_id"], + room_id=self.room_id, + prev_group=prev_group, + delta_ids={("org.matrix.test", ""): state1["event_id"]}, + current_state_ids=None, + ) + ) + another_unreferenced_end_state_group = self.get_success( + self.state_store.store_state_group( + event_id=last["event_id"], + room_id=self.room_id, + prev_group=unreferenced_end_state_group, + delta_ids={("org.matrix.test", ""): state1["event_id"]}, + current_state_ids=None, + ) + ) + + # Add some other unreferenced state groups which lead to a referenced state + # group. + # These state groups should not get deleted. + chain_state_group = self.get_success( + self.state_store.store_state_group( + event_id=last["event_id"], + room_id=self.room_id, + prev_group=None, + delta_ids={("org.matrix.test", ""): ""}, + current_state_ids={("org.matrix.test", ""): ""}, + ) + ) + chain_state_group_2 = self.get_success( + self.state_store.store_state_group( + event_id=last["event_id"], + room_id=self.room_id, + prev_group=chain_state_group, + delta_ids={("org.matrix.test", ""): ""}, + current_state_ids=None, + ) + ) + referenced_chain_state_group = self.get_success( + self.state_store.store_state_group( + event_id=last["event_id"], + room_id=self.room_id, + prev_group=chain_state_group_2, + delta_ids={("org.matrix.test", ""): ""}, + current_state_ids=None, + ) + ) + self.get_success( + self.store.db_pool.simple_insert( + "event_to_state_groups", + { + "event_id": "$new_event", + "state_group": referenced_chain_state_group, + }, + ) + ) + + # Insert and run the background update. + self.get_success( + self.store.db_pool.simple_insert( + "background_updates", + { + "update_name": _BackgroundUpdates.MARK_UNREFERENCED_STATE_GROUPS_FOR_DELETION_BG_UPDATE, + "progress_json": "{}", + }, + ) + ) + self.store.db_pool.updates._all_done = False + self.wait_for_background_updates() + + # Advance so that the background job to delete the state groups runs + self.reactor.advance( + 1 + self.state_deletion_store.DELAY_BEFORE_DELETION_MS / 1000 + ) + + # We expect that the unreferenced free state group has been deleted. + row = self.get_success( + self.state_store.db_pool.simple_select_one_onecol( + table="state_groups", + keyvalues={"id": unreferenced_free_state_group}, + retcol="id", + allow_none=True, + desc="test_purge_unreferenced_state_group", + ) + ) + self.assertIsNone(row) + + # We expect that both unreferenced end state groups have been deleted. + row = self.get_success( + self.state_store.db_pool.simple_select_one_onecol( + table="state_groups", + keyvalues={"id": unreferenced_end_state_group}, + retcol="id", + allow_none=True, + desc="test_purge_unreferenced_state_group", + ) + ) + self.assertIsNone(row) + row = self.get_success( + self.state_store.db_pool.simple_select_one_onecol( + table="state_groups", + keyvalues={"id": another_unreferenced_end_state_group}, + retcol="id", + allow_none=True, + desc="test_purge_unreferenced_state_group", + ) + ) + self.assertIsNone(row) + + # We expect there to now only be one state group for the room, which is + # the state group of the last event (as the only outlier). + state_groups = self.get_success( + self.state_store.db_pool.simple_select_onecol( + table="state_groups", + keyvalues={"room_id": self.room_id}, + retcol="id", + desc="test_purge_unreferenced_state_group", + ) + ) + self.assertEqual(len(state_groups), 210)