summary refs log tree commit diff
path: root/synapse/storage/databases
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases')
-rw-r--r--synapse/storage/databases/main/events.py6
-rw-r--r--synapse/storage/databases/main/events_bg_updates.py116
-rw-r--r--synapse/storage/databases/main/purge_events.py2
3 files changed, 118 insertions, 6 deletions
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index a8773374be..a3e12f1e9b 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -2296,11 +2296,9 @@ class PersistEventsStore:
         self.db_pool.simple_insert_many_txn(
             txn,
             table="event_edges",
-            keys=("event_id", "prev_event_id", "room_id", "is_state"),
+            keys=("event_id", "prev_event_id"),
             values=[
-                (ev.event_id, e_id, ev.room_id, False)
-                for ev in events
-                for e_id in ev.prev_event_ids()
+                (ev.event_id, e_id) for ev in events for e_id in ev.prev_event_ids()
             ],
         )
 
diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index bea34a4c4a..eeca85fc94 100644
--- a/synapse/storage/databases/main/events_bg_updates.py
+++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -1,4 +1,4 @@
-# Copyright 2019-2021 The Matrix.org Foundation C.I.C.
+# Copyright 2019-2022 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -64,6 +64,9 @@ class _BackgroundUpdates:
     INDEX_STREAM_ORDERING2_TS = "index_stream_ordering2_ts"
     REPLACE_STREAM_ORDERING_COLUMN = "replace_stream_ordering_column"
 
+    EVENT_EDGES_DROP_INVALID_ROWS = "event_edges_drop_invalid_rows"
+    EVENT_EDGES_REPLACE_INDEX = "event_edges_replace_index"
+
 
 @attr.s(slots=True, frozen=True, auto_attribs=True)
 class _CalculateChainCover:
@@ -235,6 +238,21 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
 
         ################################################################################
 
+        self.db_pool.updates.register_background_update_handler(
+            _BackgroundUpdates.EVENT_EDGES_DROP_INVALID_ROWS,
+            self._background_drop_invalid_event_edges_rows,
+        )
+
+        self.db_pool.updates.register_background_index_update(
+            _BackgroundUpdates.EVENT_EDGES_REPLACE_INDEX,
+            index_name="event_edges_event_id_prev_event_id_idx",
+            table="event_edges",
+            columns=["event_id", "prev_event_id"],
+            unique=True,
+            # the old index which just covered event_id is now redundant.
+            replaces_index="ev_edges_id",
+        )
+
     async def _background_reindex_fields_sender(
         self, progress: JsonDict, batch_size: int
     ) -> int:
@@ -1285,3 +1303,99 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
         )
 
         return 0
+
+    async def _background_drop_invalid_event_edges_rows(
+        self, progress: JsonDict, batch_size: int
+    ) -> int:
+        """Drop invalid rows from event_edges
+
+        This only runs for postgres. For SQLite, it all happens synchronously.
+
+        Firstly, drop any rows with is_state=True. These may have been added a long time
+        ago, but they are no longer used.
+
+        We also drop rows that do not correspond to entries in `events`, and add a
+        foreign key.
+        """
+
+        last_event_id = progress.get("last_event_id", "")
+
+        def drop_invalid_event_edges_txn(txn: LoggingTransaction) -> bool:
+            """Returns True if we're done."""
+
+            # first we need to find an endpoint.
+            txn.execute(
+                """
+                SELECT event_id FROM event_edges
+                WHERE event_id > ?
+                ORDER BY event_id
+                LIMIT 1 OFFSET ?
+                """,
+                (last_event_id, batch_size),
+            )
+
+            endpoint = None
+            row = txn.fetchone()
+
+            if row:
+                endpoint = row[0]
+
+            where_clause = "ee.event_id > ?"
+            args = [last_event_id]
+            if endpoint:
+                where_clause += " AND ee.event_id <= ?"
+                args.append(endpoint)
+
+            # now delete any that:
+            #   - have is_state=TRUE, or
+            #   - do not correspond to a row in `events`
+            txn.execute(
+                f"""
+                DELETE FROM event_edges
+                WHERE event_id IN (
+                   SELECT ee.event_id
+                   FROM event_edges ee
+                     LEFT JOIN events ev USING (event_id)
+                   WHERE ({where_clause}) AND
+                     (is_state OR ev.event_id IS NULL)
+                )""",
+                args,
+            )
+
+            logger.info(
+                "cleaned up event_edges up to %s: removed %i/%i rows",
+                endpoint,
+                txn.rowcount,
+                batch_size,
+            )
+
+            if endpoint is not None:
+                self.db_pool.updates._background_update_progress_txn(
+                    txn,
+                    _BackgroundUpdates.EVENT_EDGES_DROP_INVALID_ROWS,
+                    {"last_event_id": endpoint},
+                )
+                return False
+
+            # if that was the final batch, we validate the foreign key.
+            #
+            # The constraint should have been in place and enforced for new rows since
+            # before we started deleting invalid rows, so there's no chance for any
+            # invalid rows to have snuck in the meantime. In other words, this really
+            # ought to succeed.
+            logger.info("cleaned up event_edges; enabling foreign key")
+            txn.execute(
+                "ALTER TABLE event_edges VALIDATE CONSTRAINT event_edges_event_id_fkey"
+            )
+            return True
+
+        done = await self.db_pool.runInteraction(
+            desc="drop_invalid_event_edges", func=drop_invalid_event_edges_txn
+        )
+
+        if done:
+            await self.db_pool.updates._end_background_update(
+                _BackgroundUpdates.EVENT_EDGES_DROP_INVALID_ROWS
+            )
+
+        return batch_size
diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index ba385f9fc4..87b0d09039 100644
--- a/synapse/storage/databases/main/purge_events.py
+++ b/synapse/storage/databases/main/purge_events.py
@@ -214,10 +214,10 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
 
         # Delete all remote non-state events
         for table in (
+            "event_edges",
             "events",
             "event_json",
             "event_auth",
-            "event_edges",
             "event_forward_extremities",
             "event_relations",
             "event_search",