diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index bd913e524e..316c4b677c 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -33,7 +33,13 @@ from synapse.metrics.background_process_metrics import (
wrap_as_background_process,
)
from synapse.storage.databases.main.directory import RoomAliasMapping
-from synapse.types import JsonDict, RoomAlias, RoomStreamToken, UserID
+from synapse.types import (
+ DeviceListUpdates,
+ JsonDict,
+ RoomAlias,
+ RoomStreamToken,
+ UserID,
+)
from synapse.util.async_helpers import Linearizer
from synapse.util.metrics import Measure
@@ -58,6 +64,9 @@ class ApplicationServicesHandler:
self._msc2409_to_device_messages_enabled = (
hs.config.experimental.msc2409_to_device_messages_enabled
)
+ self._msc3202_transaction_extensions_enabled = (
+ hs.config.experimental.msc3202_transaction_extensions
+ )
self.current_max = 0
self.is_processing = False
@@ -204,9 +213,9 @@ class ApplicationServicesHandler:
Args:
stream_key: The stream the event came from.
- `stream_key` can be "typing_key", "receipt_key", "presence_key" or
- "to_device_key". Any other value for `stream_key` will cause this function
- to return early.
+ `stream_key` can be "typing_key", "receipt_key", "presence_key",
+ "to_device_key" or "device_list_key". Any other value for `stream_key`
+ will cause this function to return early.
Ephemeral events will only be pushed to appservices that have opted into
receiving them by setting `push_ephemeral` to true in their registration
@@ -230,6 +239,7 @@ class ApplicationServicesHandler:
"receipt_key",
"presence_key",
"to_device_key",
+ "device_list_key",
):
return
@@ -253,15 +263,37 @@ class ApplicationServicesHandler:
):
return
+ # Ignore device lists if the feature flag is not enabled
+ if (
+ stream_key == "device_list_key"
+ and not self._msc3202_transaction_extensions_enabled
+ ):
+ return
+
# Check whether there are any appservices which have registered to receive
# ephemeral events.
#
# Note that whether these events are actually relevant to these appservices
# is decided later on.
+ services = self.store.get_app_services()
services = [
service
- for service in self.store.get_app_services()
- if service.supports_ephemeral
+ for service in services
+ # Different stream keys require different support booleans
+ if (
+ stream_key
+ in (
+ "typing_key",
+ "receipt_key",
+ "presence_key",
+ "to_device_key",
+ )
+ and service.supports_ephemeral
+ )
+ or (
+ stream_key == "device_list_key"
+ and service.msc3202_transaction_extensions
+ )
]
if not services:
# Bail out early if none of the target appservices have explicitly registered
@@ -336,6 +368,20 @@ class ApplicationServicesHandler:
service, "to_device", new_token
)
+ elif stream_key == "device_list_key":
+ device_list_summary = await self._get_device_list_summary(
+ service, new_token
+ )
+ if device_list_summary:
+ self.scheduler.enqueue_for_appservice(
+ service, device_list_summary=device_list_summary
+ )
+
+ # Persist the latest handled stream token for this appservice
+ await self.store.set_appservice_stream_type_pos(
+ service, "device_list", new_token
+ )
+
async def _handle_typing(
self, service: ApplicationService, new_token: int
) -> List[JsonDict]:
@@ -542,6 +588,96 @@ class ApplicationServicesHandler:
return message_payload
+ async def _get_device_list_summary(
+ self,
+ appservice: ApplicationService,
+ new_key: int,
+ ) -> DeviceListUpdates:
+ """
+ Retrieve a list of users who have changed their device lists.
+
+ Args:
+ appservice: The application service to retrieve device list changes for.
+ new_key: The stream key of the device list change that triggered this method call.
+
+ Returns:
+ A set of device list updates, comprised of users that the appservices needs to:
+ * resync the device list of, and
+ * stop tracking the device list of.
+ """
+ # Fetch the last successfully processed device list update stream ID
+ # for this appservice.
+ from_key = await self.store.get_type_stream_id_for_appservice(
+ appservice, "device_list"
+ )
+
+ # Fetch the users who have modified their device list since then.
+ users_with_changed_device_lists = (
+ await self.store.get_users_whose_devices_changed(from_key, to_key=new_key)
+ )
+
+ # Filter out any users the application service is not interested in
+ #
+ # For each user who changed their device list, we want to check whether this
+ # appservice would be interested in the change.
+ filtered_users_with_changed_device_lists = {
+ user_id
+ for user_id in users_with_changed_device_lists
+ if await self._is_appservice_interested_in_device_lists_of_user(
+ appservice, user_id
+ )
+ }
+
+ # Create a summary of "changed" and "left" users.
+ # TODO: Calculate "left" users.
+ device_list_summary = DeviceListUpdates(
+ changed=filtered_users_with_changed_device_lists
+ )
+
+ return device_list_summary
+
+ async def _is_appservice_interested_in_device_lists_of_user(
+ self,
+ appservice: ApplicationService,
+ user_id: str,
+ ) -> bool:
+ """
+ Returns whether a given application service is interested in the device list
+ updates of a given user.
+
+ The application service is interested in the user's device list updates if any
+ of the following are true:
+ * The user is the appservice's sender localpart user.
+ * The user is in the appservice's user namespace.
+ * At least one member of one room that the user is a part of is in the
+ appservice's user namespace.
+ * The appservice is explicitly (via room ID or alias) interested in at
+ least one room that the user is in.
+
+ Args:
+ appservice: The application service to gauge interest of.
+ user_id: The ID of the user whose device list interest is in question.
+
+ Returns:
+ True if the application service is interested in the user's device lists, False
+ otherwise.
+ """
+ # This method checks against both the sender localpart user as well as if the
+ # user is in the appservice's user namespace.
+ if appservice.is_interested_in_user(user_id):
+ return True
+
+ # Determine whether any of the rooms the user is in justifies sending this
+ # device list update to the application service.
+ room_ids = await self.store.get_rooms_for_user(user_id)
+ for room_id in room_ids:
+ # This method covers checking room members for appservice interest as well as
+ # room ID and alias checks.
+ if await appservice.is_interested_in_room(room_id, self.store):
+ return True
+
+ return False
+
async def query_user_exists(self, user_id: str) -> bool:
"""Check if any application service knows this user_id exists.
|