summary refs log tree commit diff
diff options
context:
space:
mode:
authorAndrew Morgan <andrew@amorgan.xyz>2021-12-03 19:17:32 +0000
committerAndrew Morgan <andrew@amorgan.xyz>2021-12-03 20:03:09 +0000
commit275e1e0b3af2ddf6a8abd33834edb49613fd8ace (patch)
treebecb32c660b34e59c6e9ba727a15d821ce5566a7
parentFix tests to mock _TransactionController.send of ApplicationServiceScheduler.... (diff)
downloadsynapse-275e1e0b3af2ddf6a8abd33834edb49613fd8ace.tar.xz
Add to-device messages as their own special section in AS txns
-rw-r--r--synapse/appservice/__init__.py3
-rw-r--r--synapse/appservice/api.py30
-rw-r--r--synapse/storage/databases/main/appservice.py14
3 files changed, 39 insertions, 8 deletions
diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py
index 6504c6bd3f..0ca8b2ae40 100644
--- a/synapse/appservice/__init__.py
+++ b/synapse/appservice/__init__.py
@@ -329,11 +329,13 @@ class AppServiceTransaction:
         id: int,
         events: List[EventBase],
         ephemeral: List[JsonDict],
+        to_device_messages: List[JsonDict],
     ):
         self.service = service
         self.id = id
         self.events = events
         self.ephemeral = ephemeral
+        self.to_device_messages = to_device_messages
 
     async def send(self, as_api: "ApplicationServiceApi") -> bool:
         """Sends this transaction using the provided AS API interface.
@@ -347,6 +349,7 @@ class AppServiceTransaction:
             service=self.service,
             events=self.events,
             ephemeral=self.ephemeral,
+            to_device_messages=self.to_device_messages,
             txn_id=self.id,
         )
 
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index d08f6bbd7f..a54b4e867d 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -13,7 +13,7 @@
 # limitations under the License.
 import logging
 import urllib
-from typing import TYPE_CHECKING, List, Optional, Tuple
+from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
 
 from prometheus_client import Counter
 
@@ -204,12 +204,25 @@ class ApplicationServiceApi(SimpleHttpClient):
         service: "ApplicationService",
         events: List[EventBase],
         ephemeral: List[JsonDict],
+        to_device_messages: List[JsonDict],
         txn_id: Optional[int] = None,
-    ):
+    ) -> bool:
+        """
+        Push data to an application service.
+        Args:
+            service: The application service to send to.
+            events: The persistent events to send.
+            ephemeral: The ephemeral events to send.
+            to_device_messages: The to-device messages to send.
+            txn_id: An unique ID to assign to this transaction. Application services should
+                deduplicate transactions received with identitical IDs.
+        Returns:
+            True if the task succeeded, False if it failed.
+        """
         if service.url is None:
             return True
 
-        events = self._serialize(service, events)
+        serialized_events = self._serialize(service, events)
 
         if txn_id is None:
             logger.warning(
@@ -220,10 +233,15 @@ class ApplicationServiceApi(SimpleHttpClient):
         uri = service.url + ("/transactions/%s" % urllib.parse.quote(str(txn_id)))
 
         # Never send ephemeral events to appservices that do not support it
+        body: Dict[str, List[JsonDict]] = {"events": serialized_events}
         if service.supports_ephemeral:
-            body = {"events": events, "de.sorunome.msc2409.ephemeral": ephemeral}
-        else:
-            body = {"events": events}
+            body.update(
+                {
+                    # TODO: Update to stable prefixes once MSC2409 completes FCP merge.
+                    "de.sorunome.msc2409.ephemeral": ephemeral,
+                    "de.sorunome.msc2409.to_device": to_device_messages,
+                }
+            )
 
         try:
             await self.put_json(
diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py
index 1986e36d52..b88e6e1a75 100644
--- a/synapse/storage/databases/main/appservice.py
+++ b/synapse/storage/databases/main/appservice.py
@@ -194,6 +194,7 @@ class ApplicationServiceTransactionWorkerStore(
         service: ApplicationService,
         events: List[EventBase],
         ephemeral: List[JsonDict],
+        to_device_messages: List[JsonDict],
     ) -> AppServiceTransaction:
         """Atomically creates a new transaction for this application service
         with the given list of events. Ephemeral events are NOT persisted to the
@@ -203,6 +204,7 @@ class ApplicationServiceTransactionWorkerStore(
             service: The service who the transaction is for.
             events: A list of persistent events to put in the transaction.
             ephemeral: A list of ephemeral events to put in the transaction.
+            to_device_messages: A list of to-device messages to put in the transaction.
 
         Returns:
             A new transaction.
@@ -233,7 +235,11 @@ class ApplicationServiceTransactionWorkerStore(
                 (service.id, new_txn_id, event_ids),
             )
             return AppServiceTransaction(
-                service=service, id=new_txn_id, events=events, ephemeral=ephemeral
+                service=service,
+                id=new_txn_id,
+                events=events,
+                ephemeral=ephemeral,
+                to_device_messages=to_device_messages,
             )
 
         return await self.db_pool.runInteraction(
@@ -326,7 +332,11 @@ class ApplicationServiceTransactionWorkerStore(
         events = await self.get_events_as_list(event_ids)
 
         return AppServiceTransaction(
-            service=service, id=entry["txn_id"], events=events, ephemeral=[]
+            service=service,
+            id=entry["txn_id"],
+            events=events,
+            ephemeral=[],
+            to_device_messages=[],
         )
 
     def _get_last_txn(self, txn, service_id: Optional[str]) -> int: