diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 9abdad262b..fb533188a2 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -33,7 +33,7 @@ from synapse.metrics.background_process_metrics import (
wrap_as_background_process,
)
from synapse.storage.databases.main.directory import RoomAliasMapping
-from synapse.types import JsonDict, RoomAlias, RoomStreamToken, UserID
+from synapse.types import DeviceLists, JsonDict, RoomAlias, RoomStreamToken, UserID
from synapse.util.async_helpers import Linearizer
from synapse.util.metrics import Measure
@@ -55,6 +55,10 @@ class ApplicationServicesHandler:
self.clock = hs.get_clock()
self.notify_appservices = hs.config.appservice.notify_appservices
self.event_sources = hs.get_event_sources()
+ self._msc2409_to_device_messages_enabled = (
+ hs.config.experimental.msc2409_to_device_messages_enabled
+ )
+ self._msc3202_enabled = hs.config.experimental.msc3202_enabled
self.current_max = 0
self.is_processing = False
@@ -132,7 +136,9 @@ class ApplicationServicesHandler:
# Fork off pushes to these services
for service in services:
- self.scheduler.submit_event_for_as(service, event)
+ self.scheduler.enqueue_for_appservice(
+ service, events=[event]
+ )
now = self.clock.time_msec()
ts = await self.store.get_received_ts(event.event_id)
@@ -199,8 +205,9 @@ class ApplicationServicesHandler:
Args:
stream_key: The stream the event came from.
- `stream_key` can be "typing_key", "receipt_key" or "presence_key". Any other
- value for `stream_key` will cause this function to return early.
+ `stream_key` can be "typing_key", "receipt_key", "presence_key",
+ "to_device_key" or "device_list_key". Any other value fo
+ `stream_key` will cause this function to return early.
Ephemeral events will only be pushed to appservices that have opted into
receiving them by setting `push_ephemeral` to true in their registration
@@ -216,8 +223,16 @@ class ApplicationServicesHandler:
if not self.notify_appservices:
return
- # Ignore any unsupported streams
- if stream_key not in ("typing_key", "receipt_key", "presence_key"):
+ # Notify appservices of updates in ephemeral event streams.
+ # Only the following streams are currently supported.
+ # FIXME: We should use constants for these values.
+ if stream_key not in (
+ "typing_key",
+ "receipt_key",
+ "presence_key",
+ "to_device_key",
+ "device_list_key",
+ ):
return
# Assert that new_token is an integer (and not a RoomStreamToken).
@@ -233,6 +248,17 @@ class ApplicationServicesHandler:
# Additional context: https://github.com/matrix-org/synapse/pull/11137
assert isinstance(new_token, int)
+ # Ignore to-device messages if the feature flag is not enabled
+ if (
+ stream_key == "to_device_key"
+ and not self._msc2409_to_device_messages_enabled
+ ):
+ return
+
+ # Ignore device lists if the feature flag is not enabled
+ if stream_key == "device_list_key" and not self._msc3202_enabled:
+ return
+
# Check whether there are any appservices which have registered to receive
# ephemeral events.
#
@@ -266,7 +292,7 @@ class ApplicationServicesHandler:
with Measure(self.clock, "notify_interested_services_ephemeral"):
for service in services:
if stream_key == "typing_key":
- # Note that we don't persist the token (via set_type_stream_id_for_appservice)
+ # Note that we don't persist the token (via set_appservice_stream_type_pos)
# for typing_key due to performance reasons and due to their highly
# ephemeral nature.
#
@@ -274,7 +300,7 @@ class ApplicationServicesHandler:
# and, if they apply to this application service, send it off.
events = await self._handle_typing(service, new_token)
if events:
- self.scheduler.submit_ephemeral_events_for_as(service, events)
+ self.scheduler.enqueue_for_appservice(service, ephemeral=events)
continue
# Since we read/update the stream position for this AS/stream
@@ -285,28 +311,51 @@ class ApplicationServicesHandler:
):
if stream_key == "receipt_key":
events = await self._handle_receipts(service, new_token)
- if events:
- self.scheduler.submit_ephemeral_events_for_as(
- service, events
- )
+ self.scheduler.enqueue_for_appservice(service, ephemeral=events)
# Persist the latest handled stream token for this appservice
- await self.store.set_type_stream_id_for_appservice(
+ await self.store.set_appservice_stream_type_pos(
service, "read_receipt", new_token
)
elif stream_key == "presence_key":
events = await self._handle_presence(service, users, new_token)
- if events:
- self.scheduler.submit_ephemeral_events_for_as(
- service, events
- )
+ self.scheduler.enqueue_for_appservice(service, ephemeral=events)
# Persist the latest handled stream token for this appservice
- await self.store.set_type_stream_id_for_appservice(
+ await self.store.set_appservice_stream_type_pos(
service, "presence", new_token
)
+ elif stream_key == "to_device_key":
+ # Retrieve a list of to-device message events, as well as the
+ # maximum stream token of the messages we were able to retrieve.
+ to_device_messages = await self._get_to_device_messages(
+ service, new_token, users
+ )
+ self.scheduler.enqueue_for_appservice(
+ service, to_device_messages=to_device_messages
+ )
+
+ # Persist the latest handled stream token for this appservice
+ await self.store.set_appservice_stream_type_pos(
+ service, "to_device", new_token
+ )
+
+ elif stream_key == "device_list_key":
+ device_list_summary = await self._get_device_list_summary(
+ service, new_token
+ )
+ if device_list_summary:
+ self.scheduler.enqueue_for_appservice(
+ service, device_list_summary=device_list_summary
+ )
+
+ # Persist the latest handled stream token for this appservice
+ await self.store.set_appservice_stream_type_pos(
+ service, "device_list", new_token
+ )
+
async def _handle_typing(
self, service: ApplicationService, new_token: int
) -> List[JsonDict]:
@@ -440,6 +489,167 @@ class ApplicationServicesHandler:
return events
+ async def _get_to_device_messages(
+ self,
+ service: ApplicationService,
+ new_token: int,
+ users: Collection[Union[str, UserID]],
+ ) -> List[JsonDict]:
+ """
+ Given an application service, determine which events it should receive
+ from those between the last-recorded typing event stream token for this
+ appservice and the given stream token.
+
+ Args:
+ service: The application service to check for which events it should receive.
+ new_token: The latest to-device event stream token.
+ users: The users that should receive new to-device messages.
+
+ Returns:
+ A list of JSON dictionaries containing data derived from the typing events
+ that should be sent to the given application service.
+ """
+ # Get the stream token that this application service has processed up until
+ from_key = await self.store.get_type_stream_id_for_appservice(
+ service, "to_device"
+ )
+
+ # Filter out users that this appservice is not interested in
+ users_appservice_is_interested_in: List[str] = []
+ for user in users:
+ # FIXME: We should do this farther up the call stack. We currently repeat
+ # this operation in _handle_presence.
+ if isinstance(user, UserID):
+ user = user.to_string()
+
+ if service.is_interested_in_user(user):
+ users_appservice_is_interested_in.append(user)
+
+ if not users_appservice_is_interested_in:
+ # Return early if the AS was not interested in any of these users
+ return []
+
+ # Retrieve the to-device messages for each user
+ recipient_user_id_device_id_to_messages = await self.store.get_new_messages(
+ users_appservice_is_interested_in,
+ from_key,
+ new_token,
+ )
+
+ # According to MSC2409, we'll need to add 'to_user_id' and 'to_device_id' fields
+ # to the event JSON so that the application service will know which user/device
+ # combination this messages was intended for.
+ #
+ # So we mangle this dict into a flat list of to-device messages with the relevant
+ # user ID and device ID embedded inside each message dict.
+ message_payload: List[JsonDict] = []
+ for (
+ user_id,
+ device_id,
+ ), messages in recipient_user_id_device_id_to_messages.items():
+ for message_json in messages:
+ # Remove 'message_id' from the to-device message, as it's an internal ID
+ message_json.pop("message_id", None)
+
+ message_payload.append(
+ {
+ "to_user_id": user_id,
+ "to_device_id": device_id,
+ **message_json,
+ }
+ )
+
+ return message_payload
+
+ async def _get_device_list_summary(
+ self,
+ appservice: ApplicationService,
+ new_key: int,
+ ) -> DeviceLists:
+ """
+ Retrieve a list of users who have changed their device lists.
+
+ Args:
+ appservice: The application service to retrieve device list changes for.
+ new_key: The stream key of the device list change that triggered this method call.
+
+ Returns:
+ A set of device list updates, comprised of users that the appservices needs to:
+ * resync the device list of, and
+ * stop tracking the device list of.
+ """
+ # Fetch the last successfully processed device list update stream ID
+ # for this appservice.
+ from_key = await self.store.get_type_stream_id_for_appservice(
+ appservice, "device_list"
+ )
+
+ # Fetch the users who have modified their device list since then.
+ users_with_changed_device_lists = (
+ await self.store.get_users_whose_devices_changed(
+ from_key, filter_user_ids=None, to_key=new_key
+ )
+ )
+
+ # Filter out any users the application service is not interested in
+ #
+ # For each user who changed their device list, we want to check whether this
+ # appservice would be interested in the change.
+ filtered_users_with_changed_device_lists = {
+ user_id
+ for user_id in users_with_changed_device_lists
+ if self._is_appservice_interested_in_device_lists_of_user(
+ appservice, user_id
+ )
+ }
+
+ # Create a summary of "changed" and "left" users.
+ # TODO: Calculate "left" users.
+ device_list_summary = DeviceLists(
+ changed=filtered_users_with_changed_device_lists
+ )
+
+ return device_list_summary
+
+ async def _is_appservice_interested_in_device_lists_of_user(
+ self,
+ appservice: ApplicationService,
+ user_id: str,
+ ) -> bool:
+ """
+ Returns whether a given application service is interested in the device lists of a
+ given user.
+
+ The application service is interested in the user's device lists if any of the
+ following are true:
+ * The user is in the appservice's user namespace.
+ * At least one member of one room that the user is a part of is in the
+ appservice's user namespace.
+ * The appservice is explicitly (via room ID or alias) interested in at
+ least one room that the user is in.
+
+ Args:
+ appservice: The application service to gauge interest of.
+ user_id: The ID of the user whose device list interest is in question.
+
+ Returns:
+ True if the application service is interested in the user's device lists, False
+ otherwise.
+ """
+ if appservice.is_interested_in_user(user_id):
+ return True
+
+ # FIXME: This is quite an expensive check. This method is called per device
+ # list change.
+ room_ids = await self.store.get_rooms_for_user(user_id)
+ for room_id in room_ids:
+ # This method covers checking room members for appservice interest as well as
+ # room ID and alias checks.
+ if await appservice.is_interested_in_room(room_id, self.store):
+ return True
+
+ return False
+
async def query_user_exists(self, user_id: str) -> bool:
"""Check if any application service knows this user_id exists.
@@ -469,7 +679,7 @@ class ApplicationServicesHandler:
room_alias_str = room_alias.to_string()
services = self.store.get_app_services()
alias_query_services = [
- s for s in services if (s.is_interested_in_alias(room_alias_str))
+ s for s in services if (s.is_room_alias_in_namespace(room_alias_str))
]
for alias_service in alias_query_services:
is_known_alias = await self.appservice_api.query_alias(
@@ -558,7 +768,7 @@ class ApplicationServicesHandler:
# inside of a list comprehension anymore.
interested_list = []
for s in services:
- if await s.is_interested(event, self.store):
+ if await s.is_interested_in_event(event, self.store):
interested_list.append(s)
return interested_list
|