diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 763f56dfc1..9e52af5f13 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -43,9 +43,12 @@ from synapse.metrics.background_process_metrics import (
)
from synapse.types import (
JsonDict,
+ JsonMapping,
+ ScheduledTask,
StrCollection,
StreamKeyType,
StreamToken,
+ TaskStatus,
UserID,
get_domain_from_id,
get_verify_key_from_cross_signing_key,
@@ -62,6 +65,7 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
+DELETE_DEVICE_MSGS_TASK_NAME = "delete_device_messages"
MAX_DEVICE_DISPLAY_NAME_LEN = 100
DELETE_STALE_DEVICES_INTERVAL_MS = 24 * 60 * 60 * 1000
@@ -78,6 +82,7 @@ class DeviceWorkerHandler:
self._appservice_handler = hs.get_application_service_handler()
self._state_storage = hs.get_storage_controllers().state
self._auth_handler = hs.get_auth_handler()
+ self._event_sources = hs.get_event_sources()
self.server_name = hs.hostname
self._msc3852_enabled = hs.config.experimental.msc3852_enabled
self._query_appservices_for_keys = (
@@ -386,6 +391,7 @@ class DeviceHandler(DeviceWorkerHandler):
self._account_data_handler = hs.get_account_data_handler()
self._storage_controllers = hs.get_storage_controllers()
self.db_pool = hs.get_datastores().main.db_pool
+ self._task_scheduler = hs.get_task_scheduler()
self.device_list_updater = DeviceListUpdater(hs, self)
@@ -419,6 +425,10 @@ class DeviceHandler(DeviceWorkerHandler):
self._delete_stale_devices,
)
+ self._task_scheduler.register_action(
+ self._delete_device_messages, DELETE_DEVICE_MSGS_TASK_NAME
+ )
+
def _check_device_name_length(self, name: Optional[str]) -> None:
"""
Checks whether a device name is longer than the maximum allowed length.
@@ -530,6 +540,7 @@ class DeviceHandler(DeviceWorkerHandler):
user_id: The user to delete devices from.
device_ids: The list of device IDs to delete
"""
+ to_device_stream_id = self._event_sources.get_current_token().to_device_key
try:
await self.store.delete_devices(user_id, device_ids)
@@ -559,12 +570,49 @@ class DeviceHandler(DeviceWorkerHandler):
f"org.matrix.msc3890.local_notification_settings.{device_id}",
)
+ # Delete device messages asynchronously and in batches using the task scheduler
+ await self._task_scheduler.schedule_task(
+ DELETE_DEVICE_MSGS_TASK_NAME,
+ resource_id=device_id,
+ params={
+ "user_id": user_id,
+ "device_id": device_id,
+ "up_to_stream_id": to_device_stream_id,
+ },
+ )
+
# Pushers are deleted after `delete_access_tokens_for_user` is called so that
# modules using `on_logged_out` hook can use them if needed.
await self.hs.get_pusherpool().remove_pushers_by_devices(user_id, device_ids)
await self.notify_device_update(user_id, device_ids)
+ DEVICE_MSGS_DELETE_BATCH_LIMIT = 100
+
+ async def _delete_device_messages(
+ self,
+ task: ScheduledTask,
+ ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
+ """Scheduler task to delete device messages in batch of `DEVICE_MSGS_DELETE_BATCH_LIMIT`."""
+ assert task.params is not None
+ user_id = task.params["user_id"]
+ device_id = task.params["device_id"]
+ up_to_stream_id = task.params["up_to_stream_id"]
+
+ res = await self.store.delete_messages_for_device(
+ user_id=user_id,
+ device_id=device_id,
+ up_to_stream_id=up_to_stream_id,
+ limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT,
+ )
+
+ if res < DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT:
+ return TaskStatus.COMPLETE, None, None
+ else:
+ # There is probably still device messages to be deleted, let's keep the task active and it will be run
+ # again in a subsequent scheduler loop run (probably the next one, if not too many tasks are running).
+ return TaskStatus.ACTIVE, None, None
+
async def update_device(self, user_id: str, device_id: str, content: dict) -> None:
"""Update the given device
|