diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db
index a117e61586..c982ca9350 100755
--- a/scripts/synapse_port_db
+++ b/scripts/synapse_port_db
@@ -22,7 +22,7 @@ import logging
import sys
import time
import traceback
-from typing import Optional
+from typing import Dict, Optional, Set
import yaml
@@ -294,6 +294,34 @@ class Porter(object):
return table, already_ported, total_to_port, forward_chunk, backward_chunk
+ async def get_table_constraints(self) -> Dict[str, Set[str]]:
+ """Returns a map of tables that have foreign key constraints to tables they depend on.
+ """
+
+ def _get_constraints(txn):
+ # We can pull the information about foreign key constraints out from
+ # the postgres schema tables.
+ sql = """
+ SELECT DISTINCT
+ tc.table_name,
+ ccu.table_name AS foreign_table_name
+ FROM
+ information_schema.table_constraints AS tc
+ INNER JOIN information_schema.constraint_column_usage AS ccu
+ USING (table_schema, constraint_name)
+ WHERE tc.constraint_type = 'FOREIGN KEY';
+ """
+ txn.execute(sql)
+
+ results = {}
+ for table, foreign_table in txn:
+ results.setdefault(table, set()).add(foreign_table)
+ return results
+
+ return await self.postgres_store.db_pool.runInteraction(
+ "get_table_constraints", _get_constraints
+ )
+
async def handle_table(
self, table, postgres_size, table_size, forward_chunk, backward_chunk
):
@@ -593,7 +621,18 @@ class Porter(object):
"create_port_table", create_port_table
)
- # Step 2. Get tables.
+ # Step 2. Set up sequences
+ #
+ # We do this before porting the tables so that event if we fail half
+ # way through the postgres DB always have sequences that are greater
+ # than their respective tables. If we don't then creating the
+ # `DataStore` object will fail due to the inconsistency.
+ self.progress.set_state("Setting up sequence generators")
+ await self._setup_state_group_id_seq()
+ await self._setup_user_id_seq()
+ await self._setup_events_stream_seqs()
+
+ # Step 3. Get tables.
self.progress.set_state("Fetching tables")
sqlite_tables = await self.sqlite_store.db_pool.simple_select_onecol(
table="sqlite_master", keyvalues={"type": "table"}, retcol="name"
@@ -608,7 +647,7 @@ class Porter(object):
tables = set(sqlite_tables) & set(postgres_tables)
logger.info("Found %d tables", len(tables))
- # Step 3. Figure out what still needs copying
+ # Step 4. Figure out what still needs copying
self.progress.set_state("Checking on port progress")
setup_res = await make_deferred_yieldable(
defer.gatherResults(
@@ -621,21 +660,43 @@ class Porter(object):
consumeErrors=True,
)
)
-
- # Step 4. Do the copying.
+ # Map from table name to args passed to `handle_table`, i.e. a tuple
+ # of: `postgres_size`, `table_size`, `forward_chunk`, `backward_chunk`.
+ tables_to_port_info_map = {r[0]: r[1:] for r in setup_res}
+
+ # Step 5. Do the copying.
+ #
+ # This is slightly convoluted as we need to ensure tables are ported
+ # in the correct order due to foreign key constraints.
self.progress.set_state("Copying to postgres")
- await make_deferred_yieldable(
- defer.gatherResults(
- [run_in_background(self.handle_table, *res) for res in setup_res],
- consumeErrors=True,
+
+ constraints = await self.get_table_constraints()
+ tables_ported = set() # type: Set[str]
+
+ while tables_to_port_info_map:
+ # Pulls out all tables that are still to be ported and which
+ # only depend on tables that are already ported (if any).
+ tables_to_port = [
+ table
+ for table in tables_to_port_info_map
+ if not constraints.get(table, set()) - tables_ported
+ ]
+
+ await make_deferred_yieldable(
+ defer.gatherResults(
+ [
+ run_in_background(
+ self.handle_table,
+ table,
+ *tables_to_port_info_map.pop(table),
+ )
+ for table in tables_to_port
+ ],
+ consumeErrors=True,
+ )
)
- )
- # Step 5. Set up sequences
- self.progress.set_state("Setting up sequence generators")
- await self._setup_state_group_id_seq()
- await self._setup_user_id_seq()
- await self._setup_events_stream_seqs()
+ tables_ported.update(tables_to_port)
self.progress.done()
except Exception as e:
@@ -794,45 +855,62 @@ class Porter(object):
return done, remaining + done
- def _setup_state_group_id_seq(self):
+ async def _setup_state_group_id_seq(self):
+ curr_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
+ table="state_groups", keyvalues={}, retcol="MAX(id)", allow_none=True
+ )
+
+ if not curr_id:
+ return
+
def r(txn):
- txn.execute("SELECT MAX(id) FROM state_groups")
- curr_id = txn.fetchone()[0]
- if not curr_id:
- return
next_id = curr_id + 1
txn.execute("ALTER SEQUENCE state_group_id_seq RESTART WITH %s", (next_id,))
- return self.postgres_store.db_pool.runInteraction("setup_state_group_id_seq", r)
+ await self.postgres_store.db_pool.runInteraction("setup_state_group_id_seq", r)
+
+ async def _setup_user_id_seq(self):
+ curr_id = await self.sqlite_store.db_pool.runInteraction(
+ "setup_user_id_seq", find_max_generated_user_id_localpart
+ )
- def _setup_user_id_seq(self):
def r(txn):
- next_id = find_max_generated_user_id_localpart(txn) + 1
+ next_id = curr_id + 1
txn.execute("ALTER SEQUENCE user_id_seq RESTART WITH %s", (next_id,))
return self.postgres_store.db_pool.runInteraction("setup_user_id_seq", r)
- def _setup_events_stream_seqs(self):
- def r(txn):
- txn.execute("SELECT MAX(stream_ordering) FROM events")
- curr_id = txn.fetchone()[0]
- if curr_id:
- next_id = curr_id + 1
- txn.execute(
- "ALTER SEQUENCE events_stream_seq RESTART WITH %s", (next_id,)
- )
+ async def _setup_events_stream_seqs(self):
+ """Set the event stream sequences to the correct values.
+ """
+
+ # We get called before we've ported the events table, so we need to
+ # fetch the current positions from the SQLite store.
+ curr_forward_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
+ table="events", keyvalues={}, retcol="MAX(stream_ordering)", allow_none=True
+ )
- txn.execute("SELECT -MIN(stream_ordering) FROM events")
- curr_id = txn.fetchone()[0]
- if curr_id:
- next_id = curr_id + 1
+ curr_backward_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
+ table="events",
+ keyvalues={},
+ retcol="MAX(-MIN(stream_ordering), 1)",
+ allow_none=True,
+ )
+
+ def _setup_events_stream_seqs_set_pos(txn):
+ if curr_forward_id:
txn.execute(
- "ALTER SEQUENCE events_backfill_stream_seq RESTART WITH %s",
- (next_id,),
+ "ALTER SEQUENCE events_stream_seq RESTART WITH %s",
+ (curr_forward_id + 1,),
)
- return self.postgres_store.db_pool.runInteraction(
- "_setup_events_stream_seqs", r
+ txn.execute(
+ "ALTER SEQUENCE events_backfill_stream_seq RESTART WITH %s",
+ (curr_backward_id + 1,),
+ )
+
+ return await self.postgres_store.db_pool.runInteraction(
+ "_setup_events_stream_seqs", _setup_events_stream_seqs_set_pos,
)
|