diff options
author | Erik Johnston <erikj@matrix.org> | 2023-09-14 14:56:07 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-09-14 14:56:07 +0100 |
commit | e9e2904eb2c0b73eb4154faf41bd360e6168cc92 (patch) | |
tree | 6ad3c8b9870b7a90497bf25b081ed274bf9395d1 | |
parent | docs: Link to the Alpine Linux community package for Synapse (#16304) (diff) | |
download | synapse-e9e2904eb2c0b73eb4154faf41bd360e6168cc92.tar.xz |
Speed up deleting to-device messages task (#16318)
-rw-r--r-- | changelog.d/16318.misc | 1 | ||||
-rw-r--r-- | synapse/handlers/device.py | 27 |
2 files changed, 15 insertions, 13 deletions
diff --git a/changelog.d/16318.misc b/changelog.d/16318.misc new file mode 100644 index 0000000000..1433a2f246 --- /dev/null +++ b/changelog.d/16318.misc @@ -0,0 +1 @@ +Speed up task to delete to-device messages. diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 0d3d5ebc86..86ad96d030 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -388,7 +388,8 @@ class DeviceWorkerHandler: "Trying handling device list state for partial join: not supported on workers." ) - DEVICE_MSGS_DELETE_BATCH_LIMIT = 100 + DEVICE_MSGS_DELETE_BATCH_LIMIT = 1000 + DEVICE_MSGS_DELETE_SLEEP_MS = 1000 async def _delete_device_messages( self, @@ -400,19 +401,19 @@ class DeviceWorkerHandler: device_id = task.params["device_id"] up_to_stream_id = task.params["up_to_stream_id"] - res = await self.store.delete_messages_for_device( - user_id=user_id, - device_id=device_id, - up_to_stream_id=up_to_stream_id, - limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT, - ) + # Delete the messages in batches to avoid too much DB load. + while True: + res = await self.store.delete_messages_for_device( + user_id=user_id, + device_id=device_id, + up_to_stream_id=up_to_stream_id, + limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT, + ) - if res < DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT: - return TaskStatus.COMPLETE, None, None - else: - # There is probably still device messages to be deleted, let's keep the task active and it will be run - # again in a subsequent scheduler loop run (probably the next one, if not too many tasks are running). - return TaskStatus.ACTIVE, None, None + if res < DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT: + return TaskStatus.COMPLETE, None, None + + await self.clock.sleep(DeviceHandler.DEVICE_MSGS_DELETE_SLEEP_MS / 1000.0) class DeviceHandler(DeviceWorkerHandler): |