diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index 1c771e48f7..40187496e2 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -108,6 +108,11 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
("device_lists_outbound_pokes", "instance_name", "stream_id"),
("device_lists_changes_in_room", "instance_name", "stream_id"),
("device_lists_remote_pending", "instance_name", "stream_id"),
+ (
+ "device_lists_changes_converted_stream_position",
+ "instance_name",
+ "stream_id",
+ ),
],
sequence_name="device_lists_sequence",
writers=["master"],
@@ -2394,15 +2399,16 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
`FALSE` have not been converted.
"""
- return cast(
- Tuple[int, str],
- await self.db_pool.simple_select_one(
- table="device_lists_changes_converted_stream_position",
- keyvalues={},
- retcols=["stream_id", "room_id"],
- desc="get_device_change_last_converted_pos",
- ),
+ # There should be only one row in this table, though we want to
+ # future-proof ourselves for when we have multiple rows (one for each
+ # instance). So to handle that case we take the minimum of all rows.
+ rows = await self.db_pool.simple_select_list(
+ table="device_lists_changes_converted_stream_position",
+ keyvalues={},
+ retcols=["stream_id", "room_id"],
+ desc="get_device_change_last_converted_pos",
)
+ return cast(Tuple[int, str], min(rows))
async def set_device_change_last_converted_pos(
self,
@@ -2417,6 +2423,10 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
await self.db_pool.simple_update_one(
table="device_lists_changes_converted_stream_position",
keyvalues={},
- updatevalues={"stream_id": stream_id, "room_id": room_id},
+ updatevalues={
+ "stream_id": stream_id,
+ "instance_name": self._instance_name,
+ "room_id": room_id,
+ },
desc="set_device_change_last_converted_pos",
)
|