summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/device.py30
-rw-r--r--synapse/storage/database.py13
-rw-r--r--synapse/storage/databases/main/devices.py107
-rw-r--r--synapse/storage/schema/main/delta/73/12refactor_device_list_outbound_pokes.sql53
4 files changed, 156 insertions, 47 deletions
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index c597639a7f..da3ddafeae 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -682,13 +682,33 @@ class DeviceHandler(DeviceWorkerHandler):
         hosts_already_sent_to: Set[str] = set()
 
         try:
+            stream_id, room_id = await self.store.get_device_change_last_converted_pos()
+
             while True:
                 self._handle_new_device_update_new_data = False
-                rows = await self.store.get_uncoverted_outbound_room_pokes()
+                max_stream_id = self.store.get_device_stream_token()
+                rows = await self.store.get_uncoverted_outbound_room_pokes(
+                    stream_id, room_id
+                )
                 if not rows:
                     # If the DB returned nothing then there is nothing left to
                     # do, *unless* a new device list update happened during the
                     # DB query.
+
+                    # Advance `(stream_id, room_id)`.
+                    # `max_stream_id` comes from *before* the query for unconverted
+                    # rows, which means that any unconverted rows must have a larger
+                    # stream ID.
+                    if max_stream_id > stream_id:
+                        stream_id, room_id = max_stream_id, ""
+                        await self.store.set_device_change_last_converted_pos(
+                            stream_id, room_id
+                        )
+                    else:
+                        assert max_stream_id == stream_id
+                        # Avoid moving `room_id` backwards.
+                        pass
+
                     if self._handle_new_device_update_new_data:
                         continue
                     else:
@@ -718,7 +738,6 @@ class DeviceHandler(DeviceWorkerHandler):
                         user_id=user_id,
                         device_id=device_id,
                         room_id=room_id,
-                        stream_id=stream_id,
                         hosts=hosts,
                         context=opentracing_context,
                     )
@@ -752,6 +771,12 @@ class DeviceHandler(DeviceWorkerHandler):
                     hosts_already_sent_to.update(hosts)
                     current_stream_id = stream_id
 
+                # Advance `(stream_id, room_id)`.
+                _, _, room_id, stream_id, _ = rows[-1]
+                await self.store.set_device_change_last_converted_pos(
+                    stream_id, room_id
+                )
+
         finally:
             self._handle_new_device_update_is_processing = False
 
@@ -834,7 +859,6 @@ class DeviceHandler(DeviceWorkerHandler):
                 user_id=user_id,
                 device_id=device_id,
                 room_id=room_id,
-                stream_id=None,
                 hosts=potentially_changed_hosts,
                 context=None,
             )
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index 0dc44b246c..a14b13aec8 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -2075,13 +2075,14 @@ class DatabasePool:
         retcols: Collection[str],
         allow_none: bool = False,
     ) -> Optional[Dict[str, Any]]:
-        select_sql = "SELECT %s FROM %s WHERE %s" % (
-            ", ".join(retcols),
-            table,
-            " AND ".join("%s = ?" % (k,) for k in keyvalues),
-        )
+        select_sql = "SELECT %s FROM %s" % (", ".join(retcols), table)
+
+        if keyvalues:
+            select_sql += " WHERE %s" % (" AND ".join("%s = ?" % k for k in keyvalues),)
+            txn.execute(select_sql, list(keyvalues.values()))
+        else:
+            txn.execute(select_sql)
 
-        txn.execute(select_sql, list(keyvalues.values()))
         row = txn.fetchone()
 
         if not row:
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index 57230df5ae..37629115ab 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -2008,27 +2008,48 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
         )
 
     async def get_uncoverted_outbound_room_pokes(
-        self, limit: int = 10
+        self, start_stream_id: int, start_room_id: str, limit: int = 10
     ) -> List[Tuple[str, str, str, int, Optional[Dict[str, str]]]]:
         """Get device list changes by room that have not yet been handled and
         written to `device_lists_outbound_pokes`.
 
+        Args:
+            start_stream_id: Together with `start_room_id`, indicates the position after
+                which to return device list changes.
+            start_room_id: Together with `start_stream_id`, indicates the position after
+                which to return device list changes.
+            limit: The maximum number of device list changes to return.
+
         Returns:
-            A list of user ID, device ID, room ID, stream ID and optional opentracing context.
+            A list of user ID, device ID, room ID, stream ID and optional opentracing
+            context, in order of ascending (stream ID, room ID).
         """
 
         sql = """
             SELECT user_id, device_id, room_id, stream_id, opentracing_context
             FROM device_lists_changes_in_room
-            WHERE NOT converted_to_destinations
-            ORDER BY stream_id
+            WHERE
+                (stream_id, room_id) > (?, ?) AND
+                stream_id <= ? AND
+                NOT converted_to_destinations
+            ORDER BY stream_id ASC, room_id ASC
             LIMIT ?
         """
 
         def get_uncoverted_outbound_room_pokes_txn(
             txn: LoggingTransaction,
         ) -> List[Tuple[str, str, str, int, Optional[Dict[str, str]]]]:
