summary refs log tree commit diff
path: root/synapse/_scripts
diff options
context:
space:
mode:
authorErik Johnston <erikj@element.io>2024-05-30 12:07:32 +0100
committerGitHub <noreply@github.com>2024-05-30 11:07:32 +0000
commitd16910ca021320f0fa09c6cf82a802ee97e22a0c (patch)
treeeea35c87fecdccec394c8afee0ad23182d4c783d /synapse/_scripts
parentClean out invalid destinations from outbox (#17242) (diff)
downloadsynapse-d16910ca021320f0fa09c6cf82a802ee97e22a0c.tar.xz
Replaces all usages of `StreamIdGenerator` with `MultiWriterIdGenerator` (#17229)
Replaces all usages of `StreamIdGenerator` with `MultiWriterIdGenerator`, which is safer.
Diffstat (limited to 'synapse/_scripts')
-rwxr-xr-xsynapse/_scripts/synapse_port_db.py71
1 files changed, 61 insertions, 10 deletions
diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py
index 1e56f46911..3bb4a34938 100755
--- a/synapse/_scripts/synapse_port_db.py
+++ b/synapse/_scripts/synapse_port_db.py
@@ -777,22 +777,74 @@ class Porter:
             await self._setup_events_stream_seqs()
             await self._setup_sequence(
                 "un_partial_stated_event_stream_sequence",
-                ("un_partial_stated_event_stream",),
+                [("un_partial_stated_event_stream", "stream_id")],
             )
             await self._setup_sequence(
-                "device_inbox_sequence", ("device_inbox", "device_federation_outbox")
+                "device_inbox_sequence",
+                [
+                    ("device_inbox", "stream_id"),
+                    ("device_federation_outbox", "stream_id"),
+                ],
             )
             await self._setup_sequence(
                 "account_data_sequence",
-                ("room_account_data", "room_tags_revisions", "account_data"),
+                [
+                    ("room_account_data", "stream_id"),
+                    ("room_tags_revisions", "stream_id"),
+                    ("account_data", "stream_id"),
+                ],
+            )
+            await self._setup_sequence(
+                "receipts_sequence",
+                [
+                    ("receipts_linearized", "stream_id"),
+                ],
+            )
+            await self._setup_sequence(
+                "presence_stream_sequence",
+                [
+                    ("presence_stream", "stream_id"),
+                ],
             )
-            await self._setup_sequence("receipts_sequence", ("receipts_linearized",))
-            await self._setup_sequence("presence_stream_sequence", ("presence_stream",))
             await self._setup_auth_chain_sequence()
             await self._setup_sequence(
                 "application_services_txn_id_seq",
-                ("application_services_txns",),
-                "txn_id",
+                [
+                    (
+                        "application_services_txns",
+                        "txn_id",
+                    )
+                ],
+            )
+            await self._setup_sequence(
+                "device_lists_sequence",
+                [
+                    ("device_lists_stream", "stream_id"),
+                    ("user_signature_stream", "stream_id"),
+                    ("device_lists_outbound_pokes", "stream_id"),
+                    ("device_lists_changes_in_room", "stream_id"),
+                    ("device_lists_remote_pending", "stream_id"),
+                    ("device_lists_changes_converted_stream_position", "stream_id"),
+                ],
+            )
+            await self._setup_sequence(
+                "e2e_cross_signing_keys_sequence",
+                [
+                    ("e2e_cross_signing_keys", "stream_id"),
+                ],
+            )
+            await self._setup_sequence(
+                "push_rules_stream_sequence",
+                [
+                    ("push_rules_stream", "stream_id"),
+                ],
+            )
+            await self._setup_sequence(
+                "pushers_sequence",
+                [
+                    ("pushers", "id"),
+                    ("deleted_pushers", "stream_id"),
+                ],
             )
 
             # Step 3. Get tables.
@@ -1101,12 +1153,11 @@ class Porter:
     async def _setup_sequence(
         self,
         sequence_name: str,
-        stream_id_tables: Iterable[str],
-        column_name: str = "stream_id",
+        stream_id_tables: Iterable[Tuple[str, str]],
     ) -> None:
         """Set a sequence to the correct value."""
         current_stream_ids = []
-        for stream_id_table in stream_id_tables:
+        for stream_id_table, column_name in stream_id_tables:
             max_stream_id = cast(
                 int,
                 await self.sqlite_store.db_pool.simple_select_one_onecol(