summary refs log tree commit diff
path: root/synapse/storage/databases/main/devices.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases/main/devices.py')
-rw-r--r--synapse/storage/databases/main/devices.py28
1 files changed, 19 insertions, 9 deletions
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index 1c771e48f7..40187496e2 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -108,6 +108,11 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
                 ("device_lists_outbound_pokes", "instance_name", "stream_id"),
                 ("device_lists_changes_in_room", "instance_name", "stream_id"),
                 ("device_lists_remote_pending", "instance_name", "stream_id"),
+                (
+                    "device_lists_changes_converted_stream_position",
+                    "instance_name",
+                    "stream_id",
+                ),
             ],
             sequence_name="device_lists_sequence",
             writers=["master"],
@@ -2394,15 +2399,16 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
         `FALSE` have not been converted.
         """
 
-        return cast(
-            Tuple[int, str],
-            await self.db_pool.simple_select_one(
-                table="device_lists_changes_converted_stream_position",
-                keyvalues={},
-                retcols=["stream_id", "room_id"],
-                desc="get_device_change_last_converted_pos",
-            ),
+        # There should be only one row in this table, though we want to
+        # future-proof ourselves for when we have multiple rows (one for each
+        # instance). So to handle that case we take the minimum of all rows.
+        rows = await self.db_pool.simple_select_list(
+            table="device_lists_changes_converted_stream_position",
+            keyvalues={},
+            retcols=["stream_id", "room_id"],
+            desc="get_device_change_last_converted_pos",
         )
+        return cast(Tuple[int, str], min(rows))
 
     async def set_device_change_last_converted_pos(
         self,
@@ -2417,6 +2423,10 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
         await self.db_pool.simple_update_one(
             table="device_lists_changes_converted_stream_position",
             keyvalues={},
-            updatevalues={"stream_id": stream_id, "room_id": room_id},
+            updatevalues={
+                "stream_id": stream_id,
+                "instance_name": self._instance_name,
+                "room_id": room_id,
+            },
             desc="set_device_change_last_converted_pos",
         )