summary refs log tree commit diff
path: root/scripts
diff options
context:
space:
mode:
Diffstat (limited to 'scripts')
-rwxr-xr-xscripts/synapse_port_db158
1 files changed, 118 insertions, 40 deletions
diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db

index a117e61586..c982ca9350 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db
@@ -22,7 +22,7 @@ import logging import sys import time import traceback -from typing import Optional +from typing import Dict, Optional, Set import yaml @@ -294,6 +294,34 @@ class Porter(object): return table, already_ported, total_to_port, forward_chunk, backward_chunk + async def get_table_constraints(self) -> Dict[str, Set[str]]: + """Returns a map of tables that have foreign key constraints to tables they depend on. + """ + + def _get_constraints(txn): + # We can pull the information about foreign key constraints out from + # the postgres schema tables. + sql = """ + SELECT DISTINCT + tc.table_name, + ccu.table_name AS foreign_table_name + FROM + information_schema.table_constraints AS tc + INNER JOIN information_schema.constraint_column_usage AS ccu + USING (table_schema, constraint_name) + WHERE tc.constraint_type = 'FOREIGN KEY'; + """ + txn.execute(sql) + + results = {} + for table, foreign_table in txn: + results.setdefault(table, set()).add(foreign_table) + return results + + return await self.postgres_store.db_pool.runInteraction( + "get_table_constraints", _get_constraints + ) + async def handle_table( self, table, postgres_size, table_size, forward_chunk, backward_chunk ): @@ -593,7 +621,18 @@ class Porter(object): "create_port_table", create_port_table ) - # Step 2. Get tables. + # Step 2. Set up sequences + # + # We do this before porting the tables so that event if we fail half + # way through the postgres DB always have sequences that are greater + # than their respective tables. If we don't then creating the + # `DataStore` object will fail due to the inconsistency. + self.progress.set_state("Setting up sequence generators") + await self._setup_state_group_id_seq() + await self._setup_user_id_seq() + await self._setup_events_stream_seqs() + + # Step 3. Get tables. self.progress.set_state("Fetching tables") sqlite_tables = await self.sqlite_store.db_pool.simple_select_onecol( table="sqlite_master", keyvalues={"type": "table"}, retcol="name" @@ -608,7 +647,7 @@ class Porter(object): tables = set(sqlite_tables) & set(postgres_tables) logger.info("Found %d tables", len(tables)) - # Step 3. Figure out what still needs copying + # Step 4. Figure out what still needs copying self.progress.set_state("Checking on port progress") setup_res = await make_deferred_yieldable( defer.gatherResults( @@ -621,21 +660,43 @@ class Porter(object): consumeErrors=True, ) ) - - # Step 4. Do the copying. + # Map from table name to args passed to `handle_table`, i.e. a tuple + # of: `postgres_size`, `table_size`, `forward_chunk`, `backward_chunk`. + tables_to_port_info_map = {r[0]: r[1:] for r in setup_res} + + # Step 5. Do the copying. + # + # This is slightly convoluted as we need to ensure tables are ported + # in the correct order due to foreign key constraints. self.progress.set_state("Copying to postgres") - await make_deferred_yieldable( - defer.gatherResults( - [run_in_background(self.handle_table, *res) for res in setup_res], - consumeErrors=True, + + constraints = await self.get_table_constraints() + tables_ported = set() # type: Set[str] + + while tables_to_port_info_map: + # Pulls out all tables that are still to be ported and which + # only depend on tables that are already ported (if any). + tables_to_port = [ + table + for table in tables_to_port_info_map + if not constraints.get(table, set()) - tables_ported + ] + + await make_deferred_yieldable( + defer.gatherResults( + [ + run_in_background( + self.handle_table, + table, + *tables_to_port_info_map.pop(table), + ) + for table in tables_to_port + ], + consumeErrors=True, + ) ) - ) - # Step 5. Set up sequences - self.progress.set_state("Setting up sequence generators") - await self._setup_state_group_id_seq() - await self._setup_user_id_seq() - await self._setup_events_stream_seqs() + tables_ported.update(tables_to_port) self.progress.done() except Exception as e: @@ -794,45 +855,62 @@ class Porter(object): return done, remaining + done - def _setup_state_group_id_seq(self): + async def _setup_state_group_id_seq(self): + curr_id = await self.sqlite_store.db_pool.simple_select_one_onecol( + table="state_groups", keyvalues={}, retcol="MAX(id)", allow_none=True + ) + + if not curr_id: + return + def r(txn): - txn.execute("SELECT MAX(id) FROM state_groups") - curr_id = txn.fetchone()[0] - if not curr_id: - return next_id = curr_id + 1 txn.execute("ALTER SEQUENCE state_group_id_seq RESTART WITH %s", (next_id,)) - return self.postgres_store.db_pool.runInteraction("setup_state_group_id_seq", r) + await self.postgres_store.db_pool.runInteraction("setup_state_group_id_seq", r) + + async def _setup_user_id_seq(self): + curr_id = await self.sqlite_store.db_pool.runInteraction( + "setup_user_id_seq", find_max_generated_user_id_localpart + ) - def _setup_user_id_seq(self): def r(txn): - next_id = find_max_generated_user_id_localpart(txn) + 1 + next_id = curr_id + 1 txn.execute("ALTER SEQUENCE user_id_seq RESTART WITH %s", (next_id,)) return self.postgres_store.db_pool.runInteraction("setup_user_id_seq", r) - def _setup_events_stream_seqs(self): - def r(txn): - txn.execute("SELECT MAX(stream_ordering) FROM events") - curr_id = txn.fetchone()[0] - if curr_id: - next_id = curr_id + 1 - txn.execute( - "ALTER SEQUENCE events_stream_seq RESTART WITH %s", (next_id,) - ) + async def _setup_events_stream_seqs(self): + """Set the event stream sequences to the correct values. + """ + + # We get called before we've ported the events table, so we need to + # fetch the current positions from the SQLite store. + curr_forward_id = await self.sqlite_store.db_pool.simple_select_one_onecol( + table="events", keyvalues={}, retcol="MAX(stream_ordering)", allow_none=True + ) - txn.execute("SELECT -MIN(stream_ordering) FROM events") - curr_id = txn.fetchone()[0] - if curr_id: - next_id = curr_id + 1 + curr_backward_id = await self.sqlite_store.db_pool.simple_select_one_onecol( + table="events", + keyvalues={}, + retcol="MAX(-MIN(stream_ordering), 1)", + allow_none=True, + ) + + def _setup_events_stream_seqs_set_pos(txn): + if curr_forward_id: txn.execute( - "ALTER SEQUENCE events_backfill_stream_seq RESTART WITH %s", - (next_id,), + "ALTER SEQUENCE events_stream_seq RESTART WITH %s", + (curr_forward_id + 1,), ) - return self.postgres_store.db_pool.runInteraction( - "_setup_events_stream_seqs", r + txn.execute( + "ALTER SEQUENCE events_backfill_stream_seq RESTART WITH %s", + (curr_backward_id + 1,), + ) + + return await self.postgres_store.db_pool.runInteraction( + "_setup_events_stream_seqs", _setup_events_stream_seqs_set_pos, )