diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index c597639a7f..da3ddafeae 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -682,13 +682,33 @@ class DeviceHandler(DeviceWorkerHandler):
hosts_already_sent_to: Set[str] = set()
try:
+ stream_id, room_id = await self.store.get_device_change_last_converted_pos()
+
while True:
self._handle_new_device_update_new_data = False
- rows = await self.store.get_uncoverted_outbound_room_pokes()
+ max_stream_id = self.store.get_device_stream_token()
+ rows = await self.store.get_uncoverted_outbound_room_pokes(
+ stream_id, room_id
+ )
if not rows:
# If the DB returned nothing then there is nothing left to
# do, *unless* a new device list update happened during the
# DB query.
+
+ # Advance `(stream_id, room_id)`.
+ # `max_stream_id` comes from *before* the query for unconverted
+ # rows, which means that any unconverted rows must have a larger
+ # stream ID.
+ if max_stream_id > stream_id:
+ stream_id, room_id = max_stream_id, ""
+ await self.store.set_device_change_last_converted_pos(
+ stream_id, room_id
+ )
+ else:
+ assert max_stream_id == stream_id
+ # Avoid moving `room_id` backwards.
+ pass
+
if self._handle_new_device_update_new_data:
continue
else:
@@ -718,7 +738,6 @@ class DeviceHandler(DeviceWorkerHandler):
user_id=user_id,
device_id=device_id,
room_id=room_id,
- stream_id=stream_id,
hosts=hosts,
context=opentracing_context,
)
@@ -752,6 +771,12 @@ class DeviceHandler(DeviceWorkerHandler):
hosts_already_sent_to.update(hosts)
current_stream_id = stream_id
+ # Advance `(stream_id, room_id)`.
+ _, _, room_id, stream_id, _ = rows[-1]
+ await self.store.set_device_change_last_converted_pos(
+ stream_id, room_id
+ )
+
finally:
self._handle_new_device_update_is_processing = False
@@ -834,7 +859,6 @@ class DeviceHandler(DeviceWorkerHandler):
user_id=user_id,
device_id=device_id,
room_id=room_id,
- stream_id=None,
hosts=potentially_changed_hosts,
context=None,
)
|