summary refs log tree commit diff
path: root/synapse/storage/databases/main/devices.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases/main/devices.py')
-rw-r--r--synapse/storage/databases/main/devices.py107
1 files changed, 69 insertions, 38 deletions
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index 57230df5ae..37629115ab 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -2008,27 +2008,48 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
         )
 
     async def get_uncoverted_outbound_room_pokes(
-        self, limit: int = 10
+        self, start_stream_id: int, start_room_id: str, limit: int = 10
     ) -> List[Tuple[str, str, str, int, Optional[Dict[str, str]]]]:
         """Get device list changes by room that have not yet been handled and
         written to `device_lists_outbound_pokes`.
 
+        Args:
+            start_stream_id: Together with `start_room_id`, indicates the position after
+                which to return device list changes.
+            start_room_id: Together with `start_stream_id`, indicates the position after
+                which to return device list changes.
+            limit: The maximum number of device list changes to return.
+
         Returns:
-            A list of user ID, device ID, room ID, stream ID and optional opentracing context.
+            A list of user ID, device ID, room ID, stream ID and optional opentracing
+            context, in order of ascending (stream ID, room ID).
         """
 
         sql = """
             SELECT user_id, device_id, room_id, stream_id, opentracing_context
             FROM device_lists_changes_in_room
-            WHERE NOT converted_to_destinations
-            ORDER BY stream_id
+            WHERE
+                (stream_id, room_id) > (?, ?) AND
+                stream_id <= ? AND
+                NOT converted_to_destinations
+            ORDER BY stream_id ASC, room_id ASC
             LIMIT ?
         """
 
         def get_uncoverted_outbound_room_pokes_txn(
             txn: LoggingTransaction,
         ) -> List[Tuple[str, str, str, int, Optional[Dict[str, str]]]]:
-            txn.execute(sql, (limit,))
+            txn.execute(
+                sql,
+                (
+                    start_stream_id,
+                    start_room_id,
+                    # Avoid returning rows if there may be uncommitted device list
+                    # changes with smaller stream IDs.
+                    self._device_list_id_gen.get_current_token(),
+                    limit,
+                ),
+            )
 
             return [
                 (
@@ -2050,49 +2071,25 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
         user_id: str,
         device_id: str,
         room_id: str,
-        stream_id: Optional[int],
         hosts: Collection[str],
         context: Optional[Dict[str, str]],
     ) -> None:
         """Queue the device update to be sent to the given set of hosts,
         calculated from the room ID.
-
-        Marks the associated row in `device_lists_changes_in_room` as handled,
-        if `stream_id` is provided.
         """
+        if not hosts:
+            return
 
         def add_device_list_outbound_pokes_txn(
             txn: LoggingTransaction, stream_ids: List[int]
         ) -> None:
-            if hosts:
-                self._add_device_outbound_poke_to_stream_txn(
-                    txn,
-                    user_id=user_id,
-                    device_id=device_id,
-                    hosts=hosts,
-                    stream_ids=stream_ids,
-                    context=context,
-                )
-
-            if stream_id:
-                self.db_pool.simple_update_txn(
-                    txn,
-                    table="device_lists_changes_in_room",
-                    keyvalues={
-                        "user_id": user_id,
-                        "device_id": device_id,
-                        "stream_id": stream_id,
-                        "room_id": room_id,
-                    },
-                    updatevalues={"converted_to_destinations": True},
-                )
-
-        if not hosts:
-            # If there are no hosts then we don't try and generate stream IDs.
-            return await self.db_pool.runInteraction(
-                "add_device_list_outbound_pokes",
-                add_device_list_outbound_pokes_txn,
-                [],
+            self._add_device_outbound_poke_to_stream_txn(
+                txn,
+                user_id=user_id,
+                device_id=device_id,
+                hosts=hosts,
+                stream_ids=stream_ids,
+                context=context,
             )
 
         async with self._device_list_id_gen.get_next_mult(len(hosts)) as stream_ids:
@@ -2156,3 +2153,37 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
             "get_pending_remote_device_list_updates_for_room",
             get_pending_remote_device_list_updates_for_room_txn,
         )
+
+    async def get_device_change_last_converted_pos(self) -> Tuple[int, str]:
+        """
+        Get the position of the last row in `device_list_changes_in_room` that has been
+        converted to `device_lists_outbound_pokes`.
+
+        Rows with a strictly greater position where `converted_to_destinations` is
+        `FALSE` have not been converted.
+        """
+
+        row = await self.db_pool.simple_select_one(
+            table="device_lists_changes_converted_stream_position",
+            keyvalues={},
+            retcols=["stream_id", "room_id"],
+            desc="get_device_change_last_converted_pos",
+        )
+        return row["stream_id"], row["room_id"]
+
+    async def set_device_change_last_converted_pos(
+        self,
+        stream_id: int,
+        room_id: str,
+    ) -> None:
+        """
+        Set the position of the last row in `device_list_changes_in_room` that has been
+        converted to `device_lists_outbound_pokes`.
+        """
+
+        await self.db_pool.simple_update_one(
+            table="device_lists_changes_converted_stream_position",
+            keyvalues={},
+            updatevalues={"stream_id": stream_id, "room_id": room_id},
+            desc="set_device_change_last_converted_pos",
+        )