summary refs log tree commit diff
diff options
context:
space:
mode:
authorAndrew Morgan <andrew@amorgan.xyz>2021-12-01 16:46:58 +0000
committerAndrew Morgan <andrew@amorgan.xyz>2021-12-01 17:13:47 +0000
commitdfa4c318031ac8ea472993e220b14dabb56830fa (patch)
tree74bb05d8e6ce389ae446a65580c2609de2ce939d
parentAdd comment on why we don't NOT NULL to_device_stream_id (diff)
downloadsynapse-dfa4c318031ac8ea472993e220b14dabb56830fa.tar.xz
generalise sending application service transactions and allow to-device
-rw-r--r--synapse/appservice/__init__.py3
-rw-r--r--synapse/appservice/api.py29
-rw-r--r--synapse/appservice/scheduler.py96
-rw-r--r--synapse/handlers/appservice.py14
-rw-r--r--synapse/storage/databases/main/appservice.py8
5 files changed, 108 insertions, 42 deletions
diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py
index 6504c6bd3f..0ca8b2ae40 100644
--- a/synapse/appservice/__init__.py
+++ b/synapse/appservice/__init__.py
@@ -329,11 +329,13 @@ class AppServiceTransaction:
         id: int,
         events: List[EventBase],
         ephemeral: List[JsonDict],
+        to_device_messages: List[JsonDict],
     ):
         self.service = service
         self.id = id
         self.events = events
         self.ephemeral = ephemeral
+        self.to_device_messages = to_device_messages
 
     async def send(self, as_api: "ApplicationServiceApi") -> bool:
         """Sends this transaction using the provided AS API interface.
@@ -347,6 +349,7 @@ class AppServiceTransaction:
             service=self.service,
             events=self.events,
             ephemeral=self.ephemeral,
+            to_device_messages=self.to_device_messages,
             txn_id=self.id,
         )
 
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index d08f6bbd7f..e0509efe2a 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -204,8 +204,23 @@ class ApplicationServiceApi(SimpleHttpClient):
         service: "ApplicationService",
         events: List[EventBase],
         ephemeral: List[JsonDict],
+        to_device_messages: List[JsonDict],
         txn_id: Optional[int] = None,
-    ):
+    ) -> bool:
+        """
+        Push data to an application service.
+
+        Args:
+            service: The application service to send to.
+            events: The persistent events to send.
+            ephemeral: The ephemeral events to send.
+            to_device_messages: The to-device messages to send.
+            txn_id: An unique ID to assign to this transaction. Application services should
+                deduplicate transactions received with identitical IDs.
+
+        Returns:
+            True if the task succeeded, False if it failed.
+        """
         if service.url is None:
             return True
 
@@ -220,10 +235,16 @@ class ApplicationServiceApi(SimpleHttpClient):
         uri = service.url + ("/transactions/%s" % urllib.parse.quote(str(txn_id)))
 
         # Never send ephemeral events to appservices that do not support it
+        body = {"events": events}
+
         if service.supports_ephemeral:
-            body = {"events": events, "de.sorunome.msc2409.ephemeral": ephemeral}
-        else:
-            body = {"events": events}
+            body.update(
+                {
+                    # TODO: Update to stable prefixes once MSC2409 completes FCP merge.
+                    "de.sorunome.msc2409.ephemeral": ephemeral,
+                    "de.sorunome.msc2409.to_device": to_device_messages,
+                }
+            )
 
         try:
             await self.put_json(
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index a211257088..b6941f6da9 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -48,7 +48,7 @@ This is all tied together by the AppServiceScheduler which DIs the required
 components.
 """
 import logging
-from typing import Iterable, List, Optional
+from typing import Dict, Iterable, List, Optional
 
 from synapse.appservice import ApplicationService, ApplicationServiceState
 from synapse.events import EventBase
@@ -65,6 +65,9 @@ MAX_PERSISTENT_EVENTS_PER_TRANSACTION = 100
 # Maximum number of ephemeral events to provide in an AS transaction.
 MAX_EPHEMERAL_EVENTS_PER_TRANSACTION = 100
 