-            txn.execute(sql, (limit,))
+            txn.execute(
+                sql,
+                (
+                    start_stream_id,
+                    start_room_id,
+                    # Avoid returning rows if there may be uncommitted device list
+                    # changes with smaller stream IDs.
+                    self._device_list_id_gen.get_current_token(),
+                    limit,
+                ),
+            )
 
             return [
                 (
@@ -2050,49 +2071,25 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
         user_id: str,
         device_id: str,
         room_id: str,
-        stream_id: Optional[int],
         hosts: Collection[str],
         context: Optional[Dict[str, str]],
     ) -> None:
         """Queue the device update to be sent to the given set of hosts,
         calculated from the room ID.
-
-        Marks the associated row in `device_lists_changes_in_room` as handled,
-        if `stream_id` is provided.
         """
+        if not hosts:
+            return
 
         def add_device_list_outbound_pokes_txn(
             txn: LoggingTransaction, stream_ids: List[int]
         ) -> None:
-            if hosts:
-                self._add_device_outbound_poke_to_stream_txn(
-                    txn,
-                    user_id=user_id,
-                    device_id=device_id,
-                    hosts=hosts,
-                    stream_ids=stream_ids,
-                    context=context,
-                )
-
-            if stream_id:
-                self.db_pool.simple_update_txn(
-                    txn,
-                    table="device_lists_changes_in_room",
-                    keyvalues={
-                        "user_id": user_id,
-                        "device_id": device_id,
-                        "stream_id": stream_id,
-                        "room_id": room_id,
-                    },
-                    updatevalues={"converted_to_destinations": True},
-                )
-
-        if not hosts:
-            # If there are no hosts then we don't try and generate stream IDs.
-            return await self.db_pool.runInteraction(
-                "add_device_list_outbound_pokes",
-                add_device_list_outbound_pokes_txn,
-                [],
+            self._add_device_outbound_poke_to_stream_txn(
+                txn,
+                user_id=user_id,
+                device_id=device_id,
+                hosts=hosts,
+                stream_ids=stream_ids,
+                context=context,
             )
 
         async with self._device_list_id_gen.get_next_mult(len(hosts)) as stream_ids:
@@ -2156,3 +2153,37 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
             "get_pending_remote_device_list_updates_for_room",
             get_pending_remote_device_list_updates_for_room_txn,
         )
+
+    async def get_device_change_last_converted_pos(self) -> Tuple[int, str]:
+        """
+        Get the position of the last row in `device_list_changes_in_room` that has been
+        converted to `device_lists_outbound_pokes`.
+
+        Rows with a strictly greater position where `converted_to_destinations` is
+        `FALSE` have not been converted.
+        """
+
+        row = await self.db_pool.simple_select_one(
+            table="device_lists_changes_converted_stream_position",
+            keyvalues={},
+            retcols=["stream_id", "room_id"],
+            desc="get_device_change_last_converted_pos",
+        )
+        return row["stream_id"], row["room_id"]
+
+    async def set_device_change_last_converted_pos(
+        self,
+        stream_id: int,
+        room_id: str,
+    ) -> None:
+        """
+        Set the position of the last row in `device_list_changes_in_room` that has been
+        converted to `device_lists_outbound_pokes`.
+        """
+
+        await self.db_pool.simple_update_one(
+            table="device_lists_changes_converted_stream_position",
+            keyvalues={},
+            updatevalues={"stream_id": stream_id, "room_id": room_id},
+            desc="set_device_change_last_converted_pos",
+        )
diff --git a/synapse/storage/schema/main/delta/73/12refactor_device_list_outbound_pokes.sql b/synapse/storage/schema/main/delta/73/12refactor_device_list_outbound_pokes.sql
new file mode 100644
index 0000000000..93d7fcb79b
--- /dev/null
+++ b/synapse/storage/schema/main/delta/73/12refactor_device_list_outbound_pokes.sql
@@ -0,0 +1,53 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Prior to this schema delta, we tracked the set of unconverted rows in
+-- `device_lists_changes_in_room` using the `converted_to_destinations` flag. When rows
+-- were converted to `device_lists_outbound_pokes`, the `converted_to_destinations` flag
+-- would be set.
+--
+-- After this schema delta, the `converted_to_destinations` is still populated like
+-- before, but the set of unconverted rows is determined by the `stream_id` in the new
+-- `device_lists_changes_converted_stream_position` table.
+--
+-- If rolled back, Synapse will re-send all device list changes that happened since the
+-- schema delta.
+
+CREATE TABLE IF NOT EXISTS device_lists_changes_converted_stream_position(
+    Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE,  -- Makes sure this table only has one row.
+    -- The (stream id, room id) of the last row in `device_lists_changes_in_room` that
+    -- has been converted to `device_lists_outbound_pokes`. Rows with a strictly larger
+    -- (stream id, room id) where `converted_to_destinations` is `FALSE` have not been
+    -- converted.
+    stream_id BIGINT NOT NULL,
+    -- `room_id` may be an empty string, which compares less than all valid room IDs.
+    room_id TEXT NOT NULL,
+    CHECK (Lock='X')
+);
+
+INSERT INTO device_lists_changes_converted_stream_position (stream_id, room_id) VALUES (
+    (
+        SELECT COALESCE(
+            -- The last converted stream id is the smallest unconverted stream id minus
+            -- one.
+            MIN(stream_id) - 1,
+            -- If there is no unconverted stream id, the last converted stream id is the
+            -- largest stream id.
+            -- Otherwise, pick 1, since stream ids start at 2.
+            (SELECT COALESCE(MAX(stream_id), 1) FROM device_lists_changes_in_room)
+        ) FROM device_lists_changes_in_room WHERE NOT converted_to_destinations
+    ),
+    ''
+);