diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 9abdad262b..fb533188a2 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -33,7 +33,7 @@ from synapse.metrics.background_process_metrics import (
wrap_as_background_process,
)
from synapse.storage.databases.main.directory import RoomAliasMapping
-from synapse.types import JsonDict, RoomAlias, RoomStreamToken, UserID
+from synapse.types import DeviceLists, JsonDict, RoomAlias, RoomStreamToken, UserID
from synapse.util.async_helpers import Linearizer
from synapse.util.metrics import Measure
@@ -55,6 +55,10 @@ class ApplicationServicesHandler:
self.clock = hs.get_clock()
self.notify_appservices = hs.config.appservice.notify_appservices
self.event_sources = hs.get_event_sources()
+ self._msc2409_to_device_messages_enabled = (
+ hs.config.experimental.msc2409_to_device_messages_enabled
+ )
+ self._msc3202_enabled = hs.config.experimental.msc3202_enabled
self.current_max = 0
self.is_processing = False
@@ -132,7 +136,9 @@ class ApplicationServicesHandler:
# Fork off pushes to these services
for service in services:
- self.scheduler.submit_event_for_as(service, event)
+ self.scheduler.enqueue_for_appservice(
+ service, events=[event]
+ )
now = self.clock.time_msec()
ts = await self.store.get_received_ts(event.event_id)
@@ -199,8 +205,9 @@ class ApplicationServicesHandler:
Args:
stream_key: The stream the event came from.
- `stream_key` can be "typing_key", "receipt_key" or "presence_key". Any other
- value for `stream_key` will cause this function to return early.
+ `stream_key` can be "typing_key", "receipt_key", "presence_key",
+ "to_device_key" or "device_list_key". Any other value fo
+ `stream_key` will cause this function to return early.
Ephemeral events will only be pushed to appservices that have opted into
receiving them by setting `push_ephemeral` to true in their registration
@@ -216,8 +223,16 @@ class ApplicationServicesHandler:
if not self.notify_appservices:
return
- # Ignore any unsupported streams
- if stream_key not in ("typing_key", "receipt_key", "presence_key"):
+ # Notify appservices of updates in ephemeral event streams.
+ # Only the following streams are currently supported.
+ # FIXME: We should use constants for these values.
+ if stream_key not in (
+ "typing_key",
+ "receipt_key",
+ "presence_key",
+ "to_device_key",
+ "device_list_key",
+ ):
return
# Assert that new_token is an integer (and not a RoomStreamToken).
@@ -233,6 +248,17 @@ class ApplicationServicesHandler:
# Additional context: https://github.com/matrix-org/synapse/pull/11137
assert isinstance(new_token, int)
+ # Ignore to-device messages if the feature flag is not enabled
+ if (
+ stream_key == "to_device_key"
+ and not self._msc2409_to_device_messages_enabled
+ ):
+ return
+
+ # Ignore device lists if the feature flag is not enabled
+ if stream_key == "device_list_key" and not self._msc3202_enabled:
+ return
+
# Check whether there are any appservices which have registered to receive
# ephemeral events.
#
@@ -266,7 +292,7 @@ class ApplicationServicesHandler:
with Measure(self.clock, "notify_interested_services_ephemeral"):
for service in services:
if stream_key == "typing_key":
- # Note that we don't persist the token (via set_type_stream_id_for_appservice)
+ # Note that we don't persist the token (via set_appservice_stream_type_pos)
# for typing_key due to performance reasons and due to their highly
# ephemeral nature.
#
@@ -274,7 +300,7 @@ class ApplicationServicesHandler:
# and, if they apply to this application service, send it off.
events = await self._handle_typing(service, new_token)
if events:
- self.scheduler.submit_ephemeral_events_for_as(service, events)
+ self.scheduler.enqueue_for_appservice(service, ephemeral=events)
continue
# Since we read/update the stream position for this AS/stream
@@ -285,28 +311,51 @@ class ApplicationServicesHandler:
):
if stream_key == "receipt_key":
events = await self._handle_receipts(service, new_token)
- if events:
- self.scheduler.submit_ephemeral_events_for_as(
- service, events
- )
+ self.scheduler.enqueue_for_appservice(service, ephemeral=events)
# Persist the latest handled stream token for this appservice
- await self.store.set_type_stream_id_for_appservice(
+ await self.store.set_appservice_stream_type_pos(
service, "read_receipt", new_token
)
elif stream_key == "presence_key":
events = await self._handle_presence(service, users, new_token)
- if events:
- self.scheduler.submit_ephemeral_events_for_as(
- service, events
- )
+ self.scheduler.enqueue_for_appservice(service, ephemeral=events)
# Persist the latest handled stream token for this appservice
- await self.store.set_type_stream_id_for_appservice(
+ await self.store.set_appservice_stream_type_pos(
service, "presence", new_token
)
+ elif stream_key == "to_device_key":
+ # Retrieve a list of to-device message events, as well as the
+ # maximum stream token of the messages we were able to retrieve.
+ to_device_messages = await self._get_to_device_messages(
+ service, new_token, users
+ )
+ self.scheduler.enqueue_for_appservice(
+ service, to_device_messages=to_device_messages
+ )
+
+ # Persist the latest handled stream token for this appservice
+ await self.store.set_appservice_stream_type_pos(
+ service, "to_device", new_token
+ )
+
+ elif stream_key == "device_list_key":
+ device_list_summary = await self._get_device_list_summary(
+ service, new_token
+ )
+ if device_list_summary:
+ self.scheduler.enqueue_for_appservice(
+ service, device_list_summary=device_list_summary
+ )
+
+ # Persist the latest handled stream token for this appservice
+ await self.store.set_appservice_stream_type_pos(
+ service, "device_list", new_token
+ )
+
async def _handle_typing(
self, service: ApplicationService, new_token: int
) -> List[JsonDict]:
@@ -440,6 +489,167 @@ class ApplicationServicesHandler:
return events
+ async def _get_to_device_messages(
+ self,
+ service: ApplicationService,
+ new_token: int,
+ users: Collection[Union[str, UserID]],
+ ) -> List[JsonDict]:
+ """
+ Given an application service, determine which events it should receive
+ from those between the last-recorded typing event stream token for this
+ appservice and the given stream token.
+
+ Args:
+ service: The application service to check for which events it should receive.
+ new_token: The latest to-device event stream token.
+ users: The users that should receive new to-device messages.
+
+ Returns:
+ A list of JSON dictionaries containing data derived from the typing events
+ that should be sent to the given application service.
+ """
+ # Get the stream token that this application service has processed up until
+ from_key = await self.store.get_type_stream_id_for_appservice(
+ service, "to_device"
+ )
+
+ # Filter out users that this appservice is not interested in
+ users_appservice_is_interested_in: List[str] = []
+ for user in users:
+ # FIXME: We should do this farther up the call stack. We currently repeat
+ # this operation in _handle_presence.
+ if isinstance(user, UserID):
+ user = user.to_string()
+
+ if service.is_interested_in_user(user):
+ users_appservice_is_interested_in.append(user)
+
+ if not users_appservice_is_interested_in:
+ # Return early if the AS was not interested in any of these users
+ return []
+
+ # Retrieve the to-device messages for each user
+ recipient_user_id_device_id_to_messages = await self.store.get_new_messages(
+ users_appservice_is_interested_in,
+ from_key,
+ new_token,
+ )
+
+ # According to MSC2409, we'll need to add 'to_user_id' and 'to_device_id' fields
+ # to the event JSON so that the application service will know which user/device
+ # combination this messages was intended for.
+ #
+ # So we mangle this dict into a flat list of to-device messages with the relevant
+ # user ID and device ID embedded inside each message dict.
+ message_payload: List[JsonDict] = []
+ for (
+ user_id,
+ device_id,
+ ), messages in recipient_user_id_device_id_to_messages.items():
+ for message_json in messages:
+ # Remove 'message_id' from the to-device message, as it's an internal ID
+ message_json.pop("message_id", None)
+
+ message_payload.append(
+ {
+ "to_user_id": user_id,
+ "to_device_id": device_id,
+ **message_json,
+ }
+ )
+
+ return message_payload
+
+ async def _get_device_list_summary(
+ self,
+ appservice: ApplicationService,
+ new_key: int,
+ ) -> DeviceLists:
+ """
+ Retrieve a list of users who have changed their device lists.
+
+ Args:
+ appservice: The application service to retrieve device list changes for.
+ new_key: The stream key of the device list change that triggered this method call.
+
+ Returns:
+ A set of device list updates, comprised of users that the appservices needs to:
+ * resync the device list of, and
+ * stop tracking the device list of.
+ """
+ # Fetch the last successfully processed device list update stream ID
+ # for this appservice.
+ from_key = await self.store.get_type_stream_id_for_appservice(
+ appservice, "device_list"
+ )
+
+ # Fetch the users who have modified their device list since then.
+ users_with_changed_device_lists = (
+ await self.store.get_users_whose_devices_changed(
+ from_key, filter_user_ids=None, to_key=new_key
+ )
+ )
+
+ # Filter out any users the application service is not interested in
+ #
+ # For each user who changed their device list, we want to check whether this
+ # appservice would be interested in the change.
+ filtered_users_with_changed_device_lists = {
+ user_id
+ for user_id in users_with_changed_device_lists
+ if self._is_appservice_interested_in_device_lists_of_user(
+ appservice, user_id
+ )
+ }
+
+ # Create a summary of "changed" and "left" users.
+ # TODO: Calculate "left" users.
+ device_list_summary = DeviceLists(
+ changed=filtered_users_with_changed_device_lists
+ )
+
+ return device_list_summary
+
+ async def _is_appservice_interested_in_device_lists_of_user(
+ self,
+ appservice: ApplicationService,
+ user_id: str,
+ ) -> bool:
+ """
+ Returns whether a given application service is interested in the device lists of a
+ given user.
+
+ The application service is interested in the user's device lists if any of the
+ following are true:
+ * The user is in the appservice's user namespace.
+ * At least one member of one room that the user is a part of is in the
+ appservice's user namespace.
+ * The appservice is explicitly (via room ID or alias) interested in at
+ least one room that the user is in.
+
+ Args:
+ appservice: The application service to gauge interest of.
+ user_id: The ID of the user whose device list interest is in question.
+
+ Returns:
+ True if the application service is interested in the user's device lists, False
+ otherwise.
+ """
+ if appservice.is_interested_in_user(user_id):
+ return True
+
+ # FIXME: This is quite an expensive check. This method is called per device
+ # list change.
+ room_ids = await self.store.get_rooms_for_user(user_id)
+ for room_id in room_ids:
+ # This method covers checking room members for appservice interest as well as
+ # room ID and alias checks.
+ if await appservice.is_interested_in_room(room_id, self.store):
+ return True
+
+ return False
+
async def query_user_exists(self, user_id: str) -> bool:
"""Check if any application service knows this user_id exists.
@@ -469,7 +679,7 @@ class ApplicationServicesHandler:
room_alias_str = room_alias.to_string()
services = self.store.get_app_services()
alias_query_services = [
- s for s in services if (s.is_interested_in_alias(room_alias_str))
+ s for s in services if (s.is_room_alias_in_namespace(room_alias_str))
]
for alias_service in alias_query_services:
is_known_alias = await self.appservice_api.query_alias(
@@ -558,7 +768,7 @@ class ApplicationServicesHandler:
# inside of a list comprehension anymore.
interested_list = []
for s in services:
- if await s.is_interested(event, self.store):
+ if await s.is_interested_in_event(event, self.store):
interested_list.append(s)
return interested_list
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 82ee11e921..2c07d31dfd 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -495,13 +495,11 @@ class DeviceHandler(DeviceWorkerHandler):
"Notifying about update %r/%r, ID: %r", user_id, device_id, position
)
- room_ids = await self.store.get_rooms_for_user(user_id)
-
# specify the user ID too since the user should always get their own device list
# updates, even if they aren't in any rooms.
- self.notifier.on_new_event(
- "device_list_key", position, users=[user_id], rooms=room_ids
- )
+ users_to_notify = users_who_share_room.union(user_id)
+
+ self.notifier.on_new_event("device_list_key", position, users=users_to_notify)
if hosts:
logger.info(
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 7ee5c47fd9..f49bb806a8 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -119,7 +119,7 @@ class DirectoryHandler:
service = requester.app_service
if service:
- if not service.is_interested_in_alias(room_alias_str):
+ if not service.is_room_alias_in_namespace(room_alias_str):
raise SynapseError(
400,
"This application service has not reserved this kind of alias.",
@@ -221,7 +221,7 @@ class DirectoryHandler:
async def delete_appservice_association(
self, service: ApplicationService, room_alias: RoomAlias
) -> None:
- if not service.is_interested_in_alias(room_alias.to_string()):
+ if not service.is_room_alias_in_namespace(room_alias.to_string()):
raise SynapseError(
400,
"This application service has not reserved this kind of alias",
@@ -374,7 +374,7 @@ class DirectoryHandler:
# non-exclusive locks on the alias (or there are no interested services)
services = self.store.get_app_services()
interested_services = [
- s for s in services if s.is_interested_in_alias(alias.to_string())
+ s for s in services if s.is_room_alias_in_namespace(alias.to_string())
]
for service in interested_services:
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index f3039c3c3f..d004c42885 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -13,17 +13,7 @@
# limitations under the License.
import itertools
import logging
-from typing import (
- TYPE_CHECKING,
- Any,
- Collection,
- Dict,
- FrozenSet,
- List,
- Optional,
- Set,
- Tuple,
-)
+from typing import TYPE_CHECKING, Any, Dict, FrozenSet, List, Optional, Set, Tuple
import attr
from prometheus_client import Counter
@@ -39,6 +29,7 @@ from synapse.push.clientformat import format_push_rules_for_user
from synapse.storage.roommember import MemberSummary
from synapse.storage.state import StateFilter
from synapse.types import (
+ DeviceLists,
JsonDict,
MutableStateMap,
Requester,
@@ -183,21 +174,6 @@ class GroupsSyncResult:
return bool(self.join or self.invite or self.leave)
-@attr.s(slots=True, frozen=True, auto_attribs=True)
-class DeviceLists:
- """
- Attributes:
- changed: List of user_ids whose devices may have changed
- left: List of user_ids whose devices we no longer track
- """
-
- changed: Collection[str]
- left: Collection[str]
-
- def __bool__(self) -> bool:
- return bool(self.changed or self.left)
-
-
@attr.s(slots=True, auto_attribs=True)
class _RoomChanges:
"""The set of room entries to include in the sync, plus the set of joined
@@ -1329,7 +1305,7 @@ class SyncHandler:
return DeviceLists(changed=users_that_have_changed, left=newly_left_users)
else:
- return DeviceLists(changed=[], left=[])
+ return DeviceLists()
async def _generate_sync_entry_for_to_device(
self, sync_result_builder: "SyncResultBuilder"
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 1676ebd057..985b8ff3be 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -442,7 +442,7 @@ class TypingWriterHandler(FollowerTypingHandler):
class TypingNotificationEventSource(EventSource[int, JsonDict]):
def __init__(self, hs: "HomeServer"):
- self.hs = hs
+ self.store = hs.get_datastore()
self.clock = hs.get_clock()
# We can't call get_typing_handler here because there's a cycle:
#
@@ -482,9 +482,7 @@ class TypingNotificationEventSource(EventSource[int, JsonDict]):
if handler._room_serials[room_id] <= from_key:
continue
- if not await service.matches_user_in_member_list(
- room_id, handler.store
- ):
+ if not await service.matches_user_in_member_list(room_id, self.store):
continue
events.append(self._make_event_for(room_id))
|