diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index 6fdb1e292e..8b9b38c16d 100644
--- a/synapse/storage/databases/main/event_push_actions.py
+++ b/synapse/storage/databases/main/event_push_actions.py
@@ -1817,20 +1817,18 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
1 day ago).
"""
- # We want to clear out anything that is older than a day that *has* already
- # been rotated.
+ # Fetch the stream ordering that we've rotated up to, to ensure we don't
+ # delete anything after.
rotated_upto_stream_ordering = await self.db_pool.simple_select_one_onecol(
table="event_push_summary_stream_ordering",
keyvalues={},
retcol="stream_ordering",
)
- max_stream_ordering_to_delete = min(
- rotated_upto_stream_ordering, self.stream_ordering_day_ago
- )
-
def remove_old_push_actions_that_have_rotated_txn(
txn: LoggingTransaction,
+ highlight: bool,
+ max_stream_ordering: int,
) -> bool:
# We don't want to clear out too much at a time, so we bound our
# deletes.
@@ -1848,11 +1846,12 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
txn.execute(
"""
SELECT stream_ordering FROM event_push_actions
- WHERE stream_ordering <= ? AND highlight = 0
+ WHERE stream_ordering <= ? AND highlight = ?
ORDER BY stream_ordering ASC LIMIT 1 OFFSET ?
""",
(
- max_stream_ordering_to_delete,
+ max_stream_ordering,
+ 1 if highlight else 0,
batch_size,
),
)
@@ -1861,26 +1860,51 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
if stream_row:
(stream_ordering,) = stream_row
else:
- stream_ordering = max_stream_ordering_to_delete
+ stream_ordering = max_stream_ordering
# We need to use a inclusive bound here to handle the case where a
# single stream ordering has more than `batch_size` rows.
txn.execute(
"""
DELETE FROM event_push_actions
- WHERE stream_ordering <= ? AND highlight = 0
+ WHERE stream_ordering <= ? AND highlight = ?
""",
- (stream_ordering,),
+ (
+ stream_ordering,
+ 1 if highlight else 0,
+ ),
)
logger.info("Rotating notifications, deleted %s push actions", txn.rowcount)
return txn.rowcount < batch_size
+ # First we delete all rotated highlights older than a month...
+ max_stream_ordering_to_delete = min(
+ rotated_upto_stream_ordering, self.stream_ordering_month_ago
+ )
+
+ while True:
+ done = await self.db_pool.runInteraction(
+ "_remove_old_push_actions_that_have_rotated_highlights",
+ remove_old_push_actions_that_have_rotated_txn,
+ True,
+ max_stream_ordering_to_delete,
+ )
+ if done:
+ break
+
+ # ... and then delete all rotated notifs older than a day
+ max_stream_ordering_to_delete = min(
+ rotated_upto_stream_ordering, self.stream_ordering_day_ago
+ )
+
while True:
done = await self.db_pool.runInteraction(
"_remove_old_push_actions_that_have_rotated",
remove_old_push_actions_that_have_rotated_txn,
+ False,
+ max_stream_ordering_to_delete,
)
if done:
break
|