diff options
-rw-r--r-- | synapse/storage/events.py | 69 |
1 files changed, 59 insertions, 10 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py index d38f65b4e6..d25b2597b7 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -2103,17 +2103,45 @@ class EventsStore(EventsWorkerStore): should_delete_expr += " AND event_id NOT LIKE ?" should_delete_params += ("%:" + self.hs.hostname, ) - should_delete_params += (room_id, token.topological) + # We insert into events_to_purge by paginating backwards from the + # current token. - txn.execute( - "INSERT INTO events_to_purge" - " SELECT event_id, %s" - " FROM events AS e LEFT JOIN state_events USING (event_id)" - " WHERE e.room_id = ? AND topological_ordering < ?" % ( - should_delete_expr, - ), - should_delete_params, + next_token = RoomStreamToken( + token.chunk, + token.topological - 1, + token.stream, ) + while True: + rows, next_token, _ = self._paginate_room_events_txn( + txn, room_id, next_token, direction='b', limit=1000, + ) + next_token = RoomStreamToken.parse(next_token) + + if len(rows) == 0: + break + + txn.executemany( + """INSERT INTO events_to_purge + SELECT event_id, %s + FROM events + LEFT JOIN state_events USING (event_id) + WHERE event_id = ? + """ % ( + should_delete_expr, + ), + ( + should_delete_params + (row.event_id,) + for row in rows + ), + ) + + txn.execute(""" + DELETE FROM events_to_purge + WHERE event_id IN ( + SELECT event_id FROM event_forward_extremities + ) + """) + txn.execute( "SELECT event_id, should_delete FROM events_to_purge" ) @@ -2152,6 +2180,27 @@ class EventsStore(EventsWorkerStore): ] ) + txn.execute( + """DELETE FROM chunk_backwards_extremities + WHERE event_id IN ( + SELECT event_id FROM events WHERE room_id = ? + ) + """, + (room_id,) + ) + + txn.execute( + """ + INSERT INTO chunk_backwards_extremities + SELECT DISTINCT ee.chunk_id, e.event_id + FROM events_to_purge AS e + INNER JOIN event_edges AS ed ON e.event_id = ed.prev_event_id + INNER JOIN events AS ee ON ee.event_id = ed.event_id + LEFT JOIN events_to_purge AS ep2 ON ed.event_id = ep2.event_id + WHERE ep2.event_id IS NULL + """, + ) + logger.info("[purge] finding redundant state groups") # Get all state groups that are only referenced by events that are @@ -2304,7 +2353,7 @@ class EventsStore(EventsWorkerStore): # Mark all state and own events as outliers logger.info("[purge] marking remaining events as outliers") txn.execute( - "UPDATE events SET outlier = ?" + "UPDATE events SET outlier = ?, chunk_id = NULL" " WHERE event_id IN (" " SELECT event_id FROM events_to_purge " " WHERE NOT should_delete" |