summary refs log tree commit diff
diff options
context:
space:
mode:
authorH. Shay <hillerys@element.io>2023-06-05 13:38:18 -0700
committerH. Shay <hillerys@element.io>2023-06-05 13:38:18 -0700
commit67f152b476707aea3d3e6e77771e650ab3f42d47 (patch)
tree7eaaf4b7efae5b59a8bd0e0c99bdca8e4d9c51ff
parent`N + 3`: Read from column `full_user_id` rather than `user_id` of tables `pro... (diff)
downloadsynapse-67f152b476707aea3d3e6e77771e650ab3f42d47.tar.xz
add `event_stream_ordering` columns in background job and make it depend on `replace_stream_ordering_column` job
-rw-r--r--synapse/storage/databases/state/bg_updates.py103
-rw-r--r--synapse/storage/schema/main/delta/74/03_membership_tables_event_stream_ordering.sql18
-rw-r--r--synapse/storage/schema/main/delta/74/03_membership_tables_event_stream_ordering.sql.postgres29
-rw-r--r--synapse/storage/schema/main/delta/74/03_membership_tables_event_stream_ordering.sql.sqlite23
4 files changed, 120 insertions, 53 deletions
diff --git a/synapse/storage/databases/state/bg_updates.py b/synapse/storage/databases/state/bg_updates.py
index 5b8ba436d4..8d3a40020f 100644
--- a/synapse/storage/databases/state/bg_updates.py
+++ b/synapse/storage/databases/state/bg_updates.py
@@ -22,7 +22,7 @@ from synapse.storage.database import (
     LoggingDatabaseConnection,
     LoggingTransaction,
 )
-from synapse.storage.engines import PostgresEngine
+from synapse.storage.engines import PostgresEngine, Sqlite3Engine
 from synapse.types import MutableStateMap, StateMap
 from synapse.types.state import StateFilter
 from synapse.util.caches import intern_string
@@ -328,6 +328,10 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore):
             columns=["event_stream_ordering"],
         )
 
+        self.db_pool.updates.register_background_update_handler(
+            "add_event_stream_ordering",
+            self._add_event_stream_ordering,
+        )
     async def _background_deduplicate_state(
         self, progress: dict, batch_size: int
     ) -> int:
@@ -504,3 +508,100 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore):
         )
 
         return 1
+
+    async def _add_event_stream_ordering(self, progress: dict, batch_size: int) -> int:
+        """
+        Add denormalised copies of `stream_ordering` from the corresponding row in `events`
+        to the tables current_state_events, local_current_membership, and room_memberships.
+        This is done to improve database performance by reduring JOINs.
+
+        """
+        tables = [
+            "current_state_events",
+            "local_current_membership",
+            "room_memberships",
+        ]
+
+        if isinstance(self.database_engine, PostgresEngine):
+
+            def check_pg_column(txn: LoggingTransaction, table: str) -> list:
+                """
+                check if the column event_stream_ordering already exists
+                """
+                check_sql = f"""
+                    SELECT column_name FROM information_schema.columns 
+                    WHERE table_name = '{table}' and column_name = 'event_stream_ordering';
+                    """
+                txn.execute(check_sql)
+                column = txn.fetchall()
+                return column
+
+            def add_pg_column(txn: LoggingTransaction, table: str) -> None:
+                """
+                Add column event_stream_ordering to A given table
+                """
+                add_column_sql = f"""
+                ALTER TABLE {table} ADD COLUMN event_stream_ordering BIGINT;
+                """
+                txn.execute(add_column_sql)
+
+                add_fk_sql = f"""
+                ALTER TABLE {table} ADD CONSTRAINT event_stream_ordering_fkey
+                FOREIGN KEY(event_stream_ordering) REFERENCES events(stream_ordering) NOT VALID;
+                """
+                txn.execute(add_fk_sql)
+
+            for table in tables:
+                res = await self.db_pool.runInteraction(
+                    "check_column", check_pg_column, table
+                )
+                # if the column exists do nothing
+                if not res:
+                    await self.db_pool.runInteraction(
+                        "add_event_stream_ordering",
+                        add_pg_column,
+                        table,
+                    )
+            await self.db_pool.updates._end_background_update(
+                "add_event_stream_ordering"
+            )
+            return 1
+
+        elif isinstance(self.database_engine, Sqlite3Engine):
+
+            def check_sqlite_column(txn: LoggingTransaction, table: str) -> List[tuple]:
+                """
+                Get table info (to see if column event_stream_ordering exists)
+                """
+                check_sql = f"""
+                PRAGMA table_info({table})
+                """
+                txn.execute(check_sql)
+                res = txn.fetchall()
+                return res
+
+            def add_sqlite_column(txn: LoggingTransaction, table: str) -> None:
+                """
+                Add column event_stream_ordering to given table
+                """
+                add_column_sql = f"""
+                ALTER TABLE {table} ADD COLUMN event_stream_ordering BIGINT REFERENCES events(stream_ordering);
+                """
+                txn.execute(add_column_sql)
+
+            for table in tables:
+                res = await self.db_pool.runInteraction(
+                    "check_column", check_sqlite_column, table
+                )
+                columns = [tup[1] for tup in res]
+
+                # if the column exists do nothing
+                if "event_stream_ordering" not in columns:
+                    await self.db_pool.runInteraction(
+                        "add_event_stream_ordering", add_sqlite_column, table
+                    )
+
+            await self.db_pool.updates._end_background_update(
+                "add_event_stream_ordering"
+            )
+        return 1
diff --git a/synapse/storage/schema/main/delta/74/03_membership_tables_event_stream_ordering.sql b/synapse/storage/schema/main/delta/74/03_membership_tables_event_stream_ordering.sql
new file mode 100644
index 0000000000..991d0f6f8b
--- /dev/null
+++ b/synapse/storage/schema/main/delta/74/03_membership_tables_event_stream_ordering.sql
@@ -0,0 +1,18 @@
+/* Copyright 2023 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ INSERT INTO background_updates (ordering, update_name, progress_json, depends_on)
+    VALUES
+        (7403, 'add_event_stream_ordering', '{}', 'replace_stream_ordering_column');
diff --git a/synapse/storage/schema/main/delta/74/03_membership_tables_event_stream_ordering.sql.postgres b/synapse/storage/schema/main/delta/74/03_membership_tables_event_stream_ordering.sql.postgres
deleted file mode 100644
index ceb750a9fa..0000000000
--- a/synapse/storage/schema/main/delta/74/03_membership_tables_event_stream_ordering.sql.postgres
+++ /dev/null
@@ -1,29 +0,0 @@
-/* Copyright 2022 Beeper
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
--- Each of these are denormalised copies of `stream_ordering` from the corresponding row in` events` which
--- we use to improve database performance by reduring JOINs.
-
--- NOTE: these are set to NOT VALID to prevent locks while adding the column on large existing tables,
--- which will be validated in a later migration. For all new/updated rows the FKEY will be checked.
-
-ALTER TABLE current_state_events ADD COLUMN event_stream_ordering BIGINT;
-ALTER TABLE current_state_events ADD CONSTRAINT event_stream_ordering_fkey FOREIGN KEY (event_stream_ordering) REFERENCES events(stream_ordering) NOT VALID;
-
-ALTER TABLE local_current_membership ADD COLUMN event_stream_ordering BIGINT;
-ALTER TABLE local_current_membership ADD CONSTRAINT event_stream_ordering_fkey FOREIGN KEY (event_stream_ordering) REFERENCES events(stream_ordering) NOT VALID;
-
-ALTER TABLE room_memberships ADD COLUMN event_stream_ordering BIGINT;
-ALTER TABLE room_memberships ADD CONSTRAINT event_stream_ordering_fkey FOREIGN KEY (event_stream_ordering) REFERENCES events(stream_ordering) NOT VALID;
diff --git a/synapse/storage/schema/main/delta/74/03_membership_tables_event_stream_ordering.sql.sqlite b/synapse/storage/schema/main/delta/74/03_membership_tables_event_stream_ordering.sql.sqlite
deleted file mode 100644
index 6f6283fdb7..0000000000
--- a/synapse/storage/schema/main/delta/74/03_membership_tables_event_stream_ordering.sql.sqlite
+++ /dev/null
@@ -1,23 +0,0 @@
-/* Copyright 2022 Beeper
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
--- Each of these are denormalised copies of `stream_ordering` from the corresponding row in` events` which
--- we use to improve database performance by reduring JOINs.
-
--- NOTE: sqlite does not support ADD CONSTRAINT so we add the new columns with FK constraint as-is
-
-ALTER TABLE current_state_events ADD COLUMN event_stream_ordering BIGINT REFERENCES events(stream_ordering);
-ALTER TABLE local_current_membership ADD COLUMN event_stream_ordering BIGINT REFERENCES events(stream_ordering);
-ALTER TABLE room_memberships ADD COLUMN event_stream_ordering BIGINT REFERENCES events(stream_ordering);