+# Maximum number of to-device messages to provide in an AS transaction.
+MAX_TO_DEVICE_MESSAGES_PER_TRANSACTION = 100
+
 
 class ApplicationServiceScheduler:
     """Public facing API for this module. Does the required DI to tie the
@@ -91,25 +94,38 @@ class ApplicationServiceScheduler:
         for service in services:
             self.txn_ctrl.start_recoverer(service)
 
-    def submit_event_for_as(self, service: ApplicationService, event: EventBase):
-        self.queuer.enqueue_event(service, event)
-
-    def submit_ephemeral_events_for_as(
-        self, service: ApplicationService, events: Iterable[JsonDict]
+    def enqueue_for_appservice(
+        self,
+        appservice: ApplicationService,
+        events: Optional[Iterable[EventBase]] = None,
+        ephemeral: Optional[Iterable[JsonDict]] = None,
+        to_device_messages: Optional[Iterable[JsonDict]] = None,
     ) -> None:
         """
-        Send ephemeral events to application services, and schedule a new
-        outgoing AS transaction.
+        Enqueue some data to be sent off to an application service.
 
         Args:
-            service: The service to send ephemeral events to.
-            events: The ephemeral events to send.
+            appservice: The application service to create and send a transaction to.
+            events: The persistent room events to send.
+            ephemeral: The ephemeral events to send.
+            to_device_messages: The to-device messages to send. These differ from normal
+                to-device messages sent to clients, as they have 'to_device_id' and
+                'to_user_id' fields.
         """
-        # Ensure we have some events to send
-        if not events:
-            return
+        # Note that we purposefully allow this method to run with empty events/ephemeral
+        # iterables, and it helps the ergonomics of callers.
+
+        if events:
+            self.queuer.queued_events.setdefault(appservice.id, []).extend(events)
+        if ephemeral:
+            self.queuer.queued_ephemeral.setdefault(appservice.id, []).extend(ephemeral)
+        if to_device_messages:
+            self.queuer.queued_to_device_messages.setdefault(appservice.id, []).extend(
+                to_device_messages
+            )
 
-        self.queuer.enqueue_ephemeral(service, events)
+        # Kick off a new application service transaction
+        self.queuer.start_background_request(appservice)
 
 
 class _ServiceQueuer:
@@ -121,15 +137,19 @@ class _ServiceQueuer:
     """
 
     def __init__(self, txn_ctrl, clock):
-        self.queued_events = {}  # dict of {service_id: [events]}
-        self.queued_ephemeral = {}  # dict of {service_id: [events]}
+        # dict of {service_id: [events]}
+        self.queued_events: Dict[str, List[EventBase]] = {}
+        # dict of {service_id: [event_json]}
+        self.queued_ephemeral: Dict[str, List[JsonDict]] = {}
+        # dict of {service_id: [to_device_message_json]}
+        self.queued_to_device_messages: Dict[str, List[JsonDict]] = {}
 
         # the appservices which currently have a transaction in flight
         self.requests_in_flight = set()
         self.txn_ctrl = txn_ctrl
         self.clock = clock
 
-    def _start_background_request(self, service):
+    def start_background_request(self, service):
         # start a sender for this appservice if we don't already have one
         if service.id in self.requests_in_flight:
             return
@@ -138,16 +158,6 @@ class _ServiceQueuer:
             "as-sender-%s" % (service.id,), self._send_request, service
         )
 
-    def enqueue_event(self, service: ApplicationService, event: EventBase):
-        self.queued_events.setdefault(service.id, []).append(event)
-        self._start_background_request(service)
-
-    def enqueue_ephemeral(
-        self, service: ApplicationService, events: Iterable[JsonDict]
-    ) -> None:
-        self.queued_ephemeral.setdefault(service.id, []).extend(events)
-        self._start_background_request(service)
-
     async def _send_request(self, service: ApplicationService):
         # sanity-check: we shouldn't get here if this service already has a sender
         # running.
@@ -164,11 +174,21 @@ class _ServiceQueuer:
                 ephemeral = all_events_ephemeral[:MAX_EPHEMERAL_EVENTS_PER_TRANSACTION]
                 del all_events_ephemeral[:MAX_EPHEMERAL_EVENTS_PER_TRANSACTION]
 
-                if not events and not ephemeral:
+                all_to_device_messages = self.queued_to_device_messages.get(
+                    service.id, []
+                )
+                to_device_messages_to_send = all_to_device_messages[
+                    :MAX_TO_DEVICE_MESSAGES_PER_TRANSACTION
+                ]
+                del all_to_device_messages[:MAX_TO_DEVICE_MESSAGES_PER_TRANSACTION]
+
+                if not events and not ephemeral and not to_device_messages_to_send:
                     return
 
                 try:
-                    await self.txn_ctrl.send(service, events, ephemeral)
+                    await self.txn_ctrl.send(
+                        service, events, ephemeral, to_device_messages_to_send
+                    )
                 except Exception:
                     logger.exception("AS request failed")
         finally:
