summary refs log tree commit diff
path: root/synapse/appservice
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/appservice')
-rw-r--r--synapse/appservice/__init__.py149
-rw-r--r--synapse/appservice/api.py47
-rw-r--r--synapse/appservice/scheduler.py151
3 files changed, 265 insertions, 82 deletions
diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py

index f9d3bd337d..01db2b2ae3 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py
@@ -18,7 +18,7 @@ from typing import TYPE_CHECKING, Iterable, List, Match, Optional from synapse.api.constants import EventTypes from synapse.events import EventBase -from synapse.types import GroupID, JsonDict, UserID, get_domain_from_id +from synapse.types import DeviceLists, GroupID, JsonDict, UserID, get_domain_from_id from synapse.util.caches.descriptors import _CacheContext, cached if TYPE_CHECKING: @@ -144,26 +144,6 @@ class ApplicationService: return regex_obj["exclusive"] return False - async def _matches_user( - self, event: Optional[EventBase], store: Optional["DataStore"] = None - ) -> bool: - if not event: - return False - - if self.is_interested_in_user(event.sender): - return True - # also check m.room.member state key - if event.type == EventTypes.Member and self.is_interested_in_user( - event.state_key - ): - return True - - if not store: - return False - - does_match = await self.matches_user_in_member_list(event.room_id, store) - return does_match - @cached(num_args=1, cache_context=True) async def matches_user_in_member_list( self, @@ -171,14 +151,15 @@ class ApplicationService: store: "DataStore", cache_context: _CacheContext, ) -> bool: - """Check if this service is interested a room based upon it's membership + """Check if this appservice is interested a room based upon whether any members + fall into the appservice's user namespace. Args: room_id: The room to check. store: The datastore to query. Returns: - True if this service would like to know about this room. + True if this appservice would like to know about this room. """ member_list = await store.get_users_in_room( room_id, on_invalidate=cache_context.invalidate @@ -190,28 +171,82 @@ class ApplicationService: return True return False - def _matches_room_id(self, event: EventBase) -> bool: - if hasattr(event, "room_id"): - return self.is_interested_in_room(event.room_id) - return False + def is_interested_in_user( + self, + user_id: str, + ) -> bool: + """ + Returns whether the application is interested in a given user ID. + + The appservice is considered to be interested in a user if either: the + user ID is in the appservice's user namespace, or if the user is the + appservice's configured sender_localpart. + + Args: + user_id: The ID of the user to check. - async def _matches_aliases( - self, event: EventBase, store: Optional["DataStore"] = None + Returns: + True if the application service is interested in the user, False if not. + """ + return ( + # User is the appservice's sender_localpart user + user_id == self.sender + # User is in a defined namespace + or self.is_user_in_namespace(user_id) + ) + + @cached(num_args=1, cache_context=True) + async def is_interested_in_room( + self, + room_id: str, + store: "DataStore", + cache_context: _CacheContext, ) -> bool: - if not store or not event: - return False + """ + Returns whether the application service is interested in a given room ID. + + The appservice is considered to be interested in the room if either: the ID or one + of the aliases of the room is in the appservice's room ID or alias namespace + respectively, or if one of the members of the room fall into the appservice's user + namespace. + + Args: + room_id: The ID of the room to check. + store: The homeserver's datastore class. + + Returns: + True if the application service is interested in the room, False if not. + """ + # Check if we have interest in this room ID + if self.is_room_id_in_namespace(room_id): + return True - alias_list = await store.get_aliases_for_room(event.room_id) + # or any of the aliases this room has + alias_list = await store.get_aliases_for_room(room_id) for alias in alias_list: - if self.is_interested_in_alias(alias): + if self.is_room_alias_in_namespace(alias): return True - return False - async def is_interested( - self, event: EventBase, store: Optional["DataStore"] = None + # And finally, perform an expensive check on whether the appservice + # is interested in any users in the room based on their user ID + # and the appservice's user namespace. + return await self.matches_user_in_member_list( + room_id, store, on_invalidate=cache_context.invalidate + ) + + @cached(num_args=1, cache_context=True) + async def is_interested_in_event( + self, + event: EventBase, + store: "DataStore", + cache_context: _CacheContext, ) -> bool: """Check if this service is interested in this event. + Interest in an event is determined by whether this appservice is interested in + either the room the event was sent in, the sender of the event or - if the + event is of type "m.room.member", the user referenced by the event's state key. + Args: event: The event to check. store: The datastore to query. @@ -220,23 +255,28 @@ class ApplicationService: True if this service would like to know about this event. """ # Do cheap checks first - if self._matches_room_id(event): + + # Check if we're interested in this user by namespace (or if they're the + # sender_localpart user) + if self.is_interested_in_user(event.sender): return True - # This will check the namespaces first before - # checking the store, so should be run before _matches_aliases - if await self._matches_user(event, store): + # or, if this is a membership event, the user it references by namespace + if event.type == EventTypes.Member and self.is_interested_in_user( + event.state_key + ): return True - # This will check the store, so should be run last - if await self._matches_aliases(event, store): + if await self.is_interested_in_room( + event.room_id, store, on_invalidate=cache_context.invalidate + ): return True return False - @cached(num_args=1) + @cached(num_args=1, cache_context=True) async def is_interested_in_presence( - self, user_id: UserID, store: "DataStore" + self, user_id: UserID, store: "DataStore", cache_context: _CacheContext ) -> bool: """Check if this service is interested a user's presence @@ -254,20 +294,19 @@ class ApplicationService: # Then find out if the appservice is interested in any of those rooms for room_id in room_ids: - if await self.matches_user_in_member_list(room_id, store): + if await self.matches_user_in_member_list( + room_id, store, on_invalidate=cache_context.invalidate + ): return True return False - def is_interested_in_user(self, user_id: str) -> bool: - return ( - bool(self._matches_regex(user_id, ApplicationService.NS_USERS)) - or user_id == self.sender - ) + def is_user_in_namespace(self, user_id: str) -> bool: + return bool(self._matches_regex(user_id, ApplicationService.NS_USERS)) - def is_interested_in_alias(self, alias: str) -> bool: + def is_room_alias_in_namespace(self, alias: str) -> bool: return bool(self._matches_regex(alias, ApplicationService.NS_ALIASES)) - def is_interested_in_room(self, room_id: str) -> bool: + def is_room_id_in_namespace(self, room_id: str) -> bool: return bool(self._matches_regex(room_id, ApplicationService.NS_ROOMS)) def is_exclusive_user(self, user_id: str) -> bool: @@ -330,11 +369,15 @@ class AppServiceTransaction: id: int, events: List[EventBase], ephemeral: List[JsonDict], + to_device_messages: List[JsonDict], + device_list_summary: DeviceLists, ): self.service = service self.id = id self.events = events self.ephemeral = ephemeral + self.to_device_messages = to_device_messages + self.device_list_summary = device_list_summary async def send(self, as_api: "ApplicationServiceApi") -> bool: """Sends this transaction using the provided AS API interface. @@ -348,6 +391,8 @@ class AppServiceTransaction: service=self.service, events=self.events, ephemeral=self.ephemeral, + to_device_messages=self.to_device_messages, + device_list_summary=self.device_list_summary, txn_id=self.id, ) diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index f51b636417..3ae59c7a04 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py
@@ -1,4 +1,5 @@ # Copyright 2015, 2016 OpenMarket Ltd +# Copyright 2021 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,7 +14,7 @@ # limitations under the License. import logging import urllib -from typing import TYPE_CHECKING, List, Optional, Tuple +from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Union from prometheus_client import Counter @@ -22,7 +23,7 @@ from synapse.api.errors import CodeMessageException from synapse.events import EventBase from synapse.events.utils import serialize_event from synapse.http.client import SimpleHttpClient -from synapse.types import JsonDict, ThirdPartyInstanceID +from synapse.types import DeviceLists, JsonDict, ThirdPartyInstanceID from synapse.util.caches.response_cache import ResponseCache if TYPE_CHECKING: @@ -204,12 +205,26 @@ class ApplicationServiceApi(SimpleHttpClient): service: "ApplicationService", events: List[EventBase], ephemeral: List[JsonDict], + to_device_messages: List[JsonDict], + device_list_summary: DeviceLists, txn_id: Optional[int] = None, - ): + ) -> bool: + """ + Push data to an application service. + Args: + service: The application service to send to. + events: The persistent events to send. + ephemeral: The ephemeral events to send. + to_device_messages: The to-device messages to send. + txn_id: An unique ID to assign to this transaction. Application services should + deduplicate transactions received with identitical IDs. + Returns: + True if the task succeeded, False if it failed. + """ if service.url is None: return True - events = self._serialize(service, events) + serialized_events = self._serialize(service, events) if txn_id is None: logger.warning( @@ -220,10 +235,28 @@ class ApplicationServiceApi(SimpleHttpClient): uri = service.url + ("/transactions/%s" % urllib.parse.quote(str(txn_id))) # Never send ephemeral events to appservices that do not support it + body: Dict[str, Union[JsonDict, List[JsonDict]]] = {"events": serialized_events} if service.supports_ephemeral: - body = {"events": events, "de.sorunome.msc2409.ephemeral": ephemeral} - else: - body = {"events": events} + body.update( + { + # TODO: Update to stable prefixes once MSC2409 completes FCP merge. + "de.sorunome.msc2409.ephemeral": ephemeral, + "de.sorunome.msc2409.to_device": to_device_messages, + } + ) + + # Send device list summaries if needed + if device_list_summary: + logger.info("Sending device list summary: %s", device_list_summary) + body.update( + { + # TODO: Update to stable prefix once MSC3202 completes FCP merge + "org.matrix.msc3202.device_lists": { + "changed": list(device_list_summary.changed), + "left": list(device_list_summary.left), + } + } + ) try: await self.put_json( diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 6a2ce99b55..d49636d926 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py
@@ -48,13 +48,13 @@ This is all tied together by the AppServiceScheduler which DIs the required components. """ import logging -from typing import List, Optional +from typing import Dict, Iterable, List, Optional from synapse.appservice import ApplicationService, ApplicationServiceState from synapse.events import EventBase from synapse.logging.context import run_in_background from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.types import JsonDict +from synapse.types import DeviceLists, JsonDict logger = logging.getLogger(__name__) @@ -65,6 +65,9 @@ MAX_PERSISTENT_EVENTS_PER_TRANSACTION = 100 # Maximum number of ephemeral events to provide in an AS transaction. MAX_EPHEMERAL_EVENTS_PER_TRANSACTION = 100 +# Maximum number of to-device messages to provide in an AS transaction. +MAX_TO_DEVICE_MESSAGES_PER_TRANSACTION = 100 + class ApplicationServiceScheduler: """Public facing API for this module. Does the required DI to tie the @@ -91,13 +94,53 @@ class ApplicationServiceScheduler: for service in services: self.txn_ctrl.start_recoverer(service) - def submit_event_for_as(self, service: ApplicationService, event: EventBase): - self.queuer.enqueue_event(service, event) + def enqueue_for_appservice( + self, + appservice: ApplicationService, + events: Optional[Iterable[EventBase]] = None, + ephemeral: Optional[Iterable[JsonDict]] = None, + to_device_messages: Optional[Iterable[JsonDict]] = None, + device_list_summary: Optional[DeviceLists] = None, + ) -> None: + """ + Enqueue some data to be sent off to an application service. + + Args: + appservice: The application service to create and send a transaction to. + events: The persistent room events to send. + ephemeral: The ephemeral events to send. + to_device_messages: The to-device messages to send. These differ from normal + to-device messages sent to clients, as they have 'to_device_id' and + 'to_user_id' fields. + device_list_summary: A summary of users that the application service either needs + to refresh the device lists of, or those that the application service need no + longer track the device lists of. + """ + # We purposefully allow this method to run with empty events/ephemeral + # iterables, so that callers do not need to check iterable size themselves. + if ( + not events + and not ephemeral + and not to_device_messages + and not device_list_summary + ): + return + + if events: + self.queuer.queued_events.setdefault(appservice.id, []).extend(events) + if ephemeral: + self.queuer.queued_ephemeral.setdefault(appservice.id, []).extend(ephemeral) + if to_device_messages: + self.queuer.queued_to_device_messages.setdefault(appservice.id, []).extend( + to_device_messages + ) + if device_list_summary: + self.queuer.queued_device_list_summaries.setdefault( + appservice.id, [] + ).append(device_list_summary) - def submit_ephemeral_events_for_as( - self, service: ApplicationService, events: List[JsonDict] - ): - self.queuer.enqueue_ephemeral(service, events) + # Kick off a new application service transaction + self.queuer.start_background_request(appservice) class _ServiceQueuer: @@ -109,15 +152,21 @@ class _ServiceQueuer: """ def __init__(self, txn_ctrl, clock): - self.queued_events = {} # dict of {service_id: [events]} - self.queued_ephemeral = {} # dict of {service_id: [events]} + # dict of {service_id: [events]} + self.queued_events: Dict[str, List[EventBase]] = {} + # dict of {service_id: [event_json]} + self.queued_ephemeral: Dict[str, List[JsonDict]] = {} + # dict of {service_id: [to_device_message_json]} + self.queued_to_device_messages: Dict[str, List[JsonDict]] = {} + # dict of {service_id: [device_list_summary]} + self.queued_device_list_summaries: Dict[str, List[DeviceLists]] = {} # the appservices which currently have a transaction in flight self.requests_in_flight = set() self.txn_ctrl = txn_ctrl self.clock = clock - def _start_background_request(self, service): + def start_background_request(self, service): # start a sender for this appservice if we don't already have one if service.id in self.requests_in_flight: return @@ -126,14 +175,6 @@ class _ServiceQueuer: "as-sender-%s" % (service.id,), self._send_request, service ) - def enqueue_event(self, service: ApplicationService, event: EventBase): - self.queued_events.setdefault(service.id, []).append(event) - self._start_background_request(service) - - def enqueue_ephemeral(self, service: ApplicationService, events: List[JsonDict]): - self.queued_ephemeral.setdefault(service.id, []).extend(events) - self._start_background_request(service) - async def _send_request(self, service: ApplicationService): # sanity-check: we shouldn't get here if this service already has a sender # running. @@ -150,11 +191,58 @@ class _ServiceQueuer: ephemeral = all_events_ephemeral[:MAX_EPHEMERAL_EVENTS_PER_TRANSACTION] del all_events_ephemeral[:MAX_EPHEMERAL_EVENTS_PER_TRANSACTION] - if not events and not ephemeral: + all_to_device_messages = self.queued_to_device_messages.get( + service.id, [] + ) + to_device_messages_to_send = all_to_device_messages[ + :MAX_TO_DEVICE_MESSAGES_PER_TRANSACTION + ] + del all_to_device_messages[:MAX_TO_DEVICE_MESSAGES_PER_TRANSACTION] + + # Consolidate any pending device list summaries into a single, up-to-date + # summary. + # Note: this code assumes that in a single DeviceLists, a user will + # never be in both "changed" and "left" sets. + device_list_summary = DeviceLists() + while self.queued_device_list_summaries.get(service.id, []): + # Pop a summary off the front of the queue + summary = self.queued_device_list_summaries[service.id].pop(0) + + # For every user in the incoming "changed" set: + # * Remove them from the existing "left" set if necessary + # (as we need to start tracking them again) + # * Add them to the existing "changed" set if necessary. + for user_id in summary.changed: + if user_id in device_list_summary.left: + device_list_summary.left.remove(user_id) + device_list_summary.changed.add(user_id) + + # For every user in the incoming "left" set: + # * Remove them from the existing "changed" set if necessary + # (we no longer need to track them) + # * Add them to the existing "left" set if necessary. + for user_id in summary.left: + if user_id in device_list_summary.changed: + device_list_summary.changed.remove(user_id) + device_list_summary.left.add(user_id) + + if ( + not events + and not ephemeral + and not to_device_messages_to_send + # Note that DeviceLists implements __bool__ + and not device_list_summary + ): return try: - await self.txn_ctrl.send(service, events, ephemeral) + await self.txn_ctrl.send( + service, + events, + ephemeral, + to_device_messages_to_send, + device_list_summary, + ) except Exception: logger.exception("AS request failed") finally: @@ -191,10 +279,27 @@ class _TransactionController: service: ApplicationService, events: List[EventBase], ephemeral: Optional[List[JsonDict]] = None, - ): + to_device_messages: Optional[List[JsonDict]] = None, + device_list_summary: Optional[DeviceLists] = None, + ) -> None: + """ + Create a transaction with the given data and send to the provided + application service. + + Args: + service: The application service to send the transaction to. + events: The persistent events to include in the transaction. + ephemeral: The ephemeral events to include in the transaction. + to_device_messages: The to-device messages to include in the transaction. + device_list_summary: The device list summary to include in the transaction. + """ try: txn = await self.store.create_appservice_txn( - service=service, events=events, ephemeral=ephemeral or [] + service=service, + events=events, + ephemeral=ephemeral or [], + to_device_messages=to_device_messages or [], + device_list_summary=device_list_summary or DeviceLists(), ) service_is_up = await self._is_service_up(service) if service_is_up: