summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/16312.misc1
-rw-r--r--synapse/handlers/sync.py37
2 files changed, 27 insertions, 11 deletions
diff --git a/changelog.d/16312.misc b/changelog.d/16312.misc
new file mode 100644
index 0000000000..4f266c1fb0
--- /dev/null
+++ b/changelog.d/16312.misc
@@ -0,0 +1 @@
+Delete device messages asynchronously and in staged batches using the task scheduler.
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 0ccd7d250c..f1f19666d7 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -362,21 +362,36 @@ class SyncHandler:
         # (since we now know that the device has received them)
         if since_token is not None:
             since_stream_id = since_token.to_device_key
-            # Delete device messages asynchronously and in batches using the task scheduler
-            await self._task_scheduler.schedule_task(
-                DELETE_DEVICE_MSGS_TASK_NAME,
-                resource_id=sync_config.device_id,
-                params={
-                    "user_id": sync_config.user.to_string(),
-                    "device_id": sync_config.device_id,
-                    "up_to_stream_id": since_stream_id,
-                },
+            # Fast path: delete a limited number of to-device messages up front.
+            # We do this to avoid the overhead of scheduling a task for every
+            # sync.
+            device_deletion_limit = 100
+            deleted = await self.store.delete_messages_for_device(
+                sync_config.user.to_string(),
+                sync_config.device_id,
+                since_stream_id,
+                limit=device_deletion_limit,
             )
             logger.debug(
-                "Deletion of to-device messages up to %d scheduled",
-                since_stream_id,
+                "Deleted %d to-device messages up to %d", deleted, since_stream_id
             )
 
+            # If we hit the limit, schedule a background task to delete the rest.
+            if deleted >= device_deletion_limit:
+                await self._task_scheduler.schedule_task(
+                    DELETE_DEVICE_MSGS_TASK_NAME,
+                    resource_id=sync_config.device_id,
+                    params={
+                        "user_id": sync_config.user.to_string(),
+                        "device_id": sync_config.device_id,
+                        "up_to_stream_id": since_stream_id,
+                    },
+                )
+                logger.debug(
+                    "Deletion of to-device messages up to %d scheduled",
+                    since_stream_id,
+                )
+
         if timeout == 0 or since_token is None or full_state:
             # we are going to return immediately, so don't bother calling
             # notifier.wait_for_events.