@@ -205,10 +225,24 @@ class _TransactionController:
         service: ApplicationService,
         events: List[EventBase],
         ephemeral: Optional[List[JsonDict]] = None,
-    ):
+        to_device_messages: Optional[List[JsonDict]] = None,
+    ) -> None:
+        """
+        Create a transaction with the given data and send to the provided
+        application service.
+
+        Args:
+            service: The application service to send the transaction to.
+            events: The persistent events to include in the transaction.
+            ephemeral: The ephemeral events to include in the transaction.
+            to_device_messages: The to-device messages to include in the transaction.
+        """
         try:
             txn = await self.store.create_appservice_txn(
-                service=service, events=events, ephemeral=ephemeral or []
+                service=service,
+                events=events,
+                ephemeral=ephemeral or [],
+                to_device_messages=to_device_messages or [],
             )
             service_is_up = await self._is_service_up(service)
             if service_is_up:
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 4a7613b262..c92668642a 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -135,7 +135,9 @@ class ApplicationServicesHandler:
 
                         # Fork off pushes to these services
                         for service in services:
-                            self.scheduler.submit_event_for_as(service, event)
+                            self.scheduler.enqueue_for_appservice(
+                                service, events=[event]
+                            )
 
                         now = self.clock.time_msec()
                         ts = await self.store.get_received_ts(event.event_id)
@@ -292,7 +294,7 @@ class ApplicationServicesHandler:
                     # and, if they apply to this application service, send it off.
                     events = await self._handle_typing(service, new_token)
                     if events:
-                        self.scheduler.submit_ephemeral_events_for_as(service, events)
+                        self.scheduler.enqueue_for_appservice(service, ephemeral=events)
                     continue
 
                 # Since we read/update the stream position for this AS/stream
@@ -303,7 +305,7 @@ class ApplicationServicesHandler:
                 ):
                     if stream_key == "receipt_key":
                         events = await self._handle_receipts(service, new_token)
-                        self.scheduler.submit_ephemeral_events_for_as(service, events)
+                        self.scheduler.enqueue_for_appservice(service, ephemeral=events)
 
                         # Persist the latest handled stream token for this appservice
                         await self.store.set_appservice_stream_type_pos(
@@ -312,7 +314,7 @@ class ApplicationServicesHandler:
 
                     elif stream_key == "presence_key":
                         events = await self._handle_presence(service, users, new_token)
-                        self.scheduler.submit_ephemeral_events_for_as(service, events)
+                        self.scheduler.enqueue_for_appservice(service, ephemeral=events)
 
                         # Persist the latest handled stream token for this appservice
                         await self.store.set_appservice_stream_type_pos(
@@ -325,8 +327,8 @@ class ApplicationServicesHandler:
                         to_device_messages = await self._get_to_device_messages(
                             service, new_token, users
                         )
-                        self.scheduler.submit_ephemeral_events_for_as(
-                            service, to_device_messages
+                        self.scheduler.enqueue_for_appservice(
+                            service, to_device_messages=to_device_messages
                         )
 
                         # Persist the latest handled stream token for this appservice
diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py
index 1986e36d52..20d91f3a7e 100644
--- a/synapse/storage/databases/main/appservice.py
+++ b/synapse/storage/databases/main/appservice.py
@@ -194,6 +194,7 @@ class ApplicationServiceTransactionWorkerStore(
         service: ApplicationService,
         events: List[EventBase],
         ephemeral: List[JsonDict],
+        to_device_messages: List[JsonDict],
     ) -> AppServiceTransaction:
         """Atomically creates a new transaction for this application service
         with the given list of events. Ephemeral events are NOT persisted to the
@@ -203,6 +204,7 @@ class ApplicationServiceTransactionWorkerStore(
             service: The service who the transaction is for.
             events: A list of persistent events to put in the transaction.
             ephemeral: A list of ephemeral events to put in the transaction.
+            to_device_messages: A list of to-device messages to put in the transaction.
 
         Returns:
             A new transaction.
@@ -233,7 +235,11 @@ class ApplicationServiceTransactionWorkerStore(
                 (service.id, new_txn_id, event_ids),
             )
             return AppServiceTransaction(
-                service=service, id=new_txn_id, events=events, ephemeral=ephemeral
+                service=service,
+                id=new_txn_id,
+                events=events,
+                ephemeral=ephemeral,
+                to_device_messages=to_device_messages,
             )
 
         return await self.db_pool.runInteraction(