From 64ec45fc1b0856dc7daacca7d3ab75d50bd89f84 Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Tue, 1 Feb 2022 14:13:38 +0000 Subject: Send to-device messages to application services (#11215) Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- synapse/handlers/appservice.py | 136 +++++++++++++++++++++++++++++++++++------ 1 file changed, 119 insertions(+), 17 deletions(-) (limited to 'synapse/handlers/appservice.py') diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 7833e77e2b..0fb919acf6 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -55,6 +55,9 @@ class ApplicationServicesHandler: self.clock = hs.get_clock() self.notify_appservices = hs.config.appservice.notify_appservices self.event_sources = hs.get_event_sources() + self._msc2409_to_device_messages_enabled = ( + hs.config.experimental.msc2409_to_device_messages_enabled + ) self.current_max = 0 self.is_processing = False @@ -132,7 +135,9 @@ class ApplicationServicesHandler: # Fork off pushes to these services for service in services: - self.scheduler.submit_event_for_as(service, event) + self.scheduler.enqueue_for_appservice( + service, events=[event] + ) now = self.clock.time_msec() ts = await self.store.get_received_ts(event.event_id) @@ -199,8 +204,9 @@ class ApplicationServicesHandler: Args: stream_key: The stream the event came from. - `stream_key` can be "typing_key", "receipt_key" or "presence_key". Any other - value for `stream_key` will cause this function to return early. + `stream_key` can be "typing_key", "receipt_key", "presence_key" or + "to_device_key". Any other value for `stream_key` will cause this function + to return early. Ephemeral events will only be pushed to appservices that have opted into receiving them by setting `push_ephemeral` to true in their registration @@ -216,8 +222,15 @@ class ApplicationServicesHandler: if not self.notify_appservices: return - # Ignore any unsupported streams - if stream_key not in ("typing_key", "receipt_key", "presence_key"): + # Notify appservices of updates in ephemeral event streams. + # Only the following streams are currently supported. + # FIXME: We should use constants for these values. + if stream_key not in ( + "typing_key", + "receipt_key", + "presence_key", + "to_device_key", + ): return # Assert that new_token is an integer (and not a RoomStreamToken). @@ -233,6 +246,13 @@ class ApplicationServicesHandler: # Additional context: https://github.com/matrix-org/synapse/pull/11137 assert isinstance(new_token, int) + # Ignore to-device messages if the feature flag is not enabled + if ( + stream_key == "to_device_key" + and not self._msc2409_to_device_messages_enabled + ): + return + # Check whether there are any appservices which have registered to receive # ephemeral events. # @@ -266,7 +286,7 @@ class ApplicationServicesHandler: with Measure(self.clock, "notify_interested_services_ephemeral"): for service in services: if stream_key == "typing_key": - # Note that we don't persist the token (via set_type_stream_id_for_appservice) + # Note that we don't persist the token (via set_appservice_stream_type_pos) # for typing_key due to performance reasons and due to their highly # ephemeral nature. # @@ -274,7 +294,7 @@ class ApplicationServicesHandler: # and, if they apply to this application service, send it off. events = await self._handle_typing(service, new_token) if events: - self.scheduler.submit_ephemeral_events_for_as(service, events) + self.scheduler.enqueue_for_appservice(service, ephemeral=events) continue # Since we read/update the stream position for this AS/stream @@ -285,28 +305,37 @@ class ApplicationServicesHandler: ): if stream_key == "receipt_key": events = await self._handle_receipts(service, new_token) - if events: - self.scheduler.submit_ephemeral_events_for_as( - service, events - ) + self.scheduler.enqueue_for_appservice(service, ephemeral=events) # Persist the latest handled stream token for this appservice - await self.store.set_type_stream_id_for_appservice( + await self.store.set_appservice_stream_type_pos( service, "read_receipt", new_token ) elif stream_key == "presence_key": events = await self._handle_presence(service, users, new_token) - if events: - self.scheduler.submit_ephemeral_events_for_as( - service, events - ) + self.scheduler.enqueue_for_appservice(service, ephemeral=events) # Persist the latest handled stream token for this appservice - await self.store.set_type_stream_id_for_appservice( + await self.store.set_appservice_stream_type_pos( service, "presence", new_token ) + elif stream_key == "to_device_key": + # Retrieve a list of to-device message events, as well as the + # maximum stream token of the messages we were able to retrieve. + to_device_messages = await self._get_to_device_messages( + service, new_token, users + ) + self.scheduler.enqueue_for_appservice( + service, to_device_messages=to_device_messages + ) + + # Persist the latest handled stream token for this appservice + await self.store.set_appservice_stream_type_pos( + service, "to_device", new_token + ) + async def _handle_typing( self, service: ApplicationService, new_token: int ) -> List[JsonDict]: @@ -440,6 +469,79 @@ class ApplicationServicesHandler: return events + async def _get_to_device_messages( + self, + service: ApplicationService, + new_token: int, + users: Collection[Union[str, UserID]], + ) -> List[JsonDict]: + """ + Given an application service, determine which events it should receive + from those between the last-recorded to-device message stream token for this + appservice and the given stream token. + + Args: + service: The application service to check for which events it should receive. + new_token: The latest to-device event stream token. + users: The users to be notified for the new to-device messages + (ie, the recipients of the messages). + + Returns: + A list of JSON dictionaries containing data derived from the to-device events + that should be sent to the given application service. + """ + # Get the stream token that this application service has processed up until + from_key = await self.store.get_type_stream_id_for_appservice( + service, "to_device" + ) + + # Filter out users that this appservice is not interested in + users_appservice_is_interested_in: List[str] = [] + for user in users: + # FIXME: We should do this farther up the call stack. We currently repeat + # this operation in _handle_presence. + if isinstance(user, UserID): + user = user.to_string() + + if service.is_interested_in_user(user): + users_appservice_is_interested_in.append(user) + + if not users_appservice_is_interested_in: + # Return early if the AS was not interested in any of these users + return [] + + # Retrieve the to-device messages for each user + recipient_device_to_messages = await self.store.get_messages_for_user_devices( + users_appservice_is_interested_in, + from_key, + new_token, + ) + + # According to MSC2409, we'll need to add 'to_user_id' and 'to_device_id' fields + # to the event JSON so that the application service will know which user/device + # combination this messages was intended for. + # + # So we mangle this dict into a flat list of to-device messages with the relevant + # user ID and device ID embedded inside each message dict. + message_payload: List[JsonDict] = [] + for ( + user_id, + device_id, + ), messages in recipient_device_to_messages.items(): + for message_json in messages: + # Remove 'message_id' from the to-device message, as it's an internal ID + message_json.pop("message_id", None) + + message_payload.append( + { + "to_user_id": user_id, + "to_device_id": device_id, + **message_json, + } + ) + + return message_payload + async def query_user_exists(self, user_id: str) -> bool: """Check if any application service knows this user_id exists. -- cgit 1.5.1 From cf06783d54b2b9090aef595a9094ddd857c1155b Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Mon, 7 Feb 2022 18:26:42 +0000 Subject: Remove optional state of `ApplicationService.is_interested`'s `store` parameter (#11911) --- changelog.d/11911.misc | 1 + synapse/appservice/__init__.py | 23 +++++----------------- synapse/handlers/appservice.py | 2 +- tests/appservice/test_appservice.py | 38 +++++++++++++++++++++++++++++++------ 4 files changed, 39 insertions(+), 25 deletions(-) create mode 100644 changelog.d/11911.misc (limited to 'synapse/handlers/appservice.py') diff --git a/changelog.d/11911.misc b/changelog.d/11911.misc new file mode 100644 index 0000000000..805588c2e9 --- /dev/null +++ b/changelog.d/11911.misc @@ -0,0 +1 @@ +Various refactors to the application service notifier code. \ No newline at end of file diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index 7dbebd97b5..a340a8c9c7 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -165,23 +165,16 @@ class ApplicationService: return namespace.exclusive return False - async def _matches_user( - self, event: Optional[EventBase], store: Optional["DataStore"] = None - ) -> bool: - if not event: - return False - + async def _matches_user(self, event: EventBase, store: "DataStore") -> bool: if self.is_interested_in_user(event.sender): return True + # also check m.room.member state key if event.type == EventTypes.Member and self.is_interested_in_user( event.state_key ): return True - if not store: - return False - does_match = await self.matches_user_in_member_list(event.room_id, store) return does_match @@ -216,21 +209,15 @@ class ApplicationService: return self.is_interested_in_room(event.room_id) return False - async def _matches_aliases( - self, event: EventBase, store: Optional["DataStore"] = None - ) -> bool: - if not store or not event: - return False - + async def _matches_aliases(self, event: EventBase, store: "DataStore") -> bool: alias_list = await store.get_aliases_for_room(event.room_id) for alias in alias_list: if self.is_interested_in_alias(alias): return True + return False - async def is_interested( - self, event: EventBase, store: Optional["DataStore"] = None - ) -> bool: + async def is_interested(self, event: EventBase, store: "DataStore") -> bool: """Check if this service is interested in this event. Args: diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 0fb919acf6..a42c3558e4 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -649,7 +649,7 @@ class ApplicationServicesHandler: """Retrieve a list of application services interested in this event. Args: - event: The event to check. Can be None if alias_list is not. + event: The event to check. Returns: A list of services interested in this event based on the service regex. """ diff --git a/tests/appservice/test_appservice.py b/tests/appservice/test_appservice.py index 07d8105f41..9bd6275e92 100644 --- a/tests/appservice/test_appservice.py +++ b/tests/appservice/test_appservice.py @@ -40,13 +40,19 @@ class ApplicationServiceTestCase(unittest.TestCase): ) self.store = Mock() + self.store.get_aliases_for_room = simple_async_mock([]) + self.store.get_users_in_room = simple_async_mock([]) @defer.inlineCallbacks def test_regex_user_id_prefix_match(self): self.service.namespaces[ApplicationService.NS_USERS].append(_regex("@irc_.*")) self.event.sender = "@irc_foobar:matrix.org" self.assertTrue( - (yield defer.ensureDeferred(self.service.is_interested(self.event))) + ( + yield defer.ensureDeferred( + self.service.is_interested(self.event, self.store) + ) + ) ) @defer.inlineCallbacks @@ -54,7 +60,11 @@ class ApplicationServiceTestCase(unittest.TestCase): self.service.namespaces[ApplicationService.NS_USERS].append(_regex("@irc_.*")) self.event.sender = "@someone_else:matrix.org" self.assertFalse( - (yield defer.ensureDeferred(self.service.is_interested(self.event))) + ( + yield defer.ensureDeferred( + self.service.is_interested(self.event, self.store) + ) + ) ) @defer.inlineCallbacks @@ -64,7 +74,11 @@ class ApplicationServiceTestCase(unittest.TestCase): self.event.type = "m.room.member" self.event.state_key = "@irc_foobar:matrix.org" self.assertTrue( - (yield defer.ensureDeferred(self.service.is_interested(self.event))) + ( + yield defer.ensureDeferred( + self.service.is_interested(self.event, self.store) + ) + ) ) @defer.inlineCallbacks @@ -74,7 +88,11 @@ class ApplicationServiceTestCase(unittest.TestCase): ) self.event.room_id = "!some_prefixs0m3th1nGsome_suffix:matrix.org" self.assertTrue( - (yield defer.ensureDeferred(self.service.is_interested(self.event))) + ( + yield defer.ensureDeferred( + self.service.is_interested(self.event, self.store) + ) + ) ) @defer.inlineCallbacks @@ -84,7 +102,11 @@ class ApplicationServiceTestCase(unittest.TestCase): ) self.event.room_id = "!XqBunHwQIXUiqCaoxq:matrix.org" self.assertFalse( - (yield defer.ensureDeferred(self.service.is_interested(self.event))) + ( + yield defer.ensureDeferred( + self.service.is_interested(self.event, self.store) + ) + ) ) @defer.inlineCallbacks @@ -183,7 +205,11 @@ class ApplicationServiceTestCase(unittest.TestCase): self.event.content = {"membership": "invite"} self.event.state_key = self.service.sender self.assertTrue( - (yield defer.ensureDeferred(self.service.is_interested(self.event))) + ( + yield defer.ensureDeferred( + self.service.is_interested(self.event, self.store) + ) + ) ) @defer.inlineCallbacks -- cgit 1.5.1