summary refs log tree commit diff
path: root/synapse/util/task_scheduler.py
blob: caf13b3474bed12de19fbd8a78f1b93770405ab1 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
# Copyright 2023 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from typing import TYPE_CHECKING, Awaitable, Callable, Dict, List, Optional, Set, Tuple

from twisted.python.failure import Failure

from synapse.logging.context import nested_logging_context
from synapse.metrics import LaterGauge
from synapse.metrics.background_process_metrics import (
    run_as_background_process,
    wrap_as_background_process,
)
from synapse.types import JsonMapping, ScheduledTask, TaskStatus
from synapse.util.stringutils import random_string

if TYPE_CHECKING:
    from synapse.server import HomeServer

logger = logging.getLogger(__name__)


class TaskScheduler:
    """
    This is a simple task sheduler aimed at resumable tasks: usually we use `run_in_background`
    to launch a background task, or Twisted `deferLater` if we want to do so later on.

    The problem with that is that the tasks will just stop and never be resumed if synapse
    is stopped for whatever reason.

    How this works:
    - A function mapped to a named action should first be registered with `register_action`.
    This function will be called when trying to resuming tasks after a synapse shutdown,
    so this registration should happen when synapse is initialised, NOT right before scheduling
    a task.
    - A task can then be launched using this named action with `schedule_task`. A `params` dict
    can be passed, and it will be available to the registered function when launched. This task
    can be launch either now-ish, or later on by giving a `timestamp` parameter.

    The function may call `update_task` at any time to update the `result` of the task,
    and this can be used to resume the task at a specific point and/or to convey a result to
    the code launching the task.
    You can also specify the `result` (and/or an `error`) when returning from the function.

    The reconciliation loop runs every minute, so this is not a precise scheduler.
    There is a limit of 10 concurrent tasks, so tasks may be delayed if the pool is already
    full. In this regard, please take great care that scheduled tasks can actually finished.
    For now there is no mechanism to stop a running task if it is stuck.

    Tasks will be run on the worker specified with `run_background_tasks_on` config,
    or the main one by default.
    """

    # Precision of the scheduler, evaluation of tasks to run will only happen
    # every `SCHEDULE_INTERVAL_MS` ms
    SCHEDULE_INTERVAL_MS = 1 * 60 * 1000  # 1mn
    # How often to clean up old tasks.
    CLEANUP_INTERVAL_MS = 30 * 60 * 1000
    # Time before a complete or failed task is deleted from the DB
    KEEP_TASKS_FOR_MS = 7 * 24 * 60 * 60 * 1000  # 1 week
    # Maximum number of tasks that can run at the same time
    MAX_CONCURRENT_RUNNING_TASKS = 10
    # Time from the last task update after which we will log a warning
    LAST_UPDATE_BEFORE_WARNING_MS = 24 * 60 * 60 * 1000  # 24hrs

    def __init__(self, hs: "HomeServer"):
        self._hs = hs
        self._store = hs.get_datastores().main
        self._clock = hs.get_clock()
        self._running_tasks: Set[str] = set()
        # A map between action names and their registered function
        self._actions: Dict[
            str,
            Callable[
                [ScheduledTask],
                Awaitable[Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]],
            ],
        ] = {}
        self._run_background_tasks = hs.config.worker.run_background_tasks

        # Flag to make sure we only try and launch new tasks once at a time.
        self._launching_new_tasks = False

        if self._run_background_tasks:
            self._clock.looping_call(
                self._launch_scheduled_tasks,
                TaskScheduler.SCHEDULE_INTERVAL_MS,
            )
            self._clock.looping_call(
                self._clean_scheduled_tasks,
                TaskScheduler.SCHEDULE_INTERVAL_MS,
            )

        LaterGauge(
            "synapse_scheduler_running_tasks",
            "The number of concurrent running tasks handled by the TaskScheduler",
            labels=None,
            caller=lambda: len(self._running_tasks),
        )

    def register_action(
        self,
        function: Callable[
            [ScheduledTask],
            Awaitable[Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]],
        ],
        action_name: str,
    ) -> None:
        """Register a function to be executed when an action is scheduled with
        the specified action name.

        Actions need to be registered as early as possible so that a resumed action
        can find its matching function. It's usually better to NOT do that right before
        calling `schedule_task` but rather in an `__init__` method.

        Args:
            function: The function to be executed for this action. The parameter
                passed to the function when launched is the `ScheduledTask` being run.
                The function should return a tuple of new `status`, `result`
                and `error` as specified in `ScheduledTask`.
            action_name: The name of the action to be associated with the function
        """
        self._actions[action_name] = function

    async def schedule_task(
        self,
        action: str,
        *,
        resource_id: Optional[str] = None,
        timestamp: Optional[int] = None,
        params: Optional[JsonMapping] = None,
    ) -> str:
        """Schedule a new potentially resumable task. A function matching the specified
        `action` should have be registered with `register_action` before the task is run.

        Args:
            action: the name of a previously registered action
            resource_id: a task can be associated with a resource id to facilitate
                getting all tasks associated with a specific resource
            timestamp: if `None`, the task will be launched as soon as possible, otherwise it
                will be launch as soon as possible after the `timestamp` value.
                Note that this scheduler is not meant to be precise, and the scheduling
                could be delayed if too many tasks are already running
            params: a set of parameters that can be easily accessed from inside the
                executed function

        Returns:
            The id of the scheduled task
        """
        status = TaskStatus.SCHEDULED
        if timestamp is None or timestamp < self._clock.time_msec():
            timestamp = self._clock.time_msec()
            status = TaskStatus.ACTIVE

        task = ScheduledTask(
            random_string(16),
            action,
            status,
            timestamp,
            resource_id,
            params,
            result=None,
            error=None,
        )
        await self._store.insert_scheduled_task(task)

        if status == TaskStatus.ACTIVE:
            if self._run_background_tasks:
                await self._launch_task(task)
            else:
                self._hs.get_replication_command_handler().send_new_active_task(task.id)

        return task.id

    async def update_task(
        self,
        id: str,
        *,
        timestamp: Optional[int] = None,
        status: Optional[TaskStatus] = None,
        result: Optional[JsonMapping] = None,
        error: Optional[str] = None,
    ) -> bool:
        """Update some task associated values. This is exposed publically so it can
        be used inside task functions, mainly to update the result and be able to
        resume a task at a specific step after a restart of synapse.

        It can also be used to stage a task, by setting the `status` to `SCHEDULED` with
        a new timestamp.

        The `status` can only be set to `ACTIVE` or `SCHEDULED`, `COMPLETE` and `FAILED`
        are terminal status and can only be set by returning it in the function.

        Args:
            id: the id of the task to update
            timestamp: useful to schedule a new stage of the task at a later date
            status: the new `TaskStatus` of the task
            result: the new result of the task
            error: the new error of the task
        """
        if status == TaskStatus.COMPLETE or status == TaskStatus.FAILED:
            raise Exception(
                "update_task can't be called with a FAILED or COMPLETE status"
            )

        if timestamp is None:
            timestamp = self._clock.time_msec()
        return await self._store.update_scheduled_task(
            id,
            timestamp,
            status=status,
            result=result,
            error=error,
        )

    async def get_task(self, id: str) -> Optional[ScheduledTask]:
        """Get a specific task description by id.

        Args:
            id: the id of the task to retrieve

        Returns:
            The task information or `None` if it doesn't exist or it has
            already been removed because it's too old.
        """
        return await self._store.get_scheduled_task(id)

    async def get_tasks(
        self,
        *,
        actions: Optional[List[str]] = None,
        resource_id: Optional[str] = None,
        statuses: Optional[List[TaskStatus]] = None,
        max_timestamp: Optional[int] = None,
        limit: Optional[int] = None,
    ) -> List[ScheduledTask]:
        """Get a list of tasks. Returns all the tasks if no args is provided.

        If an arg is `None` all tasks matching the other args will be selected.
        If an arg is an empty list, the corresponding value of the task needs
        to be `None` to be selected.

        Args:
            actions: Limit the returned tasks to those specific action names
            resource_id: Limit the returned tasks to the specific resource id, if specified
            statuses: Limit the returned tasks to the specific statuses
            max_timestamp: Limit the returned tasks to the ones that have
                a timestamp inferior to the specified one
            limit: Only return `limit` number of rows if set.

        Returns
            A list of `ScheduledTask`, ordered by increasing timestamps
        """
        return await self._store.get_scheduled_tasks(
            actions=actions,
            resource_id=resource_id,
            statuses=statuses,
            max_timestamp=max_timestamp,
            limit=limit,
        )

    async def delete_task(self, id: str) -> None:
        """Delete a task. Running tasks can't be deleted.

        Can only be called from the worker handling the task scheduling.

        Args:
            id: id of the task to delete
        """
        task = await self.get_task(id)
        if task is None:
            raise Exception(f"Task {id} does not exist")
        if task.status == TaskStatus.ACTIVE:
            raise Exception(f"Task {id} is currently ACTIVE and can't be deleted")
        await self._store.delete_scheduled_task(id)

    def launch_task_by_id(self, id: str) -> None:
        """Try launching the task with the given ID."""
        # Don't bother trying to launch new tasks if we're already at capacity.
        if len(self._running_tasks) >= TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS:
            return

        run_as_background_process("launch_task_by_id", self._launch_task_by_id, id)

    async def _launch_task_by_id(self, id: str) -> None:
        """Helper async function for `launch_task_by_id`."""
        task = await self.get_task(id)
        if task:
            await self._launch_task(task)

    @wrap_as_background_process("launch_scheduled_tasks")
    async def _launch_scheduled_tasks(self) -> None:
        """Retrieve and launch scheduled tasks that should be running at that time."""
        # Don't bother trying to launch new tasks if we're already at capacity.
        if len(self._running_tasks) >= TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS:
            return

        if self._launching_new_tasks:
            return

        self._launching_new_tasks = True

        try:
            for task in await self.get_tasks(
                statuses=[TaskStatus.ACTIVE], limit=self.MAX_CONCURRENT_RUNNING_TASKS
            ):
                await self._launch_task(task)
            for task in await self.get_tasks(
                statuses=[TaskStatus.SCHEDULED],
                max_timestamp=self._clock.time_msec(),
                limit=self.MAX_CONCURRENT_RUNNING_TASKS,
            ):
                await self._launch_task(task)

        finally:
            self._launching_new_tasks = False

    @wrap_as_background_process("clean_scheduled_tasks")
    async def _clean_scheduled_tasks(self) -> None:
        """Clean old complete or failed jobs to avoid clutter the DB."""
        now = self._clock.time_msec()
        for task in await self._store.get_scheduled_tasks(
            statuses=[TaskStatus.FAILED, TaskStatus.COMPLETE],
            max_timestamp=now - TaskScheduler.KEEP_TASKS_FOR_MS,
        ):
            # FAILED and COMPLETE tasks should never be running
            assert task.id not in self._running_tasks
            await self._store.delete_scheduled_task(task.id)

    async def _launch_task(self, task: ScheduledTask) -> None:
        """Launch a scheduled task now.

        Args:
            task: the task to launch
        """
        assert self._run_background_tasks

        if task.action not in self._actions:
            raise Exception(
                f"No function associated with action {task.action} of the scheduled task {task.id}"
            )
        function = self._actions[task.action]

        async def wrapper() -> None:
            with nested_logging_context(task.id):
                try:
                    (status, result, error) = await function(task)
                except Exception:
                    f = Failure()
                    logger.error(
                        f"scheduled task {task.id} failed",
                        exc_info=(f.type, f.value, f.getTracebackObject()),
                    )
                    status = TaskStatus.FAILED
                    result = None
                    error = f.getErrorMessage()

                await self._store.update_scheduled_task(
                    task.id,
                    self._clock.time_msec(),
                    status=status,
                    result=result,
                    error=error,
                )
                self._running_tasks.remove(task.id)

            # Try launch a new task since we've finished with this one.
            self._clock.call_later(1, self._launch_scheduled_tasks)

        if len(self._running_tasks) >= TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS:
            return

        if (
            self._clock.time_msec()
            > task.timestamp + TaskScheduler.LAST_UPDATE_BEFORE_WARNING_MS
        ):
            logger.warn(
                f"Task {task.id} (action {task.action}) has seen no update for more than 24h and may be stuck"
            )

        if task.id in self._running_tasks:
            return

        self._running_tasks.add(task.id)
        await self.update_task(task.id, status=TaskStatus.ACTIVE)
        run_as_background_process(f"task-{task.action}", wrapper)