diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index cb6b1f8a0c..78367ea58d 100644
--- a/synapse/storage/databases/main/events_bg_updates.py
+++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -135,6 +135,11 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
self._chain_cover_index,
)
+ self.db_pool.updates.register_background_update_handler(
+ "purged_chain_cover",
+ self._purged_chain_cover_index,
+ )
+
async def _background_reindex_fields_sender(self, progress, batch_size):
target_min_stream_id = progress["target_min_stream_id_inclusive"]
max_stream_id = progress["max_stream_id_exclusive"]
@@ -932,3 +937,77 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
processed_count=count,
finished_room_map=finished_rooms,
)
+
+ async def _purged_chain_cover_index(self, progress: dict, batch_size: int) -> int:
+ """
+ A background updates that iterates over the chain cover and deletes the
+ chain cover for events that have been purged.
+
+ This may be due to fully purging a room or via setting a retention policy.
+ """
+ current_event_id = progress.get("current_event_id", "")
+
+ def purged_chain_cover_txn(txn) -> int:
+ # The event ID from events will be null if the chain ID / sequence
+ # number points to a purged event.
+ sql = """
+ SELECT event_id, chain_id, sequence_number, e.event_id IS NOT NULL
+ FROM event_auth_chains
+ LEFT JOIN events AS e USING (event_id)
+ WHERE event_id > ? ORDER BY event_auth_chains.event_id ASC LIMIT ?
+ """
+ txn.execute(sql, (current_event_id, batch_size))
+
+ rows = txn.fetchall()
+ if not rows:
+ return 0
+
+ # The event IDs and chain IDs / sequence numbers where the event has
+ # been purged.
+ unreferenced_event_ids = []
+ unreferenced_chain_id_tuples = []
+ event_id = ""
+ for event_id, chain_id, sequence_number, has_event in rows:
+ if not has_event:
+ unreferenced_event_ids.append((event_id,))
+ unreferenced_chain_id_tuples.append((chain_id, sequence_number))
+
+ # Delete the unreferenced auth chains from event_auth_chain_links and
+ # event_auth_chains.
+ txn.executemany(
+ """
+ DELETE FROM event_auth_chains WHERE event_id = ?
+ """,
+ unreferenced_event_ids,
+ )
+ # We should also delete matching target_*, but there is no index on
+ # target_chain_id. Hopefully any purged events are due to a room
+ # being fully purged and they will be removed from the origin_*
+ # searches.
+ txn.executemany(
+ """
+ DELETE FROM event_auth_chain_links WHERE
+ origin_chain_id = ? AND origin_sequence_number = ?
+ """,
+ unreferenced_chain_id_tuples,
+ )
+
+ progress = {
+ "current_event_id": event_id,
+ }
+
+ self.db_pool.updates._background_update_progress_txn(
+ txn, "purged_chain_cover", progress
+ )
+
+ return len(rows)
+
+ result = await self.db_pool.runInteraction(
+ "_purged_chain_cover_index",
+ purged_chain_cover_txn,
+ )
+
+ if not result:
+ await self.db_pool.updates._end_background_update("purged_chain_cover")
+
+ return result
|