summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/16318.misc1
-rw-r--r--synapse/handlers/device.py27
2 files changed, 15 insertions, 13 deletions
diff --git a/changelog.d/16318.misc b/changelog.d/16318.misc
new file mode 100644
index 0000000000..1433a2f246
--- /dev/null
+++ b/changelog.d/16318.misc
@@ -0,0 +1 @@
+Speed up task to delete to-device messages.
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 0d3d5ebc86..86ad96d030 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -388,7 +388,8 @@ class DeviceWorkerHandler:
             "Trying handling device list state for partial join: not supported on workers."
         )
 
-    DEVICE_MSGS_DELETE_BATCH_LIMIT = 100
+    DEVICE_MSGS_DELETE_BATCH_LIMIT = 1000
+    DEVICE_MSGS_DELETE_SLEEP_MS = 1000
 
     async def _delete_device_messages(
         self,
@@ -400,19 +401,19 @@ class DeviceWorkerHandler:
         device_id = task.params["device_id"]
         up_to_stream_id = task.params["up_to_stream_id"]
 
-        res = await self.store.delete_messages_for_device(
-            user_id=user_id,
-            device_id=device_id,
-            up_to_stream_id=up_to_stream_id,
-            limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT,
-        )
+        # Delete the messages in batches to avoid too much DB load.
+        while True:
+            res = await self.store.delete_messages_for_device(
+                user_id=user_id,
+                device_id=device_id,
+                up_to_stream_id=up_to_stream_id,
+                limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT,
+            )
 
-        if res < DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT:
-            return TaskStatus.COMPLETE, None, None
-        else:
-            # There is probably still device messages to be deleted, let's keep the task active and it will be run
-            # again in a subsequent scheduler loop run (probably the next one, if not too many tasks are running).
-            return TaskStatus.ACTIVE, None, None
+            if res < DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT:
+                return TaskStatus.COMPLETE, None, None
+
+            await self.clock.sleep(DeviceHandler.DEVICE_MSGS_DELETE_SLEEP_MS / 1000.0)
 
 
 class DeviceHandler(DeviceWorkerHandler):