diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index ecfc9f20b1..0836e4af49 100644
--- a/synapse/storage/databases/main/purge_events.py
+++ b/synapse/storage/databases/main/purge_events.py
@@ -28,7 +28,10 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
async def purge_history(
self, room_id: str, token: str, delete_local_events: bool
) -> Set[int]:
- """Deletes room history before a certain point
+ """Deletes room history before a certain point.
+
+ Note that only a single purge can occur at once, this is guaranteed via
+ a higher level (in the PaginationHandler).
Args:
room_id:
@@ -52,7 +55,9 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
delete_local_events,
)
- def _purge_history_txn(self, txn, room_id, token, delete_local_events):
+ def _purge_history_txn(
+ self, txn, room_id: str, token: RoomStreamToken, delete_local_events: bool
+ ) -> Set[int]:
# Tables that should be pruned:
# event_auth
# event_backward_extremities
@@ -103,7 +108,7 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
if max_depth < token.topological:
# We need to ensure we don't delete all the events from the database
# otherwise we wouldn't be able to send any events (due to not
- # having any backwards extremeties)
+ # having any backwards extremities)
raise SynapseError(
400, "topological_ordering is greater than forward extremeties"
)
@@ -154,7 +159,7 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
logger.info("[purge] Finding new backward extremities")
- # We calculate the new entries for the backward extremeties by finding
+ # We calculate the new entries for the backward extremities by finding
# events to be purged that are pointed to by events we're not going to
# purge.
txn.execute(
@@ -296,7 +301,7 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
"purge_room", self._purge_room_txn, room_id
)
- def _purge_room_txn(self, txn, room_id):
+ def _purge_room_txn(self, txn, room_id: str) -> List[int]:
# First we fetch all the state groups that should be deleted, before
# we delete that information.
txn.execute(
@@ -310,6 +315,31 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
state_groups = [row[0] for row in txn]
+ # Get all the auth chains that are referenced by events that are to be
+ # deleted.
+ txn.execute(
+ """
+ SELECT chain_id, sequence_number FROM events
+ LEFT JOIN event_auth_chains USING (event_id)
+ WHERE room_id = ?
+ """,
+ (room_id,),
+ )
+ referenced_chain_id_tuples = list(txn)
+
+ logger.info("[purge] removing events from event_auth_chain_links")
+ txn.executemany(
+ """
+ DELETE FROM event_auth_chain_links WHERE
+ (origin_chain_id = ? AND origin_sequence_number = ?) OR
+ (target_chain_id = ? AND target_sequence_number = ?)
+ """,
+ (
+ (chain_id, seq_num, chain_id, seq_num)
+ for (chain_id, seq_num) in referenced_chain_id_tuples
+ ),
+ )
+
# Now we delete tables which lack an index on room_id but have one on event_id
for table in (
"event_auth",
@@ -319,6 +349,8 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
"event_reference_hashes",
"event_relations",
"event_to_state_groups",
+ "event_auth_chains",
+ "event_auth_chain_to_calculate",
"redactions",
"rejections",
"state_events",
|