summary refs log tree commit diff
path: root/synapse/appservice/scheduler.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/appservice/scheduler.py37
1 files changed, 35 insertions, 2 deletions
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py

index bec83419a2..cba08dde85 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py
@@ -2,7 +2,7 @@ # This file is licensed under the Affero General Public License (AGPL) version 3. # # Copyright 2015, 2016 OpenMarket Ltd -# Copyright (C) 2023 New Vector, Ltd +# Copyright (C) 2023, 2025 New Vector, Ltd # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as @@ -54,6 +54,7 @@ UP & quit +---------- YES SUCCESS This is all tied together by the AppServiceScheduler which DIs the required components. """ + import logging from typing import ( TYPE_CHECKING, @@ -69,6 +70,8 @@ from typing import ( Tuple, ) +from twisted.internet.interfaces import IDelayedCall + from synapse.appservice import ( ApplicationService, ApplicationServiceState, @@ -449,6 +452,20 @@ class _TransactionController: recoverer.recover() logger.info("Now %i active recoverers", len(self.recoverers)) + def force_retry(self, service: ApplicationService) -> None: + """Forces a Recoverer to attempt delivery of transations immediately. + + Args: + service: + """ + recoverer = self.recoverers.get(service.id) + if not recoverer: + # No need to force a retry on a happy AS. + logger.info(f"{service.id} is not in recovery, not forcing retry") + return + + recoverer.force_retry() + async def _is_service_up(self, service: ApplicationService) -> bool: state = await self.store.get_appservice_state(service) return state == ApplicationServiceState.UP or state is None @@ -481,11 +498,12 @@ class _Recoverer: self.service = service self.callback = callback self.backoff_counter = 1 + self.scheduled_recovery: Optional[IDelayedCall] = None def recover(self) -> None: delay = 2**self.backoff_counter logger.info("Scheduling retries on %s in %fs", self.service.id, delay) - self.clock.call_later( + self.scheduled_recovery = self.clock.call_later( delay, run_as_background_process, "as-recoverer", self.retry ) @@ -495,6 +513,21 @@ class _Recoverer: self.backoff_counter += 1 self.recover() + def force_retry(self) -> None: + """Cancels the existing timer and forces an immediate retry in the background. + + Args: + service: + """ + # Prevent the existing backoff from occuring + if self.scheduled_recovery: + self.clock.cancel_call_later(self.scheduled_recovery) + # Run a retry, which will resechedule a recovery if it fails. + run_as_background_process( + "retry", + self.retry, + ) + async def retry(self) -> None: logger.info("Starting retries on %s", self.service.id) try: