diff options
-rw-r--r-- | synapse/storage/databases/main/event_push_actions.py | 46 |
1 files changed, 35 insertions, 11 deletions
diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 6fdb1e292e..8b9b38c16d 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -1817,20 +1817,18 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas 1 day ago). """ - # We want to clear out anything that is older than a day that *has* already - # been rotated. + # Fetch the stream ordering that we've rotated up to, to ensure we don't + # delete anything after. rotated_upto_stream_ordering = await self.db_pool.simple_select_one_onecol( table="event_push_summary_stream_ordering", keyvalues={}, retcol="stream_ordering", ) - max_stream_ordering_to_delete = min( - rotated_upto_stream_ordering, self.stream_ordering_day_ago - ) - def remove_old_push_actions_that_have_rotated_txn( txn: LoggingTransaction, + highlight: bool, + max_stream_ordering: int, ) -> bool: # We don't want to clear out too much at a time, so we bound our # deletes. @@ -1848,11 +1846,12 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas txn.execute( """ SELECT stream_ordering FROM event_push_actions - WHERE stream_ordering <= ? AND highlight = 0 + WHERE stream_ordering <= ? AND highlight = ? ORDER BY stream_ordering ASC LIMIT 1 OFFSET ? """, ( - max_stream_ordering_to_delete, + max_stream_ordering, + 1 if highlight else 0, batch_size, ), ) @@ -1861,26 +1860,51 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas if stream_row: (stream_ordering,) = stream_row else: - stream_ordering = max_stream_ordering_to_delete + stream_ordering = max_stream_ordering # We need to use a inclusive bound here to handle the case where a # single stream ordering has more than `batch_size` rows. txn.execute( """ DELETE FROM event_push_actions - WHERE stream_ordering <= ? AND highlight = 0 + WHERE stream_ordering <= ? AND highlight = ? """, - (stream_ordering,), + ( + stream_ordering, + 1 if highlight else 0, + ), ) logger.info("Rotating notifications, deleted %s push actions", txn.rowcount) return txn.rowcount < batch_size + # First we delete all rotated highlights older than a month... + max_stream_ordering_to_delete = min( + rotated_upto_stream_ordering, self.stream_ordering_month_ago + ) + + while True: + done = await self.db_pool.runInteraction( + "_remove_old_push_actions_that_have_rotated_highlights", + remove_old_push_actions_that_have_rotated_txn, + True, + max_stream_ordering_to_delete, + ) + if done: + break + + # ... and then delete all rotated notifs older than a day + max_stream_ordering_to_delete = min( + rotated_upto_stream_ordering, self.stream_ordering_day_ago + ) + while True: done = await self.db_pool.runInteraction( "_remove_old_push_actions_that_have_rotated", remove_old_push_actions_that_have_rotated_txn, + False, + max_stream_ordering_to_delete, ) if done: break |