diff options
-rw-r--r-- | synapse/appservice/scheduler.py | 62 | ||||
-rw-r--r-- | synapse/handlers/appservice.py | 16 |
2 files changed, 44 insertions, 34 deletions
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index a211257088..305e751d46 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -48,7 +48,7 @@ This is all tied together by the AppServiceScheduler which DIs the required components. """ import logging -from typing import Iterable, List, Optional +from typing import Dict, Iterable, List, Optional from synapse.appservice import ApplicationService, ApplicationServiceState from synapse.events import EventBase @@ -91,25 +91,31 @@ class ApplicationServiceScheduler: for service in services: self.txn_ctrl.start_recoverer(service) - def submit_event_for_as(self, service: ApplicationService, event: EventBase): - self.queuer.enqueue_event(service, event) - - def submit_ephemeral_events_for_as( - self, service: ApplicationService, events: Iterable[JsonDict] + def enqueue_for_appservice( + self, + appservice: ApplicationService, + events: Optional[Iterable[EventBase]] = None, + ephemeral: Optional[Iterable[JsonDict]] = None, ) -> None: """ - Send ephemeral events to application services, and schedule a new - outgoing AS transaction. - + Enqueue some data to be sent off to an application service. Args: - service: The service to send ephemeral events to. - events: The ephemeral events to send. + appservice: The application service to create and send a transaction to. + events: The persistent room events to send. + ephemeral: The ephemeral events to send. """ - # Ensure we have some events to send - if not events: + # We purposefully allow this method to run with empty events/ephemeral + # iterables, so that callers do not need to check iterable size themselves. + if not events and not ephemeral and not to_device_messages: return - self.queuer.enqueue_ephemeral(service, events) + if events: + self.queuer.queued_events.setdefault(appservice.id, []).extend(events) + if ephemeral: + self.queuer.queued_ephemeral.setdefault(appservice.id, []).extend(ephemeral) + + # Kick off a new application service transaction + self.queuer.start_background_request(appservice) class _ServiceQueuer: @@ -121,15 +127,17 @@ class _ServiceQueuer: """ def __init__(self, txn_ctrl, clock): - self.queued_events = {} # dict of {service_id: [events]} - self.queued_ephemeral = {} # dict of {service_id: [events]} + # dict of {service_id: [events]} + self.queued_events: Dict[str, List[EventBase]] = {} + # dict of {service_id: [event_json]} + self.queued_ephemeral: Dict[str, List[JsonDict]] = {} # the appservices which currently have a transaction in flight self.requests_in_flight = set() self.txn_ctrl = txn_ctrl self.clock = clock - def _start_background_request(self, service): + def start_background_request(self, service): # start a sender for this appservice if we don't already have one if service.id in self.requests_in_flight: return @@ -138,16 +146,6 @@ class _ServiceQueuer: "as-sender-%s" % (service.id,), self._send_request, service ) - def enqueue_event(self, service: ApplicationService, event: EventBase): - self.queued_events.setdefault(service.id, []).append(event) - self._start_background_request(service) - - def enqueue_ephemeral( - self, service: ApplicationService, events: Iterable[JsonDict] - ) -> None: - self.queued_ephemeral.setdefault(service.id, []).extend(events) - self._start_background_request(service) - async def _send_request(self, service: ApplicationService): # sanity-check: we shouldn't get here if this service already has a sender # running. @@ -205,7 +203,15 @@ class _TransactionController: service: ApplicationService, events: List[EventBase], ephemeral: Optional[List[JsonDict]] = None, - ): + ) -> None: + """ + Create a transaction with the given data and send to the provided + application service. + Args: + service: The application service to send the transaction to. + events: The persistent events to include in the transaction. + ephemeral: The ephemeral events to include in the transaction. + """ try: txn = await self.store.create_appservice_txn( service=service, events=events, ephemeral=ephemeral or [] diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 4a7613b262..5a28ceac43 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -135,7 +135,9 @@ class ApplicationServicesHandler: # Fork off pushes to these services for service in services: - self.scheduler.submit_event_for_as(service, event) + self.scheduler.enqueue_for_appservice( + service, events=[event] + ) now = self.clock.time_msec() ts = await self.store.get_received_ts(event.event_id) @@ -292,7 +294,7 @@ class ApplicationServicesHandler: # and, if they apply to this application service, send it off. events = await self._handle_typing(service, new_token) if events: - self.scheduler.submit_ephemeral_events_for_as(service, events) + self.scheduler.enqueue_for_appservice(service, ephemeral=events) continue # Since we read/update the stream position for this AS/stream @@ -303,7 +305,7 @@ class ApplicationServicesHandler: ): if stream_key == "receipt_key": events = await self._handle_receipts(service, new_token) - self.scheduler.submit_ephemeral_events_for_as(service, events) + self.scheduler.enqueue_for_appservice(service, ephemeral=events) # Persist the latest handled stream token for this appservice await self.store.set_appservice_stream_type_pos( @@ -312,7 +314,7 @@ class ApplicationServicesHandler: elif stream_key == "presence_key": events = await self._handle_presence(service, users, new_token) - self.scheduler.submit_ephemeral_events_for_as(service, events) + self.scheduler.enqueue_for_appservice(service, ephemeral=events) # Persist the latest handled stream token for this appservice await self.store.set_appservice_stream_type_pos( @@ -325,8 +327,10 @@ class ApplicationServicesHandler: to_device_messages = await self._get_to_device_messages( service, new_token, users ) - self.scheduler.submit_ephemeral_events_for_as( - service, to_device_messages + # REVIEW: In a subsequent commit, we'll move this to a to-device-specific + # key in the AS transaction. + self.scheduler.enqueue_for_appservice( + service, ephemeral=to_device_messages ) # Persist the latest handled stream token for this appservice |