summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/14020.misc1
-rw-r--r--synapse/storage/databases/main/event_push_actions.py58
-rw-r--r--synapse/storage/schema/__init__.py1
-rw-r--r--synapse/storage/schema/main/delta/73/05old_push_actions.sql.postgres22
-rw-r--r--synapse/storage/schema/main/delta/73/05old_push_actions.sql.sqlite24
5 files changed, 105 insertions, 1 deletions
diff --git a/changelog.d/14020.misc b/changelog.d/14020.misc
new file mode 100644
index 0000000000..85550b307d
--- /dev/null
+++ b/changelog.d/14020.misc
@@ -0,0 +1 @@
+Clear out stale entries in `event_push_actions_staging` table.
diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index 3fdf128d9e..cdc9ee5a37 100644
--- a/synapse/storage/databases/main/event_push_actions.py
+++ b/synapse/storage/databases/main/event_push_actions.py
@@ -205,6 +205,9 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
     ):
         super().__init__(database, db_conn, hs)
 
+        # Track when the process started.
+        self._started_ts = self._clock.time_msec()
+
         # These get correctly set by _find_stream_orderings_for_times_txn
         self.stream_ordering_month_ago: Optional[int] = None
         self.stream_ordering_day_ago: Optional[int] = None
@@ -224,6 +227,10 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
                 self._rotate_notifs, 30 * 1000
             )
 
+            self._clear_old_staging_loop = self._clock.looping_call(
+                self._clear_old_push_actions_staging, 30 * 60 * 1000
+            )
+
         self.db_pool.updates.register_background_index_update(
             "event_push_summary_unique_index",
             index_name="event_push_summary_unique_index",
@@ -791,7 +798,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
         # can be used to insert into the `event_push_actions_staging` table.
         def _gen_entry(
             user_id: str, actions: Collection[Union[Mapping, str]]
-        ) -> Tuple[str, str, str, int, int, int, str]:
+        ) -> Tuple[str, str, str, int, int, int, str, int]:
             is_highlight = 1 if _action_has_highlight(actions) else 0
             notif = 1 if "notify" in actions else 0
             return (
@@ -802,6 +809,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
                 is_highlight,  # highlight column
                 int(count_as_unread),  # unread column
                 thread_id,  # thread_id column
+                self._clock.time_msec(),  # inserted_ts column
             )
 
         await self.db_pool.simple_insert_many(
@@ -814,6 +822,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
                 "highlight",
                 "unread",
                 "thread_id",
+                "inserted_ts",
             ),
             values=[
                 _gen_entry(user_id, actions)
@@ -1340,6 +1349,53 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
             if done:
                 break
 
+    @wrap_as_background_process("_clear_old_push_actions_staging")
+    async def _clear_old_push_actions_staging(self) -> None:
+        """Clear out any old event push actions from the staging table for
+        events that we failed to persist.
+        """
+
+        # We delete anything more than an hour old, on the assumption that we'll
+        # never take more than an hour to persist an event.
+        delete_before_ts = self._clock.time_msec() - 60 * 60 * 1000
+
+        if self._started_ts > delete_before_ts:
+            # We need to wait for at least an hour before we started deleting,
+            # so that we know it's safe to delete rows with NULL `inserted_ts`.
+            return
+
+        # We don't have an index on `inserted_ts`, instead we assume that the
+        # number of "live" rows in `event_push_actions_staging` is small enough
+        # that an infrequent periodic scan won't cause a problem.
+        #
+        # Note: we also delete any columns with NULL `inserted_ts`, this is safe
+        # as we added a default value to new rows and so they must be at least
+        # an hour old.
+        limit = 1000
+        sql = """
+            DELETE FROM event_push_actions_staging WHERE event_id IN (
+                SELECT event_id FROM event_push_actions_staging WHERE
+                inserted_ts < ? OR inserted_ts IS NULL
+                LIMIT ?
+            )
+        """
+
+        def _clear_old_push_actions_staging_txn(txn: LoggingTransaction) -> bool:
+            txn.execute(sql, (delete_before_ts, limit))
+            return txn.rowcount >= limit
+
+        while True:
+            # Returns true if we have more stuff to delete from the table.
+            deleted = await self.db_pool.runInteraction(
+                "_clear_old_push_actions_staging", _clear_old_push_actions_staging_txn
+            )
+
+            if not deleted:
+                return
+
+            # We sleep to ensure that we don't overwhelm the DB.
+            await self._clock.sleep(1.0)
+
 
 class EventPushActionsStore(EventPushActionsWorkerStore):
     EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index f29424d17a..4a5c947699 100644
--- a/synapse/storage/schema/__init__.py
+++ b/synapse/storage/schema/__init__.py
@@ -85,6 +85,7 @@ Changes in SCHEMA_VERSION = 73;
       events over federation.
     - Add indexes to various tables (`event_failed_pull_attempts`, `insertion_events`,
       `batch_events`) to make it easy to delete all associated rows when purging a room.
+    - `inserted_ts` column is added to `event_push_actions_staging` table.
 """
 
 
diff --git a/synapse/storage/schema/main/delta/73/05old_push_actions.sql.postgres b/synapse/storage/schema/main/delta/73/05old_push_actions.sql.postgres
new file mode 100644
index 0000000000..4af1a8470b
--- /dev/null
+++ b/synapse/storage/schema/main/delta/73/05old_push_actions.sql.postgres
@@ -0,0 +1,22 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Add a column so that we know when a push action was inserted, to make it
+-- easier to clear out old ones.
+ALTER TABLE event_push_actions_staging ADD COLUMN inserted_ts BIGINT;
+
+-- We now add a default for *new* rows. We don't do this above as we don't want
+-- to have to update every remove with the new default.
+ALTER TABLE event_push_actions_staging ALTER COLUMN inserted_ts SET DEFAULT extract(epoch from now()) * 1000;
diff --git a/synapse/storage/schema/main/delta/73/05old_push_actions.sql.sqlite b/synapse/storage/schema/main/delta/73/05old_push_actions.sql.sqlite
new file mode 100644
index 0000000000..7482dabba2
--- /dev/null
+++ b/synapse/storage/schema/main/delta/73/05old_push_actions.sql.sqlite
@@ -0,0 +1,24 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- On SQLite we must be in monolith mode and updating the database from Synapse,
+-- so its safe to assume that `event_push_actions_staging` should be empty (as
+-- over restart an event must either have been fully persisted or we'll
+-- recalculate the push actions)
+DELETE FROM event_push_actions_staging;
+
+-- Add a column so that we know when a push action was inserted, to make it
+-- easier to clear out old ones.
+ALTER TABLE event_push_actions_staging ADD COLUMN inserted_ts BIGINT;