summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/storage/databases/main/event_push_actions.py46
1 files changed, 35 insertions, 11 deletions
diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index 6fdb1e292e..8b9b38c16d 100644
--- a/synapse/storage/databases/main/event_push_actions.py
+++ b/synapse/storage/databases/main/event_push_actions.py
@@ -1817,20 +1817,18 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
         1 day ago).
         """
 
-        # We want to clear out anything that is older than a day that *has* already
-        # been rotated.
+        # Fetch the stream ordering that we've rotated up to, to ensure we don't
+        # delete anything after.
         rotated_upto_stream_ordering = await self.db_pool.simple_select_one_onecol(
             table="event_push_summary_stream_ordering",
             keyvalues={},
             retcol="stream_ordering",
         )
 
-        max_stream_ordering_to_delete = min(
-            rotated_upto_stream_ordering, self.stream_ordering_day_ago
-        )
-
         def remove_old_push_actions_that_have_rotated_txn(
             txn: LoggingTransaction,
+            highlight: bool,
+            max_stream_ordering: int,
         ) -> bool:
             # We don't want to clear out too much at a time, so we bound our
             # deletes.
@@ -1848,11 +1846,12 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
             txn.execute(
                 """
                 SELECT stream_ordering FROM event_push_actions
-                WHERE stream_ordering <= ? AND highlight = 0
+                WHERE stream_ordering <= ? AND highlight = ?
                 ORDER BY stream_ordering ASC LIMIT 1 OFFSET ?
                 """,
                 (
-                    max_stream_ordering_to_delete,
+                    max_stream_ordering,
+                    1 if highlight else 0,
                     batch_size,
                 ),
             )
@@ -1861,26 +1860,51 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
             if stream_row:
                 (stream_ordering,) = stream_row
             else:
-                stream_ordering = max_stream_ordering_to_delete
+                stream_ordering = max_stream_ordering
 
             # We need to use a inclusive bound here to handle the case where a
             # single stream ordering has more than `batch_size` rows.
             txn.execute(
                 """
                 DELETE FROM event_push_actions
-                WHERE stream_ordering <= ? AND highlight = 0
+                WHERE stream_ordering <= ? AND highlight = ?
                 """,
-                (stream_ordering,),
+                (
+                    stream_ordering,
+                    1 if highlight else 0,
+                ),
             )
 
             logger.info("Rotating notifications, deleted %s push actions", txn.rowcount)
 
             return txn.rowcount < batch_size
 
+        # First we delete all rotated highlights older than a month...
+        max_stream_ordering_to_delete = min(
+            rotated_upto_stream_ordering, self.stream_ordering_month_ago
+        )
+
+        while True:
+            done = await self.db_pool.runInteraction(
+                "_remove_old_push_actions_that_have_rotated_highlights",
+                remove_old_push_actions_that_have_rotated_txn,
+                True,
+                max_stream_ordering_to_delete,
+            )
+            if done:
+                break
+
+        # ... and then delete all rotated notifs older than a day
+        max_stream_ordering_to_delete = min(
+            rotated_upto_stream_ordering, self.stream_ordering_day_ago
+        )
+
         while True:
             done = await self.db_pool.runInteraction(
                 "_remove_old_push_actions_that_have_rotated",
                 remove_old_push_actions_that_have_rotated_txn,
+                False,
+                max_stream_ordering_to_delete,
             )
             if done:
                 break