summary refs log tree commit diff
path: root/synapse/storage/databases/main
diff options
context:
space:
mode:
authorErik Johnston <erikj@matrix.org>2023-11-16 15:19:35 +0000
committerGitHub <noreply@github.com>2023-11-16 15:19:35 +0000
commit3e8531d3baf205733693f9ae8b43aa0b4c82b744 (patch)
treeb4a153c96b8a2dbea1e7dfa66fa9026afa3e9dad /synapse/storage/databases/main
parentSpeed up persisting large number of outliers (#16649) (diff)
downloadsynapse-3e8531d3baf205733693f9ae8b43aa0b4c82b744.tar.xz
Speed up deleting device messages (#16643)
Keeping track of a lower bound of stream ID where we've deleted everything below makes the queries much faster. Otherwise, every time we scan for rows to delete we'd re-scan across all the rows that have previously deleted (until the next table VACUUM).
Diffstat (limited to 'synapse/storage/databases/main')
-rw-r--r--synapse/storage/databases/main/deviceinbox.py106
1 files changed, 81 insertions, 25 deletions
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index 3e7425d4a6..02dddd1da4 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -450,14 +450,12 @@ class DeviceInboxWorkerStore(SQLBaseStore):
         user_id: str,
         device_id: Optional[str],
         up_to_stream_id: int,
-        limit: Optional[int] = None,
     ) -> int:
         """
         Args:
             user_id: The recipient user_id.
             device_id: The recipient device_id.
             up_to_stream_id: Where to delete messages up to.
-            limit: maximum number of messages to delete
 
         Returns:
             The number of messages deleted.
@@ -478,32 +476,22 @@ class DeviceInboxWorkerStore(SQLBaseStore):
                 log_kv({"message": "No changes in cache since last check"})
                 return 0
 
-        def delete_messages_for_device_txn(txn: LoggingTransaction) -> int:
-            limit_statement = "" if limit is None else f"LIMIT {limit}"
-            sql = f"""
-                DELETE FROM device_inbox WHERE user_id = ? AND device_id = ? AND stream_id <= (
-                  SELECT MAX(stream_id) FROM (
-                    SELECT stream_id FROM device_inbox
-                    WHERE user_id = ? AND device_id = ? AND stream_id <= ?
-                    ORDER BY stream_id
-                    {limit_statement}
-                  ) AS q1
-                )
-                """
-            txn.execute(sql, (user_id, device_id, user_id, device_id, up_to_stream_id))
-            return txn.rowcount
-
-        count = await self.db_pool.runInteraction(
-            "delete_messages_for_device", delete_messages_for_device_txn
-        )
+        from_stream_id = None
+        count = 0
+        while True:
+            from_stream_id, loop_count = await self.delete_messages_for_device_between(
+                user_id,
+                device_id,
+                from_stream_id=from_stream_id,
+                to_stream_id=up_to_stream_id,
+                limit=1000,
+            )
+            count += loop_count
+            if from_stream_id is None:
+                break
 
         log_kv({"message": f"deleted {count} messages for device", "count": count})
 
-        # In this case we don't know if we hit the limit or the delete is complete
-        # so let's not update the cache.
-        if count == limit:
-            return count
-
         # Update the cache, ensuring that we only ever increase the value
         updated_last_deleted_stream_id = self._last_device_delete_cache.get(
             (user_id, device_id), 0
@@ -515,6 +503,74 @@ class DeviceInboxWorkerStore(SQLBaseStore):
         return count
 
     @trace
+    async def delete_messages_for_device_between(
+        self,
+        user_id: str,
+        device_id: Optional[str],
+        from_stream_id: Optional[int],
+        to_stream_id: int,
+        limit: int,
+    ) -> Tuple[Optional[int], int]:
+        """Delete N device messages between the stream IDs, returning the
+        highest stream ID deleted (or None if all messages in the range have
+        been deleted) and the number of messages deleted.
+
+        This is more efficient than `delete_messages_for_device` when calling in
+        a loop to batch delete messages.
+        """
+
+        # Keeping track of a lower bound of stream ID where we've deleted
+        # everything below makes the queries much faster. Otherwise, every time
+        # we scan for rows to delete we'd re-scan across all the rows that have
+        # previously deleted (until the next table VACUUM).
+
+        if from_stream_id is None:
+            # Minimum device stream ID is 1.
+            from_stream_id = 0
+
+        def delete_messages_for_device_between_txn(
+            txn: LoggingTransaction,
+        ) -> Tuple[Optional[int], int]:
+            txn.execute(
+                """
+                SELECT MAX(stream_id) FROM (
+                    SELECT stream_id FROM device_inbox
+                    WHERE user_id = ? AND device_id = ?
+                        AND ? < stream_id AND stream_id <= ?
+                    ORDER BY stream_id
+                    LIMIT ?
+                ) AS d
+                """,
+                (user_id, device_id, from_stream_id, to_stream_id, limit),
+            )
+            row = txn.fetchone()
+            if row is None or row[0] is None:
+                return None, 0
+
+            (max_stream_id,) = row
+
+            txn.execute(
+                """
+                DELETE FROM device_inbox
+                WHERE user_id = ? AND device_id = ?
+                AND ? < stream_id AND stream_id <= ?
+                """,
+                (user_id, device_id, from_stream_id, max_stream_id),
+            )
+
+            num_deleted = txn.rowcount
+            if num_deleted < limit:
+                return None, num_deleted
+
+            return max_stream_id, num_deleted
+
+        return await self.db_pool.runInteraction(
+            "delete_messages_for_device_between",
+            delete_messages_for_device_between_txn,
+            db_autocommit=True,  # We don't need to run in a transaction
+        )
+
+    @trace
     async def get_new_device_msgs_for_remote(
         self, destination: str, last_stream_id: int, current_stream_id: int, limit: int
     ) -> Tuple[List[JsonDict], int]: