diff options
3 files changed, 102 insertions, 79 deletions
diff --git a/synapse/storage/databases/state/bg_updates.py b/synapse/storage/databases/state/bg_updates.py index 8d3a40020f..ab4c180451 100644 --- a/synapse/storage/databases/state/bg_updates.py +++ b/synapse/storage/databases/state/bg_updates.py @@ -332,6 +332,11 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore): "add_event_stream_ordering", self._add_event_stream_ordering, ) + + self.db_pool.updates.register_background_update_handler( + "add_stream_ordering_triggers", self._add_triggers_in_bg + ) + async def _background_deduplicate_state( self, progress: dict, batch_size: int ) -> int: @@ -605,3 +610,78 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore): "add_event_stream_ordering" ) return 1 + + async def _add_triggers_in_bg(self, progress: dict, batch_size: int) -> int: + """ + Adds triggers to the room membership tables to enforce consistency + """ + # Complain if the `event_stream_ordering` in membership tables doesn't match + # the `stream_ordering` row with the same `event_id` in `events`. + if isinstance(self.database_engine, Sqlite3Engine): + + def add_sqlite_triggers(txn: LoggingTransaction) -> None: + for table in ( + "current_state_events", + "local_current_membership", + "room_memberships", + ): + txn.execute( + f""" + CREATE TRIGGER IF NOT EXISTS {table}_bad_event_stream_ordering + BEFORE INSERT ON {table} + FOR EACH ROW + BEGIN + SELECT RAISE(ABORT, 'Incorrect event_stream_ordering in {table}') + WHERE EXISTS ( + SELECT 1 FROM events + WHERE events.event_id = NEW.event_id + AND events.stream_ordering != NEW.event_stream_ordering + ); + END; + """ + ) + + await self.db_pool.runInteraction("add_sqlite_triggers", add_sqlite_triggers) + elif isinstance(self.database_engine, PostgresEngine): + + def add_pg_triggers(txn: LoggingTransaction) -> None: + txn.execute( + """ + CREATE OR REPLACE FUNCTION check_event_stream_ordering() RETURNS trigger AS $BODY$ + BEGIN + IF EXISTS ( + SELECT 1 FROM events + WHERE events.event_id = NEW.event_id + AND events.stream_ordering != NEW.event_stream_ordering + ) THEN + RAISE EXCEPTION 'Incorrect event_stream_ordering'; + END IF; + RETURN NEW; + END; + $BODY$ LANGUAGE plpgsql; + """ + ) + + for table in ( + "current_state_events", + "local_current_membership", + "room_memberships", + ): + txn.execute( + f""" + CREATE TRIGGER check_event_stream_ordering BEFORE INSERT OR UPDATE ON {table} + FOR EACH ROW + EXECUTE PROCEDURE check_event_stream_ordering() + """ + ) + + await self.db_pool.runInteraction( + "add_postgres_triggers", add_pg_triggers + ) + else: + raise NotImplementedError("Unknown database engine") + + await self.db_pool.updates._end_background_update( + "add_stream_ordering_triggers" + ) + return 1 diff --git a/synapse/storage/schema/main/delta/74/04_membership_tables_event_stream_ordering_triggers.py b/synapse/storage/schema/main/delta/74/04_membership_tables_event_stream_ordering_triggers.py deleted file mode 100644 index 2ee2bc9422..0000000000 --- a/synapse/storage/schema/main/delta/74/04_membership_tables_event_stream_ordering_triggers.py +++ /dev/null @@ -1,79 +0,0 @@ -# Copyright 2022 Beeper -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -""" -This migration adds triggers to the room membership tables to enforce consistency. -Triggers cannot be expressed in .sql files, so we have to use a separate file. -""" -from synapse.storage.database import LoggingTransaction -from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine - - -def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None: - # Complain if the `event_stream_ordering` in membership tables doesn't match - # the `stream_ordering` row with the same `event_id` in `events`. - if isinstance(database_engine, Sqlite3Engine): - for table in ( - "current_state_events", - "local_current_membership", - "room_memberships", - ): - cur.execute( - f""" - CREATE TRIGGER IF NOT EXISTS {table}_bad_event_stream_ordering - BEFORE INSERT ON {table} - FOR EACH ROW - BEGIN - SELECT RAISE(ABORT, 'Incorrect event_stream_ordering in {table}') - WHERE EXISTS ( - SELECT 1 FROM events - WHERE events.event_id = NEW.event_id - AND events.stream_ordering != NEW.event_stream_ordering - ); - END; - """ - ) - elif isinstance(database_engine, PostgresEngine): - cur.execute( - """ - CREATE OR REPLACE FUNCTION check_event_stream_ordering() RETURNS trigger AS $BODY$ - BEGIN - IF EXISTS ( - SELECT 1 FROM events - WHERE events.event_id = NEW.event_id - AND events.stream_ordering != NEW.event_stream_ordering - ) THEN - RAISE EXCEPTION 'Incorrect event_stream_ordering'; - END IF; - RETURN NEW; - END; - $BODY$ LANGUAGE plpgsql; - """ - ) - - for table in ( - "current_state_events", - "local_current_membership", - "room_memberships", - ): - cur.execute( - f""" - CREATE TRIGGER check_event_stream_ordering BEFORE INSERT OR UPDATE ON {table} - FOR EACH ROW - EXECUTE PROCEDURE check_event_stream_ordering() - """ - ) - else: - raise NotImplementedError("Unknown database engine") diff --git a/synapse/storage/schema/main/delta/74/04_membership_tables_event_stream_ordering_triggers.sql b/synapse/storage/schema/main/delta/74/04_membership_tables_event_stream_ordering_triggers.sql new file mode 100644 index 0000000000..27dd44d31e --- /dev/null +++ b/synapse/storage/schema/main/delta/74/04_membership_tables_event_stream_ordering_triggers.sql @@ -0,0 +1,22 @@ +/* Copyright 2023 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + + +-- This migration adds triggers to the room membership tables to enforce consistency. + +INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) + VALUES + (7404, 'add_stream_ordering_triggers', '{}', 'add_event_stream_ordering'); \ No newline at end of file |