diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 7833e77e2b..a42c3558e4 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -55,6 +55,9 @@ class ApplicationServicesHandler:
self.clock = hs.get_clock()
self.notify_appservices = hs.config.appservice.notify_appservices
self.event_sources = hs.get_event_sources()
+ self._msc2409_to_device_messages_enabled = (
+ hs.config.experimental.msc2409_to_device_messages_enabled
+ )
self.current_max = 0
self.is_processing = False
@@ -132,7 +135,9 @@ class ApplicationServicesHandler:
# Fork off pushes to these services
for service in services:
- self.scheduler.submit_event_for_as(service, event)
+ self.scheduler.enqueue_for_appservice(
+ service, events=[event]
+ )
now = self.clock.time_msec()
ts = await self.store.get_received_ts(event.event_id)
@@ -199,8 +204,9 @@ class ApplicationServicesHandler:
Args:
stream_key: The stream the event came from.
- `stream_key` can be "typing_key", "receipt_key" or "presence_key". Any other
- value for `stream_key` will cause this function to return early.
+ `stream_key` can be "typing_key", "receipt_key", "presence_key" or
+ "to_device_key". Any other value for `stream_key` will cause this function
+ to return early.
Ephemeral events will only be pushed to appservices that have opted into
receiving them by setting `push_ephemeral` to true in their registration
@@ -216,8 +222,15 @@ class ApplicationServicesHandler:
if not self.notify_appservices:
return
- # Ignore any unsupported streams
- if stream_key not in ("typing_key", "receipt_key", "presence_key"):
+ # Notify appservices of updates in ephemeral event streams.
+ # Only the following streams are currently supported.
+ # FIXME: We should use constants for these values.
+ if stream_key not in (
+ "typing_key",
+ "receipt_key",
+ "presence_key",
+ "to_device_key",
+ ):
return
# Assert that new_token is an integer (and not a RoomStreamToken).
@@ -233,6 +246,13 @@ class ApplicationServicesHandler:
# Additional context: https://github.com/matrix-org/synapse/pull/11137
assert isinstance(new_token, int)
+ # Ignore to-device messages if the feature flag is not enabled
+ if (
+ stream_key == "to_device_key"
+ and not self._msc2409_to_device_messages_enabled
+ ):
+ return
+
# Check whether there are any appservices which have registered to receive
# ephemeral events.
#
@@ -266,7 +286,7 @@ class ApplicationServicesHandler:
with Measure(self.clock, "notify_interested_services_ephemeral"):
for service in services:
if stream_key == "typing_key":
- # Note that we don't persist the token (via set_type_stream_id_for_appservice)
+ # Note that we don't persist the token (via set_appservice_stream_type_pos)
# for typing_key due to performance reasons and due to their highly
# ephemeral nature.
#
@@ -274,7 +294,7 @@ class ApplicationServicesHandler:
# and, if they apply to this application service, send it off.
events = await self._handle_typing(service, new_token)
if events:
- self.scheduler.submit_ephemeral_events_for_as(service, events)
+ self.scheduler.enqueue_for_appservice(service, ephemeral=events)
continue
# Since we read/update the stream position for this AS/stream
@@ -285,28 +305,37 @@ class ApplicationServicesHandler:
):
if stream_key == "receipt_key":
events = await self._handle_receipts(service, new_token)
- if events:
- self.scheduler.submit_ephemeral_events_for_as(
- service, events
- )
+ self.scheduler.enqueue_for_appservice(service, ephemeral=events)
# Persist the latest handled stream token for this appservice
- await self.store.set_type_stream_id_for_appservice(
+ await self.store.set_appservice_stream_type_pos(
service, "read_receipt", new_token
)
elif stream_key == "presence_key":
events = await self._handle_presence(service, users, new_token)
- if events:
- self.scheduler.submit_ephemeral_events_for_as(
- service, events
- )
+ self.scheduler.enqueue_for_appservice(service, ephemeral=events)
# Persist the latest handled stream token for this appservice
- await self.store.set_type_stream_id_for_appservice(
+ await self.store.set_appservice_stream_type_pos(
service, "presence", new_token
)
+ elif stream_key == "to_device_key":
+ # Retrieve a list of to-device message events, as well as the
+ # maximum stream token of the messages we were able to retrieve.
+ to_device_messages = await self._get_to_device_messages(
+ service, new_token, users
+ )
+ self.scheduler.enqueue_for_appservice(
+ service, to_device_messages=to_device_messages
+ )
+
+ # Persist the latest handled stream token for this appservice
+ await self.store.set_appservice_stream_type_pos(
+ service, "to_device", new_token
+ )
+
async def _handle_typing(
self, service: ApplicationService, new_token: int
) -> List[JsonDict]:
@@ -440,6 +469,79 @@ class ApplicationServicesHandler:
return events
+ async def _get_to_device_messages(
+ self,
+ service: ApplicationService,
+ new_token: int,
+ users: Collection[Union[str, UserID]],
+ ) -> List[JsonDict]:
+ """
+ Given an application service, determine which events it should receive
+ from those between the last-recorded to-device message stream token for this
+ appservice and the given stream token.
+
+ Args:
+ service: The application service to check for which events it should receive.
+ new_token: The latest to-device event stream token.
+ users: The users to be notified for the new to-device messages
+ (ie, the recipients of the messages).
+
+ Returns:
+ A list of JSON dictionaries containing data derived from the to-device events
+ that should be sent to the given application service.
+ """
+ # Get the stream token that this application service has processed up until
+ from_key = await self.store.get_type_stream_id_for_appservice(
+ service, "to_device"
+ )
+
+ # Filter out users that this appservice is not interested in
+ users_appservice_is_interested_in: List[str] = []
+ for user in users:
+ # FIXME: We should do this farther up the call stack. We currently repeat
+ # this operation in _handle_presence.
+ if isinstance(user, UserID):
+ user = user.to_string()
+
+ if service.is_interested_in_user(user):
+ users_appservice_is_interested_in.append(user)
+
+ if not users_appservice_is_interested_in:
+ # Return early if the AS was not interested in any of these users
+ return []
+
+ # Retrieve the to-device messages for each user
+ recipient_device_to_messages = await self.store.get_messages_for_user_devices(
+ users_appservice_is_interested_in,
+ from_key,
+ new_token,
+ )
+
+ # According to MSC2409, we'll need to add 'to_user_id' and 'to_device_id' fields
+ # to the event JSON so that the application service will know which user/device
+ # combination this messages was intended for.
+ #
+ # So we mangle this dict into a flat list of to-device messages with the relevant
+ # user ID and device ID embedded inside each message dict.
+ message_payload: List[JsonDict] = []
+ for (
+ user_id,
+ device_id,
+ ), messages in recipient_device_to_messages.items():
+ for message_json in messages:
+ # Remove 'message_id' from the to-device message, as it's an internal ID
+ message_json.pop("message_id", None)
+
+ message_payload.append(
+ {
+ "to_user_id": user_id,
+ "to_device_id": device_id,
+ **message_json,
+ }
+ )
+
+ return message_payload
+
async def query_user_exists(self, user_id: str) -> bool:
"""Check if any application service knows this user_id exists.
@@ -547,7 +649,7 @@ class ApplicationServicesHandler:
"""Retrieve a list of application services interested in this event.
Args:
- event: The event to check. Can be None if alias_list is not.
+ event: The event to check.
Returns:
A list of services interested in this event based on the service regex.
"""
|