diff options
Diffstat (limited to 'synapse/appservice/scheduler.py')
-rw-r--r-- | synapse/appservice/scheduler.py | 97 |
1 files changed, 75 insertions, 22 deletions
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 185e3a5278..c42fa32fff 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -48,7 +48,16 @@ This is all tied together by the AppServiceScheduler which DIs the required components. """ import logging -from typing import TYPE_CHECKING, Awaitable, Callable, Dict, List, Optional, Set +from typing import ( + TYPE_CHECKING, + Awaitable, + Callable, + Collection, + Dict, + List, + Optional, + Set, +) from synapse.appservice import ApplicationService, ApplicationServiceState from synapse.appservice.api import ApplicationServiceApi @@ -71,6 +80,9 @@ MAX_PERSISTENT_EVENTS_PER_TRANSACTION = 100 # Maximum number of ephemeral events to provide in an AS transaction. MAX_EPHEMERAL_EVENTS_PER_TRANSACTION = 100 +# Maximum number of to-device messages to provide in an AS transaction. +MAX_TO_DEVICE_MESSAGES_PER_TRANSACTION = 100 + class ApplicationServiceScheduler: """Public facing API for this module. Does the required DI to tie the @@ -97,15 +109,40 @@ class ApplicationServiceScheduler: for service in services: self.txn_ctrl.start_recoverer(service) - def submit_event_for_as( - self, service: ApplicationService, event: EventBase + def enqueue_for_appservice( + self, + appservice: ApplicationService, + events: Optional[Collection[EventBase]] = None, + ephemeral: Optional[Collection[JsonDict]] = None, + to_device_messages: Optional[Collection[JsonDict]] = None, ) -> None: - self.queuer.enqueue_event(service, event) + """ + Enqueue some data to be sent off to an application service. - def submit_ephemeral_events_for_as( - self, service: ApplicationService, events: List[JsonDict] - ) -> None: - self.queuer.enqueue_ephemeral(service, events) + Args: + appservice: The application service to create and send a transaction to. + events: The persistent room events to send. + ephemeral: The ephemeral events to send. + to_device_messages: The to-device messages to send. These differ from normal + to-device messages sent to clients, as they have 'to_device_id' and + 'to_user_id' fields. + """ + # We purposefully allow this method to run with empty events/ephemeral + # collections, so that callers do not need to check iterable size themselves. + if not events and not ephemeral and not to_device_messages: + return + + if events: + self.queuer.queued_events.setdefault(appservice.id, []).extend(events) + if ephemeral: + self.queuer.queued_ephemeral.setdefault(appservice.id, []).extend(ephemeral) + if to_device_messages: + self.queuer.queued_to_device_messages.setdefault(appservice.id, []).extend( + to_device_messages + ) + + # Kick off a new application service transaction + self.queuer.start_background_request(appservice) class _ServiceQueuer: @@ -121,13 +158,15 @@ class _ServiceQueuer: self.queued_events: Dict[str, List[EventBase]] = {} # dict of {service_id: [events]} self.queued_ephemeral: Dict[str, List[JsonDict]] = {} + # dict of {service_id: [to_device_message_json]} + self.queued_to_device_messages: Dict[str, List[JsonDict]] = {} # the appservices which currently have a transaction in flight self.requests_in_flight: Set[str] = set() self.txn_ctrl = txn_ctrl self.clock = clock - def _start_background_request(self, service: ApplicationService) -> None: + def start_background_request(self, service: ApplicationService) -> None: # start a sender for this appservice if we don't already have one if service.id in self.requests_in_flight: return @@ -136,16 +175,6 @@ class _ServiceQueuer: "as-sender-%s" % (service.id,), self._send_request, service ) - def enqueue_event(self, service: ApplicationService, event: EventBase) -> None: - self.queued_events.setdefault(service.id, []).append(event) - self._start_background_request(service) - - def enqueue_ephemeral( - self, service: ApplicationService, events: List[JsonDict] - ) -> None: - self.queued_ephemeral.setdefault(service.id, []).extend(events) - self._start_background_request(service) - async def _send_request(self, service: ApplicationService) -> None: # sanity-check: we shouldn't get here if this service already has a sender # running. @@ -162,11 +191,21 @@ class _ServiceQueuer: ephemeral = all_events_ephemeral[:MAX_EPHEMERAL_EVENTS_PER_TRANSACTION] del all_events_ephemeral[:MAX_EPHEMERAL_EVENTS_PER_TRANSACTION] - if not events and not ephemeral: + all_to_device_messages = self.queued_to_device_messages.get( + service.id, [] + ) + to_device_messages_to_send = all_to_device_messages[ + :MAX_TO_DEVICE_MESSAGES_PER_TRANSACTION + ] + del all_to_device_messages[:MAX_TO_DEVICE_MESSAGES_PER_TRANSACTION] + + if not events and not ephemeral and not to_device_messages_to_send: return try: - await self.txn_ctrl.send(service, events, ephemeral) + await self.txn_ctrl.send( + service, events, ephemeral, to_device_messages_to_send + ) except Exception: logger.exception("AS request failed") finally: @@ -198,10 +237,24 @@ class _TransactionController: service: ApplicationService, events: List[EventBase], ephemeral: Optional[List[JsonDict]] = None, + to_device_messages: Optional[List[JsonDict]] = None, ) -> None: + """ + Create a transaction with the given data and send to the provided + application service. + + Args: + service: The application service to send the transaction to. + events: The persistent events to include in the transaction. + ephemeral: The ephemeral events to include in the transaction. + to_device_messages: The to-device messages to include in the transaction. + """ try: txn = await self.store.create_appservice_txn( - service=service, events=events, ephemeral=ephemeral or [] + service=service, + events=events, + ephemeral=ephemeral or [], + to_device_messages=to_device_messages or [], ) service_is_up = await self._is_service_up(service) if service_is_up: |