summary refs log tree commit diff
path: root/scripts
diff options
context:
space:
mode:
authorPatrick Cloke <clokep@users.noreply.github.com>2021-02-23 07:33:24 -0500
committerGitHub <noreply@github.com>2021-02-23 07:33:24 -0500
commit65a9eb899479e4c9d7de8b86780f68d478696908 (patch)
tree35fd4fb23178acb83f44c4502b85d31c5657ab7f /scripts
parentFix deleting pushers when using sharded pushers. (#9465) (diff)
downloadsynapse-65a9eb899479e4c9d7de8b86780f68d478696908.tar.xz
Include newly added sequences in the port DB script. (#9449)
And ensure the consistency of `event_auth_chain_id`.
Diffstat (limited to 'scripts')
-rwxr-xr-xscripts/synapse_port_db65
1 files changed, 41 insertions, 24 deletions
diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db
index 69bf9110a6..d2aaea08f5 100755
--- a/scripts/synapse_port_db
+++ b/scripts/synapse_port_db
@@ -22,7 +22,7 @@ import logging
 import sys
 import time
 import traceback
-from typing import Dict, Optional, Set
+from typing import Dict, Iterable, Optional, Set
 
 import yaml
 
@@ -629,7 +629,13 @@ class Porter(object):
             await self._setup_state_group_id_seq()
             await self._setup_user_id_seq()
             await self._setup_events_stream_seqs()
-            await self._setup_device_inbox_seq()
+            await self._setup_sequence(
+                "device_inbox_sequence", ("device_inbox", "device_federation_outbox")
+            )
+            await self._setup_sequence(
+                "account_data_sequence", ("room_account_data", "room_tags_revisions", "account_data"))
+            await self._setup_sequence("receipts_sequence", ("receipts_linearized", ))
+            await self._setup_auth_chain_sequence()
 
             # Step 3. Get tables.
             self.progress.set_state("Fetching tables")
@@ -854,7 +860,7 @@ class Porter(object):
 
         return done, remaining + done
 
-    async def _setup_state_group_id_seq(self):
+    async def _setup_state_group_id_seq(self) -> None:
         curr_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
             table="state_groups", keyvalues={}, retcol="MAX(id)", allow_none=True
         )
@@ -868,7 +874,7 @@ class Porter(object):
 
         await self.postgres_store.db_pool.runInteraction("setup_state_group_id_seq", r)
 
-    async def _setup_user_id_seq(self):
+    async def _setup_user_id_seq(self) -> None:
         curr_id = await self.sqlite_store.db_pool.runInteraction(
             "setup_user_id_seq", find_max_generated_user_id_localpart
         )
@@ -877,9 +883,9 @@ class Porter(object):
             next_id = curr_id + 1
             txn.execute("ALTER SEQUENCE user_id_seq RESTART WITH %s", (next_id,))
 
-        return self.postgres_store.db_pool.runInteraction("setup_user_id_seq", r)
+        await self.postgres_store.db_pool.runInteraction("setup_user_id_seq", r)
 
-    async def _setup_events_stream_seqs(self):
+    async def _setup_events_stream_seqs(self) -> None:
         """Set the event stream sequences to the correct values.
         """
 
@@ -908,35 +914,46 @@ class Porter(object):
                 (curr_backward_id + 1,),
             )
 
-        return await self.postgres_store.db_pool.runInteraction(
+        await self.postgres_store.db_pool.runInteraction(
             "_setup_events_stream_seqs", _setup_events_stream_seqs_set_pos,
         )
 
-    async def _setup_device_inbox_seq(self):
-        """Set the device inbox sequence to the correct value.
+    async def _setup_sequence(self, sequence_name: str, stream_id_tables: Iterable[str]) -> None:
+        """Set a sequence to the correct value.
         """
-        curr_local_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
-            table="device_inbox",
-            keyvalues={},
-            retcol="COALESCE(MAX(stream_id), 1)",
-            allow_none=True,
-        )
+        current_stream_ids = []
+        for stream_id_table in stream_id_tables:
+            max_stream_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
+                table=stream_id_table,
+                keyvalues={},
+                retcol="COALESCE(MAX(stream_id), 1)",
+                allow_none=True,
+            )
+            current_stream_ids.append(max_stream_id)
 
-        curr_federation_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
-            table="device_federation_outbox",
-            keyvalues={},
-            retcol="COALESCE(MAX(stream_id), 1)",
-            allow_none=True,
-        )
+        next_id = max(current_stream_ids) + 1
+
+        def r(txn):
+            sql = "ALTER SEQUENCE %s RESTART WITH" % (sequence_name, )
+            txn.execute(sql + " %s", (next_id, ))
 
-        next_id = max(curr_local_id, curr_federation_id) + 1
+        await self.postgres_store.db_pool.runInteraction("_setup_%s" % (sequence_name,), r)
+
+    async def _setup_auth_chain_sequence(self) -> None:
+        curr_chain_id = await self.sqlite_store.db_pool.simple_select_one_onecol(
+            table="event_auth_chains", keyvalues={}, retcol="MAX(chain_id)", allow_none=True
+        )
 
         def r(txn):
             txn.execute(
-                "ALTER SEQUENCE device_inbox_sequence RESTART WITH %s", (next_id,)
+                "ALTER SEQUENCE event_auth_chain_id RESTART WITH %s",
+                (curr_chain_id,),
             )
 
-        return self.postgres_store.db_pool.runInteraction("_setup_device_inbox_seq", r)
+        await self.postgres_store.db_pool.runInteraction(
+            "_setup_event_auth_chain_id", r,
+        )
+
 
 
 ##############################################