summary refs log tree commit diff
diff options
context:
space:
mode:
authorAndrew Morgan <andrew@amorgan.xyz>2021-12-03 19:22:22 +0000
committerAndrew Morgan <andrew@amorgan.xyz>2021-12-03 20:03:09 +0000
commit403490de8b633327e300f249bfd798097bc12c8a (patch)
tree5d0df601c6ca2e3f36a44b1dbee6664dde27a683
parentAdd to-device messages as their own special section in AS txns (diff)
downloadsynapse-403490de8b633327e300f249bfd798097bc12c8a.tar.xz
Insert to-device messages into the new to-device part of AS txns
-rw-r--r--synapse/appservice/scheduler.py34
-rw-r--r--synapse/handlers/appservice.py4
2 files changed, 32 insertions, 6 deletions
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 31b297c7a1..dae952dc13 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -65,6 +65,9 @@ MAX_PERSISTENT_EVENTS_PER_TRANSACTION = 100
 # Maximum number of ephemeral events to provide in an AS transaction.
 MAX_EPHEMERAL_EVENTS_PER_TRANSACTION = 100
 
+# Maximum number of to-device messages to provide in an AS transaction.
+MAX_TO_DEVICE_MESSAGES_PER_TRANSACTION = 100
+
 
 class ApplicationServiceScheduler:
     """Public facing API for this module. Does the required DI to tie the
@@ -96,6 +99,7 @@ class ApplicationServiceScheduler:
         appservice: ApplicationService,
         events: Optional[Iterable[EventBase]] = None,
         ephemeral: Optional[Iterable[JsonDict]] = None,
+        to_device_messages: Optional[Iterable[JsonDict]] = None,
     ) -> None:
         """
         Enqueue some data to be sent off to an application service.
@@ -103,6 +107,9 @@ class ApplicationServiceScheduler:
             appservice: The application service to create and send a transaction to.
             events: The persistent room events to send.
             ephemeral: The ephemeral events to send.
+            to_device_messages: The to-device messages to send. These differ from normal
+                to-device messages sent to clients, as they have 'to_device_id' and
+                'to_user_id' fields.
         """
         # We purposefully allow this method to run with empty events/ephemeral
         # iterables, so that callers do not need to check iterable size themselves.
@@ -113,6 +120,10 @@ class ApplicationServiceScheduler:
             self.queuer.queued_events.setdefault(appservice.id, []).extend(events)
         if ephemeral:
             self.queuer.queued_ephemeral.setdefault(appservice.id, []).extend(ephemeral)
+        if to_device_messages:
+            self.queuer.queued_to_device_messages.setdefault(appservice.id, []).extend(
+                to_device_messages
+            )
 
         # Kick off a new application service transaction
         self.queuer.start_background_request(appservice)
@@ -131,6 +142,8 @@ class _ServiceQueuer:
         self.queued_events: Dict[str, List[EventBase]] = {}
         # dict of {service_id: [event_json]}
         self.queued_ephemeral: Dict[str, List[JsonDict]] = {}
+        # dict of {service_id: [to_device_message_json]}
+        self.queued_to_device_messages: Dict[str, List[JsonDict]] = {}
 
         # the appservices which currently have a transaction in flight
         self.requests_in_flight = set()
@@ -162,11 +175,21 @@ class _ServiceQueuer:
                 ephemeral = all_events_ephemeral[:MAX_EPHEMERAL_EVENTS_PER_TRANSACTION]
                 del all_events_ephemeral[:MAX_EPHEMERAL_EVENTS_PER_TRANSACTION]
 
-                if not events and not ephemeral:
+                all_to_device_messages = self.queued_to_device_messages.get(
+                    service.id, []
+                )
+                to_device_messages_to_send = all_to_device_messages[
+                    :MAX_TO_DEVICE_MESSAGES_PER_TRANSACTION
+                ]
+                del all_to_device_messages[:MAX_TO_DEVICE_MESSAGES_PER_TRANSACTION]
+
+                if not events and not ephemeral and not to_device_messages_to_send:
                     return
 
                 try:
-                    await self.txn_ctrl.send(service, events, ephemeral)
+                    await self.txn_ctrl.send(
+                        service, events, ephemeral, to_device_messages_to_send
+                    )
                 except Exception:
                     logger.exception("AS request failed")
         finally:
@@ -203,6 +226,7 @@ class _TransactionController:
         service: ApplicationService,
         events: List[EventBase],
         ephemeral: Optional[List[JsonDict]] = None,
+        to_device_messages: Optional[List[JsonDict]] = None,
     ) -> None:
         """
         Create a transaction with the given data and send to the provided
@@ -212,10 +236,14 @@ class _TransactionController:
             service: The application service to send the transaction to.
             events: The persistent events to include in the transaction.
             ephemeral: The ephemeral events to include in the transaction.
+            to_device_messages: The to-device messages to include in the transaction.
         """
         try:
             txn = await self.store.create_appservice_txn(
-                service=service, events=events, ephemeral=ephemeral or []
+                service=service,
+                events=events,
+                ephemeral=ephemeral or [],
+                to_device_messages=to_device_messages or [],
             )
             service_is_up = await self._is_service_up(service)
             if service_is_up:
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 5a28ceac43..c92668642a 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -327,10 +327,8 @@ class ApplicationServicesHandler:
                         to_device_messages = await self._get_to_device_messages(
                             service, new_token, users
                         )
-                        # REVIEW: In a subsequent commit, we'll move this to a to-device-specific
-                        #  key in the AS transaction.
                         self.scheduler.enqueue_for_appservice(
-                            service, ephemeral=to_device_messages
+                            service, to_device_messages=to_device_messages
                         )
 
                         # Persist the latest handled stream token for this appservice