summary refs log tree commit diff
path: root/synapse/handlers/appservice.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/appservice.py')
-rw-r--r--synapse/handlers/appservice.py103
1 files changed, 97 insertions, 6 deletions
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index cd042e35b7..8994313a3e 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -202,8 +202,9 @@ class ApplicationServicesHandler:
         Args:
             stream_key: The stream the event came from.
 
-                `stream_key` can be "typing_key", "receipt_key" or "presence_key". Any other
-                value for `stream_key` will cause this function to return early.
+                `stream_key` can be "typing_key", "receipt_key", "presence_key" or
+                "to_device_key". Any other value for `stream_key` will cause this function
+                to return early.
 
                 Ephemeral events will only be pushed to appservices that have opted into
                 receiving them by setting `push_ephemeral` to true in their registration
@@ -219,10 +220,6 @@ class ApplicationServicesHandler:
         if not self.notify_appservices:
             return
 
-        # Ignore any unsupported streams
-        if stream_key not in ("typing_key", "receipt_key", "presence_key"):
-            return
-
         # Assert that new_token is an integer (and not a RoomStreamToken).
         # All of the supported streams that this function handles use an
         # integer to track progress (rather than a RoomStreamToken - a
@@ -236,6 +233,13 @@ class ApplicationServicesHandler:
         # Additional context: https://github.com/matrix-org/synapse/pull/11137
         assert isinstance(new_token, int)
 
+        # Ignore to-device messages if the feature flag is not enabled
+        if (
+            stream_key == "to_device_key"
+            and not self.msc2409_to_device_messages_enabled
+        ):
+            return
+
         # Check whether there are any appservices which have registered to receive
         # ephemeral events.
         #
@@ -310,6 +314,23 @@ class ApplicationServicesHandler:
                             service, "presence", new_token
                         )
 
+                    elif (
+                        stream_key == "to_device_key"
+                        and self.msc2409_to_device_messages_enabled
+                    ):
+                        # Retrieve an iterable of to-device message events, as well as the
+                        # maximum stream token of the messages we were able to retrieve.
+                        events = await self._handle_to_device(service, new_token, users)
+                        if events:
+                            self.scheduler.submit_ephemeral_events_for_as(
+                                service, events
+                            )
+
+                        # Persist the latest handled stream token for this appservice
+                        await self.store.set_type_stream_id_for_appservice(
+                            service, "to_device", new_token
+                        )
+
     async def _handle_typing(
         self, service: ApplicationService, new_token: int
     ) -> List[JsonDict]:
@@ -443,6 +464,76 @@ class ApplicationServicesHandler:
 
         return events
 
+    async def _handle_to_device(
+        self,
+        service: ApplicationService,
+        new_token: int,
+        users: Collection[Union[str, UserID]],
+    ) -> List[JsonDict]:
+        """
+        Given an application service, determine which events it should receive
+        from those between the last-recorded typing event stream token for this
+        appservice and the given stream token.
+
+        Args:
+            service: The application service to check for which events it should receive.
+            new_token: The latest to-device event stream token.
+            users: The users that should receive new to-device messages.
+
+        Returns:
+            A list of JSON dictionaries containing data derived from the typing events
+                that should be sent to the given application service.
+        """
+        # Get the stream token that this application service has processed up until
+        from_key = await self.store.get_type_stream_id_for_appservice(
+            service, "to_device"
+        )
+
+        # Filter out users that this appservice is not interested in
+        users_appservice_is_interested_in: List[str] = []
+        for user in users:
+            if isinstance(user, UserID):
+                user = user.to_string()
+
+            if service.is_interested_in_user(user):
+                users_appservice_is_interested_in.append(user)
+
+        if not users_appservice_is_interested_in:
+            # Return early if the AS was not interested in any of these users
+            return []
+
+        # Retrieve the to-device messages for each user
+        recipient_user_id_device_id_to_messages = await self.store.get_new_messages(
+            users_appservice_is_interested_in,
+            from_key,
+            new_token,
+        )
+
+        # According to MSC2409, we'll need to add 'to_user_id' and 'to_device_id' fields
+        # to the event JSON so that the application service will know which user/device
+        # combination this messages was intended for.
+        #
+        # So we mangle this dict into a flat list of to-device messages with the relevant
+        # user ID and device ID embedded inside each message dict.
+        message_payload: List[JsonDict] = []
+        for (
+            user_id,
+            device_id,
+        ), messages in recipient_user_id_device_id_to_messages.items():
+            for message_json in messages:
+                # Remove 'message_id' from the to-device message, as it's an internal ID
+                message_json.pop("message_id", None)
+
+                message_payload.append(
+                    {
+                        "to_user_id": user_id,
+                        "to_device_id": device_id,
+                        **message_json,
+                    }
+                )
+
+        return message_payload
+
     async def query_user_exists(self, user_id: str) -> bool:
         """Check if any application service knows this user_id exists.