diff options
author | Andrew Morgan <andrew@amorgan.xyz> | 2021-11-05 15:47:33 +0000 |
---|---|---|
committer | Andrew Morgan <andrew@amorgan.xyz> | 2021-11-16 12:59:14 +0000 |
commit | b7a44d44024b88dcfa1ebe98b5a8a54fe5074c10 (patch) | |
tree | 7a8961081a9a5d34b56a69a33715860601390e44 /synapse/handlers | |
parent | Add experimental config option to send to-device messages to AS's (diff) | |
download | synapse-b7a44d44024b88dcfa1ebe98b5a8a54fe5074c10.tar.xz |
Add a new ephemeral AS handler for to_device message edus
Here we add the ability for the application service ephemeral message processor to handle new events on the "to_device" stream. We keep track of a stream id (token) per application service, and every time a new to-device message comes in, for each appservice we pull the messages between the last-recorded and current stream id and check whether any of the messages are for a user in that appservice's user namespace. get_new_messages is implemented in the next commit. since we rebased off latest develop.
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/appservice.py | 103 |
1 files changed, 97 insertions, 6 deletions
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index cd042e35b7..8994313a3e 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -202,8 +202,9 @@ class ApplicationServicesHandler: Args: stream_key: The stream the event came from. - `stream_key` can be "typing_key", "receipt_key" or "presence_key". Any other - value for `stream_key` will cause this function to return early. + `stream_key` can be "typing_key", "receipt_key", "presence_key" or + "to_device_key". Any other value for `stream_key` will cause this function + to return early. Ephemeral events will only be pushed to appservices that have opted into receiving them by setting `push_ephemeral` to true in their registration @@ -219,10 +220,6 @@ class ApplicationServicesHandler: if not self.notify_appservices: return - # Ignore any unsupported streams - if stream_key not in ("typing_key", "receipt_key", "presence_key"): - return - # Assert that new_token is an integer (and not a RoomStreamToken). # All of the supported streams that this function handles use an # integer to track progress (rather than a RoomStreamToken - a @@ -236,6 +233,13 @@ class ApplicationServicesHandler: # Additional context: https://github.com/matrix-org/synapse/pull/11137 assert isinstance(new_token, int) + # Ignore to-device messages if the feature flag is not enabled + if ( + stream_key == "to_device_key" + and not self.msc2409_to_device_messages_enabled + ): + return + # Check whether there are any appservices which have registered to receive # ephemeral events. # @@ -310,6 +314,23 @@ class ApplicationServicesHandler: service, "presence", new_token ) + elif ( + stream_key == "to_device_key" + and self.msc2409_to_device_messages_enabled + ): + # Retrieve an iterable of to-device message events, as well as the + # maximum stream token of the messages we were able to retrieve. + events = await self._handle_to_device(service, new_token, users) + if events: + self.scheduler.submit_ephemeral_events_for_as( + service, events + ) + + # Persist the latest handled stream token for this appservice + await self.store.set_type_stream_id_for_appservice( + service, "to_device", new_token + ) + async def _handle_typing( self, service: ApplicationService, new_token: int ) -> List[JsonDict]: @@ -443,6 +464,76 @@ class ApplicationServicesHandler: return events + async def _handle_to_device( + self, + service: ApplicationService, + new_token: int, + users: Collection[Union[str, UserID]], + ) -> List[JsonDict]: + """ + Given an application service, determine which events it should receive + from those between the last-recorded typing event stream token for this + appservice and the given stream token. + + Args: + service: The application service to check for which events it should receive. + new_token: The latest to-device event stream token. + users: The users that should receive new to-device messages. + + Returns: + A list of JSON dictionaries containing data derived from the typing events + that should be sent to the given application service. + """ + # Get the stream token that this application service has processed up until + from_key = await self.store.get_type_stream_id_for_appservice( + service, "to_device" + ) + + # Filter out users that this appservice is not interested in + users_appservice_is_interested_in: List[str] = [] + for user in users: + if isinstance(user, UserID): + user = user.to_string() + + if service.is_interested_in_user(user): + users_appservice_is_interested_in.append(user) + + if not users_appservice_is_interested_in: + # Return early if the AS was not interested in any of these users + return [] + + # Retrieve the to-device messages for each user + recipient_user_id_device_id_to_messages = await self.store.get_new_messages( + users_appservice_is_interested_in, + from_key, + new_token, + ) + + # According to MSC2409, we'll need to add 'to_user_id' and 'to_device_id' fields + # to the event JSON so that the application service will know which user/device + # combination this messages was intended for. + # + # So we mangle this dict into a flat list of to-device messages with the relevant + # user ID and device ID embedded inside each message dict. + message_payload: List[JsonDict] = [] + for ( + user_id, + device_id, + ), messages in recipient_user_id_device_id_to_messages.items(): + for message_json in messages: + # Remove 'message_id' from the to-device message, as it's an internal ID + message_json.pop("message_id", None) + + message_payload.append( + { + "to_user_id": user_id, + "to_device_id": device_id, + **message_json, + } + ) + + return message_payload + async def query_user_exists(self, user_id: str) -> bool: """Check if any application service knows this user_id exists. |