summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/9542.bugfix1
-rw-r--r--synapse/storage/databases/main/events_bg_updates.py79
-rw-r--r--synapse/storage/databases/main/purge_events.py8
-rw-r--r--synapse/storage/databases/main/schema/delta/59/10delete_purged_chain_cover.sql17
4 files changed, 99 insertions, 6 deletions
diff --git a/changelog.d/9542.bugfix b/changelog.d/9542.bugfix
new file mode 100644
index 0000000000..51b1876f3b
--- /dev/null
+++ b/changelog.d/9542.bugfix
@@ -0,0 +1 @@
+Purge chain cover indexes for events that were purged prior to Synapse v1.29.0.
diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index cb6b1f8a0c..73e69d4cb1 100644
--- a/synapse/storage/databases/main/events_bg_updates.py
+++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -135,6 +135,11 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
             self._chain_cover_index,
         )
 
+        self.db_pool.updates.register_background_update_handler(
+            "purged_chain_cover",
+            self._purged_chain_cover_index,
+        )
+
     async def _background_reindex_fields_sender(self, progress, batch_size):
         target_min_stream_id = progress["target_min_stream_id_inclusive"]
         max_stream_id = progress["max_stream_id_exclusive"]
@@ -932,3 +937,77 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
             processed_count=count,
             finished_room_map=finished_rooms,
         )
+
+    async def _purged_chain_cover_index(self, progress: dict, batch_size: int) -> int:
+        """
+        A background updates that iterates over the chain cover and deletes the
+        chain cover for events that have been purged.
+
+        This may be due to fully purging a room or via setting a retention policy.
+        """
+        current_event_id = progress.get("current_event_id", "")
+
+        def purged_chain_cover_txn(txn) -> int:
+            # The event ID from events will be null if the chain ID / sequence
+            # number points to a purged event.
+            sql = """
+                SELECT event_id, chain_id, sequence_number, e.event_id IS NOT NULL
+                FROM event_auth_chains
+                LEFT JOIN events AS e USING (event_id)
+                WHERE event_id > ? ORDER BY event_auth_chains.event_id ASC LIMIT ?
+            """
+            txn.execute(sql, (current_event_id, batch_size))
+
+            rows = txn.fetchall()
+            if not rows:
+                return 0
+
+            # The event IDs and chain IDs / sequence numbers where the event has
+            # been purged.
+            unreferenced_event_ids = []
+            unreferenced_chain_id_tuples = []
+            event_id = ""
+            for event_id, chain_id, sequence_number, has_event in rows:
+                if not has_event:
+                    unreferenced_event_ids.append(event_id)
+                    unreferenced_chain_id_tuples.append((chain_id, sequence_number))
+
+            # Delete the unreferenced auth chains from event_auth_chain_links and
+            # event_auth_chains.
+            txn.executemany(
+                """
+                DELETE FROM event_auth_chains WHERE event_id = ?
+                """,
+                unreferenced_event_ids,
+            )
+            # We should also delete matching target_*, but there is no index on
+            # target_chain_id. Hopefully any purged events are due to a room
+            # being fully purged and they will be removed from the origin_*
+            # searches.
+            txn.executemany(
+                """
+                DELETE FROM event_auth_chain_links WHERE
+                origin_chain_id = ? AND origin_sequence_number = ?
+                """,
+                unreferenced_chain_id_tuples,
+            )
+
+            progress = {
+                "current_event_id": event_id,
+            }
+
+            self.db_pool.updates._background_update_progress_txn(
+                txn, "purged_chain_cover", progress
+            )
+
+            return len(rows)
+
+        result = await self.db_pool.runInteraction(
+            "_purged_chain_cover_index",
+            purged_chain_cover_txn,
+        )
+
+        if not result:
+            await self.db_pool.updates._end_background_update("purged_chain_cover")
+
+        return result
diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index 0836e4af49..41f4fe7f95 100644
--- a/synapse/storage/databases/main/purge_events.py
+++ b/synapse/storage/databases/main/purge_events.py
@@ -331,13 +331,9 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
         txn.executemany(
             """
             DELETE FROM event_auth_chain_links WHERE
-            (origin_chain_id = ? AND origin_sequence_number = ?) OR
-            (target_chain_id = ? AND target_sequence_number = ?)
+            origin_chain_id = ? AND origin_sequence_number = ?
             """,
-            (
-                (chain_id, seq_num, chain_id, seq_num)
-                for (chain_id, seq_num) in referenced_chain_id_tuples
-            ),
+            referenced_chain_id_tuples,
         )
 
         # Now we delete tables which lack an index on room_id but have one on event_id
diff --git a/synapse/storage/databases/main/schema/delta/59/10delete_purged_chain_cover.sql b/synapse/storage/databases/main/schema/delta/59/10delete_purged_chain_cover.sql
new file mode 100644
index 0000000000..87cb1f3cfd
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/59/10delete_purged_chain_cover.sql
@@ -0,0 +1,17 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+  (5910, 'purged_chain_cover', '{}');