diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 9084a62456..24141f97ba 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -2114,17 +2114,42 @@ class EventsStore(EventsWorkerStore):
should_delete_expr += " AND event_id NOT LIKE ?"
should_delete_params += ("%:" + self.hs.hostname, )
- should_delete_params += (room_id, token.topological)
+ next_token = RoomStreamToken(
+ token.chunk,
+ token.topological - 1,
+ token.stream,
+ )
+ while True:
+ rows, next_token, _ = self._paginate_room_events_txn(
+ txn, room_id, next_token, direction='b', limit=1000,
+ )
+ next_token = RoomStreamToken.parse(next_token)
+
+ if len(rows) == 0:
+ break
+
+ txn.executemany(
+ """INSERT INTO events_to_purge
+ SELECT event_id, %s
+ FROM events
+ LEFT JOIN state_events USING (event_id)
+ WHERE event_id = ?
+ """ % (
+ should_delete_expr,
+ ),
+ (
+ should_delete_params + (row.event_id,)
+ for row in rows
+ ),
+ )
+
+ txn.execute("""
+ DELETE FROM events_to_purge
+ WHERE event_id IN (
+ SELECT event_id FROM event_forward_extremities
+ )
+ """)
- txn.execute(
- "INSERT INTO events_to_purge"
- " SELECT event_id, %s"
- " FROM events AS e LEFT JOIN state_events USING (event_id)"
- " WHERE e.room_id = ? AND topological_ordering < ?" % (
- should_delete_expr,
- ),
- should_delete_params,
- )
txn.execute(
"SELECT event_id, should_delete FROM events_to_purge"
)
@@ -2163,6 +2188,27 @@ class EventsStore(EventsWorkerStore):
]
)
+ txn.execute(
+ """DELETE FROM chunk_backwards_extremities
+ WHERE event_id IN (
+ SELECT event_id FROM events WHERE room_id = ?
+ )
+ """,
+ (room_id,)
+ )
+
+ txn.execute(
+ """
+ INSERT INTO chunk_backwards_extremities
+ SELECT DISTINCT ee.chunk_id, e.event_id
+ FROM events_to_purge AS e
+ INNER JOIN event_edges AS ed ON e.event_id = ed.prev_event_id
+ INNER JOIN events AS ee ON ee.event_id = ed.event_id
+ LEFT JOIN events_to_purge AS ep2 ON ed.event_id = ep2.event_id
+ WHERE ep2.event_id IS NULL
+ """,
+ )
+
logger.info("[purge] finding redundant state groups")
# Get all state groups that are only referenced by events that are
@@ -2315,7 +2361,7 @@ class EventsStore(EventsWorkerStore):
# Mark all state and own events as outliers
logger.info("[purge] marking remaining events as outliers")
txn.execute(
- "UPDATE events SET outlier = ?"
+ "UPDATE events SET outlier = ?, chunk_id = NULL"
" WHERE event_id IN ("
" SELECT event_id FROM events_to_purge "
" WHERE NOT should_delete"
|