diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 9d240ad4ee..e2ae3da67e 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -91,9 +91,14 @@ class DeviceWorkerHandler:
self._query_appservices_for_keys = (
hs.config.experimental.msc3984_appservice_key_query
)
+ self._task_scheduler = hs.get_task_scheduler()
self.device_list_updater = DeviceListWorkerUpdater(hs)
+ self._task_scheduler.register_action(
+ self._delete_device_messages, DELETE_DEVICE_MSGS_TASK_NAME
+ )
+
@trace
async def get_devices_by_user(self, user_id: str) -> List[JsonDict]:
"""
@@ -383,6 +388,32 @@ class DeviceWorkerHandler:
"Trying handling device list state for partial join: not supported on workers."
)
+ DEVICE_MSGS_DELETE_BATCH_LIMIT = 100
+
+ async def _delete_device_messages(
+ self,
+ task: ScheduledTask,
+ ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
+ """Scheduler task to delete device messages in batch of `DEVICE_MSGS_DELETE_BATCH_LIMIT`."""
+ assert task.params is not None
+ user_id = task.params["user_id"]
+ device_id = task.params["device_id"]
+ up_to_stream_id = task.params["up_to_stream_id"]
+
+ res = await self.store.delete_messages_for_device(
+ user_id=user_id,
+ device_id=device_id,
+ up_to_stream_id=up_to_stream_id,
+ limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT,
+ )
+
+ if res < DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT:
+ return TaskStatus.COMPLETE, None, None
+ else:
+ # There is probably still device messages to be deleted, let's keep the task active and it will be run
+ # again in a subsequent scheduler loop run (probably the next one, if not too many tasks are running).
+ return TaskStatus.ACTIVE, None, None
+
class DeviceHandler(DeviceWorkerHandler):
device_list_updater: "DeviceListUpdater"
@@ -394,7 +425,6 @@ class DeviceHandler(DeviceWorkerHandler):
self._account_data_handler = hs.get_account_data_handler()
self._storage_controllers = hs.get_storage_controllers()
self.db_pool = hs.get_datastores().main.db_pool
- self._task_scheduler = hs.get_task_scheduler()
self.device_list_updater = DeviceListUpdater(hs, self)
@@ -428,10 +458,6 @@ class DeviceHandler(DeviceWorkerHandler):
self._delete_stale_devices,
)
- self._task_scheduler.register_action(
- self._delete_device_messages, DELETE_DEVICE_MSGS_TASK_NAME
- )
-
def _check_device_name_length(self, name: Optional[str]) -> None:
"""
Checks whether a device name is longer than the maximum allowed length.
@@ -590,32 +616,6 @@ class DeviceHandler(DeviceWorkerHandler):
await self.notify_device_update(user_id, device_ids)
- DEVICE_MSGS_DELETE_BATCH_LIMIT = 100
-
- async def _delete_device_messages(
- self,
- task: ScheduledTask,
- ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
- """Scheduler task to delete device messages in batch of `DEVICE_MSGS_DELETE_BATCH_LIMIT`."""
- assert task.params is not None
- user_id = task.params["user_id"]
- device_id = task.params["device_id"]
- up_to_stream_id = task.params["up_to_stream_id"]
-
- res = await self.store.delete_messages_for_device(
- user_id=user_id,
- device_id=device_id,
- up_to_stream_id=up_to_stream_id,
- limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT,
- )
-
- if res < DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT:
- return TaskStatus.COMPLETE, None, None
- else:
- # There is probably still device messages to be deleted, let's keep the task active and it will be run
- # again in a subsequent scheduler loop run (probably the next one, if not too many tasks are running).
- return TaskStatus.ACTIVE, None, None
-
async def update_device(self, user_id: str, device_id: str, content: dict) -> None:
"""Update the given device
|