summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/storage/databases/main/devices.py28
-rw-r--r--synapse/storage/schema/main/delta/85/05_add_instance_names_converted_pos.sql16
2 files changed, 35 insertions, 9 deletions
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index 1c771e48f7..40187496e2 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -108,6 +108,11 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
                 ("device_lists_outbound_pokes", "instance_name", "stream_id"),
                 ("device_lists_changes_in_room", "instance_name", "stream_id"),
                 ("device_lists_remote_pending", "instance_name", "stream_id"),
+                (
+                    "device_lists_changes_converted_stream_position",
+                    "instance_name",
+                    "stream_id",
+                ),
             ],
             sequence_name="device_lists_sequence",
             writers=["master"],
@@ -2394,15 +2399,16 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
         `FALSE` have not been converted.
         """
 
-        return cast(
-            Tuple[int, str],
-            await self.db_pool.simple_select_one(
-                table="device_lists_changes_converted_stream_position",
-                keyvalues={},
-                retcols=["stream_id", "room_id"],
-                desc="get_device_change_last_converted_pos",
-            ),
+        # There should be only one row in this table, though we want to
+        # future-proof ourselves for when we have multiple rows (one for each
+        # instance). So to handle that case we take the minimum of all rows.
+        rows = await self.db_pool.simple_select_list(
+            table="device_lists_changes_converted_stream_position",
+            keyvalues={},
+            retcols=["stream_id", "room_id"],
+            desc="get_device_change_last_converted_pos",
         )
+        return cast(Tuple[int, str], min(rows))
 
     async def set_device_change_last_converted_pos(
         self,
@@ -2417,6 +2423,10 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
         await self.db_pool.simple_update_one(
             table="device_lists_changes_converted_stream_position",
             keyvalues={},
-            updatevalues={"stream_id": stream_id, "room_id": room_id},
+            updatevalues={
+                "stream_id": stream_id,
+                "instance_name": self._instance_name,
+                "room_id": room_id,
+            },
             desc="set_device_change_last_converted_pos",
         )
diff --git a/synapse/storage/schema/main/delta/85/05_add_instance_names_converted_pos.sql b/synapse/storage/schema/main/delta/85/05_add_instance_names_converted_pos.sql
new file mode 100644
index 0000000000..c3f2b6a1dd
--- /dev/null
+++ b/synapse/storage/schema/main/delta/85/05_add_instance_names_converted_pos.sql
@@ -0,0 +1,16 @@
+--
+-- This file is licensed under the Affero General Public License (AGPL) version 3.
+--
+-- Copyright (C) 2024 New Vector, Ltd
+--
+-- This program is free software: you can redistribute it and/or modify
+-- it under the terms of the GNU Affero General Public License as
+-- published by the Free Software Foundation, either version 3 of the
+-- License, or (at your option) any later version.
+--
+-- See the GNU Affero General Public License for more details:
+-- <https://www.gnu.org/licenses/agpl-3.0.html>.
+
+-- Add `instance_name` columns to stream tables to allow them to be used with
+-- `MultiWriterIdGenerator`
+ALTER TABLE device_lists_changes_converted_stream_position ADD COLUMN instance_name TEXT;