summary refs log tree commit diff
path: root/synapse/handlers/device.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/device.py')
-rw-r--r--synapse/handlers/device.py30
1 files changed, 27 insertions, 3 deletions
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index c597639a7f..da3ddafeae 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -682,13 +682,33 @@ class DeviceHandler(DeviceWorkerHandler):
         hosts_already_sent_to: Set[str] = set()
 
         try:
+            stream_id, room_id = await self.store.get_device_change_last_converted_pos()
+
             while True:
                 self._handle_new_device_update_new_data = False
-                rows = await self.store.get_uncoverted_outbound_room_pokes()
+                max_stream_id = self.store.get_device_stream_token()
+                rows = await self.store.get_uncoverted_outbound_room_pokes(
+                    stream_id, room_id
+                )
                 if not rows:
                     # If the DB returned nothing then there is nothing left to
                     # do, *unless* a new device list update happened during the
                     # DB query.
+
+                    # Advance `(stream_id, room_id)`.
+                    # `max_stream_id` comes from *before* the query for unconverted
+                    # rows, which means that any unconverted rows must have a larger
+                    # stream ID.
+                    if max_stream_id > stream_id:
+                        stream_id, room_id = max_stream_id, ""
+                        await self.store.set_device_change_last_converted_pos(
+                            stream_id, room_id
+                        )
+                    else:
+                        assert max_stream_id == stream_id
+                        # Avoid moving `room_id` backwards.
+                        pass
+
                     if self._handle_new_device_update_new_data:
                         continue
                     else:
@@ -718,7 +738,6 @@ class DeviceHandler(DeviceWorkerHandler):
                         user_id=user_id,
                         device_id=device_id,
                         room_id=room_id,
-                        stream_id=stream_id,
                         hosts=hosts,
                         context=opentracing_context,
                     )
@@ -752,6 +771,12 @@ class DeviceHandler(DeviceWorkerHandler):
                     hosts_already_sent_to.update(hosts)
                     current_stream_id = stream_id
 
+                # Advance `(stream_id, room_id)`.
+                _, _, room_id, stream_id, _ = rows[-1]
+                await self.store.set_device_change_last_converted_pos(
+                    stream_id, room_id
+                )
+
         finally:
             self._handle_new_device_update_is_processing = False
 
@@ -834,7 +859,6 @@ class DeviceHandler(DeviceWorkerHandler):
                 user_id=user_id,
                 device_id=device_id,
                 room_id=room_id,
-                stream_id=None,
                 hosts=potentially_changed_hosts,
                 context=None,
             )