summary refs log tree commit diff
diff options
context:
space:
mode:
authorH. Shay <hillerys@element.io>2023-06-05 13:49:24 -0700
committerH. Shay <hillerys@element.io>2023-06-05 13:49:24 -0700
commit7bc2aefe92600ae8e96a7bce3555bb9eef09a554 (patch)
tree26eb7a1dbb346b6e73a8037d9aae0f60ffc36017
parentadd `event_stream_ordering` columns in background job and make it depend on `... (diff)
downloadsynapse-7bc2aefe92600ae8e96a7bce3555bb9eef09a554.tar.xz
add triggers to `events_stream_ordering` columns in bg job and make this job depend on the `add_stream_ordering` bg job
-rw-r--r--synapse/storage/databases/state/bg_updates.py80
-rw-r--r--synapse/storage/schema/main/delta/74/04_membership_tables_event_stream_ordering_triggers.py79
-rw-r--r--synapse/storage/schema/main/delta/74/04_membership_tables_event_stream_ordering_triggers.sql22
3 files changed, 102 insertions, 79 deletions
diff --git a/synapse/storage/databases/state/bg_updates.py b/synapse/storage/databases/state/bg_updates.py
index 8d3a40020f..ab4c180451 100644
--- a/synapse/storage/databases/state/bg_updates.py
+++ b/synapse/storage/databases/state/bg_updates.py
@@ -332,6 +332,11 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore):
             "add_event_stream_ordering",
             self._add_event_stream_ordering,
         )
+
+        self.db_pool.updates.register_background_update_handler(
+            "add_stream_ordering_triggers", self._add_triggers_in_bg
+        )
+
     async def _background_deduplicate_state(
         self, progress: dict, batch_size: int
     ) -> int:
@@ -605,3 +610,78 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore):
                 "add_event_stream_ordering"
             )
         return 1
+
+    async def _add_triggers_in_bg(self, progress: dict, batch_size: int) -> int:
+        """
+        Adds triggers to the room membership tables to enforce consistency
+        """
+        # Complain if the `event_stream_ordering` in membership tables doesn't match
+        # the `stream_ordering` row with the same `event_id` in `events`.
+        if isinstance(self.database_engine, Sqlite3Engine):
+
+            def add_sqlite_triggers(txn: LoggingTransaction) -> None:
+                for table in (
+                    "current_state_events",
+                    "local_current_membership",
+                    "room_memberships",
+                ):
+                    txn.execute(
+                        f"""
+                        CREATE TRIGGER IF NOT EXISTS {table}_bad_event_stream_ordering
+                        BEFORE INSERT ON {table}
+                        FOR EACH ROW
+                        BEGIN
+                            SELECT RAISE(ABORT, 'Incorrect event_stream_ordering in {table}')
+                            WHERE EXISTS (
+                                SELECT 1 FROM events
+                                WHERE events.event_id = NEW.event_id
+                                   AND events.stream_ordering != NEW.event_stream_ordering
+                            );
+                        END;
+                        """
+                    )
+
+            await self.db_pool.runInteraction("add_sqlite_triggers", add_sqlite_triggers)
+        elif isinstance(self.database_engine, PostgresEngine):
+
+            def add_pg_triggers(txn: LoggingTransaction) -> None:
+                txn.execute(
+                    """
+                    CREATE OR REPLACE FUNCTION check_event_stream_ordering() RETURNS trigger AS $BODY$
+                    BEGIN
+                        IF EXISTS (
+                            SELECT 1 FROM events
+                            WHERE events.event_id = NEW.event_id
+                               AND events.stream_ordering != NEW.event_stream_ordering
+                        ) THEN
+                            RAISE EXCEPTION 'Incorrect event_stream_ordering';
+                        END IF;
+                        RETURN NEW;
+                    END;
+                    $BODY$ LANGUAGE plpgsql;
+                    """
+                )
+
+                for table in (
+                    "current_state_events",
+                    "local_current_membership",
+                    "room_memberships",
+                ):
+                    txn.execute(
+                        f"""
+                        CREATE TRIGGER check_event_stream_ordering BEFORE INSERT OR UPDATE ON {table}
+                        FOR EACH ROW
+                        EXECUTE PROCEDURE check_event_stream_ordering()
+                        """
+                    )
+
+            await self.db_pool.runInteraction(
+                "add_postgres_triggers", add_pg_triggers
+            )
+        else:
+            raise NotImplementedError("Unknown database engine")
+
+        await self.db_pool.updates._end_background_update(
+            "add_stream_ordering_triggers"
+        )
+        return 1
diff --git a/synapse/storage/schema/main/delta/74/04_membership_tables_event_stream_ordering_triggers.py b/synapse/storage/schema/main/delta/74/04_membership_tables_event_stream_ordering_triggers.py
deleted file mode 100644
index 2ee2bc9422..0000000000
--- a/synapse/storage/schema/main/delta/74/04_membership_tables_event_stream_ordering_triggers.py
+++ /dev/null
@@ -1,79 +0,0 @@
-# Copyright 2022 Beeper
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-"""
-This migration adds triggers to the room membership tables to enforce consistency.
-Triggers cannot be expressed in .sql files, so we have to use a separate file.
-"""
-from synapse.storage.database import LoggingTransaction
-from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
-
-
-def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
-    # Complain if the `event_stream_ordering` in membership tables doesn't match
-    # the `stream_ordering` row with the same `event_id` in `events`.
-    if isinstance(database_engine, Sqlite3Engine):
-        for table in (
-            "current_state_events",
-            "local_current_membership",
-            "room_memberships",
-        ):
-            cur.execute(
-                f"""
-                CREATE TRIGGER IF NOT EXISTS {table}_bad_event_stream_ordering
-                BEFORE INSERT ON {table}
-                FOR EACH ROW
-                BEGIN
-                    SELECT RAISE(ABORT, 'Incorrect event_stream_ordering in {table}')
-                    WHERE EXISTS (
-                        SELECT 1 FROM events
-                        WHERE events.event_id = NEW.event_id
-                           AND events.stream_ordering != NEW.event_stream_ordering
-                    );
-                END;
-                """
-            )
-    elif isinstance(database_engine, PostgresEngine):
-        cur.execute(
-            """
-            CREATE OR REPLACE FUNCTION check_event_stream_ordering() RETURNS trigger AS $BODY$
-            BEGIN
-                IF EXISTS (
-                    SELECT 1 FROM events
-                    WHERE events.event_id = NEW.event_id
-                       AND events.stream_ordering != NEW.event_stream_ordering
-                ) THEN
-                    RAISE EXCEPTION 'Incorrect event_stream_ordering';
-                END IF;
-                RETURN NEW;
-            END;
-            $BODY$ LANGUAGE plpgsql;
-            """
-        )
-
-        for table in (
-            "current_state_events",
-            "local_current_membership",
-            "room_memberships",
-        ):
-            cur.execute(
-                f"""
-                CREATE TRIGGER check_event_stream_ordering BEFORE INSERT OR UPDATE ON {table}
-                FOR EACH ROW
-                EXECUTE PROCEDURE check_event_stream_ordering()
-                """
-            )
-    else:
-        raise NotImplementedError("Unknown database engine")
diff --git a/synapse/storage/schema/main/delta/74/04_membership_tables_event_stream_ordering_triggers.sql b/synapse/storage/schema/main/delta/74/04_membership_tables_event_stream_ordering_triggers.sql
new file mode 100644
index 0000000000..27dd44d31e
--- /dev/null
+++ b/synapse/storage/schema/main/delta/74/04_membership_tables_event_stream_ordering_triggers.sql
@@ -0,0 +1,22 @@
+/* Copyright 2023 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+-- This migration adds triggers to the room membership tables to enforce consistency.
+
+INSERT INTO background_updates (ordering, update_name, progress_json, depends_on)
+    VALUES
+        (7404, 'add_stream_ordering_triggers', '{}', 'add_event_stream_ordering');
\ No newline at end of file