diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py
index 8c9ff93b2c..7dbebd97b5 100644
--- a/synapse/appservice/__init__.py
+++ b/synapse/appservice/__init__.py
@@ -351,11 +351,13 @@ class AppServiceTransaction:
id: int,
events: List[EventBase],
ephemeral: List[JsonDict],
+ to_device_messages: List[JsonDict],
):
self.service = service
self.id = id
self.events = events
self.ephemeral = ephemeral
+ self.to_device_messages = to_device_messages
async def send(self, as_api: "ApplicationServiceApi") -> bool:
"""Sends this transaction using the provided AS API interface.
@@ -369,6 +371,7 @@ class AppServiceTransaction:
service=self.service,
events=self.events,
ephemeral=self.ephemeral,
+ to_device_messages=self.to_device_messages,
txn_id=self.id,
)
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index def4424af0..73be7ff3d4 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -218,8 +218,23 @@ class ApplicationServiceApi(SimpleHttpClient):
service: "ApplicationService",
events: List[EventBase],
ephemeral: List[JsonDict],
+ to_device_messages: List[JsonDict],
txn_id: Optional[int] = None,
) -> bool:
+ """
+ Push data to an application service.
+
+ Args:
+ service: The application service to send to.
+ events: The persistent events to send.
+ ephemeral: The ephemeral events to send.
+ to_device_messages: The to-device messages to send.
+ txn_id: An unique ID to assign to this transaction. Application services should
+ deduplicate transactions received with identitical IDs.
+
+ Returns:
+ True if the task succeeded, False if it failed.
+ """
if service.url is None:
return True
@@ -237,13 +252,15 @@ class ApplicationServiceApi(SimpleHttpClient):
uri = service.url + ("/transactions/%s" % urllib.parse.quote(str(txn_id)))
# Never send ephemeral events to appservices that do not support it
+ body: Dict[str, List[JsonDict]] = {"events": serialized_events}
if service.supports_ephemeral:
- body = {
- "events": serialized_events,
- "de.sorunome.msc2409.ephemeral": ephemeral,
- }
- else:
- body = {"events": serialized_events}
+ body.update(
+ {
+ # TODO: Update to stable prefixes once MSC2409 completes FCP merge.
+ "de.sorunome.msc2409.ephemeral": ephemeral,
+ "de.sorunome.msc2409.to_device": to_device_messages,
+ }
+ )
try:
await self.put_json(
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 185e3a5278..c42fa32fff 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -48,7 +48,16 @@ This is all tied together by the AppServiceScheduler which DIs the required
components.
"""
import logging
-from typing import TYPE_CHECKING, Awaitable, Callable, Dict, List, Optional, Set
+from typing import (
+ TYPE_CHECKING,
+ Awaitable,
+ Callable,
+ Collection,
+ Dict,
+ List,
+ Optional,
+ Set,
+)
from synapse.appservice import ApplicationService, ApplicationServiceState
from synapse.appservice.api import ApplicationServiceApi
@@ -71,6 +80,9 @@ MAX_PERSISTENT_EVENTS_PER_TRANSACTION = 100
# Maximum number of ephemeral events to provide in an AS transaction.
MAX_EPHEMERAL_EVENTS_PER_TRANSACTION = 100
+# Maximum number of to-device messages to provide in an AS transaction.
+MAX_TO_DEVICE_MESSAGES_PER_TRANSACTION = 100
+
class ApplicationServiceScheduler:
"""Public facing API for this module. Does the required DI to tie the
@@ -97,15 +109,40 @@ class ApplicationServiceScheduler:
for service in services:
self.txn_ctrl.start_recoverer(service)
- def submit_event_for_as(
- self, service: ApplicationService, event: EventBase
+ def enqueue_for_appservice(
+ self,
+ appservice: ApplicationService,
+ events: Optional[Collection[EventBase]] = None,
+ ephemeral: Optional[Collection[JsonDict]] = None,
+ to_device_messages: Optional[Collection[JsonDict]] = None,
) -> None:
- self.queuer.enqueue_event(service, event)
+ """
+ Enqueue some data to be sent off to an application service.
- def submit_ephemeral_events_for_as(
- self, service: ApplicationService, events: List[JsonDict]
- ) -> None:
- self.queuer.enqueue_ephemeral(service, events)
+ Args:
+ appservice: The application service to create and send a transaction to.
+ events: The persistent room events to send.
+ ephemeral: The ephemeral events to send.
+ to_device_messages: The to-device messages to send. These differ from normal
+ to-device messages sent to clients, as they have 'to_device_id' and
+ 'to_user_id' fields.
+ """
+ # We purposefully allow this method to run with empty events/ephemeral
+ # collections, so that callers do not need to check iterable size themselves.
+ if not events and not ephemeral and not to_device_messages:
+ return
+
+ if events:
+ self.queuer.queued_events.setdefault(appservice.id, []).extend(events)
+ if ephemeral:
+ self.queuer.queued_ephemeral.setdefault(appservice.id, []).extend(ephemeral)
+ if to_device_messages:
+ self.queuer.queued_to_device_messages.setdefault(appservice.id, []).extend(
+ to_device_messages
+ )
+
+ # Kick off a new application service transaction
+ self.queuer.start_background_request(appservice)
class _ServiceQueuer:
@@ -121,13 +158,15 @@ class _ServiceQueuer:
self.queued_events: Dict[str, List[EventBase]] = {}
# dict of {service_id: [events]}
self.queued_ephemeral: Dict[str, List[JsonDict]] = {}
+ # dict of {service_id: [to_device_message_json]}
+ self.queued_to_device_messages: Dict[str, List[JsonDict]] = {}
# the appservices which currently have a transaction in flight
self.requests_in_flight: Set[str] = set()
self.txn_ctrl = txn_ctrl
self.clock = clock
- def _start_background_request(self, service: ApplicationService) -> None:
+ def start_background_request(self, service: ApplicationService) -> None:
# start a sender for this appservice if we don't already have one
if service.id in self.requests_in_flight:
return
@@ -136,16 +175,6 @@ class _ServiceQueuer:
"as-sender-%s" % (service.id,), self._send_request, service
)
- def enqueue_event(self, service: ApplicationService, event: EventBase) -> None:
- self.queued_events.setdefault(service.id, []).append(event)
- self._start_background_request(service)
-
- def enqueue_ephemeral(
- self, service: ApplicationService, events: List[JsonDict]
- ) -> None:
- self.queued_ephemeral.setdefault(service.id, []).extend(events)
- self._start_background_request(service)
-
async def _send_request(self, service: ApplicationService) -> None:
# sanity-check: we shouldn't get here if this service already has a sender
# running.
@@ -162,11 +191,21 @@ class _ServiceQueuer:
ephemeral = all_events_ephemeral[:MAX_EPHEMERAL_EVENTS_PER_TRANSACTION]
del all_events_ephemeral[:MAX_EPHEMERAL_EVENTS_PER_TRANSACTION]
- if not events and not ephemeral:
+ all_to_device_messages = self.queued_to_device_messages.get(
+ service.id, []
+ )
+ to_device_messages_to_send = all_to_device_messages[
+ :MAX_TO_DEVICE_MESSAGES_PER_TRANSACTION
+ ]
+ del all_to_device_messages[:MAX_TO_DEVICE_MESSAGES_PER_TRANSACTION]
+
+ if not events and not ephemeral and not to_device_messages_to_send:
return
try:
- await self.txn_ctrl.send(service, events, ephemeral)
+ await self.txn_ctrl.send(
+ service, events, ephemeral, to_device_messages_to_send
+ )
except Exception:
logger.exception("AS request failed")
finally:
@@ -198,10 +237,24 @@ class _TransactionController:
service: ApplicationService,
events: List[EventBase],
ephemeral: Optional[List[JsonDict]] = None,
+ to_device_messages: Optional[List[JsonDict]] = None,
) -> None:
+ """
+ Create a transaction with the given data and send to the provided
+ application service.
+
+ Args:
+ service: The application service to send the transaction to.
+ events: The persistent events to include in the transaction.
+ ephemeral: The ephemeral events to include in the transaction.
+ to_device_messages: The to-device messages to include in the transaction.
+ """
try:
txn = await self.store.create_appservice_txn(
- service=service, events=events, ephemeral=ephemeral or []
+ service=service,
+ events=events,
+ ephemeral=ephemeral or [],
+ to_device_messages=to_device_messages or [],
)
service_is_up = await self._is_service_up(service)
if service_is_up:
|