diff options
Diffstat (limited to 'synapse/appservice')
-rw-r--r-- | synapse/appservice/__init__.py | 26 | ||||
-rw-r--r-- | synapse/appservice/api.py | 29 | ||||
-rw-r--r-- | synapse/appservice/scheduler.py | 97 |
3 files changed, 106 insertions, 46 deletions
diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index 8c9ff93b2c..a340a8c9c7 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -165,23 +165,16 @@ class ApplicationService: return namespace.exclusive return False - async def _matches_user( - self, event: Optional[EventBase], store: Optional["DataStore"] = None - ) -> bool: - if not event: - return False - + async def _matches_user(self, event: EventBase, store: "DataStore") -> bool: if self.is_interested_in_user(event.sender): return True + # also check m.room.member state key if event.type == EventTypes.Member and self.is_interested_in_user( event.state_key ): return True - if not store: - return False - does_match = await self.matches_user_in_member_list(event.room_id, store) return does_match @@ -216,21 +209,15 @@ class ApplicationService: return self.is_interested_in_room(event.room_id) return False - async def _matches_aliases( - self, event: EventBase, store: Optional["DataStore"] = None - ) -> bool: - if not store or not event: - return False - + async def _matches_aliases(self, event: EventBase, store: "DataStore") -> bool: alias_list = await store.get_aliases_for_room(event.room_id) for alias in alias_list: if self.is_interested_in_alias(alias): return True + return False - async def is_interested( - self, event: EventBase, store: Optional["DataStore"] = None - ) -> bool: + async def is_interested(self, event: EventBase, store: "DataStore") -> bool: """Check if this service is interested in this event. Args: @@ -351,11 +338,13 @@ class AppServiceTransaction: id: int, events: List[EventBase], ephemeral: List[JsonDict], + to_device_messages: List[JsonDict], ): self.service = service self.id = id self.events = events self.ephemeral = ephemeral + self.to_device_messages = to_device_messages async def send(self, as_api: "ApplicationServiceApi") -> bool: """Sends this transaction using the provided AS API interface. @@ -369,6 +358,7 @@ class AppServiceTransaction: service=self.service, events=self.events, ephemeral=self.ephemeral, + to_device_messages=self.to_device_messages, txn_id=self.id, ) diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index def4424af0..73be7ff3d4 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -218,8 +218,23 @@ class ApplicationServiceApi(SimpleHttpClient): service: "ApplicationService", events: List[EventBase], ephemeral: List[JsonDict], + to_device_messages: List[JsonDict], txn_id: Optional[int] = None, ) -> bool: + """ + Push data to an application service. + + Args: + service: The application service to send to. + events: The persistent events to send. + ephemeral: The ephemeral events to send. + to_device_messages: The to-device messages to send. + txn_id: An unique ID to assign to this transaction. Application services should + deduplicate transactions received with identitical IDs. + + Returns: + True if the task succeeded, False if it failed. + """ if service.url is None: return True @@ -237,13 +252,15 @@ class ApplicationServiceApi(SimpleHttpClient): uri = service.url + ("/transactions/%s" % urllib.parse.quote(str(txn_id))) # Never send ephemeral events to appservices that do not support it + body: Dict[str, List[JsonDict]] = {"events": serialized_events} if service.supports_ephemeral: - body = { - "events": serialized_events, - "de.sorunome.msc2409.ephemeral": ephemeral, - } - else: - body = {"events": serialized_events} + body.update( + { + # TODO: Update to stable prefixes once MSC2409 completes FCP merge. + "de.sorunome.msc2409.ephemeral": ephemeral, + "de.sorunome.msc2409.to_device": to_device_messages, + } + ) try: await self.put_json( diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 185e3a5278..c42fa32fff 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -48,7 +48,16 @@ This is all tied together by the AppServiceScheduler which DIs the required components. """ import logging -from typing import TYPE_CHECKING, Awaitable, Callable, Dict, List, Optional, Set +from typing import ( + TYPE_CHECKING, + Awaitable, + Callable, + Collection, + Dict, + List, + Optional, + Set, +) from synapse.appservice import ApplicationService, ApplicationServiceState from synapse.appservice.api import ApplicationServiceApi @@ -71,6 +80,9 @@ MAX_PERSISTENT_EVENTS_PER_TRANSACTION = 100 # Maximum number of ephemeral events to provide in an AS transaction. MAX_EPHEMERAL_EVENTS_PER_TRANSACTION = 100 +# Maximum number of to-device messages to provide in an AS transaction. +MAX_TO_DEVICE_MESSAGES_PER_TRANSACTION = 100 + class ApplicationServiceScheduler: """Public facing API for this module. Does the required DI to tie the @@ -97,15 +109,40 @@ class ApplicationServiceScheduler: for service in services: self.txn_ctrl.start_recoverer(service) - def submit_event_for_as( - self, service: ApplicationService, event: EventBase + def enqueue_for_appservice( + self, + appservice: ApplicationService, + events: Optional[Collection[EventBase]] = None, + ephemeral: Optional[Collection[JsonDict]] = None, + to_device_messages: Optional[Collection[JsonDict]] = None, ) -> None: - self.queuer.enqueue_event(service, event) + """ + Enqueue some data to be sent off to an application service. - def submit_ephemeral_events_for_as( - self, service: ApplicationService, events: List[JsonDict] - ) -> None: - self.queuer.enqueue_ephemeral(service, events) + Args: + appservice: The application service to create and send a transaction to. + events: The persistent room events to send. + ephemeral: The ephemeral events to send. + to_device_messages: The to-device messages to send. These differ from normal + to-device messages sent to clients, as they have 'to_device_id' and + 'to_user_id' fields. + """ + # We purposefully allow this method to run with empty events/ephemeral + # collections, so that callers do not need to check iterable size themselves. + if not events and not ephemeral and not to_device_messages: + return + + if events: + self.queuer.queued_events.setdefault(appservice.id, []).extend(events) + if ephemeral: + self.queuer.queued_ephemeral.setdefault(appservice.id, []).extend(ephemeral) + if to_device_messages: + self.queuer.queued_to_device_messages.setdefault(appservice.id, []).extend( + to_device_messages + ) + + # Kick off a new application service transaction + self.queuer.start_background_request(appservice) class _ServiceQueuer: @@ -121,13 +158,15 @@ class _ServiceQueuer: self.queued_events: Dict[str, List[EventBase]] = {} # dict of {service_id: [events]} self.queued_ephemeral: Dict[str, List[JsonDict]] = {} + # dict of {service_id: [to_device_message_json]} + self.queued_to_device_messages: Dict[str, List[JsonDict]] = {} # the appservices which currently have a transaction in flight self.requests_in_flight: Set[str] = set() self.txn_ctrl = txn_ctrl self.clock = clock - def _start_background_request(self, service: ApplicationService) -> None: + def start_background_request(self, service: ApplicationService) -> None: # start a sender for this appservice if we don't already have one if service.id in self.requests_in_flight: return @@ -136,16 +175,6 @@ class _ServiceQueuer: "as-sender-%s" % (service.id,), self._send_request, service ) - def enqueue_event(self, service: ApplicationService, event: EventBase) -> None: - self.queued_events.setdefault(service.id, []).append(event) - self._start_background_request(service) - - def enqueue_ephemeral( - self, service: ApplicationService, events: List[JsonDict] - ) -> None: - self.queued_ephemeral.setdefault(service.id, []).extend(events) - self._start_background_request(service) - async def _send_request(self, service: ApplicationService) -> None: # sanity-check: we shouldn't get here if this service already has a sender # running. @@ -162,11 +191,21 @@ class _ServiceQueuer: ephemeral = all_events_ephemeral[:MAX_EPHEMERAL_EVENTS_PER_TRANSACTION] del all_events_ephemeral[:MAX_EPHEMERAL_EVENTS_PER_TRANSACTION] - if not events and not ephemeral: + all_to_device_messages = self.queued_to_device_messages.get( + service.id, [] + ) + to_device_messages_to_send = all_to_device_messages[ + :MAX_TO_DEVICE_MESSAGES_PER_TRANSACTION + ] + del all_to_device_messages[:MAX_TO_DEVICE_MESSAGES_PER_TRANSACTION] + + if not events and not ephemeral and not to_device_messages_to_send: return try: - await self.txn_ctrl.send(service, events, ephemeral) + await self.txn_ctrl.send( + service, events, ephemeral, to_device_messages_to_send + ) except Exception: logger.exception("AS request failed") finally: @@ -198,10 +237,24 @@ class _TransactionController: service: ApplicationService, events: List[EventBase], ephemeral: Optional[List[JsonDict]] = None, + to_device_messages: Optional[List[JsonDict]] = None, ) -> None: + """ + Create a transaction with the given data and send to the provided + application service. + + Args: + service: The application service to send the transaction to. + events: The persistent events to include in the transaction. + ephemeral: The ephemeral events to include in the transaction. + to_device_messages: The to-device messages to include in the transaction. + """ try: txn = await self.store.create_appservice_txn( - service=service, events=events, ephemeral=ephemeral or [] + service=service, + events=events, + ephemeral=ephemeral or [], + to_device_messages=to_device_messages or [], ) service_is_up = await self._is_service_up(service) if service_is_up: |