summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/12893.misc1
-rw-r--r--synapse/storage/databases/main/events.py6
-rw-r--r--synapse/storage/databases/main/events_bg_updates.py116
-rw-r--r--synapse/storage/databases/main/purge_events.py2
-rw-r--r--synapse/storage/schema/__init__.py12
-rw-r--r--synapse/storage/schema/main/delta/71/01rebuild_event_edges.sql.postgres43
-rw-r--r--synapse/storage/schema/main/delta/71/01rebuild_event_edges.sql.sqlite47
7 files changed, 216 insertions, 11 deletions
diff --git a/changelog.d/12893.misc b/changelog.d/12893.misc
new file mode 100644
index 0000000000..5705210303
--- /dev/null
+++ b/changelog.d/12893.misc
@@ -0,0 +1 @@
+Simplify the database schema for `event_edges`.
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index a8773374be..a3e12f1e9b 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -2296,11 +2296,9 @@ class PersistEventsStore:
         self.db_pool.simple_insert_many_txn(
             txn,
             table="event_edges",
-            keys=("event_id", "prev_event_id", "room_id", "is_state"),
+            keys=("event_id", "prev_event_id"),
             values=[
-                (ev.event_id, e_id, ev.room_id, False)
-                for ev in events
-                for e_id in ev.prev_event_ids()
+                (ev.event_id, e_id) for ev in events for e_id in ev.prev_event_ids()
             ],
         )
 
diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index bea34a4c4a..eeca85fc94 100644
--- a/synapse/storage/databases/main/events_bg_updates.py
+++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -1,4 +1,4 @@
-# Copyright 2019-2021 The Matrix.org Foundation C.I.C.
+# Copyright 2019-2022 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -64,6 +64,9 @@ class _BackgroundUpdates:
     INDEX_STREAM_ORDERING2_TS = "index_stream_ordering2_ts"
     REPLACE_STREAM_ORDERING_COLUMN = "replace_stream_ordering_column"
 
+    EVENT_EDGES_DROP_INVALID_ROWS = "event_edges_drop_invalid_rows"
+    EVENT_EDGES_REPLACE_INDEX = "event_edges_replace_index"
+
 
 @attr.s(slots=True, frozen=True, auto_attribs=True)
 class _CalculateChainCover:
@@ -235,6 +238,21 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
 
         ################################################################################
 
+        self.db_pool.updates.register_background_update_handler(
+            _BackgroundUpdates.EVENT_EDGES_DROP_INVALID_ROWS,
+            self._background_drop_invalid_event_edges_rows,
+        )
+
+        self.db_pool.updates.register_background_index_update(
+            _BackgroundUpdates.EVENT_EDGES_REPLACE_INDEX,
+            index_name="event_edges_event_id_prev_event_id_idx",
+            table="event_edges",
+            columns=["event_id", "prev_event_id"],
+            unique=True,
+            # the old index which just covered event_id is now redundant.
+            replaces_index="ev_edges_id",
+        )
+
     async def _background_reindex_fields_sender(
         self, progress: JsonDict, batch_size: int
     ) -> int:
@@ -1285,3 +1303,99 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
         )
 
         return 0
