diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 0d3d5ebc86..86ad96d030 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -388,7 +388,8 @@ class DeviceWorkerHandler:
"Trying handling device list state for partial join: not supported on workers."
)
- DEVICE_MSGS_DELETE_BATCH_LIMIT = 100
+ DEVICE_MSGS_DELETE_BATCH_LIMIT = 1000
+ DEVICE_MSGS_DELETE_SLEEP_MS = 1000
async def _delete_device_messages(
self,
@@ -400,19 +401,19 @@ class DeviceWorkerHandler:
device_id = task.params["device_id"]
up_to_stream_id = task.params["up_to_stream_id"]
- res = await self.store.delete_messages_for_device(
- user_id=user_id,
- device_id=device_id,
- up_to_stream_id=up_to_stream_id,
- limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT,
- )
+ # Delete the messages in batches to avoid too much DB load.
+ while True:
+ res = await self.store.delete_messages_for_device(
+ user_id=user_id,
+ device_id=device_id,
+ up_to_stream_id=up_to_stream_id,
+ limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT,
+ )
- if res < DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT:
- return TaskStatus.COMPLETE, None, None
- else:
- # There is probably still device messages to be deleted, let's keep the task active and it will be run
- # again in a subsequent scheduler loop run (probably the next one, if not too many tasks are running).
- return TaskStatus.ACTIVE, None, None
+ if res < DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT:
+ return TaskStatus.COMPLETE, None, None
+
+ await self.clock.sleep(DeviceHandler.DEVICE_MSGS_DELETE_SLEEP_MS / 1000.0)
class DeviceHandler(DeviceWorkerHandler):
|