summary refs log tree commit diff
path: root/synapse/storage/databases/main/events_bg_updates.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases/main/events_bg_updates.py')
-rw-r--r--synapse/storage/databases/main/events_bg_updates.py79
1 files changed, 79 insertions, 0 deletions
diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py

index cb6b1f8a0c..78367ea58d 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -135,6 +135,11 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): self._chain_cover_index, ) + self.db_pool.updates.register_background_update_handler( + "purged_chain_cover", + self._purged_chain_cover_index, + ) + async def _background_reindex_fields_sender(self, progress, batch_size): target_min_stream_id = progress["target_min_stream_id_inclusive"] max_stream_id = progress["max_stream_id_exclusive"] @@ -932,3 +937,77 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): processed_count=count, finished_room_map=finished_rooms, ) + + async def _purged_chain_cover_index(self, progress: dict, batch_size: int) -> int: + """ + A background updates that iterates over the chain cover and deletes the + chain cover for events that have been purged. + + This may be due to fully purging a room or via setting a retention policy. + """ + current_event_id = progress.get("current_event_id", "") + + def purged_chain_cover_txn(txn) -> int: + # The event ID from events will be null if the chain ID / sequence + # number points to a purged event. + sql = """ + SELECT event_id, chain_id, sequence_number, e.event_id IS NOT NULL + FROM event_auth_chains + LEFT JOIN events AS e USING (event_id) + WHERE event_id > ? ORDER BY event_auth_chains.event_id ASC LIMIT ? + """ + txn.execute(sql, (current_event_id, batch_size)) + + rows = txn.fetchall() + if not rows: + return 0 + + # The event IDs and chain IDs / sequence numbers where the event has + # been purged. + unreferenced_event_ids = [] + unreferenced_chain_id_tuples = [] + event_id = "" + for event_id, chain_id, sequence_number, has_event in rows: + if not has_event: + unreferenced_event_ids.append((event_id,)) + unreferenced_chain_id_tuples.append((chain_id, sequence_number)) + + # Delete the unreferenced auth chains from event_auth_chain_links and + # event_auth_chains. + txn.executemany( + """ + DELETE FROM event_auth_chains WHERE event_id = ? + """, + unreferenced_event_ids, + ) + # We should also delete matching target_*, but there is no index on + # target_chain_id. Hopefully any purged events are due to a room + # being fully purged and they will be removed from the origin_* + # searches. + txn.executemany( + """ + DELETE FROM event_auth_chain_links WHERE + origin_chain_id = ? AND origin_sequence_number = ? + """, + unreferenced_chain_id_tuples, + ) + + progress = { + "current_event_id": event_id, + } + + self.db_pool.updates._background_update_progress_txn( + txn, "purged_chain_cover", progress + ) + + return len(rows) + + result = await self.db_pool.runInteraction( + "_purged_chain_cover_index", + purged_chain_cover_txn, + ) + + if not result: + await self.db_pool.updates._end_background_update("purged_chain_cover") + + return result