summary refs log tree commit diff
path: root/synapse/appservice/scheduler.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/appservice/scheduler.py')
-rw-r--r--synapse/appservice/scheduler.py50
1 files changed, 23 insertions, 27 deletions
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 430ffbcd1f..18a30bc376 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -57,6 +57,7 @@ from typing import (
     Iterable,
     List,
     Optional,
+    Sequence,
     Set,
     Tuple,
 )
@@ -64,7 +65,7 @@ from typing import (
 from synapse.appservice import (
     ApplicationService,
     ApplicationServiceState,
-    TransactionOneTimeKeyCounts,
+    TransactionOneTimeKeysCount,
     TransactionUnusedFallbackKeys,
 )
 from synapse.appservice.api import ApplicationServiceApi
@@ -72,7 +73,7 @@ from synapse.events import EventBase
 from synapse.logging.context import run_in_background
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage.databases.main import DataStore
-from synapse.types import DeviceListUpdates, JsonDict
+from synapse.types import DeviceListUpdates, JsonMapping
 from synapse.util import Clock
 
 if TYPE_CHECKING:
@@ -120,8 +121,8 @@ class ApplicationServiceScheduler:
         self,
         appservice: ApplicationService,
         events: Optional[Collection[EventBase]] = None,
-        ephemeral: Optional[Collection[JsonDict]] = None,
-        to_device_messages: Optional[Collection[JsonDict]] = None,
+        ephemeral: Optional[Collection[JsonMapping]] = None,
+        to_device_messages: Optional[Collection[JsonMapping]] = None,
         device_list_summary: Optional[DeviceListUpdates] = None,
     ) -> None:
         """
@@ -179,9 +180,9 @@ class _ServiceQueuer:
         # dict of {service_id: [events]}
         self.queued_events: Dict[str, List[EventBase]] = {}
         # dict of {service_id: [events]}
-        self.queued_ephemeral: Dict[str, List[JsonDict]] = {}
+        self.queued_ephemeral: Dict[str, List[JsonMapping]] = {}
         # dict of {service_id: [to_device_message_json]}
-        self.queued_to_device_messages: Dict[str, List[JsonDict]] = {}
+        self.queued_to_device_messages: Dict[str, List[JsonMapping]] = {}
         # dict of {service_id: [device_list_summary]}
         self.queued_device_list_summaries: Dict[str, List[DeviceListUpdates]] = {}
 
@@ -199,9 +200,7 @@ class _ServiceQueuer:
         if service.id in self.requests_in_flight:
             return
 
-        run_as_background_process(
-            "as-sender-%s" % (service.id,), self._send_request, service
-        )
+        run_as_background_process("as-sender", self._send_request, service)
 
     async def _send_request(self, service: ApplicationService) -> None:
         # sanity-check: we shouldn't get here if this service already has a sender
@@ -258,7 +257,7 @@ class _ServiceQueuer:
                 ):
                     return
 
-                one_time_key_counts: Optional[TransactionOneTimeKeyCounts] = None
+                one_time_keys_count: Optional[TransactionOneTimeKeysCount] = None
                 unused_fallback_keys: Optional[TransactionUnusedFallbackKeys] = None
 
                 if (
@@ -269,7 +268,7 @@ class _ServiceQueuer:
                     # for the users which are mentioned in this transaction,
                     # as well as the appservice's sender.
                     (
-                        one_time_key_counts,
+                        one_time_keys_count,
                         unused_fallback_keys,
                     ) = await self._compute_msc3202_otk_counts_and_fallback_keys(
                         service, events, ephemeral, to_device_messages_to_send
@@ -281,7 +280,7 @@ class _ServiceQueuer:
                         events,
                         ephemeral,
                         to_device_messages_to_send,
-                        one_time_key_counts,
+                        one_time_keys_count,
                         unused_fallback_keys,
                         device_list_summary,
                     )
@@ -294,9 +293,9 @@ class _ServiceQueuer:
         self,
         service: ApplicationService,
         events: Iterable[EventBase],
-        ephemerals: Iterable[JsonDict],
-        to_device_messages: Iterable[JsonDict],
-    ) -> Tuple[TransactionOneTimeKeyCounts, TransactionUnusedFallbackKeys]:
+        ephemerals: Iterable[JsonMapping],
+        to_device_messages: Iterable[JsonMapping],
+    ) -> Tuple[TransactionOneTimeKeysCount, TransactionUnusedFallbackKeys]:
         """
         Given a list of the events, ephemeral messages and to-device messages,
         - first computes a list of application services users that may have
@@ -364,10 +363,10 @@ class _TransactionController:
     async def send(
         self,
         service: ApplicationService,
-        events: List[EventBase],
-        ephemeral: Optional[List[JsonDict]] = None,
-        to_device_messages: Optional[List[JsonDict]] = None,
-        one_time_key_counts: Optional[TransactionOneTimeKeyCounts] = None,
+        events: Sequence[EventBase],
+        ephemeral: Optional[List[JsonMapping]] = None,
+        to_device_messages: Optional[List[JsonMapping]] = None,
+        one_time_keys_count: Optional[TransactionOneTimeKeysCount] = None,
         unused_fallback_keys: Optional[TransactionUnusedFallbackKeys] = None,
         device_list_summary: Optional[DeviceListUpdates] = None,
     ) -> None:
@@ -380,7 +379,7 @@ class _TransactionController:
             events: The persistent events to include in the transaction.
             ephemeral: The ephemeral events to include in the transaction.
             to_device_messages: The to-device messages to include in the transaction.
-            one_time_key_counts: Counts of remaining one-time keys for relevant
+            one_time_keys_count: Counts of remaining one-time keys for relevant
                 appservice devices in the transaction.
             unused_fallback_keys: Lists of unused fallback keys for relevant
                 appservice devices in the transaction.
@@ -397,7 +396,7 @@ class _TransactionController:
                 events=events,
                 ephemeral=ephemeral or [],
                 to_device_messages=to_device_messages or [],
-                one_time_key_counts=one_time_key_counts or {},
+                one_time_keys_count=one_time_keys_count or {},
                 unused_fallback_keys=unused_fallback_keys or {},
                 device_list_summary=device_list_summary or DeviceListUpdates(),
             )
@@ -477,14 +476,11 @@ class _Recoverer:
         self.backoff_counter = 1
 
     def recover(self) -> None:
-        def _retry() -> None:
-            run_as_background_process(
-                "as-recoverer-%s" % (self.service.id,), self.retry
-            )
-
         delay = 2**self.backoff_counter
         logger.info("Scheduling retries on %s in %fs", self.service.id, delay)
-        self.clock.call_later(delay, _retry)
+        self.clock.call_later(
+            delay, run_as_background_process, "as-recoverer", self.retry
+        )
 
     def _backoff(self) -> None:
         # cap the backoff to be around 8.5min => (2^9) = 512 secs