summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/18521.feature1
-rw-r--r--synapse/appservice/scheduler.py36
-rw-r--r--synapse/rest/client/appservice_ping.py7
-rw-r--r--tests/appservice/test_scheduler.py37
4 files changed, 77 insertions, 4 deletions
diff --git a/changelog.d/18521.feature b/changelog.d/18521.feature
new file mode 100644

index 0000000000..29da172e91 --- /dev/null +++ b/changelog.d/18521.feature
@@ -0,0 +1 @@ +Successful requests to `/_matrix/app/v1/ping` will now force Synapse to reattempt delivering transactions to appservices. diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 7994da0868..cba08dde85 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py
@@ -2,7 +2,7 @@ # This file is licensed under the Affero General Public License (AGPL) version 3. # # Copyright 2015, 2016 OpenMarket Ltd -# Copyright (C) 2023 New Vector, Ltd +# Copyright (C) 2023, 2025 New Vector, Ltd # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as @@ -70,6 +70,8 @@ from typing import ( Tuple, ) +from twisted.internet.interfaces import IDelayedCall + from synapse.appservice import ( ApplicationService, ApplicationServiceState, @@ -450,6 +452,20 @@ class _TransactionController: recoverer.recover() logger.info("Now %i active recoverers", len(self.recoverers)) + def force_retry(self, service: ApplicationService) -> None: + """Forces a Recoverer to attempt delivery of transations immediately. + + Args: + service: + """ + recoverer = self.recoverers.get(service.id) + if not recoverer: + # No need to force a retry on a happy AS. + logger.info(f"{service.id} is not in recovery, not forcing retry") + return + + recoverer.force_retry() + async def _is_service_up(self, service: ApplicationService) -> bool: state = await self.store.get_appservice_state(service) return state == ApplicationServiceState.UP or state is None @@ -482,11 +498,12 @@ class _Recoverer: self.service = service self.callback = callback self.backoff_counter = 1 + self.scheduled_recovery: Optional[IDelayedCall] = None def recover(self) -> None: delay = 2**self.backoff_counter logger.info("Scheduling retries on %s in %fs", self.service.id, delay) - self.clock.call_later( + self.scheduled_recovery = self.clock.call_later( delay, run_as_background_process, "as-recoverer", self.retry ) @@ -496,6 +513,21 @@ class _Recoverer: self.backoff_counter += 1 self.recover() + def force_retry(self) -> None: + """Cancels the existing timer and forces an immediate retry in the background. + + Args: + service: + """ + # Prevent the existing backoff from occuring + if self.scheduled_recovery: + self.clock.cancel_call_later(self.scheduled_recovery) + # Run a retry, which will resechedule a recovery if it fails. + run_as_background_process( + "retry", + self.retry, + ) + async def retry(self) -> None: logger.info("Starting retries on %s", self.service.id) try: diff --git a/synapse/rest/client/appservice_ping.py b/synapse/rest/client/appservice_ping.py
index d6b4e32453..1f9662a95a 100644 --- a/synapse/rest/client/appservice_ping.py +++ b/synapse/rest/client/appservice_ping.py
@@ -2,7 +2,7 @@ # This file is licensed under the Affero General Public License (AGPL) version 3. # # Copyright 2023 Tulir Asokan -# Copyright (C) 2023 New Vector, Ltd +# Copyright (C) 2023, 2025 New Vector, Ltd # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as @@ -53,6 +53,7 @@ class AppservicePingRestServlet(RestServlet): def __init__(self, hs: "HomeServer"): super().__init__() self.as_api = hs.get_application_service_api() + self.scheduler = hs.get_application_service_scheduler() self.auth = hs.get_auth() async def on_POST( @@ -85,6 +86,10 @@ class AppservicePingRestServlet(RestServlet): start = time.monotonic() try: await self.as_api.ping(requester.app_service, txn_id) + + # We got a OK response, so if the AS needs to be recovered then lets recover it now. + # This sets off a task in the background and so is safe to execute and forget. + self.scheduler.txn_ctrl.force_retry(requester.app_service) except RequestTimedOutError as e: raise SynapseError( HTTPStatus.GATEWAY_TIMEOUT, diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py
index 730b00a9fb..a5bf7e0635 100644 --- a/tests/appservice/test_scheduler.py +++ b/tests/appservice/test_scheduler.py
@@ -2,7 +2,7 @@ # This file is licensed under the Affero General Public License (AGPL) version 3. # # Copyright 2015, 2016 OpenMarket Ltd -# Copyright (C) 2023 New Vector, Ltd +# Copyright (C) 2023, 2025 New Vector, Ltd # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as @@ -234,6 +234,41 @@ class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): self.assertEqual(1, txn.complete.call_count) self.callback.assert_called_once_with(self.recoverer) + def test_recover_force_retry(self) -> None: + txn = Mock() + txns = [txn, None] + pop_txn = False + + def take_txn( + *args: object, **kwargs: object + ) -> "defer.Deferred[Optional[Mock]]": + if pop_txn: + return defer.succeed(txns.pop(0)) + else: + return defer.succeed(txn) + + self.store.get_oldest_unsent_txn = Mock(side_effect=take_txn) + + # Start the recovery, and then fail the first attempt. + self.recoverer.recover() + self.assertEqual(0, self.store.get_oldest_unsent_txn.call_count) + txn.send = AsyncMock(return_value=False) + txn.complete = AsyncMock(return_value=None) + self.clock.advance_time(2) + self.assertEqual(1, txn.send.call_count) + self.assertEqual(0, txn.complete.call_count) + self.assertEqual(0, self.callback.call_count) + + # Now allow the send to succeed, and force a retry. + pop_txn = True # returns the txn the first time, then no more. + txn.send = AsyncMock(return_value=True) # successfully send the txn + self.recoverer.force_retry() + self.assertEqual(1, txn.send.call_count) # new mock reset call count + self.assertEqual(1, txn.complete.call_count) + + # Ensure we call the callback to say we're done! + self.callback.assert_called_once_with(self.recoverer) + # Corresponds to synapse.appservice.scheduler._TransactionController.send TxnCtrlArgs: TypeAlias = """