diff --git a/changelog.d/18521.feature b/changelog.d/18521.feature
new file mode 100644
index 0000000000..29da172e91
--- /dev/null
+++ b/changelog.d/18521.feature
@@ -0,0 +1 @@
+Successful requests to `/_matrix/app/v1/ping` will now force Synapse to reattempt delivering transactions to appservices.
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 7994da0868..cba08dde85 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -2,7 +2,7 @@
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright 2015, 2016 OpenMarket Ltd
-# Copyright (C) 2023 New Vector, Ltd
+# Copyright (C) 2023, 2025 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
@@ -70,6 +70,8 @@ from typing import (
Tuple,
)
+from twisted.internet.interfaces import IDelayedCall
+
from synapse.appservice import (
ApplicationService,
ApplicationServiceState,
@@ -450,6 +452,20 @@ class _TransactionController:
recoverer.recover()
logger.info("Now %i active recoverers", len(self.recoverers))
+ def force_retry(self, service: ApplicationService) -> None:
+ """Forces a Recoverer to attempt delivery of transations immediately.
+
+ Args:
+ service:
+ """
+ recoverer = self.recoverers.get(service.id)
+ if not recoverer:
+ # No need to force a retry on a happy AS.
+ logger.info(f"{service.id} is not in recovery, not forcing retry")
+ return
+
+ recoverer.force_retry()
+
async def _is_service_up(self, service: ApplicationService) -> bool:
state = await self.store.get_appservice_state(service)
return state == ApplicationServiceState.UP or state is None
@@ -482,11 +498,12 @@ class _Recoverer:
self.service = service
self.callback = callback
self.backoff_counter = 1
+ self.scheduled_recovery: Optional[IDelayedCall] = None
def recover(self) -> None:
delay = 2**self.backoff_counter
logger.info("Scheduling retries on %s in %fs", self.service.id, delay)
- self.clock.call_later(
+ self.scheduled_recovery = self.clock.call_later(
delay, run_as_background_process, "as-recoverer", self.retry
)
@@ -496,6 +513,21 @@ class _Recoverer:
self.backoff_counter += 1
self.recover()
+ def force_retry(self) -> None:
+ """Cancels the existing timer and forces an immediate retry in the background.
+
+ Args:
+ service:
+ """
+ # Prevent the existing backoff from occuring
+ if self.scheduled_recovery:
+ self.clock.cancel_call_later(self.scheduled_recovery)
+ # Run a retry, which will resechedule a recovery if it fails.
+ run_as_background_process(
+ "retry",
+ self.retry,
+ )
+
async def retry(self) -> None:
logger.info("Starting retries on %s", self.service.id)
try:
diff --git a/synapse/rest/client/appservice_ping.py b/synapse/rest/client/appservice_ping.py
index d6b4e32453..1f9662a95a 100644
--- a/synapse/rest/client/appservice_ping.py
+++ b/synapse/rest/client/appservice_ping.py
@@ -2,7 +2,7 @@
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright 2023 Tulir Asokan
-# Copyright (C) 2023 New Vector, Ltd
+# Copyright (C) 2023, 2025 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
@@ -53,6 +53,7 @@ class AppservicePingRestServlet(RestServlet):
def __init__(self, hs: "HomeServer"):
super().__init__()
self.as_api = hs.get_application_service_api()
+ self.scheduler = hs.get_application_service_scheduler()
self.auth = hs.get_auth()
async def on_POST(
@@ -85,6 +86,10 @@ class AppservicePingRestServlet(RestServlet):
start = time.monotonic()
try:
await self.as_api.ping(requester.app_service, txn_id)
+
+ # We got a OK response, so if the AS needs to be recovered then lets recover it now.
+ # This sets off a task in the background and so is safe to execute and forget.
+ self.scheduler.txn_ctrl.force_retry(requester.app_service)
except RequestTimedOutError as e:
raise SynapseError(
HTTPStatus.GATEWAY_TIMEOUT,
diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py
index 730b00a9fb..a5bf7e0635 100644
--- a/tests/appservice/test_scheduler.py
+++ b/tests/appservice/test_scheduler.py
@@ -2,7 +2,7 @@
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright 2015, 2016 OpenMarket Ltd
-# Copyright (C) 2023 New Vector, Ltd
+# Copyright (C) 2023, 2025 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
@@ -234,6 +234,41 @@ class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase):
self.assertEqual(1, txn.complete.call_count)
self.callback.assert_called_once_with(self.recoverer)
+ def test_recover_force_retry(self) -> None:
+ txn = Mock()
+ txns = [txn, None]
+ pop_txn = False
+
+ def take_txn(
+ *args: object, **kwargs: object
+ ) -> "defer.Deferred[Optional[Mock]]":
+ if pop_txn:
+ return defer.succeed(txns.pop(0))
+ else:
+ return defer.succeed(txn)
+
+ self.store.get_oldest_unsent_txn = Mock(side_effect=take_txn)
+
+ # Start the recovery, and then fail the first attempt.
+ self.recoverer.recover()
+ self.assertEqual(0, self.store.get_oldest_unsent_txn.call_count)
+ txn.send = AsyncMock(return_value=False)
+ txn.complete = AsyncMock(return_value=None)
+ self.clock.advance_time(2)
+ self.assertEqual(1, txn.send.call_count)
+ self.assertEqual(0, txn.complete.call_count)
+ self.assertEqual(0, self.callback.call_count)
+
+ # Now allow the send to succeed, and force a retry.
+ pop_txn = True # returns the txn the first time, then no more.
+ txn.send = AsyncMock(return_value=True) # successfully send the txn
+ self.recoverer.force_retry()
+ self.assertEqual(1, txn.send.call_count) # new mock reset call count
+ self.assertEqual(1, txn.complete.call_count)
+
+ # Ensure we call the callback to say we're done!
+ self.callback.assert_called_once_with(self.recoverer)
+
# Corresponds to synapse.appservice.scheduler._TransactionController.send
TxnCtrlArgs: TypeAlias = """
|