+
+    async def _background_drop_invalid_event_edges_rows(
+        self, progress: JsonDict, batch_size: int
+    ) -> int:
+        """Drop invalid rows from event_edges
+
+        This only runs for postgres. For SQLite, it all happens synchronously.
+
+        Firstly, drop any rows with is_state=True. These may have been added a long time
+        ago, but they are no longer used.
+
+        We also drop rows that do not correspond to entries in `events`, and add a
+        foreign key.
+        """
+
+        last_event_id = progress.get("last_event_id", "")
+
+        def drop_invalid_event_edges_txn(txn: LoggingTransaction) -> bool:
+            """Returns True if we're done."""
+
+            # first we need to find an endpoint.
+            txn.execute(
+                """
+                SELECT event_id FROM event_edges
+                WHERE event_id > ?
+                ORDER BY event_id
+                LIMIT 1 OFFSET ?
+                """,
+                (last_event_id, batch_size),
+            )
+
+            endpoint = None
+            row = txn.fetchone()
+
+            if row:
+                endpoint = row[0]
+
+            where_clause = "ee.event_id > ?"
+            args = [last_event_id]
+            if endpoint:
+                where_clause += " AND ee.event_id <= ?"
+                args.append(endpoint)
+
+            # now delete any that:
+            #   - have is_state=TRUE, or
+            #   - do not correspond to a row in `events`
+            txn.execute(
+                f"""
+                DELETE FROM event_edges
+                WHERE event_id IN (
+                   SELECT ee.event_id
+                   FROM event_edges ee
+                     LEFT JOIN events ev USING (event_id)
+                   WHERE ({where_clause}) AND
+                     (is_state OR ev.event_id IS NULL)
+                )""",
+                args,
+            )
+
+            logger.info(
+                "cleaned up event_edges up to %s: removed %i/%i rows",
+                endpoint,
+                txn.rowcount,
+                batch_size,
+            )
+
+            if endpoint is not None:
+                self.db_pool.updates._background_update_progress_txn(
+                    txn,
+                    _BackgroundUpdates.EVENT_EDGES_DROP_INVALID_ROWS,
+                    {"last_event_id": endpoint},
+                )
+                return False
+
+            # if that was the final batch, we validate the foreign key.
+            #
+            # The constraint should have been in place and enforced for new rows since
+            # before we started deleting invalid rows, so there's no chance for any
+            # invalid rows to have snuck in the meantime. In other words, this really
+            # ought to succeed.
+            logger.info("cleaned up event_edges; enabling foreign key")
+            txn.execute(
+                "ALTER TABLE event_edges VALIDATE CONSTRAINT event_edges_event_id_fkey"
+            )
+            return True
+
+        done = await self.db_pool.runInteraction(
+            desc="drop_invalid_event_edges", func=drop_invalid_event_edges_txn
+        )
+
+        if done:
+            await self.db_pool.updates._end_background_update(
+                _BackgroundUpdates.EVENT_EDGES_DROP_INVALID_ROWS
+            )
+
+        return batch_size
diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index ba385f9fc4..87b0d09039 100644
--- a/synapse/storage/databases/main/purge_events.py
+++ b/synapse/storage/databases/main/purge_events.py
@@ -214,10 +214,10 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
 
         # Delete all remote non-state events
         for table in (
+            "event_edges",
             "events",
             "event_json",
             "event_auth",
-            "event_edges",
             "event_forward_extremities",
             "event_relations",
             "event_search",
diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index 5843fae605..dc237e3032 100644
--- a/synapse/storage/schema/__init__.py
+++ b/synapse/storage/schema/__init__.py
@@ -12,7 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-SCHEMA_VERSION = 71  # remember to update the list below when updating
+SCHEMA_VERSION = 72  # remember to update the list below when updating
 """Represents the expectations made by the codebase about the database schema
 
 This should be incremented whenever the codebase changes its requirements on the
@@ -71,14 +71,16 @@ Changes in SCHEMA_VERSION = 70:
 Changes in SCHEMA_VERSION = 71:
     - event_edges.room_id is no longer read from.
     - Tables related to groups are no longer accessed.
+
+Changes in SCHEMA_VERSION = 72:
+    - event_edges.(room_id, is_state) are no longer written to.
 """
 
 
 SCHEMA_COMPAT_VERSION = (
-    # We now assume that `device_lists_changes_in_room` has been filled out for
-    # recent device_list_updates.
-    # ... and that `application_services_state.last_txn` is not used.
-    69
+    # We no longer maintain `event_edges.room_id`, so synapses with SCHEMA_VERSION < 71
+    # will break.
+    71
 )
 """Limit on how far the synapse codebase can be rolled back without breaking db compat
 
diff --git a/synapse/storage/schema/main/delta/71/01rebuild_event_edges.sql.postgres b/synapse/storage/schema/main/delta/71/01rebuild_event_edges.sql.postgres
new file mode 100644
index 0000000000..f32f445858
--- /dev/null
+++ b/synapse/storage/schema/main/delta/71/01rebuild_event_edges.sql.postgres
@@ -0,0 +1,43 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- We're going to stop populating event_edges.room_id and event_edges.is_state,
+-- which means we now need to give them defaults.
+
+-- We also drop the exising unique constraint which spans all four columns. Franky
+-- it's not doing much, and there are other indexes on event_id and prev_event_id.
+-- Later on we introduce a proper unique constraint on (event_id, prev_event_id).
+--
+-- We also add a foreign key constraint (which will be enforced for new rows), but
+-- don't yet validate it for existing rows (since that's slow, and we haven't yet
+-- checked that all the rows are valid)
+
+ALTER TABLE event_edges
+   ALTER room_id DROP NOT NULL,
+   ALTER is_state SET DEFAULT FALSE,
+   DROP CONSTRAINT IF EXISTS event_edges_event_id_prev_event_id_room_id_is_state_key,
+   ADD CONSTRAINT event_edges_event_id_fkey FOREIGN KEY (event_id) REFERENCES events(event_id) NOT VALID;
+
+-- In the background, we drop any rows with is_state=True. These may have been
+-- added a long time ago, but they are no longer used.
+--
+-- We also drop rows that do not correspond to entries in `events`, and finally
+-- validate the foreign key.
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+  (7101, 'event_edges_drop_invalid_rows', '{}');
+
+-- We'll then create a new unique index on (event_id, prev_event_id).
+INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) VALUES
+  (7101, 'event_edges_replace_index', '{}', 'event_edges_drop_invalid_rows');
diff --git a/synapse/storage/schema/main/delta/71/01rebuild_event_edges.sql.sqlite b/synapse/storage/schema/main/delta/71/01rebuild_event_edges.sql.sqlite
new file mode 100644
index 0000000000..0bb86edd2a
--- /dev/null
+++ b/synapse/storage/schema/main/delta/71/01rebuild_event_edges.sql.sqlite
@@ -0,0 +1,47 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- We're going to stop populating event_edges.room_id and event_edges.is_state,
+-- which means we now need to give them defaults.
+--
+-- We also take the opportunity to:
+--   - drop any rows with is_state=True (these were populated a long time ago, but
+--     are no longer used.)
+--   - drop any rows which do not correspond to entries in `events`
+--   - tighten the unique index so that it applies just to (event_id, prev_event_id)
+--   - drop the "ev_edges_id" index, which is redundant to the above.
+--   - add a foreign key constraint from event_id to `events`
+
+CREATE TABLE new_event_edges (
+  event_id TEXT NOT NULL,
+  prev_event_id TEXT NOT NULL,
+  room_id TEXT NULL,
+  is_state BOOL NOT NULL DEFAULT 0,
+  FOREIGN KEY(event_id) REFERENCES events(event_id)
+);
+
+INSERT INTO new_event_edges
+    SELECT ee.event_id, ee.prev_event_id, ee.room_id, ee.is_state
+    FROM event_edges ee JOIN events ev USING (event_id)
+    WHERE NOT ee.is_state;
+
+DROP TABLE event_edges;
+
+ALTER TABLE new_event_edges RENAME TO event_edges;
+
+CREATE UNIQUE INDEX event_edges_event_id_prev_event_id_idx
+  ON event_edges (event_id, prev_event_id);
+
+CREATE INDEX ev_edges_prev_id ON event_edges (prev_event_id);