diff options
author | Andrew Morgan <andrewm@element.io> | 2022-03-04 14:54:19 +0000 |
---|---|---|
committer | Andrew Morgan <andrewm@element.io> | 2022-03-10 15:50:58 +0000 |
commit | 047db4da1ceb96a543239833de3a7c00ca24959b (patch) | |
tree | 9c2de7ec2303ee758677924b1c9e20b65973f39a | |
parent | Switch DeviceLists to containing Sets, which allows item deletes (diff) | |
download | synapse-047db4da1ceb96a543239833de3a7c00ca24959b.tar.xz |
Use get_users_whose_devices_changed to pull device list changes for given AS
When a new device list change occurs, we're now: 1. For each appservice, checking the last device list stream key that was processed up until. 2. Getting any users with changed device list between the last device list stream key and the stream key of the triggering update. 3. Filtering out those users based on those that are actually relevant to this application service. 4. Passing those changes to enqueue_for_appservice and saving the device list stream key that we've just processed up to for later reference.
-rw-r--r-- | synapse/handlers/appservice.py | 108 |
1 files changed, 107 insertions, 1 deletions
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 0aba6238a5..b189eb95d0 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -33,7 +33,7 @@ from synapse.metrics.background_process_metrics import ( wrap_as_background_process, ) from synapse.storage.databases.main.directory import RoomAliasMapping -from synapse.types import JsonDict, RoomAlias, RoomStreamToken, UserID +from synapse.types import DeviceLists, JsonDict, RoomAlias, RoomStreamToken, UserID from synapse.util.async_helpers import Linearizer from synapse.util.metrics import Measure @@ -362,6 +362,20 @@ class ApplicationServicesHandler: service, "to_device", new_token ) + elif stream_key == "device_list_key": + device_list_summary = await self._get_device_list_summary( + service, new_token + ) + if device_list_summary: + self.scheduler.enqueue_for_appservice( + service, device_list_summary=device_list_summary + ) + + # Persist the latest handled stream token for this appservice + await self.store.set_appservice_stream_type_pos( + service, "device_list", new_token + ) + async def _handle_typing( self, service: ApplicationService, new_token: int ) -> List[JsonDict]: @@ -568,6 +582,98 @@ class ApplicationServicesHandler: return message_payload + async def _get_device_list_summary( + self, + appservice: ApplicationService, + new_key: int, + ) -> DeviceLists: + """ + Retrieve a list of users who have changed their device lists. + + Args: + appservice: The application service to retrieve device list changes for. + new_key: The stream key of the device list change that triggered this method call. + + Returns: + A set of device list updates, comprised of users that the appservices needs to: + * resync the device list of, and + * stop tracking the device list of. + """ + # Fetch the last successfully processed device list update stream ID + # for this appservice. + from_key = await self.store.get_type_stream_id_for_appservice( + appservice, "device_list" + ) + + # Fetch the users who have modified their device list since then. + users_with_changed_device_lists = ( + await self.store.get_users_whose_devices_changed( + from_key, user_ids=None, to_key=new_key + ) + ) + + # Filter out any users the application service is not interested in + # + # For each user who changed their device list, we want to check whether this + # appservice would be interested in the change. + filtered_users_with_changed_device_lists = { + user_id + for user_id in users_with_changed_device_lists + if await self._is_appservice_interested_in_device_lists_of_user( + appservice, user_id + ) + } + + # Create a summary of "changed" and "left" users. + # TODO: Calculate "left" users. + device_list_summary = DeviceLists( + changed=filtered_users_with_changed_device_lists + ) + + return device_list_summary + + async def _is_appservice_interested_in_device_lists_of_user( + self, + appservice: ApplicationService, + user_id: str, + ) -> bool: + """ + Returns whether a given application service is interested in the device list + updates of a given user. + + The application service is interested in the user's device list updates if any + of the following are true: + * The user is the appservice's sender localpart user. + * The user is in the appservice's user namespace. + * At least one member of one room that the user is a part of is in the + appservice's user namespace. + * The appservice is explicitly (via room ID or alias) interested in at + least one room that the user is in. + + Args: + appservice: The application service to gauge interest of. + user_id: The ID of the user whose device list interest is in question. + + Returns: + True if the application service is interested in the user's device lists, False + otherwise. + """ + # This method checks against both the sender localpart user as well as if the + # user is in the appservice's user namespace. + if appservice.is_interested_in_user(user_id): + return True + + # FIXME: This is quite an expensive check. This method is called per device + # list change. + room_ids = await self.store.get_rooms_for_user(user_id) + for room_id in room_ids: + # This method covers checking room members for appservice interest as well as + # room ID and alias checks. + if await appservice.is_interested_in_room(room_id, self.store): + return True + + return False + async def query_user_exists(self, user_id: str) -> bool: """Check if any application service knows this user_id exists. |