diff options
Diffstat (limited to 'synapse/appservice/scheduler.py')
-rw-r--r-- | synapse/appservice/scheduler.py | 50 |
1 files changed, 23 insertions, 27 deletions
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 430ffbcd1f..18a30bc376 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -57,6 +57,7 @@ from typing import ( Iterable, List, Optional, + Sequence, Set, Tuple, ) @@ -64,7 +65,7 @@ from typing import ( from synapse.appservice import ( ApplicationService, ApplicationServiceState, - TransactionOneTimeKeyCounts, + TransactionOneTimeKeysCount, TransactionUnusedFallbackKeys, ) from synapse.appservice.api import ApplicationServiceApi @@ -72,7 +73,7 @@ from synapse.events import EventBase from synapse.logging.context import run_in_background from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.databases.main import DataStore -from synapse.types import DeviceListUpdates, JsonDict +from synapse.types import DeviceListUpdates, JsonMapping from synapse.util import Clock if TYPE_CHECKING: @@ -120,8 +121,8 @@ class ApplicationServiceScheduler: self, appservice: ApplicationService, events: Optional[Collection[EventBase]] = None, - ephemeral: Optional[Collection[JsonDict]] = None, - to_device_messages: Optional[Collection[JsonDict]] = None, + ephemeral: Optional[Collection[JsonMapping]] = None, + to_device_messages: Optional[Collection[JsonMapping]] = None, device_list_summary: Optional[DeviceListUpdates] = None, ) -> None: """ @@ -179,9 +180,9 @@ class _ServiceQueuer: # dict of {service_id: [events]} self.queued_events: Dict[str, List[EventBase]] = {} # dict of {service_id: [events]} - self.queued_ephemeral: Dict[str, List[JsonDict]] = {} + self.queued_ephemeral: Dict[str, List[JsonMapping]] = {} # dict of {service_id: [to_device_message_json]} - self.queued_to_device_messages: Dict[str, List[JsonDict]] = {} + self.queued_to_device_messages: Dict[str, List[JsonMapping]] = {} # dict of {service_id: [device_list_summary]} self.queued_device_list_summaries: Dict[str, List[DeviceListUpdates]] = {} @@ -199,9 +200,7 @@ class _ServiceQueuer: if service.id in self.requests_in_flight: return - run_as_background_process( - "as-sender-%s" % (service.id,), self._send_request, service - ) + run_as_background_process("as-sender", self._send_request, service) async def _send_request(self, service: ApplicationService) -> None: # sanity-check: we shouldn't get here if this service already has a sender @@ -258,7 +257,7 @@ class _ServiceQueuer: ): return - one_time_key_counts: Optional[TransactionOneTimeKeyCounts] = None + one_time_keys_count: Optional[TransactionOneTimeKeysCount] = None unused_fallback_keys: Optional[TransactionUnusedFallbackKeys] = None if ( @@ -269,7 +268,7 @@ class _ServiceQueuer: # for the users which are mentioned in this transaction, # as well as the appservice's sender. ( - one_time_key_counts, + one_time_keys_count, unused_fallback_keys, ) = await self._compute_msc3202_otk_counts_and_fallback_keys( service, events, ephemeral, to_device_messages_to_send @@ -281,7 +280,7 @@ class _ServiceQueuer: events, ephemeral, to_device_messages_to_send, - one_time_key_counts, + one_time_keys_count, unused_fallback_keys, device_list_summary, ) @@ -294,9 +293,9 @@ class _ServiceQueuer: self, service: ApplicationService, events: Iterable[EventBase], - ephemerals: Iterable[JsonDict], - to_device_messages: Iterable[JsonDict], - ) -> Tuple[TransactionOneTimeKeyCounts, TransactionUnusedFallbackKeys]: + ephemerals: Iterable[JsonMapping], + to_device_messages: Iterable[JsonMapping], + ) -> Tuple[TransactionOneTimeKeysCount, TransactionUnusedFallbackKeys]: """ Given a list of the events, ephemeral messages and to-device messages, - first computes a list of application services users that may have @@ -364,10 +363,10 @@ class _TransactionController: async def send( self, service: ApplicationService, - events: List[EventBase], - ephemeral: Optional[List[JsonDict]] = None, - to_device_messages: Optional[List[JsonDict]] = None, - one_time_key_counts: Optional[TransactionOneTimeKeyCounts] = None, + events: Sequence[EventBase], + ephemeral: Optional[List[JsonMapping]] = None, + to_device_messages: Optional[List[JsonMapping]] = None, + one_time_keys_count: Optional[TransactionOneTimeKeysCount] = None, unused_fallback_keys: Optional[TransactionUnusedFallbackKeys] = None, device_list_summary: Optional[DeviceListUpdates] = None, ) -> None: @@ -380,7 +379,7 @@ class _TransactionController: events: The persistent events to include in the transaction. ephemeral: The ephemeral events to include in the transaction. to_device_messages: The to-device messages to include in the transaction. - one_time_key_counts: Counts of remaining one-time keys for relevant + one_time_keys_count: Counts of remaining one-time keys for relevant appservice devices in the transaction. unused_fallback_keys: Lists of unused fallback keys for relevant appservice devices in the transaction. @@ -397,7 +396,7 @@ class _TransactionController: events=events, ephemeral=ephemeral or [], to_device_messages=to_device_messages or [], - one_time_key_counts=one_time_key_counts or {}, + one_time_keys_count=one_time_keys_count or {}, unused_fallback_keys=unused_fallback_keys or {}, device_list_summary=device_list_summary or DeviceListUpdates(), ) @@ -477,14 +476,11 @@ class _Recoverer: self.backoff_counter = 1 def recover(self) -> None: - def _retry() -> None: - run_as_background_process( - "as-recoverer-%s" % (self.service.id,), self.retry - ) - delay = 2**self.backoff_counter logger.info("Scheduling retries on %s in %fs", self.service.id, delay) - self.clock.call_later(delay, _retry) + self.clock.call_later( + delay, run_as_background_process, "as-recoverer", self.retry + ) def _backoff(self) -> None: # cap the backoff to be around 8.5min => (2^9) = 512 secs |