summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/appservice.py250
-rw-r--r--synapse/handlers/device.py8
-rw-r--r--synapse/handlers/directory.py6
-rw-r--r--synapse/handlers/sync.py30
-rw-r--r--synapse/handlers/typing.py6
5 files changed, 241 insertions, 59 deletions
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py

index 9abdad262b..fb533188a2 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py
@@ -33,7 +33,7 @@ from synapse.metrics.background_process_metrics import ( wrap_as_background_process, ) from synapse.storage.databases.main.directory import RoomAliasMapping -from synapse.types import JsonDict, RoomAlias, RoomStreamToken, UserID +from synapse.types import DeviceLists, JsonDict, RoomAlias, RoomStreamToken, UserID from synapse.util.async_helpers import Linearizer from synapse.util.metrics import Measure @@ -55,6 +55,10 @@ class ApplicationServicesHandler: self.clock = hs.get_clock() self.notify_appservices = hs.config.appservice.notify_appservices self.event_sources = hs.get_event_sources() + self._msc2409_to_device_messages_enabled = ( + hs.config.experimental.msc2409_to_device_messages_enabled + ) + self._msc3202_enabled = hs.config.experimental.msc3202_enabled self.current_max = 0 self.is_processing = False @@ -132,7 +136,9 @@ class ApplicationServicesHandler: # Fork off pushes to these services for service in services: - self.scheduler.submit_event_for_as(service, event) + self.scheduler.enqueue_for_appservice( + service, events=[event] + ) now = self.clock.time_msec() ts = await self.store.get_received_ts(event.event_id) @@ -199,8 +205,9 @@ class ApplicationServicesHandler: Args: stream_key: The stream the event came from. - `stream_key` can be "typing_key", "receipt_key" or "presence_key". Any other - value for `stream_key` will cause this function to return early. + `stream_key` can be "typing_key", "receipt_key", "presence_key", + "to_device_key" or "device_list_key". Any other value fo + `stream_key` will cause this function to return early. Ephemeral events will only be pushed to appservices that have opted into receiving them by setting `push_ephemeral` to true in their registration @@ -216,8 +223,16 @@ class ApplicationServicesHandler: if not self.notify_appservices: return - # Ignore any unsupported streams - if stream_key not in ("typing_key", "receipt_key", "presence_key"): + # Notify appservices of updates in ephemeral event streams. + # Only the following streams are currently supported. + # FIXME: We should use constants for these values. + if stream_key not in ( + "typing_key", + "receipt_key", + "presence_key", + "to_device_key", + "device_list_key", + ): return # Assert that new_token is an integer (and not a RoomStreamToken). @@ -233,6 +248,17 @@ class ApplicationServicesHandler: # Additional context: https://github.com/matrix-org/synapse/pull/11137 assert isinstance(new_token, int) + # Ignore to-device messages if the feature flag is not enabled + if ( + stream_key == "to_device_key" + and not self._msc2409_to_device_messages_enabled + ): + return + + # Ignore device lists if the feature flag is not enabled + if stream_key == "device_list_key" and not self._msc3202_enabled: + return + # Check whether there are any appservices which have registered to receive # ephemeral events. # @@ -266,7 +292,7 @@ class ApplicationServicesHandler: with Measure(self.clock, "notify_interested_services_ephemeral"): for service in services: if stream_key == "typing_key": - # Note that we don't persist the token (via set_type_stream_id_for_appservice) + # Note that we don't persist the token (via set_appservice_stream_type_pos) # for typing_key due to performance reasons and due to their highly # ephemeral nature. # @@ -274,7 +300,7 @@ class ApplicationServicesHandler: # and, if they apply to this application service, send it off. events = await self._handle_typing(service, new_token) if events: - self.scheduler.submit_ephemeral_events_for_as(service, events) + self.scheduler.enqueue_for_appservice(service, ephemeral=events) continue # Since we read/update the stream position for this AS/stream @@ -285,28 +311,51 @@ class ApplicationServicesHandler: ): if stream_key == "receipt_key": events = await self._handle_receipts(service, new_token) - if events: - self.scheduler.submit_ephemeral_events_for_as( - service, events - ) + self.scheduler.enqueue_for_appservice(service, ephemeral=events) # Persist the latest handled stream token for this appservice - await self.store.set_type_stream_id_for_appservice( + await self.store.set_appservice_stream_type_pos( service, "read_receipt", new_token ) elif stream_key == "presence_key": events = await self._handle_presence(service, users, new_token) - if events: - self.scheduler.submit_ephemeral_events_for_as( - service, events - ) + self.scheduler.enqueue_for_appservice(service, ephemeral=events) # Persist the latest handled stream token for this appservice - await self.store.set_type_stream_id_for_appservice( + await self.store.set_appservice_stream_type_pos( service, "presence", new_token ) + elif stream_key == "to_device_key": + # Retrieve a list of to-device message events, as well as the + # maximum stream token of the messages we were able to retrieve. + to_device_messages = await self._get_to_device_messages( + service, new_token, users + ) + self.scheduler.enqueue_for_appservice( + service, to_device_messages=to_device_messages + ) + + # Persist the latest handled stream token for this appservice + await self.store.set_appservice_stream_type_pos( + service, "to_device", new_token + ) + + elif stream_key == "device_list_key": + device_list_summary = await self._get_device_list_summary( + service, new_token + ) + if device_list_summary: + self.scheduler.enqueue_for_appservice( + service, device_list_summary=device_list_summary + ) + + # Persist the latest handled stream token for this appservice + await self.store.set_appservice_stream_type_pos( + service, "device_list", new_token + ) + async def _handle_typing( self, service: ApplicationService, new_token: int ) -> List[JsonDict]: @@ -440,6 +489,167 @@ class ApplicationServicesHandler: return events + async def _get_to_device_messages( + self, + service: ApplicationService, + new_token: int, + users: Collection[Union[str, UserID]], + ) -> List[JsonDict]: + """ + Given an application service, determine which events it should receive + from those between the last-recorded typing event stream token for this + appservice and the given stream token. + + Args: + service: The application service to check for which events it should receive. + new_token: The latest to-device event stream token. + users: The users that should receive new to-device messages. + + Returns: + A list of JSON dictionaries containing data derived from the typing events + that should be sent to the given application service. + """ + # Get the stream token that this application service has processed up until + from_key = await self.store.get_type_stream_id_for_appservice( + service, "to_device" + ) + + # Filter out users that this appservice is not interested in + users_appservice_is_interested_in: List[str] = [] + for user in users: + # FIXME: We should do this farther up the call stack. We currently repeat + # this operation in _handle_presence. + if isinstance(user, UserID): + user = user.to_string() + + if service.is_interested_in_user(user): + users_appservice_is_interested_in.append(user) + + if not users_appservice_is_interested_in: + # Return early if the AS was not interested in any of these users + return [] + + # Retrieve the to-device messages for each user + recipient_user_id_device_id_to_messages = await self.store.get_new_messages( + users_appservice_is_interested_in, + from_key, + new_token, + ) + + # According to MSC2409, we'll need to add 'to_user_id' and 'to_device_id' fields + # to the event JSON so that the application service will know which user/device + # combination this messages was intended for. + # + # So we mangle this dict into a flat list of to-device messages with the relevant + # user ID and device ID embedded inside each message dict. + message_payload: List[JsonDict] = [] + for ( + user_id, + device_id, + ), messages in recipient_user_id_device_id_to_messages.items(): + for message_json in messages: + # Remove 'message_id' from the to-device message, as it's an internal ID + message_json.pop("message_id", None) + + message_payload.append( + { + "to_user_id": user_id, + "to_device_id": device_id, + **message_json, + } + ) + + return message_payload + + async def _get_device_list_summary( + self, + appservice: ApplicationService, + new_key: int, + ) -> DeviceLists: + """ + Retrieve a list of users who have changed their device lists. + + Args: + appservice: The application service to retrieve device list changes for. + new_key: The stream key of the device list change that triggered this method call. + + Returns: + A set of device list updates, comprised of users that the appservices needs to: + * resync the device list of, and + * stop tracking the device list of. + """ + # Fetch the last successfully processed device list update stream ID + # for this appservice. + from_key = await self.store.get_type_stream_id_for_appservice( + appservice, "device_list" + ) + + # Fetch the users who have modified their device list since then. + users_with_changed_device_lists = ( + await self.store.get_users_whose_devices_changed( + from_key, filter_user_ids=None, to_key=new_key + ) + ) + + # Filter out any users the application service is not interested in + # + # For each user who changed their device list, we want to check whether this + # appservice would be interested in the change. + filtered_users_with_changed_device_lists = { + user_id + for user_id in users_with_changed_device_lists + if self._is_appservice_interested_in_device_lists_of_user( + appservice, user_id + ) + } + + # Create a summary of "changed" and "left" users. + # TODO: Calculate "left" users. + device_list_summary = DeviceLists( + changed=filtered_users_with_changed_device_lists + ) + + return device_list_summary + + async def _is_appservice_interested_in_device_lists_of_user( + self, + appservice: ApplicationService, + user_id: str, + ) -> bool: + """ + Returns whether a given application service is interested in the device lists of a + given user. + + The application service is interested in the user's device lists if any of the + following are true: + * The user is in the appservice's user namespace. + * At least one member of one room that the user is a part of is in the + appservice's user namespace. + * The appservice is explicitly (via room ID or alias) interested in at + least one room that the user is in. + + Args: + appservice: The application service to gauge interest of. + user_id: The ID of the user whose device list interest is in question. + + Returns: + True if the application service is interested in the user's device lists, False + otherwise. + """ + if appservice.is_interested_in_user(user_id): + return True + + # FIXME: This is quite an expensive check. This method is called per device + # list change. + room_ids = await self.store.get_rooms_for_user(user_id) + for room_id in room_ids: + # This method covers checking room members for appservice interest as well as + # room ID and alias checks. + if await appservice.is_interested_in_room(room_id, self.store): + return True + + return False + async def query_user_exists(self, user_id: str) -> bool: """Check if any application service knows this user_id exists. @@ -469,7 +679,7 @@ class ApplicationServicesHandler: room_alias_str = room_alias.to_string() services = self.store.get_app_services() alias_query_services = [ - s for s in services if (s.is_interested_in_alias(room_alias_str)) + s for s in services if (s.is_room_alias_in_namespace(room_alias_str)) ] for alias_service in alias_query_services: is_known_alias = await self.appservice_api.query_alias( @@ -558,7 +768,7 @@ class ApplicationServicesHandler: # inside of a list comprehension anymore. interested_list = [] for s in services: - if await s.is_interested(event, self.store): + if await s.is_interested_in_event(event, self.store): interested_list.append(s) return interested_list diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 82ee11e921..2c07d31dfd 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py
@@ -495,13 +495,11 @@ class DeviceHandler(DeviceWorkerHandler): "Notifying about update %r/%r, ID: %r", user_id, device_id, position ) - room_ids = await self.store.get_rooms_for_user(user_id) - # specify the user ID too since the user should always get their own device list # updates, even if they aren't in any rooms. - self.notifier.on_new_event( - "device_list_key", position, users=[user_id], rooms=room_ids - ) + users_to_notify = users_who_share_room.union(user_id) + + self.notifier.on_new_event("device_list_key", position, users=users_to_notify) if hosts: logger.info( diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 7ee5c47fd9..f49bb806a8 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py
@@ -119,7 +119,7 @@ class DirectoryHandler: service = requester.app_service if service: - if not service.is_interested_in_alias(room_alias_str): + if not service.is_room_alias_in_namespace(room_alias_str): raise SynapseError( 400, "This application service has not reserved this kind of alias.", @@ -221,7 +221,7 @@ class DirectoryHandler: async def delete_appservice_association( self, service: ApplicationService, room_alias: RoomAlias ) -> None: - if not service.is_interested_in_alias(room_alias.to_string()): + if not service.is_room_alias_in_namespace(room_alias.to_string()): raise SynapseError( 400, "This application service has not reserved this kind of alias", @@ -374,7 +374,7 @@ class DirectoryHandler: # non-exclusive locks on the alias (or there are no interested services) services = self.store.get_app_services() interested_services = [ - s for s in services if s.is_interested_in_alias(alias.to_string()) + s for s in services if s.is_room_alias_in_namespace(alias.to_string()) ] for service in interested_services: diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index f3039c3c3f..d004c42885 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py
@@ -13,17 +13,7 @@ # limitations under the License. import itertools import logging -from typing import ( - TYPE_CHECKING, - Any, - Collection, - Dict, - FrozenSet, - List, - Optional, - Set, - Tuple, -) +from typing import TYPE_CHECKING, Any, Dict, FrozenSet, List, Optional, Set, Tuple import attr from prometheus_client import Counter @@ -39,6 +29,7 @@ from synapse.push.clientformat import format_push_rules_for_user from synapse.storage.roommember import MemberSummary from synapse.storage.state import StateFilter from synapse.types import ( + DeviceLists, JsonDict, MutableStateMap, Requester, @@ -183,21 +174,6 @@ class GroupsSyncResult: return bool(self.join or self.invite or self.leave) -@attr.s(slots=True, frozen=True, auto_attribs=True) -class DeviceLists: - """ - Attributes: - changed: List of user_ids whose devices may have changed - left: List of user_ids whose devices we no longer track - """ - - changed: Collection[str] - left: Collection[str] - - def __bool__(self) -> bool: - return bool(self.changed or self.left) - - @attr.s(slots=True, auto_attribs=True) class _RoomChanges: """The set of room entries to include in the sync, plus the set of joined @@ -1329,7 +1305,7 @@ class SyncHandler: return DeviceLists(changed=users_that_have_changed, left=newly_left_users) else: - return DeviceLists(changed=[], left=[]) + return DeviceLists() async def _generate_sync_entry_for_to_device( self, sync_result_builder: "SyncResultBuilder" diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 1676ebd057..985b8ff3be 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py
@@ -442,7 +442,7 @@ class TypingWriterHandler(FollowerTypingHandler): class TypingNotificationEventSource(EventSource[int, JsonDict]): def __init__(self, hs: "HomeServer"): - self.hs = hs + self.store = hs.get_datastore() self.clock = hs.get_clock() # We can't call get_typing_handler here because there's a cycle: # @@ -482,9 +482,7 @@ class TypingNotificationEventSource(EventSource[int, JsonDict]): if handler._room_serials[room_id] <= from_key: continue - if not await service.matches_user_in_member_list( - room_id, handler.store - ): + if not await service.matches_user_in_member_list(room_id, self.store): continue events.append(self._make_event_for(room_id))