summary refs log tree commit diff
path: root/synapse/appservice
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/appservice/__init__.py2
-rw-r--r--synapse/appservice/scheduler.py37
2 files changed, 37 insertions, 2 deletions
diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py

index a96cdbf1e7..6ee5240c4e 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py
@@ -87,6 +87,7 @@ class ApplicationService: ip_range_whitelist: Optional[IPSet] = None, supports_ephemeral: bool = False, msc3202_transaction_extensions: bool = False, + msc4190_device_management: bool = False, ): self.token = token self.url = ( @@ -100,6 +101,7 @@ class ApplicationService: self.ip_range_whitelist = ip_range_whitelist self.supports_ephemeral = supports_ephemeral self.msc3202_transaction_extensions = msc3202_transaction_extensions + self.msc4190_device_management = msc4190_device_management if "|" in self.id: raise Exception("application service ID cannot contain '|' character") diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index bec83419a2..cba08dde85 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py
@@ -2,7 +2,7 @@ # This file is licensed under the Affero General Public License (AGPL) version 3. # # Copyright 2015, 2016 OpenMarket Ltd -# Copyright (C) 2023 New Vector, Ltd +# Copyright (C) 2023, 2025 New Vector, Ltd # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as @@ -54,6 +54,7 @@ UP & quit +---------- YES SUCCESS This is all tied together by the AppServiceScheduler which DIs the required components. """ + import logging from typing import ( TYPE_CHECKING, @@ -69,6 +70,8 @@ from typing import ( Tuple, ) +from twisted.internet.interfaces import IDelayedCall + from synapse.appservice import ( ApplicationService, ApplicationServiceState, @@ -449,6 +452,20 @@ class _TransactionController: recoverer.recover() logger.info("Now %i active recoverers", len(self.recoverers)) + def force_retry(self, service: ApplicationService) -> None: + """Forces a Recoverer to attempt delivery of transations immediately. + + Args: + service: + """ + recoverer = self.recoverers.get(service.id) + if not recoverer: + # No need to force a retry on a happy AS. + logger.info(f"{service.id} is not in recovery, not forcing retry") + return + + recoverer.force_retry() + async def _is_service_up(self, service: ApplicationService) -> bool: state = await self.store.get_appservice_state(service) return state == ApplicationServiceState.UP or state is None @@ -481,11 +498,12 @@ class _Recoverer: self.service = service self.callback = callback self.backoff_counter = 1 + self.scheduled_recovery: Optional[IDelayedCall] = None def recover(self) -> None: delay = 2**self.backoff_counter logger.info("Scheduling retries on %s in %fs", self.service.id, delay) - self.clock.call_later( + self.scheduled_recovery = self.clock.call_later( delay, run_as_background_process, "as-recoverer", self.retry ) @@ -495,6 +513,21 @@ class _Recoverer: self.backoff_counter += 1 self.recover() + def force_retry(self) -> None: + """Cancels the existing timer and forces an immediate retry in the background. + + Args: + service: + """ + # Prevent the existing backoff from occuring + if self.scheduled_recovery: + self.clock.cancel_call_later(self.scheduled_recovery) + # Run a retry, which will resechedule a recovery if it fails. + run_as_background_process( + "retry", + self.retry, + ) + async def retry(self) -> None: logger.info("Starting retries on %s", self.service.id) try: