summary refs log tree commit diff
path: root/synapse/handlers/device.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/device.py')
-rw-r--r--synapse/handlers/device.py48
1 files changed, 48 insertions, 0 deletions
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 763f56dfc1..9e52af5f13 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -43,9 +43,12 @@ from synapse.metrics.background_process_metrics import (
 )
 from synapse.types import (
     JsonDict,
+    JsonMapping,
+    ScheduledTask,
     StrCollection,
     StreamKeyType,
     StreamToken,
+    TaskStatus,
     UserID,
     get_domain_from_id,
     get_verify_key_from_cross_signing_key,
@@ -62,6 +65,7 @@ if TYPE_CHECKING:
 
 logger = logging.getLogger(__name__)
 
+DELETE_DEVICE_MSGS_TASK_NAME = "delete_device_messages"
 MAX_DEVICE_DISPLAY_NAME_LEN = 100
 DELETE_STALE_DEVICES_INTERVAL_MS = 24 * 60 * 60 * 1000
 
@@ -78,6 +82,7 @@ class DeviceWorkerHandler:
         self._appservice_handler = hs.get_application_service_handler()
         self._state_storage = hs.get_storage_controllers().state
         self._auth_handler = hs.get_auth_handler()
+        self._event_sources = hs.get_event_sources()
         self.server_name = hs.hostname
         self._msc3852_enabled = hs.config.experimental.msc3852_enabled
         self._query_appservices_for_keys = (
@@ -386,6 +391,7 @@ class DeviceHandler(DeviceWorkerHandler):
         self._account_data_handler = hs.get_account_data_handler()
         self._storage_controllers = hs.get_storage_controllers()
         self.db_pool = hs.get_datastores().main.db_pool
+        self._task_scheduler = hs.get_task_scheduler()
 
         self.device_list_updater = DeviceListUpdater(hs, self)
 
@@ -419,6 +425,10 @@ class DeviceHandler(DeviceWorkerHandler):
                 self._delete_stale_devices,
             )
 
+        self._task_scheduler.register_action(
+            self._delete_device_messages, DELETE_DEVICE_MSGS_TASK_NAME
+        )
+
     def _check_device_name_length(self, name: Optional[str]) -> None:
         """
         Checks whether a device name is longer than the maximum allowed length.
@@ -530,6 +540,7 @@ class DeviceHandler(DeviceWorkerHandler):
             user_id: The user to delete devices from.
             device_ids: The list of device IDs to delete
         """
+        to_device_stream_id = self._event_sources.get_current_token().to_device_key
 
         try:
             await self.store.delete_devices(user_id, device_ids)
@@ -559,12 +570,49 @@ class DeviceHandler(DeviceWorkerHandler):
                     f"org.matrix.msc3890.local_notification_settings.{device_id}",
                 )
 
+            # Delete device messages asynchronously and in batches using the task scheduler
+            await self._task_scheduler.schedule_task(
+                DELETE_DEVICE_MSGS_TASK_NAME,
+                resource_id=device_id,
+                params={
+                    "user_id": user_id,
+                    "device_id": device_id,
+                    "up_to_stream_id": to_device_stream_id,
+                },
+            )
+
         # Pushers are deleted after `delete_access_tokens_for_user` is called so that
         # modules using `on_logged_out` hook can use them if needed.
         await self.hs.get_pusherpool().remove_pushers_by_devices(user_id, device_ids)
 
         await self.notify_device_update(user_id, device_ids)
 
+    DEVICE_MSGS_DELETE_BATCH_LIMIT = 100
+
+    async def _delete_device_messages(
+        self,
+        task: ScheduledTask,
+    ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
+        """Scheduler task to delete device messages in batch of `DEVICE_MSGS_DELETE_BATCH_LIMIT`."""
+        assert task.params is not None
+        user_id = task.params["user_id"]
+        device_id = task.params["device_id"]
+        up_to_stream_id = task.params["up_to_stream_id"]
+
+        res = await self.store.delete_messages_for_device(
+            user_id=user_id,
+            device_id=device_id,
+            up_to_stream_id=up_to_stream_id,
+            limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT,
+        )
+
+        if res < DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT:
+            return TaskStatus.COMPLETE, None, None
+        else:
+            # There is probably still device messages to be deleted, let's keep the task active and it will be run
+            # again in a subsequent scheduler loop run (probably the next one, if not too many tasks are running).
+            return TaskStatus.ACTIVE, None, None
+
     async def update_device(self, user_id: str, device_id: str, content: dict) -> None:
         """Update the given device