From 275e1e0b3af2ddf6a8abd33834edb49613fd8ace Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Fri, 3 Dec 2021 19:17:32 +0000 Subject: Add to-device messages as their own special section in AS txns --- synapse/appservice/__init__.py | 3 +++ synapse/appservice/api.py | 30 ++++++++++++++++++++++------ synapse/storage/databases/main/appservice.py | 14 +++++++++++-- 3 files changed, 39 insertions(+), 8 deletions(-) diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index 6504c6bd3f..0ca8b2ae40 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -329,11 +329,13 @@ class AppServiceTransaction: id: int, events: List[EventBase], ephemeral: List[JsonDict], + to_device_messages: List[JsonDict], ): self.service = service self.id = id self.events = events self.ephemeral = ephemeral + self.to_device_messages = to_device_messages async def send(self, as_api: "ApplicationServiceApi") -> bool: """Sends this transaction using the provided AS API interface. @@ -347,6 +349,7 @@ class AppServiceTransaction: service=self.service, events=self.events, ephemeral=self.ephemeral, + to_device_messages=self.to_device_messages, txn_id=self.id, ) diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index d08f6bbd7f..a54b4e867d 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -13,7 +13,7 @@ # limitations under the License. import logging import urllib -from typing import TYPE_CHECKING, List, Optional, Tuple +from typing import TYPE_CHECKING, Dict, List, Optional, Tuple from prometheus_client import Counter @@ -204,12 +204,25 @@ class ApplicationServiceApi(SimpleHttpClient): service: "ApplicationService", events: List[EventBase], ephemeral: List[JsonDict], + to_device_messages: List[JsonDict], txn_id: Optional[int] = None, - ): + ) -> bool: + """ + Push data to an application service. + Args: + service: The application service to send to. + events: The persistent events to send. + ephemeral: The ephemeral events to send. + to_device_messages: The to-device messages to send. + txn_id: An unique ID to assign to this transaction. Application services should + deduplicate transactions received with identitical IDs. + Returns: + True if the task succeeded, False if it failed. + """ if service.url is None: return True - events = self._serialize(service, events) + serialized_events = self._serialize(service, events) if txn_id is None: logger.warning( @@ -220,10 +233,15 @@ class ApplicationServiceApi(SimpleHttpClient): uri = service.url + ("/transactions/%s" % urllib.parse.quote(str(txn_id))) # Never send ephemeral events to appservices that do not support it + body: Dict[str, List[JsonDict]] = {"events": serialized_events} if service.supports_ephemeral: - body = {"events": events, "de.sorunome.msc2409.ephemeral": ephemeral} - else: - body = {"events": events} + body.update( + { + # TODO: Update to stable prefixes once MSC2409 completes FCP merge. + "de.sorunome.msc2409.ephemeral": ephemeral, + "de.sorunome.msc2409.to_device": to_device_messages, + } + ) try: await self.put_json( diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py index 1986e36d52..b88e6e1a75 100644 --- a/synapse/storage/databases/main/appservice.py +++ b/synapse/storage/databases/main/appservice.py @@ -194,6 +194,7 @@ class ApplicationServiceTransactionWorkerStore( service: ApplicationService, events: List[EventBase], ephemeral: List[JsonDict], + to_device_messages: List[JsonDict], ) -> AppServiceTransaction: """Atomically creates a new transaction for this application service with the given list of events. Ephemeral events are NOT persisted to the @@ -203,6 +204,7 @@ class ApplicationServiceTransactionWorkerStore( service: The service who the transaction is for. events: A list of persistent events to put in the transaction. ephemeral: A list of ephemeral events to put in the transaction. + to_device_messages: A list of to-device messages to put in the transaction. Returns: A new transaction. @@ -233,7 +235,11 @@ class ApplicationServiceTransactionWorkerStore( (service.id, new_txn_id, event_ids), ) return AppServiceTransaction( - service=service, id=new_txn_id, events=events, ephemeral=ephemeral + service=service, + id=new_txn_id, + events=events, + ephemeral=ephemeral, + to_device_messages=to_device_messages, ) return await self.db_pool.runInteraction( @@ -326,7 +332,11 @@ class ApplicationServiceTransactionWorkerStore( events = await self.get_events_as_list(event_ids) return AppServiceTransaction( - service=service, id=entry["txn_id"], events=events, ephemeral=[] + service=service, + id=entry["txn_id"], + events=events, + ephemeral=[], + to_device_messages=[], ) def _get_last_txn(self, txn, service_id: Optional[str]) -> int: -- cgit 1.4.1