From 5829872bec9b9986c741eafec36e47774e4d2b3e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 11 Nov 2020 15:07:34 +0000 Subject: Fix port script to handle foreign key constraints (#8730) --- scripts/synapse_port_db | 68 ++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 62 insertions(+), 6 deletions(-) (limited to 'scripts/synapse_port_db') diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index 13c0120bb4..7a638ea8e3 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -22,7 +22,7 @@ import logging import sys import time import traceback -from typing import Optional +from typing import Dict, Optional, Set import yaml @@ -292,6 +292,34 @@ class Porter(object): return table, already_ported, total_to_port, forward_chunk, backward_chunk + async def get_table_constraints(self) -> Dict[str, Set[str]]: + """Returns a map of tables that have foreign key constraints to tables they depend on. + """ + + def _get_constraints(txn): + # We can pull the information about foreign key constraints out from + # the postgres schema tables. + sql = """ + SELECT DISTINCT + tc.table_name, + ccu.table_name AS foreign_table_name + FROM + information_schema.table_constraints AS tc + INNER JOIN information_schema.constraint_column_usage AS ccu + USING (table_schema, constraint_name) + WHERE tc.constraint_type = 'FOREIGN KEY'; + """ + txn.execute(sql) + + results = {} + for table, foreign_table in txn: + results.setdefault(table, set()).add(foreign_table) + return results + + return await self.postgres_store.db_pool.runInteraction( + "get_table_constraints", _get_constraints + ) + async def handle_table( self, table, postgres_size, table_size, forward_chunk, backward_chunk ): @@ -619,15 +647,43 @@ class Porter(object): consumeErrors=True, ) ) + # Map from table name to args passed to `handle_table`, i.e. a tuple + # of: `postgres_size`, `table_size`, `forward_chunk`, `backward_chunk`. + tables_to_port_info_map = {r[0]: r[1:] for r in setup_res} # Step 4. Do the copying. + # + # This is slightly convoluted as we need to ensure tables are ported + # in the correct order due to foreign key constraints. self.progress.set_state("Copying to postgres") - await make_deferred_yieldable( - defer.gatherResults( - [run_in_background(self.handle_table, *res) for res in setup_res], - consumeErrors=True, + + constraints = await self.get_table_constraints() + tables_ported = set() # type: Set[str] + + while tables_to_port_info_map: + # Pulls out all tables that are still to be ported and which + # only depend on tables that are already ported (if any). + tables_to_port = [ + table + for table in tables_to_port_info_map + if not constraints.get(table, set()) - tables_ported + ] + + await make_deferred_yieldable( + defer.gatherResults( + [ + run_in_background( + self.handle_table, + table, + *tables_to_port_info_map.pop(table), + ) + for table in tables_to_port + ], + consumeErrors=True, + ) ) - ) + + tables_ported.update(tables_to_port) # Step 5. Set up sequences self.progress.set_state("Setting up sequence generators") -- cgit 1.5.1 From 41a389934e45dd2a9e96b0b465626adef18b25b8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 11 Nov 2020 15:08:03 +0000 Subject: Fix port script fails when DB has no backfilled events. (#8729) Fixes #8618 --- changelog.d/8729.bugfix | 1 + scripts/synapse_port_db | 12 +++++------- 2 files changed, 6 insertions(+), 7 deletions(-) create mode 100644 changelog.d/8729.bugfix (limited to 'scripts/synapse_port_db') diff --git a/changelog.d/8729.bugfix b/changelog.d/8729.bugfix new file mode 100644 index 0000000000..7f59a3b9e2 --- /dev/null +++ b/changelog.d/8729.bugfix @@ -0,0 +1 @@ +Fix port script fails when DB has no backfilled events. Broke in v1.21.0. diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index 7a638ea8e3..604b961bd2 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -876,14 +876,12 @@ class Porter(object): "ALTER SEQUENCE events_stream_seq RESTART WITH %s", (next_id,) ) - txn.execute("SELECT -MIN(stream_ordering) FROM events") + txn.execute("SELECT GREATEST(-MIN(stream_ordering), 1) FROM events") curr_id = txn.fetchone()[0] - if curr_id: - next_id = curr_id + 1 - txn.execute( - "ALTER SEQUENCE events_backfill_stream_seq RESTART WITH %s", - (next_id,), - ) + next_id = curr_id + 1 + txn.execute( + "ALTER SEQUENCE events_backfill_stream_seq RESTART WITH %s", (next_id,), + ) return self.postgres_store.db_pool.runInteraction( "_setup_events_stream_seqs", r -- cgit 1.5.1 From 1b15a3d92cbe1ee9475319ff81abe8760d6be19f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 13 Nov 2020 11:53:51 +0000 Subject: Fix port script so that it can be run again after failure. (#8755) If the script fails (or is CTRL-C'ed) between porting some of the events table and copying of the sequences then the port script will immediately die if run again due to the postgres DB having inconsistencies between sequences and tables. The fix is to move the porting of sequences to before porting the tables, so that there is never a period where the Postgres DB is inconsistent. To do that we need to change how we port the sequences so that it calculates the values from the SQLite DB rather than the Postgres DB. Fixes #8619 --- changelog.d/8755.bugfix | 1 + scripts/synapse_port_db | 84 +++++++++++++++++++++++++++++++------------------ 2 files changed, 55 insertions(+), 30 deletions(-) create mode 100644 changelog.d/8755.bugfix (limited to 'scripts/synapse_port_db') diff --git a/changelog.d/8755.bugfix b/changelog.d/8755.bugfix new file mode 100644 index 0000000000..42bbed3ac2 --- /dev/null +++ b/changelog.d/8755.bugfix @@ -0,0 +1 @@ +Fix port script so that it can be run again after a failure. Broke in v1.21.0. diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index 604b961bd2..5ad17aa90f 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -619,7 +619,18 @@ class Porter(object): "create_port_table", create_port_table ) - # Step 2. Get tables. + # Step 2. Set up sequences + # + # We do this before porting the tables so that event if we fail half + # way through the postgres DB always have sequences that are greater + # than their respective tables. If we don't then creating the + # `DataStore` object will fail due to the inconsistency. + self.progress.set_state("Setting up sequence generators") + await self._setup_state_group_id_seq() + await self._setup_user_id_seq() + await self._setup_events_stream_seqs() + + # Step 3. Get tables. self.progress.set_state("Fetching tables") sqlite_tables = await self.sqlite_store.db_pool.simple_select_onecol( table="sqlite_master", keyvalues={"type": "table"}, retcol="name" @@ -634,7 +645,7 @@ class Porter(object): tables = set(sqlite_tables) & set(postgres_tables) logger.info("Found %d tables", len(tables)) - # Step 3. Figure out what still needs copying + # Step 4. Figure out what still needs copying self.progress.set_state("Checking on port progress") setup_res = await make_deferred_yieldable( defer.gatherResults( @@ -651,7 +662,7 @@ class Porter(object): # of: `postgres_size`, `table_size`, `forward_chunk`, `backward_chunk`. tables_to_port_info_map = {r[0]: r[1:] for r in setup_res} - # Step 4. Do the copying. + # Step 5. Do the copying. # # This is slightly convoluted as we need to ensure tables are ported # in the correct order due to foreign key constraints. @@ -685,12 +696,6 @@ class Porter(object): tables_ported.update(tables_to_port) - # Step 5. Set up sequences - self.progress.set_state("Setting up sequence generators") - await self._setup_state_group_id_seq() - await self._setup_user_id_seq() - await self._setup_events_stream_seqs() - self.progress.done() except Exception as e: global end_error_exec_info @@ -848,43 +853,62 @@ class Porter(object): return done, remaining + done - def _setup_state_group_id_seq(self): + async def _setup_state_group_id_seq(self): + curr_id = await self.sqlite_store.db_pool.simple_select_one_onecol( + table="state_groups", keyvalues={}, retcol="MAX(id)", allow_none=True + ) + + if not curr_id: + return + def r(txn): - txn.execute("SELECT MAX(id) FROM state_groups") - curr_id = txn.fetchone()[0] - if not curr_id: - return next_id = curr_id + 1 txn.execute("ALTER SEQUENCE state_group_id_seq RESTART WITH %s", (next_id,)) - return self.postgres_store.db_pool.runInteraction("setup_state_group_id_seq", r) + await self.postgres_store.db_pool.runInteraction("setup_state_group_id_seq", r) + + async def _setup_user_id_seq(self): + curr_id = await self.sqlite_store.db_pool.runInteraction( + "setup_user_id_seq", find_max_generated_user_id_localpart + ) - def _setup_user_id_seq(self): def r(txn): - next_id = find_max_generated_user_id_localpart(txn) + 1 + next_id = curr_id + 1 txn.execute("ALTER SEQUENCE user_id_seq RESTART WITH %s", (next_id,)) return self.postgres_store.db_pool.runInteraction("setup_user_id_seq", r) - def _setup_events_stream_seqs(self): - def r(txn): - txn.execute("SELECT MAX(stream_ordering) FROM events") - curr_id = txn.fetchone()[0] - if curr_id: - next_id = curr_id + 1 + async def _setup_events_stream_seqs(self): + """Set the event stream sequences to the correct values. + """ + + # We get called before we've ported the events table, so we need to + # fetch the current positions from the SQLite store. + curr_forward_id = await self.sqlite_store.db_pool.simple_select_one_onecol( + table="events", keyvalues={}, retcol="MAX(stream_ordering)", allow_none=True + ) + + curr_backward_id = await self.sqlite_store.db_pool.simple_select_one_onecol( + table="events", + keyvalues={}, + retcol="MAX(-MIN(stream_ordering), 1)", + allow_none=True, + ) + + def _setup_events_stream_seqs_set_pos(txn): + if curr_forward_id: txn.execute( - "ALTER SEQUENCE events_stream_seq RESTART WITH %s", (next_id,) + "ALTER SEQUENCE events_stream_seq RESTART WITH %s", + (curr_forward_id + 1,), ) - txn.execute("SELECT GREATEST(-MIN(stream_ordering), 1) FROM events") - curr_id = txn.fetchone()[0] - next_id = curr_id + 1 txn.execute( - "ALTER SEQUENCE events_backfill_stream_seq RESTART WITH %s", (next_id,), + "ALTER SEQUENCE events_backfill_stream_seq RESTART WITH %s", + (curr_backward_id + 1,), ) - return self.postgres_store.db_pool.runInteraction( - "_setup_events_stream_seqs", r + return await self.postgres_store.db_pool.runInteraction( + "_setup_events_stream_seqs", _setup_events_stream_seqs_set_pos, ) -- cgit 1.5.1