diff options
author | Erik Johnston <erikj@element.io> | 2024-05-30 12:07:32 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-05-30 11:07:32 +0000 |
commit | d16910ca021320f0fa09c6cf82a802ee97e22a0c (patch) | |
tree | eea35c87fecdccec394c8afee0ad23182d4c783d /synapse/_scripts | |
parent | Clean out invalid destinations from outbox (#17242) (diff) | |
download | synapse-d16910ca021320f0fa09c6cf82a802ee97e22a0c.tar.xz |
Replaces all usages of `StreamIdGenerator` with `MultiWriterIdGenerator` (#17229)
Replaces all usages of `StreamIdGenerator` with `MultiWriterIdGenerator`, which is safer.
Diffstat (limited to 'synapse/_scripts')
-rwxr-xr-x | synapse/_scripts/synapse_port_db.py | 71 |
1 files changed, 61 insertions, 10 deletions
diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py index 1e56f46911..3bb4a34938 100755 --- a/synapse/_scripts/synapse_port_db.py +++ b/synapse/_scripts/synapse_port_db.py @@ -777,22 +777,74 @@ class Porter: await self._setup_events_stream_seqs() await self._setup_sequence( "un_partial_stated_event_stream_sequence", - ("un_partial_stated_event_stream",), + [("un_partial_stated_event_stream", "stream_id")], ) await self._setup_sequence( - "device_inbox_sequence", ("device_inbox", "device_federation_outbox") + "device_inbox_sequence", + [ + ("device_inbox", "stream_id"), + ("device_federation_outbox", "stream_id"), + ], ) await self._setup_sequence( "account_data_sequence", - ("room_account_data", "room_tags_revisions", "account_data"), + [ + ("room_account_data", "stream_id"), + ("room_tags_revisions", "stream_id"), + ("account_data", "stream_id"), + ], + ) + await self._setup_sequence( + "receipts_sequence", + [ + ("receipts_linearized", "stream_id"), + ], + ) + await self._setup_sequence( + "presence_stream_sequence", + [ + ("presence_stream", "stream_id"), + ], ) - await self._setup_sequence("receipts_sequence", ("receipts_linearized",)) - await self._setup_sequence("presence_stream_sequence", ("presence_stream",)) await self._setup_auth_chain_sequence() await self._setup_sequence( "application_services_txn_id_seq", - ("application_services_txns",), - "txn_id", + [ + ( + "application_services_txns", + "txn_id", + ) + ], + ) + await self._setup_sequence( + "device_lists_sequence", + [ + ("device_lists_stream", "stream_id"), + ("user_signature_stream", "stream_id"), + ("device_lists_outbound_pokes", "stream_id"), + ("device_lists_changes_in_room", "stream_id"), + ("device_lists_remote_pending", "stream_id"), + ("device_lists_changes_converted_stream_position", "stream_id"), + ], + ) + await self._setup_sequence( + "e2e_cross_signing_keys_sequence", + [ + ("e2e_cross_signing_keys", "stream_id"), + ], + ) + await self._setup_sequence( + "push_rules_stream_sequence", + [ + ("push_rules_stream", "stream_id"), + ], + ) + await self._setup_sequence( + "pushers_sequence", + [ + ("pushers", "id"), + ("deleted_pushers", "stream_id"), + ], ) # Step 3. Get tables. @@ -1101,12 +1153,11 @@ class Porter: async def _setup_sequence( self, sequence_name: str, - stream_id_tables: Iterable[str], - column_name: str = "stream_id", + stream_id_tables: Iterable[Tuple[str, str]], ) -> None: """Set a sequence to the correct value.""" current_stream_ids = [] - for stream_id_table in stream_id_tables: + for stream_id_table, column_name in stream_id_tables: max_stream_id = cast( int, await self.sqlite_store.db_pool.simple_select_one_onecol( |