diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index bec83419a2..cba08dde85 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -2,7 +2,7 @@
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright 2015, 2016 OpenMarket Ltd
-# Copyright (C) 2023 New Vector, Ltd
+# Copyright (C) 2023, 2025 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
@@ -54,6 +54,7 @@ UP & quit +---------- YES SUCCESS
This is all tied together by the AppServiceScheduler which DIs the required
components.
"""
+
import logging
from typing import (
TYPE_CHECKING,
@@ -69,6 +70,8 @@ from typing import (
Tuple,
)
+from twisted.internet.interfaces import IDelayedCall
+
from synapse.appservice import (
ApplicationService,
ApplicationServiceState,
@@ -449,6 +452,20 @@ class _TransactionController:
recoverer.recover()
logger.info("Now %i active recoverers", len(self.recoverers))
+ def force_retry(self, service: ApplicationService) -> None:
+ """Forces a Recoverer to attempt delivery of transations immediately.
+
+ Args:
+ service:
+ """
+ recoverer = self.recoverers.get(service.id)
+ if not recoverer:
+ # No need to force a retry on a happy AS.
+ logger.info(f"{service.id} is not in recovery, not forcing retry")
+ return
+
+ recoverer.force_retry()
+
async def _is_service_up(self, service: ApplicationService) -> bool:
state = await self.store.get_appservice_state(service)
return state == ApplicationServiceState.UP or state is None
@@ -481,11 +498,12 @@ class _Recoverer:
self.service = service
self.callback = callback
self.backoff_counter = 1
+ self.scheduled_recovery: Optional[IDelayedCall] = None
def recover(self) -> None:
delay = 2**self.backoff_counter
logger.info("Scheduling retries on %s in %fs", self.service.id, delay)
- self.clock.call_later(
+ self.scheduled_recovery = self.clock.call_later(
delay, run_as_background_process, "as-recoverer", self.retry
)
@@ -495,6 +513,21 @@ class _Recoverer:
self.backoff_counter += 1
self.recover()
+ def force_retry(self) -> None:
+ """Cancels the existing timer and forces an immediate retry in the background.
+
+ Args:
+ service:
+ """
+ # Prevent the existing backoff from occuring
+ if self.scheduled_recovery:
+ self.clock.cancel_call_later(self.scheduled_recovery)
+ # Run a retry, which will resechedule a recovery if it fails.
+ run_as_background_process(
+ "retry",
+ self.retry,
+ )
+
async def retry(self) -> None:
logger.info("Starting retries on %s", self.service.id)
try:
|