From 65a9eb899479e4c9d7de8b86780f68d478696908 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 23 Feb 2021 07:33:24 -0500 Subject: Include newly added sequences in the port DB script. (#9449) And ensure the consistency of `event_auth_chain_id`. --- scripts/synapse_port_db | 65 +++++++++++++++++++++++++++++++------------------ 1 file changed, 41 insertions(+), 24 deletions(-) (limited to 'scripts') diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index 69bf9110a6..d2aaea08f5 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -22,7 +22,7 @@ import logging import sys import time import traceback -from typing import Dict, Optional, Set +from typing import Dict, Iterable, Optional, Set import yaml @@ -629,7 +629,13 @@ class Porter(object): await self._setup_state_group_id_seq() await self._setup_user_id_seq() await self._setup_events_stream_seqs() - await self._setup_device_inbox_seq() + await self._setup_sequence( + "device_inbox_sequence", ("device_inbox", "device_federation_outbox") + ) + await self._setup_sequence( + "account_data_sequence", ("room_account_data", "room_tags_revisions", "account_data")) + await self._setup_sequence("receipts_sequence", ("receipts_linearized", )) + await self._setup_auth_chain_sequence() # Step 3. Get tables. self.progress.set_state("Fetching tables") @@ -854,7 +860,7 @@ class Porter(object): return done, remaining + done - async def _setup_state_group_id_seq(self): + async def _setup_state_group_id_seq(self) -> None: curr_id = await self.sqlite_store.db_pool.simple_select_one_onecol( table="state_groups", keyvalues={}, retcol="MAX(id)", allow_none=True ) @@ -868,7 +874,7 @@ class Porter(object): await self.postgres_store.db_pool.runInteraction("setup_state_group_id_seq", r) - async def _setup_user_id_seq(self): + async def _setup_user_id_seq(self) -> None: curr_id = await self.sqlite_store.db_pool.runInteraction( "setup_user_id_seq", find_max_generated_user_id_localpart ) @@ -877,9 +883,9 @@ class Porter(object): next_id = curr_id + 1 txn.execute("ALTER SEQUENCE user_id_seq RESTART WITH %s", (next_id,)) - return self.postgres_store.db_pool.runInteraction("setup_user_id_seq", r) + await self.postgres_store.db_pool.runInteraction("setup_user_id_seq", r) - async def _setup_events_stream_seqs(self): + async def _setup_events_stream_seqs(self) -> None: """Set the event stream sequences to the correct values. """ @@ -908,35 +914,46 @@ class Porter(object): (curr_backward_id + 1,), ) - return await self.postgres_store.db_pool.runInteraction( + await self.postgres_store.db_pool.runInteraction( "_setup_events_stream_seqs", _setup_events_stream_seqs_set_pos, ) - async def _setup_device_inbox_seq(self): - """Set the device inbox sequence to the correct value. + async def _setup_sequence(self, sequence_name: str, stream_id_tables: Iterable[str]) -> None: + """Set a sequence to the correct value. """ - curr_local_id = await self.sqlite_store.db_pool.simple_select_one_onecol( - table="device_inbox", - keyvalues={}, - retcol="COALESCE(MAX(stream_id), 1)", - allow_none=True, - ) + current_stream_ids = [] + for stream_id_table in stream_id_tables: + max_stream_id = await self.sqlite_store.db_pool.simple_select_one_onecol( + table=stream_id_table, + keyvalues={}, + retcol="COALESCE(MAX(stream_id), 1)", + allow_none=True, + ) + current_stream_ids.append(max_stream_id) - curr_federation_id = await self.sqlite_store.db_pool.simple_select_one_onecol( - table="device_federation_outbox", - keyvalues={}, - retcol="COALESCE(MAX(stream_id), 1)", - allow_none=True, - ) + next_id = max(current_stream_ids) + 1 + + def r(txn): + sql = "ALTER SEQUENCE %s RESTART WITH" % (sequence_name, ) + txn.execute(sql + " %s", (next_id, )) - next_id = max(curr_local_id, curr_federation_id) + 1 + await self.postgres_store.db_pool.runInteraction("_setup_%s" % (sequence_name,), r) + + async def _setup_auth_chain_sequence(self) -> None: + curr_chain_id = await self.sqlite_store.db_pool.simple_select_one_onecol( + table="event_auth_chains", keyvalues={}, retcol="MAX(chain_id)", allow_none=True + ) def r(txn): txn.execute( - "ALTER SEQUENCE device_inbox_sequence RESTART WITH %s", (next_id,) + "ALTER SEQUENCE event_auth_chain_id RESTART WITH %s", + (curr_chain_id,), ) - return self.postgres_store.db_pool.runInteraction("_setup_device_inbox_seq", r) + await self.postgres_store.db_pool.runInteraction( + "_setup_event_auth_chain_id", r, + ) + ############################################## -- cgit 1.4.1