summary refs log tree commit diff
path: root/synapse/storage/databases/main/deviceinbox.py
diff options
context:
space:
mode:
authorMathieu Velten <mathieuv@matrix.org>2023-09-06 09:30:53 +0200
committerGitHub <noreply@github.com>2023-09-06 09:30:53 +0200
commit4f1840a88ad3a93244fc23149c56245704eab824 (patch)
tree86b7f380757329ca7e6a38f47c13bd69df873283 /synapse/storage/databases/main/deviceinbox.py
parentFix appservices being unable to handle to_device messages for multiple users ... (diff)
downloadsynapse-4f1840a88ad3a93244fc23149c56245704eab824.tar.xz
Delete device messages asynchronously and in staged batches (#16240)
Diffstat (limited to 'synapse/storage/databases/main/deviceinbox.py')
-rw-r--r--synapse/storage/databases/main/deviceinbox.py26
1 files changed, 20 insertions, 6 deletions
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index 271cdf923c..744e98c6d0 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -445,13 +445,18 @@ class DeviceInboxWorkerStore(SQLBaseStore):
 
     @trace
     async def delete_messages_for_device(
-        self, user_id: str, device_id: Optional[str], up_to_stream_id: int
+        self,
+        user_id: str,
+        device_id: Optional[str],
+        up_to_stream_id: int,
+        limit: int,
     ) -> int:
         """
         Args:
             user_id: The recipient user_id.
             device_id: The recipient device_id.
             up_to_stream_id: Where to delete messages up to.
+            limit: maximum number of messages to delete
 
         Returns:
             The number of messages deleted.
@@ -472,12 +477,16 @@ class DeviceInboxWorkerStore(SQLBaseStore):
                 log_kv({"message": "No changes in cache since last check"})
                 return 0
 
+        ROW_ID_NAME = self.database_engine.row_id_name
+
         def delete_messages_for_device_txn(txn: LoggingTransaction) -> int:
-            sql = (
-                "DELETE FROM device_inbox"
-                " WHERE user_id = ? AND device_id = ?"
-                " AND stream_id <= ?"
-            )
+            sql = f"""
+                DELETE FROM device_inbox WHERE {ROW_ID_NAME} IN (
+                  SELECT {ROW_ID_NAME} FROM device_inbox
+                  WHERE user_id = ? AND device_id = ? AND stream_id <= ?
+                  LIMIT {limit}
+                )
+                """
             txn.execute(sql, (user_id, device_id, up_to_stream_id))
             return txn.rowcount
 
@@ -487,6 +496,11 @@ class DeviceInboxWorkerStore(SQLBaseStore):
 
         log_kv({"message": f"deleted {count} messages for device", "count": count})
 
+        # In this case we don't know if we hit the limit or the delete is complete
+        # so let's not update the cache.
+        if count == limit:
+            return count
+
         # Update the cache, ensuring that we only ever increase the value
         updated_last_deleted_stream_id = self._last_device_delete_cache.get(
             (user_id, device_id), 0