diff options
Diffstat (limited to 'synapse/util')
-rw-r--r-- | synapse/util/caches/deferred_cache.py | 2 | ||||
-rw-r--r-- | synapse/util/check_dependencies.py | 6 | ||||
-rw-r--r-- | synapse/util/ratelimitutils.py | 3 | ||||
-rw-r--r-- | synapse/util/retryutils.py | 43 | ||||
-rw-r--r-- | synapse/util/task_scheduler.py | 92 |
5 files changed, 90 insertions, 56 deletions
diff --git a/synapse/util/caches/deferred_cache.py b/synapse/util/caches/deferred_cache.py index bf7bd351e0..029eedcc6f 100644 --- a/synapse/util/caches/deferred_cache.py +++ b/synapse/util/caches/deferred_cache.py @@ -470,7 +470,7 @@ class CacheMultipleEntries(CacheEntry[KT, VT]): def deferred(self, key: KT) -> "defer.Deferred[VT]": if not self._deferred: self._deferred = ObservableDeferred(defer.Deferred(), consumeErrors=True) - return self._deferred.observe().addCallback(lambda res: res.get(key)) + return self._deferred.observe().addCallback(lambda res: res[key]) def add_invalidation_callback( self, key: KT, callback: Optional[Callable[[], None]] diff --git a/synapse/util/check_dependencies.py b/synapse/util/check_dependencies.py index 114130a08f..f7cead9e12 100644 --- a/synapse/util/check_dependencies.py +++ b/synapse/util/check_dependencies.py @@ -51,9 +51,9 @@ class DependencyException(Exception): DEV_EXTRAS = {"lint", "mypy", "test", "dev"} -RUNTIME_EXTRAS = ( - set(metadata.metadata(DISTRIBUTION_NAME).get_all("Provides-Extra")) - DEV_EXTRAS -) +ALL_EXTRAS = metadata.metadata(DISTRIBUTION_NAME).get_all("Provides-Extra") +assert ALL_EXTRAS is not None +RUNTIME_EXTRAS = set(ALL_EXTRAS) - DEV_EXTRAS VERSION = metadata.version(DISTRIBUTION_NAME) diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index cde4a0780f..f693ba2a8c 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -291,7 +291,8 @@ class _PerHostRatelimiter: if self.metrics_name: rate_limit_reject_counter.labels(self.metrics_name).inc() raise LimitExceededError( - retry_after_ms=int(self.window_size / self.sleep_limit) + limiter_name="rc_federation", + retry_after_ms=int(self.window_size / self.sleep_limit), ) self.request_times.append(time_now) diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index 27e9fc976c..0e1f907667 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -19,6 +19,7 @@ from typing import TYPE_CHECKING, Any, Optional, Type from synapse.api.errors import CodeMessageException from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage import DataStore +from synapse.types import StrCollection from synapse.util import Clock if TYPE_CHECKING: @@ -116,6 +117,30 @@ async def get_retry_limiter( ) +async def filter_destinations_by_retry_limiter( + destinations: StrCollection, + clock: Clock, + store: DataStore, + retry_due_within_ms: int = 0, +) -> StrCollection: + """Filter down the list of destinations to only those that will are either + alive or due for a retry (within `retry_due_within_ms`) + """ + if not destinations: + return destinations + + retry_timings = await store.get_destination_retry_timings_batch(destinations) + + now = int(clock.time_msec()) + + return [ + destination + for destination, timings in retry_timings.items() + if timings is None + or timings.retry_last_ts + timings.retry_interval <= now + retry_due_within_ms + ] + + class RetryDestinationLimiter: def __init__( self, @@ -128,6 +153,7 @@ class RetryDestinationLimiter: backoff_on_failure: bool = True, notifier: Optional["Notifier"] = None, replication_client: Optional["ReplicationCommandHandler"] = None, + backoff_on_all_error_codes: bool = False, ): """Marks the destination as "down" if an exception is thrown in the context, except for CodeMessageException with code < 500. @@ -147,6 +173,9 @@ class RetryDestinationLimiter: backoff_on_failure: set to False if we should not increase the retry interval on a failure. + + backoff_on_all_error_codes: Whether we should back off on any + error code. """ self.clock = clock self.store = store @@ -156,6 +185,7 @@ class RetryDestinationLimiter: self.retry_interval = retry_interval self.backoff_on_404 = backoff_on_404 self.backoff_on_failure = backoff_on_failure + self.backoff_on_all_error_codes = backoff_on_all_error_codes self.notifier = notifier self.replication_client = replication_client @@ -179,6 +209,7 @@ class RetryDestinationLimiter: exc_val: Optional[BaseException], exc_tb: Optional[TracebackType], ) -> None: + success = exc_type is None valid_err_code = False if exc_type is None: valid_err_code = True @@ -195,7 +226,9 @@ class RetryDestinationLimiter: # won't accept our requests for at least a while. # 429 is us being aggressively rate limited, so lets rate limit # ourselves. - if exc_val.code == 404 and self.backoff_on_404: + if self.backoff_on_all_error_codes: + valid_err_code = False + elif exc_val.code == 404 and self.backoff_on_404: valid_err_code = False elif exc_val.code in (401, 429): valid_err_code = False @@ -204,7 +237,7 @@ class RetryDestinationLimiter: else: valid_err_code = False - if valid_err_code: + if success: # We connected successfully. if not self.retry_interval: return @@ -215,6 +248,12 @@ class RetryDestinationLimiter: self.failure_ts = None retry_last_ts = 0 self.retry_interval = 0 + elif valid_err_code: + # We got a potentially valid error code back. We don't reset the + # timers though, as the other side might actually be down anyway + # (e.g. some deprovisioned servers will always return a 404 or 403, + # and we don't want to keep resetting the retry timers for them). + return elif not self.backoff_on_failure: return else: diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index 4aea64b338..9e89aeb748 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -57,14 +57,13 @@ class TaskScheduler: the code launching the task. You can also specify the `result` (and/or an `error`) when returning from the function. - The reconciliation loop runs every 5 mns, so this is not a precise scheduler. When wanting - to launch now, the launch will still not happen before the next loop run. - - Tasks will be run on the worker specified with `run_background_tasks_on` config, - or the main one by default. + The reconciliation loop runs every minute, so this is not a precise scheduler. There is a limit of 10 concurrent tasks, so tasks may be delayed if the pool is already full. In this regard, please take great care that scheduled tasks can actually finished. For now there is no mechanism to stop a running task if it is stuck. + + Tasks will be run on the worker specified with `run_background_tasks_on` config, + or the main one by default. """ # Precision of the scheduler, evaluation of tasks to run will only happen @@ -85,7 +84,7 @@ class TaskScheduler: self._actions: Dict[ str, Callable[ - [ScheduledTask, bool], + [ScheduledTask], Awaitable[Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]], ], ] = {} @@ -98,11 +97,13 @@ class TaskScheduler: "handle_scheduled_tasks", self._handle_scheduled_tasks, ) + else: + self.replication_client = hs.get_replication_command_handler() def register_action( self, function: Callable[ - [ScheduledTask, bool], + [ScheduledTask], Awaitable[Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]], ], action_name: str, @@ -115,10 +116,9 @@ class TaskScheduler: calling `schedule_task` but rather in an `__init__` method. Args: - function: The function to be executed for this action. The parameters - passed to the function when launched are the `ScheduledTask` being run, - and a `first_launch` boolean to signal if it's a resumed task or the first - launch of it. The function should return a tuple of new `status`, `result` + function: The function to be executed for this action. The parameter + passed to the function when launched is the `ScheduledTask` being run. + The function should return a tuple of new `status`, `result` and `error` as specified in `ScheduledTask`. action_name: The name of the action to be associated with the function """ @@ -171,6 +171,12 @@ class TaskScheduler: ) await self._store.insert_scheduled_task(task) + if status == TaskStatus.ACTIVE: + if self._run_background_tasks: + await self._launch_task(task) + else: + self.replication_client.send_new_active_task(task.id) + return task.id async def update_task( @@ -265,21 +271,13 @@ class TaskScheduler: Args: id: id of the task to delete """ - if self.task_is_running(id): - raise Exception(f"Task {id} is currently running and can't be deleted") + task = await self.get_task(id) + if task is None: + raise Exception(f"Task {id} does not exist") + if task.status == TaskStatus.ACTIVE: + raise Exception(f"Task {id} is currently ACTIVE and can't be deleted") await self._store.delete_scheduled_task(id) - def task_is_running(self, id: str) -> bool: - """Check if a task is currently running. - - Can only be called from the worker handling the task scheduling. - - Args: - id: id of the task to check - """ - assert self._run_background_tasks - return id in self._running_tasks - async def _handle_scheduled_tasks(self) -> None: """Main loop taking care of launching tasks and cleaning up old ones.""" await self._launch_scheduled_tasks() @@ -288,29 +286,11 @@ class TaskScheduler: async def _launch_scheduled_tasks(self) -> None: """Retrieve and launch scheduled tasks that should be running at that time.""" for task in await self.get_tasks(statuses=[TaskStatus.ACTIVE]): - if not self.task_is_running(task.id): - if ( - len(self._running_tasks) - < TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS - ): - await self._launch_task(task, first_launch=False) - else: - if ( - self._clock.time_msec() - > task.timestamp + TaskScheduler.LAST_UPDATE_BEFORE_WARNING_MS - ): - logger.warn( - f"Task {task.id} (action {task.action}) has seen no update for more than 24h and may be stuck" - ) + await self._launch_task(task) for task in await self.get_tasks( statuses=[TaskStatus.SCHEDULED], max_timestamp=self._clock.time_msec() ): - if ( - not self.task_is_running(task.id) - and len(self._running_tasks) - < TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS - ): - await self._launch_task(task, first_launch=True) + await self._launch_task(task) running_tasks_gauge.set(len(self._running_tasks)) @@ -320,27 +300,27 @@ class TaskScheduler: statuses=[TaskStatus.FAILED, TaskStatus.COMPLETE] ): # FAILED and COMPLETE tasks should never be running - assert not self.task_is_running(task.id) + assert task.id not in self._running_tasks if ( self._clock.time_msec() > task.timestamp + TaskScheduler.KEEP_TASKS_FOR_MS ): await self._store.delete_scheduled_task(task.id) - async def _launch_task(self, task: ScheduledTask, first_launch: bool) -> None: + async def _launch_task(self, task: ScheduledTask) -> None: """Launch a scheduled task now. Args: task: the task to launch - first_launch: `True` if it's the first time is launched, `False` otherwise """ - assert task.action in self._actions + assert self._run_background_tasks + assert task.action in self._actions function = self._actions[task.action] async def wrapper() -> None: try: - (status, result, error) = await function(task, first_launch) + (status, result, error) = await function(task) except Exception: f = Failure() logger.error( @@ -360,6 +340,20 @@ class TaskScheduler: ) self._running_tasks.remove(task.id) + if len(self._running_tasks) >= TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS: + return + + if ( + self._clock.time_msec() + > task.timestamp + TaskScheduler.LAST_UPDATE_BEFORE_WARNING_MS + ): + logger.warn( + f"Task {task.id} (action {task.action}) has seen no update for more than 24h and may be stuck" + ) + + if task.id in self._running_tasks: + return + self._running_tasks.add(task.id) await self.update_task(task.id, status=TaskStatus.ACTIVE) description = f"{task.id}-{task.action}" |