summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorErik Johnston <erikj@matrix.org>2023-09-13 11:54:16 +0100
committerGitHub <noreply@github.com>2023-09-13 11:54:16 +0100
commitbe3c7b08a3e6888e60497a80ebd143bd4df9a719 (patch)
tree2bf146bd2d8ac97e3ed0c087af9478956095606b /synapse
parentImprove logging of replication (#16309) (diff)
downloadsynapse-be3c7b08a3e6888e60497a80ebd143bd4df9a719.tar.xz
Fix deleting device inbox when using background worker (#16311)
Introduced in #16240

The action for the task was only defined on the "master" handler, rather than the base worker one.
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/device.py62
1 files changed, 31 insertions, 31 deletions
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 9d240ad4ee..e2ae3da67e 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -91,9 +91,14 @@ class DeviceWorkerHandler:
         self._query_appservices_for_keys = (
             hs.config.experimental.msc3984_appservice_key_query
         )
+        self._task_scheduler = hs.get_task_scheduler()
 
         self.device_list_updater = DeviceListWorkerUpdater(hs)
 
+        self._task_scheduler.register_action(
+            self._delete_device_messages, DELETE_DEVICE_MSGS_TASK_NAME
+        )
+
     @trace
     async def get_devices_by_user(self, user_id: str) -> List[JsonDict]:
         """
@@ -383,6 +388,32 @@ class DeviceWorkerHandler:
             "Trying handling device list state for partial join: not supported on workers."
         )
 
+    DEVICE_MSGS_DELETE_BATCH_LIMIT = 100
+
+    async def _delete_device_messages(
+        self,
+        task: ScheduledTask,
+    ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
+        """Scheduler task to delete device messages in batch of `DEVICE_MSGS_DELETE_BATCH_LIMIT`."""
+        assert task.params is not None
+        user_id = task.params["user_id"]
+        device_id = task.params["device_id"]
+        up_to_stream_id = task.params["up_to_stream_id"]
+
+        res = await self.store.delete_messages_for_device(
+            user_id=user_id,
+            device_id=device_id,
+            up_to_stream_id=up_to_stream_id,
+            limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT,
+        )
+
+        if res < DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT:
+            return TaskStatus.COMPLETE, None, None
+        else:
+            # There is probably still device messages to be deleted, let's keep the task active and it will be run
+            # again in a subsequent scheduler loop run (probably the next one, if not too many tasks are running).
+            return TaskStatus.ACTIVE, None, None
+
 
 class DeviceHandler(DeviceWorkerHandler):
     device_list_updater: "DeviceListUpdater"
@@ -394,7 +425,6 @@ class DeviceHandler(DeviceWorkerHandler):
         self._account_data_handler = hs.get_account_data_handler()
         self._storage_controllers = hs.get_storage_controllers()
         self.db_pool = hs.get_datastores().main.db_pool
-        self._task_scheduler = hs.get_task_scheduler()
 
         self.device_list_updater = DeviceListUpdater(hs, self)
 
@@ -428,10 +458,6 @@ class DeviceHandler(DeviceWorkerHandler):
                 self._delete_stale_devices,
             )
 
-        self._task_scheduler.register_action(
-            self._delete_device_messages, DELETE_DEVICE_MSGS_TASK_NAME
-        )
-
     def _check_device_name_length(self, name: Optional[str]) -> None:
         """
         Checks whether a device name is longer than the maximum allowed length.
@@ -590,32 +616,6 @@ class DeviceHandler(DeviceWorkerHandler):
 
         await self.notify_device_update(user_id, device_ids)
 
-    DEVICE_MSGS_DELETE_BATCH_LIMIT = 100
-
-    async def _delete_device_messages(
-        self,
-        task: ScheduledTask,
-    ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]:
-        """Scheduler task to delete device messages in batch of `DEVICE_MSGS_DELETE_BATCH_LIMIT`."""
-        assert task.params is not None
-        user_id = task.params["user_id"]
-        device_id = task.params["device_id"]
-        up_to_stream_id = task.params["up_to_stream_id"]
-
-        res = await self.store.delete_messages_for_device(
-            user_id=user_id,
-            device_id=device_id,
-            up_to_stream_id=up_to_stream_id,
-            limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT,
-        )
-
-        if res < DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT:
-            return TaskStatus.COMPLETE, None, None
-        else:
-            # There is probably still device messages to be deleted, let's keep the task active and it will be run
-            # again in a subsequent scheduler loop run (probably the next one, if not too many tasks are running).
-            return TaskStatus.ACTIVE, None, None
-
     async def update_device(self, user_id: str, device_id: str, content: dict) -> None:
         """Update the given device