diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py
index f9d3bd337d..01db2b2ae3 100644
--- a/synapse/appservice/__init__.py
+++ b/synapse/appservice/__init__.py
@@ -18,7 +18,7 @@ from typing import TYPE_CHECKING, Iterable, List, Match, Optional
from synapse.api.constants import EventTypes
from synapse.events import EventBase
-from synapse.types import GroupID, JsonDict, UserID, get_domain_from_id
+from synapse.types import DeviceLists, GroupID, JsonDict, UserID, get_domain_from_id
from synapse.util.caches.descriptors import _CacheContext, cached
if TYPE_CHECKING:
@@ -144,26 +144,6 @@ class ApplicationService:
return regex_obj["exclusive"]
return False
- async def _matches_user(
- self, event: Optional[EventBase], store: Optional["DataStore"] = None
- ) -> bool:
- if not event:
- return False
-
- if self.is_interested_in_user(event.sender):
- return True
- # also check m.room.member state key
- if event.type == EventTypes.Member and self.is_interested_in_user(
- event.state_key
- ):
- return True
-
- if not store:
- return False
-
- does_match = await self.matches_user_in_member_list(event.room_id, store)
- return does_match
-
@cached(num_args=1, cache_context=True)
async def matches_user_in_member_list(
self,
@@ -171,14 +151,15 @@ class ApplicationService:
store: "DataStore",
cache_context: _CacheContext,
) -> bool:
- """Check if this service is interested a room based upon it's membership
+ """Check if this appservice is interested a room based upon whether any members
+ fall into the appservice's user namespace.
Args:
room_id: The room to check.
store: The datastore to query.
Returns:
- True if this service would like to know about this room.
+ True if this appservice would like to know about this room.
"""
member_list = await store.get_users_in_room(
room_id, on_invalidate=cache_context.invalidate
@@ -190,28 +171,82 @@ class ApplicationService:
return True
return False
- def _matches_room_id(self, event: EventBase) -> bool:
- if hasattr(event, "room_id"):
- return self.is_interested_in_room(event.room_id)
- return False
+ def is_interested_in_user(
+ self,
+ user_id: str,
+ ) -> bool:
+ """
+ Returns whether the application is interested in a given user ID.
+
+ The appservice is considered to be interested in a user if either: the
+ user ID is in the appservice's user namespace, or if the user is the
+ appservice's configured sender_localpart.
+
+ Args:
+ user_id: The ID of the user to check.
- async def _matches_aliases(
- self, event: EventBase, store: Optional["DataStore"] = None
+ Returns:
+ True if the application service is interested in the user, False if not.
+ """
+ return (
+ # User is the appservice's sender_localpart user
+ user_id == self.sender
+ # User is in a defined namespace
+ or self.is_user_in_namespace(user_id)
+ )
+
+ @cached(num_args=1, cache_context=True)
+ async def is_interested_in_room(
+ self,
+ room_id: str,
+ store: "DataStore",
+ cache_context: _CacheContext,
) -> bool:
- if not store or not event:
- return False
+ """
+ Returns whether the application service is interested in a given room ID.
+
+ The appservice is considered to be interested in the room if either: the ID or one
+ of the aliases of the room is in the appservice's room ID or alias namespace
+ respectively, or if one of the members of the room fall into the appservice's user
+ namespace.
+
+ Args:
+ room_id: The ID of the room to check.
+ store: The homeserver's datastore class.
+
+ Returns:
+ True if the application service is interested in the room, False if not.
+ """
+ # Check if we have interest in this room ID
+ if self.is_room_id_in_namespace(room_id):
+ return True
- alias_list = await store.get_aliases_for_room(event.room_id)
+ # or any of the aliases this room has
+ alias_list = await store.get_aliases_for_room(room_id)
for alias in alias_list:
- if self.is_interested_in_alias(alias):
+ if self.is_room_alias_in_namespace(alias):
return True
- return False
- async def is_interested(
- self, event: EventBase, store: Optional["DataStore"] = None
+ # And finally, perform an expensive check on whether the appservice
+ # is interested in any users in the room based on their user ID
+ # and the appservice's user namespace.
+ return await self.matches_user_in_member_list(
+ room_id, store, on_invalidate=cache_context.invalidate
+ )
+
+ @cached(num_args=1, cache_context=True)
+ async def is_interested_in_event(
+ self,
+ event: EventBase,
+ store: "DataStore",
+ cache_context: _CacheContext,
) -> bool:
"""Check if this service is interested in this event.
+ Interest in an event is determined by whether this appservice is interested in
+ either the room the event was sent in, the sender of the event or - if the
+ event is of type "m.room.member", the user referenced by the event's state key.
+
Args:
event: The event to check.
store: The datastore to query.
@@ -220,23 +255,28 @@ class ApplicationService:
True if this service would like to know about this event.
"""
# Do cheap checks first
- if self._matches_room_id(event):
+
+ # Check if we're interested in this user by namespace (or if they're the
+ # sender_localpart user)
+ if self.is_interested_in_user(event.sender):
return True
- # This will check the namespaces first before
- # checking the store, so should be run before _matches_aliases
- if await self._matches_user(event, store):
+ # or, if this is a membership event, the user it references by namespace
+ if event.type == EventTypes.Member and self.is_interested_in_user(
+ event.state_key
+ ):
return True
- # This will check the store, so should be run last
- if await self._matches_aliases(event, store):
+ if await self.is_interested_in_room(
+ event.room_id, store, on_invalidate=cache_context.invalidate
+ ):
return True
return False
- @cached(num_args=1)
+ @cached(num_args=1, cache_context=True)
async def is_interested_in_presence(
- self, user_id: UserID, store: "DataStore"
+ self, user_id: UserID, store: "DataStore", cache_context: _CacheContext
) -> bool:
"""Check if this service is interested a user's presence
@@ -254,20 +294,19 @@ class ApplicationService:
# Then find out if the appservice is interested in any of those rooms
for room_id in room_ids:
- if await self.matches_user_in_member_list(room_id, store):
+ if await self.matches_user_in_member_list(
+ room_id, store, on_invalidate=cache_context.invalidate
+ ):
return True
return False
- def is_interested_in_user(self, user_id: str) -> bool:
- return (
- bool(self._matches_regex(user_id, ApplicationService.NS_USERS))
- or user_id == self.sender
- )
+ def is_user_in_namespace(self, user_id: str) -> bool:
+ return bool(self._matches_regex(user_id, ApplicationService.NS_USERS))
- def is_interested_in_alias(self, alias: str) -> bool:
+ def is_room_alias_in_namespace(self, alias: str) -> bool:
return bool(self._matches_regex(alias, ApplicationService.NS_ALIASES))
- def is_interested_in_room(self, room_id: str) -> bool:
+ def is_room_id_in_namespace(self, room_id: str) -> bool:
return bool(self._matches_regex(room_id, ApplicationService.NS_ROOMS))
def is_exclusive_user(self, user_id: str) -> bool:
@@ -330,11 +369,15 @@ class AppServiceTransaction:
id: int,
events: List[EventBase],
ephemeral: List[JsonDict],
+ to_device_messages: List[JsonDict],
+ device_list_summary: DeviceLists,
):
self.service = service
self.id = id
self.events = events
self.ephemeral = ephemeral
+ self.to_device_messages = to_device_messages
+ self.device_list_summary = device_list_summary
async def send(self, as_api: "ApplicationServiceApi") -> bool:
"""Sends this transaction using the provided AS API interface.
@@ -348,6 +391,8 @@ class AppServiceTransaction:
service=self.service,
events=self.events,
ephemeral=self.ephemeral,
+ to_device_messages=self.to_device_messages,
+ device_list_summary=self.device_list_summary,
txn_id=self.id,
)
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index f51b636417..3ae59c7a04 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -1,4 +1,5 @@
# Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2021 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -13,7 +14,7 @@
# limitations under the License.
import logging
import urllib
-from typing import TYPE_CHECKING, List, Optional, Tuple
+from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Union
from prometheus_client import Counter
@@ -22,7 +23,7 @@ from synapse.api.errors import CodeMessageException
from synapse.events import EventBase
from synapse.events.utils import serialize_event
from synapse.http.client import SimpleHttpClient
-from synapse.types import JsonDict, ThirdPartyInstanceID
+from synapse.types import DeviceLists, JsonDict, ThirdPartyInstanceID
from synapse.util.caches.response_cache import ResponseCache
if TYPE_CHECKING:
@@ -204,12 +205,26 @@ class ApplicationServiceApi(SimpleHttpClient):
service: "ApplicationService",
events: List[EventBase],
ephemeral: List[JsonDict],
+ to_device_messages: List[JsonDict],
+ device_list_summary: DeviceLists,
txn_id: Optional[int] = None,
- ):
+ ) -> bool:
+ """
+ Push data to an application service.
+ Args:
+ service: The application service to send to.
+ events: The persistent events to send.
+ ephemeral: The ephemeral events to send.
+ to_device_messages: The to-device messages to send.
+ txn_id: An unique ID to assign to this transaction. Application services should
+ deduplicate transactions received with identitical IDs.
+ Returns:
+ True if the task succeeded, False if it failed.
+ """
if service.url is None:
return True
- events = self._serialize(service, events)
+ serialized_events = self._serialize(service, events)
if txn_id is None:
logger.warning(
@@ -220,10 +235,28 @@ class ApplicationServiceApi(SimpleHttpClient):
uri = service.url + ("/transactions/%s" % urllib.parse.quote(str(txn_id)))
# Never send ephemeral events to appservices that do not support it
+ body: Dict[str, Union[JsonDict, List[JsonDict]]] = {"events": serialized_events}
if service.supports_ephemeral:
- body = {"events": events, "de.sorunome.msc2409.ephemeral": ephemeral}
- else:
- body = {"events": events}
+ body.update(
+ {
+ # TODO: Update to stable prefixes once MSC2409 completes FCP merge.
+ "de.sorunome.msc2409.ephemeral": ephemeral,
+ "de.sorunome.msc2409.to_device": to_device_messages,
+ }
+ )
+
+ # Send device list summaries if needed
+ if device_list_summary:
+ logger.info("Sending device list summary: %s", device_list_summary)
+ body.update(
+ {
+ # TODO: Update to stable prefix once MSC3202 completes FCP merge
+ "org.matrix.msc3202.device_lists": {
+ "changed": list(device_list_summary.changed),
+ "left": list(device_list_summary.left),
+ }
+ }
+ )
try:
await self.put_json(
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 6a2ce99b55..d49636d926 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -48,13 +48,13 @@ This is all tied together by the AppServiceScheduler which DIs the required
components.
"""
import logging
-from typing import List, Optional
+from typing import Dict, Iterable, List, Optional
from synapse.appservice import ApplicationService, ApplicationServiceState
from synapse.events import EventBase
from synapse.logging.context import run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.types import JsonDict
+from synapse.types import DeviceLists, JsonDict
logger = logging.getLogger(__name__)
@@ -65,6 +65,9 @@ MAX_PERSISTENT_EVENTS_PER_TRANSACTION = 100
# Maximum number of ephemeral events to provide in an AS transaction.
MAX_EPHEMERAL_EVENTS_PER_TRANSACTION = 100
+# Maximum number of to-device messages to provide in an AS transaction.
+MAX_TO_DEVICE_MESSAGES_PER_TRANSACTION = 100
+
class ApplicationServiceScheduler:
"""Public facing API for this module. Does the required DI to tie the
@@ -91,13 +94,53 @@ class ApplicationServiceScheduler:
for service in services:
self.txn_ctrl.start_recoverer(service)
- def submit_event_for_as(self, service: ApplicationService, event: EventBase):
- self.queuer.enqueue_event(service, event)
+ def enqueue_for_appservice(
+ self,
+ appservice: ApplicationService,
+ events: Optional[Iterable[EventBase]] = None,
+ ephemeral: Optional[Iterable[JsonDict]] = None,
+ to_device_messages: Optional[Iterable[JsonDict]] = None,
+ device_list_summary: Optional[DeviceLists] = None,
+ ) -> None:
+ """
+ Enqueue some data to be sent off to an application service.
+
+ Args:
+ appservice: The application service to create and send a transaction to.
+ events: The persistent room events to send.
+ ephemeral: The ephemeral events to send.
+ to_device_messages: The to-device messages to send. These differ from normal
+ to-device messages sent to clients, as they have 'to_device_id' and
+ 'to_user_id' fields.
+ device_list_summary: A summary of users that the application service either needs
+ to refresh the device lists of, or those that the application service need no
+ longer track the device lists of.
+ """
+ # We purposefully allow this method to run with empty events/ephemeral
+ # iterables, so that callers do not need to check iterable size themselves.
+ if (
+ not events
+ and not ephemeral
+ and not to_device_messages
+ and not device_list_summary
+ ):
+ return
+
+ if events:
+ self.queuer.queued_events.setdefault(appservice.id, []).extend(events)
+ if ephemeral:
+ self.queuer.queued_ephemeral.setdefault(appservice.id, []).extend(ephemeral)
+ if to_device_messages:
+ self.queuer.queued_to_device_messages.setdefault(appservice.id, []).extend(
+ to_device_messages
+ )
+ if device_list_summary:
+ self.queuer.queued_device_list_summaries.setdefault(
+ appservice.id, []
+ ).append(device_list_summary)
- def submit_ephemeral_events_for_as(
- self, service: ApplicationService, events: List[JsonDict]
- ):
- self.queuer.enqueue_ephemeral(service, events)
+ # Kick off a new application service transaction
+ self.queuer.start_background_request(appservice)
class _ServiceQueuer:
@@ -109,15 +152,21 @@ class _ServiceQueuer:
"""
def __init__(self, txn_ctrl, clock):
- self.queued_events = {} # dict of {service_id: [events]}
- self.queued_ephemeral = {} # dict of {service_id: [events]}
+ # dict of {service_id: [events]}
+ self.queued_events: Dict[str, List[EventBase]] = {}
+ # dict of {service_id: [event_json]}
+ self.queued_ephemeral: Dict[str, List[JsonDict]] = {}
+ # dict of {service_id: [to_device_message_json]}
+ self.queued_to_device_messages: Dict[str, List[JsonDict]] = {}
+ # dict of {service_id: [device_list_summary]}
+ self.queued_device_list_summaries: Dict[str, List[DeviceLists]] = {}
# the appservices which currently have a transaction in flight
self.requests_in_flight = set()
self.txn_ctrl = txn_ctrl
self.clock = clock
- def _start_background_request(self, service):
+ def start_background_request(self, service):
# start a sender for this appservice if we don't already have one
if service.id in self.requests_in_flight:
return
@@ -126,14 +175,6 @@ class _ServiceQueuer:
"as-sender-%s" % (service.id,), self._send_request, service
)
- def enqueue_event(self, service: ApplicationService, event: EventBase):
- self.queued_events.setdefault(service.id, []).append(event)
- self._start_background_request(service)
-
- def enqueue_ephemeral(self, service: ApplicationService, events: List[JsonDict]):
- self.queued_ephemeral.setdefault(service.id, []).extend(events)
- self._start_background_request(service)
-
async def _send_request(self, service: ApplicationService):
# sanity-check: we shouldn't get here if this service already has a sender
# running.
@@ -150,11 +191,58 @@ class _ServiceQueuer:
ephemeral = all_events_ephemeral[:MAX_EPHEMERAL_EVENTS_PER_TRANSACTION]
del all_events_ephemeral[:MAX_EPHEMERAL_EVENTS_PER_TRANSACTION]
- if not events and not ephemeral:
+ all_to_device_messages = self.queued_to_device_messages.get(
+ service.id, []
+ )
+ to_device_messages_to_send = all_to_device_messages[
+ :MAX_TO_DEVICE_MESSAGES_PER_TRANSACTION
+ ]
+ del all_to_device_messages[:MAX_TO_DEVICE_MESSAGES_PER_TRANSACTION]
+
+ # Consolidate any pending device list summaries into a single, up-to-date
+ # summary.
+ # Note: this code assumes that in a single DeviceLists, a user will
+ # never be in both "changed" and "left" sets.
+ device_list_summary = DeviceLists()
+ while self.queued_device_list_summaries.get(service.id, []):
+ # Pop a summary off the front of the queue
+ summary = self.queued_device_list_summaries[service.id].pop(0)
+
+ # For every user in the incoming "changed" set:
+ # * Remove them from the existing "left" set if necessary
+ # (as we need to start tracking them again)
+ # * Add them to the existing "changed" set if necessary.
+ for user_id in summary.changed:
+ if user_id in device_list_summary.left:
+ device_list_summary.left.remove(user_id)
+ device_list_summary.changed.add(user_id)
+
+ # For every user in the incoming "left" set:
+ # * Remove them from the existing "changed" set if necessary
+ # (we no longer need to track them)
+ # * Add them to the existing "left" set if necessary.
+ for user_id in summary.left:
+ if user_id in device_list_summary.changed:
+ device_list_summary.changed.remove(user_id)
+ device_list_summary.left.add(user_id)
+
+ if (
+ not events
+ and not ephemeral
+ and not to_device_messages_to_send
+ # Note that DeviceLists implements __bool__
+ and not device_list_summary
+ ):
return
try:
- await self.txn_ctrl.send(service, events, ephemeral)
+ await self.txn_ctrl.send(
+ service,
+ events,
+ ephemeral,
+ to_device_messages_to_send,
+ device_list_summary,
+ )
except Exception:
logger.exception("AS request failed")
finally:
@@ -191,10 +279,27 @@ class _TransactionController:
service: ApplicationService,
events: List[EventBase],
ephemeral: Optional[List[JsonDict]] = None,
- ):
+ to_device_messages: Optional[List[JsonDict]] = None,
+ device_list_summary: Optional[DeviceLists] = None,
+ ) -> None:
+ """
+ Create a transaction with the given data and send to the provided
+ application service.
+
+ Args:
+ service: The application service to send the transaction to.
+ events: The persistent events to include in the transaction.
+ ephemeral: The ephemeral events to include in the transaction.
+ to_device_messages: The to-device messages to include in the transaction.
+ device_list_summary: The device list summary to include in the transaction.
+ """
try:
txn = await self.store.create_appservice_txn(
- service=service, events=events, ephemeral=ephemeral or []
+ service=service,
+ events=events,
+ ephemeral=ephemeral or [],
+ to_device_messages=to_device_messages or [],
+ device_list_summary=device_list_summary or DeviceLists(),
)
service_is_up = await self._is_service_up(service)
if service_is_up:
|