diff options
author | H. Shay <hillerys@element.io> | 2023-06-05 13:38:18 -0700 |
---|---|---|
committer | H. Shay <hillerys@element.io> | 2023-06-05 13:38:18 -0700 |
commit | 67f152b476707aea3d3e6e77771e650ab3f42d47 (patch) | |
tree | 7eaaf4b7efae5b59a8bd0e0c99bdca8e4d9c51ff | |
parent | `N + 3`: Read from column `full_user_id` rather than `user_id` of tables `pro... (diff) | |
download | synapse-67f152b476707aea3d3e6e77771e650ab3f42d47.tar.xz |
add `event_stream_ordering` columns in background job and make it depend on `replace_stream_ordering_column` job
4 files changed, 120 insertions, 53 deletions
diff --git a/synapse/storage/databases/state/bg_updates.py b/synapse/storage/databases/state/bg_updates.py index 5b8ba436d4..8d3a40020f 100644 --- a/synapse/storage/databases/state/bg_updates.py +++ b/synapse/storage/databases/state/bg_updates.py @@ -22,7 +22,7 @@ from synapse.storage.database import ( LoggingDatabaseConnection, LoggingTransaction, ) -from synapse.storage.engines import PostgresEngine +from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.types import MutableStateMap, StateMap from synapse.types.state import StateFilter from synapse.util.caches import intern_string @@ -328,6 +328,10 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore): columns=["event_stream_ordering"], ) + self.db_pool.updates.register_background_update_handler( + "add_event_stream_ordering", + self._add_event_stream_ordering, + ) async def _background_deduplicate_state( self, progress: dict, batch_size: int ) -> int: @@ -504,3 +508,100 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore): ) return 1 + + async def _add_event_stream_ordering(self, progress: dict, batch_size: int) -> int: + """ + Add denormalised copies of `stream_ordering` from the corresponding row in `events` + to the tables current_state_events, local_current_membership, and room_memberships. + This is done to improve database performance by reduring JOINs. + + """ + tables = [ + "current_state_events", + "local_current_membership", + "room_memberships", + ] + + if isinstance(self.database_engine, PostgresEngine): + + def check_pg_column(txn: LoggingTransaction, table: str) -> list: + """ + check if the column event_stream_ordering already exists + """ + check_sql = f""" + SELECT column_name FROM information_schema.columns + WHERE table_name = '{table}' and column_name = 'event_stream_ordering'; + """ + txn.execute(check_sql) + column = txn.fetchall() + return column + + def add_pg_column(txn: LoggingTransaction, table: str) -> None: + """ + Add column event_stream_ordering to A given table + """ + add_column_sql = f""" + ALTER TABLE {table} ADD COLUMN event_stream_ordering BIGINT; + """ + txn.execute(add_column_sql) + + add_fk_sql = f""" + ALTER TABLE {table} ADD CONSTRAINT event_stream_ordering_fkey + FOREIGN KEY(event_stream_ordering) REFERENCES events(stream_ordering) NOT VALID; + """ + txn.execute(add_fk_sql) + + for table in tables: + res = await self.db_pool.runInteraction( + "check_column", check_pg_column, table + ) + # if the column exists do nothing + if not res: + await self.db_pool.runInteraction( + "add_event_stream_ordering", + add_pg_column, + table, + ) + await self.db_pool.updates._end_background_update( + "add_event_stream_ordering" + ) + return 1 + + elif isinstance(self.database_engine, Sqlite3Engine): + + def check_sqlite_column(txn: LoggingTransaction, table: str) -> List[tuple]: + """ + Get table info (to see if column event_stream_ordering exists) + """ + check_sql = f""" + PRAGMA table_info({table}) + """ + txn.execute(check_sql) + res = txn.fetchall() + return res + + def add_sqlite_column(txn: LoggingTransaction, table: str) -> None: + """ + Add column event_stream_ordering to given table + """ + add_column_sql = f""" + ALTER TABLE {table} ADD COLUMN event_stream_ordering BIGINT REFERENCES events(stream_ordering); + """ + txn.execute(add_column_sql) + + for table in tables: + res = await self.db_pool.runInteraction( + "check_column", check_sqlite_column, table + ) + columns = [tup[1] for tup in res] + + # if the column exists do nothing + if "event_stream_ordering" not in columns: + await self.db_pool.runInteraction( + "add_event_stream_ordering", add_sqlite_column, table + ) + + await self.db_pool.updates._end_background_update( + "add_event_stream_ordering" + ) + return 1 diff --git a/synapse/storage/schema/main/delta/74/03_membership_tables_event_stream_ordering.sql b/synapse/storage/schema/main/delta/74/03_membership_tables_event_stream_ordering.sql new file mode 100644 index 0000000000..991d0f6f8b --- /dev/null +++ b/synapse/storage/schema/main/delta/74/03_membership_tables_event_stream_ordering.sql @@ -0,0 +1,18 @@ +/* Copyright 2023 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + INSERT INTO background_updates (ordering, update_name, progress_json, depends_on) + VALUES + (7403, 'add_event_stream_ordering', '{}', 'replace_stream_ordering_column'); diff --git a/synapse/storage/schema/main/delta/74/03_membership_tables_event_stream_ordering.sql.postgres b/synapse/storage/schema/main/delta/74/03_membership_tables_event_stream_ordering.sql.postgres deleted file mode 100644 index ceb750a9fa..0000000000 --- a/synapse/storage/schema/main/delta/74/03_membership_tables_event_stream_ordering.sql.postgres +++ /dev/null @@ -1,29 +0,0 @@ -/* Copyright 2022 Beeper - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - --- Each of these are denormalised copies of `stream_ordering` from the corresponding row in` events` which --- we use to improve database performance by reduring JOINs. - --- NOTE: these are set to NOT VALID to prevent locks while adding the column on large existing tables, --- which will be validated in a later migration. For all new/updated rows the FKEY will be checked. - -ALTER TABLE current_state_events ADD COLUMN event_stream_ordering BIGINT; -ALTER TABLE current_state_events ADD CONSTRAINT event_stream_ordering_fkey FOREIGN KEY (event_stream_ordering) REFERENCES events(stream_ordering) NOT VALID; - -ALTER TABLE local_current_membership ADD COLUMN event_stream_ordering BIGINT; -ALTER TABLE local_current_membership ADD CONSTRAINT event_stream_ordering_fkey FOREIGN KEY (event_stream_ordering) REFERENCES events(stream_ordering) NOT VALID; - -ALTER TABLE room_memberships ADD COLUMN event_stream_ordering BIGINT; -ALTER TABLE room_memberships ADD CONSTRAINT event_stream_ordering_fkey FOREIGN KEY (event_stream_ordering) REFERENCES events(stream_ordering) NOT VALID; diff --git a/synapse/storage/schema/main/delta/74/03_membership_tables_event_stream_ordering.sql.sqlite b/synapse/storage/schema/main/delta/74/03_membership_tables_event_stream_ordering.sql.sqlite deleted file mode 100644 index 6f6283fdb7..0000000000 --- a/synapse/storage/schema/main/delta/74/03_membership_tables_event_stream_ordering.sql.sqlite +++ /dev/null @@ -1,23 +0,0 @@ -/* Copyright 2022 Beeper - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - --- Each of these are denormalised copies of `stream_ordering` from the corresponding row in` events` which --- we use to improve database performance by reduring JOINs. - --- NOTE: sqlite does not support ADD CONSTRAINT so we add the new columns with FK constraint as-is - -ALTER TABLE current_state_events ADD COLUMN event_stream_ordering BIGINT REFERENCES events(stream_ordering); -ALTER TABLE local_current_membership ADD COLUMN event_stream_ordering BIGINT REFERENCES events(stream_ordering); -ALTER TABLE room_memberships ADD COLUMN event_stream_ordering BIGINT REFERENCES events(stream_ordering); |