From e9e2904eb2c0b73eb4154faf41bd360e6168cc92 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 Sep 2023 14:56:07 +0100 Subject: Speed up deleting to-device messages task (#16318) --- changelog.d/16318.misc | 1 + synapse/handlers/device.py | 27 ++++++++++++++------------- 2 files changed, 15 insertions(+), 13 deletions(-) create mode 100644 changelog.d/16318.misc diff --git a/changelog.d/16318.misc b/changelog.d/16318.misc new file mode 100644 index 0000000000..1433a2f246 --- /dev/null +++ b/changelog.d/16318.misc @@ -0,0 +1 @@ +Speed up task to delete to-device messages. diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 0d3d5ebc86..86ad96d030 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -388,7 +388,8 @@ class DeviceWorkerHandler: "Trying handling device list state for partial join: not supported on workers." ) - DEVICE_MSGS_DELETE_BATCH_LIMIT = 100 + DEVICE_MSGS_DELETE_BATCH_LIMIT = 1000 + DEVICE_MSGS_DELETE_SLEEP_MS = 1000 async def _delete_device_messages( self, @@ -400,19 +401,19 @@ class DeviceWorkerHandler: device_id = task.params["device_id"] up_to_stream_id = task.params["up_to_stream_id"] - res = await self.store.delete_messages_for_device( - user_id=user_id, - device_id=device_id, - up_to_stream_id=up_to_stream_id, - limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT, - ) + # Delete the messages in batches to avoid too much DB load. + while True: + res = await self.store.delete_messages_for_device( + user_id=user_id, + device_id=device_id, + up_to_stream_id=up_to_stream_id, + limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT, + ) - if res < DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT: - return TaskStatus.COMPLETE, None, None - else: - # There is probably still device messages to be deleted, let's keep the task active and it will be run - # again in a subsequent scheduler loop run (probably the next one, if not too many tasks are running). - return TaskStatus.ACTIVE, None, None + if res < DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT: + return TaskStatus.COMPLETE, None, None + + await self.clock.sleep(DeviceHandler.DEVICE_MSGS_DELETE_SLEEP_MS / 1000.0) class DeviceHandler(DeviceWorkerHandler): -- cgit 1.4.1