diff options
author | Andrew Morgan <andrew@amorgan.xyz> | 2021-12-03 19:22:22 +0000 |
---|---|---|
committer | Andrew Morgan <andrew@amorgan.xyz> | 2021-12-03 20:03:09 +0000 |
commit | 403490de8b633327e300f249bfd798097bc12c8a (patch) | |
tree | 5d0df601c6ca2e3f36a44b1dbee6664dde27a683 | |
parent | Add to-device messages as their own special section in AS txns (diff) | |
download | synapse-403490de8b633327e300f249bfd798097bc12c8a.tar.xz |
Insert to-device messages into the new to-device part of AS txns
-rw-r--r-- | synapse/appservice/scheduler.py | 34 | ||||
-rw-r--r-- | synapse/handlers/appservice.py | 4 |
2 files changed, 32 insertions, 6 deletions
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 31b297c7a1..dae952dc13 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -65,6 +65,9 @@ MAX_PERSISTENT_EVENTS_PER_TRANSACTION = 100 # Maximum number of ephemeral events to provide in an AS transaction. MAX_EPHEMERAL_EVENTS_PER_TRANSACTION = 100 +# Maximum number of to-device messages to provide in an AS transaction. +MAX_TO_DEVICE_MESSAGES_PER_TRANSACTION = 100 + class ApplicationServiceScheduler: """Public facing API for this module. Does the required DI to tie the @@ -96,6 +99,7 @@ class ApplicationServiceScheduler: appservice: ApplicationService, events: Optional[Iterable[EventBase]] = None, ephemeral: Optional[Iterable[JsonDict]] = None, + to_device_messages: Optional[Iterable[JsonDict]] = None, ) -> None: """ Enqueue some data to be sent off to an application service. @@ -103,6 +107,9 @@ class ApplicationServiceScheduler: appservice: The application service to create and send a transaction to. events: The persistent room events to send. ephemeral: The ephemeral events to send. + to_device_messages: The to-device messages to send. These differ from normal + to-device messages sent to clients, as they have 'to_device_id' and + 'to_user_id' fields. """ # We purposefully allow this method to run with empty events/ephemeral # iterables, so that callers do not need to check iterable size themselves. @@ -113,6 +120,10 @@ class ApplicationServiceScheduler: self.queuer.queued_events.setdefault(appservice.id, []).extend(events) if ephemeral: self.queuer.queued_ephemeral.setdefault(appservice.id, []).extend(ephemeral) + if to_device_messages: + self.queuer.queued_to_device_messages.setdefault(appservice.id, []).extend( + to_device_messages + ) # Kick off a new application service transaction self.queuer.start_background_request(appservice) @@ -131,6 +142,8 @@ class _ServiceQueuer: self.queued_events: Dict[str, List[EventBase]] = {} # dict of {service_id: [event_json]} self.queued_ephemeral: Dict[str, List[JsonDict]] = {} + # dict of {service_id: [to_device_message_json]} + self.queued_to_device_messages: Dict[str, List[JsonDict]] = {} # the appservices which currently have a transaction in flight self.requests_in_flight = set() @@ -162,11 +175,21 @@ class _ServiceQueuer: ephemeral = all_events_ephemeral[:MAX_EPHEMERAL_EVENTS_PER_TRANSACTION] del all_events_ephemeral[:MAX_EPHEMERAL_EVENTS_PER_TRANSACTION] - if not events and not ephemeral: + all_to_device_messages = self.queued_to_device_messages.get( + service.id, [] + ) + to_device_messages_to_send = all_to_device_messages[ + :MAX_TO_DEVICE_MESSAGES_PER_TRANSACTION + ] + del all_to_device_messages[:MAX_TO_DEVICE_MESSAGES_PER_TRANSACTION] + + if not events and not ephemeral and not to_device_messages_to_send: return try: - await self.txn_ctrl.send(service, events, ephemeral) + await self.txn_ctrl.send( + service, events, ephemeral, to_device_messages_to_send + ) except Exception: logger.exception("AS request failed") finally: @@ -203,6 +226,7 @@ class _TransactionController: service: ApplicationService, events: List[EventBase], ephemeral: Optional[List[JsonDict]] = None, + to_device_messages: Optional[List[JsonDict]] = None, ) -> None: """ Create a transaction with the given data and send to the provided @@ -212,10 +236,14 @@ class _TransactionController: service: The application service to send the transaction to. events: The persistent events to include in the transaction. ephemeral: The ephemeral events to include in the transaction. + to_device_messages: The to-device messages to include in the transaction. """ try: txn = await self.store.create_appservice_txn( - service=service, events=events, ephemeral=ephemeral or [] + service=service, + events=events, + ephemeral=ephemeral or [], + to_device_messages=to_device_messages or [], ) service_is_up = await self._is_service_up(service) if service_is_up: diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 5a28ceac43..c92668642a 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -327,10 +327,8 @@ class ApplicationServicesHandler: to_device_messages = await self._get_to_device_messages( service, new_token, users ) - # REVIEW: In a subsequent commit, we'll move this to a to-device-specific - # key in the AS transaction. self.scheduler.enqueue_for_appservice( - service, ephemeral=to_device_messages + service, to_device_messages=to_device_messages ) # Persist the latest handled stream token for this appservice |