diff options
author | Andrew Morgan <andrew@amorgan.xyz> | 2021-12-03 20:00:30 +0000 |
---|---|---|
committer | Andrew Morgan <andrew@amorgan.xyz> | 2021-12-03 20:03:09 +0000 |
commit | 6d68b8a4e8ac160f1e050a547a76efaa02793929 (patch) | |
tree | f54d1f84c395437c1fcd4280500fdaa4447d04c7 | |
parent | Add comment on why we don't NOT NULL to_device_stream_id (diff) | |
download | synapse-6d68b8a4e8ac160f1e050a547a76efaa02793929.tar.xz |
Refactor and generalise the sending of arbitrary fields over AS transactions
Things were starting to get a little inflexible as we kept adding new types of data to send to application services. It's better to just have one method for adding data to AS transactions, than one for each type of data. Note that subsequent PRs will need to add device lists, one-time keys and fallback keys to these transactions. Adding those are additional arguments to a method is much nicer than a new method for each one. Plus with this setup we can add multiple different types of data at once without kicking off an AS transaction for each type. This will be useful for OTK/fallback keys, as we plan to lazily attach those when handling other event types.
-rw-r--r-- | synapse/appservice/scheduler.py | 62 | ||||
-rw-r--r-- | synapse/handlers/appservice.py | 16 |
2 files changed, 44 insertions, 34 deletions
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index a211257088..305e751d46 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -48,7 +48,7 @@ This is all tied together by the AppServiceScheduler which DIs the required components. """ import logging -from typing import Iterable, List, Optional +from typing import Dict, Iterable, List, Optional from synapse.appservice import ApplicationService, ApplicationServiceState from synapse.events import EventBase @@ -91,25 +91,31 @@ class ApplicationServiceScheduler: for service in services: self.txn_ctrl.start_recoverer(service) - def submit_event_for_as(self, service: ApplicationService, event: EventBase): - self.queuer.enqueue_event(service, event) - - def submit_ephemeral_events_for_as( - self, service: ApplicationService, events: Iterable[JsonDict] + def enqueue_for_appservice( + self, + appservice: ApplicationService, + events: Optional[Iterable[EventBase]] = None, + ephemeral: Optional[Iterable[JsonDict]] = None, ) -> None: """ - Send ephemeral events to application services, and schedule a new - outgoing AS transaction. - + Enqueue some data to be sent off to an application service. Args: - service: The service to send ephemeral events to. - events: The ephemeral events to send. + appservice: The application service to create and send a transaction to. + events: The persistent room events to send. + ephemeral: The ephemeral events to send. """ - # Ensure we have some events to send - if not events: + # We purposefully allow this method to run with empty events/ephemeral + # iterables, so that callers do not need to check iterable size themselves. + if not events and not ephemeral and not to_device_messages: return - self.queuer.enqueue_ephemeral(service, events) + if events: + self.queuer.queued_events.setdefault(appservice.id, []).extend(events) + if ephemeral: + self.queuer.queued_ephemeral.setdefault(appservice.id, []).extend(ephemeral) + + # Kick off a new application service transaction + self.queuer.start_background_request(appservice) class _ServiceQueuer: @@ -121,15 +127,17 @@ class _ServiceQueuer: """ def __init__(self, txn_ctrl, clock): - self.queued_events = {} # dict of {service_id: [events]} - self.queued_ephemeral = {} # dict of {service_id: [events]} + # dict of {service_id: [events]} + self.queued_events: Dict[str, List[EventBase]] = {} + # dict of {service_id: [event_json]} + self.queued_ephemeral: Dict[str, List[JsonDict]] = {} # the appservices which currently have a transaction in flight self.requests_in_flight = set() self.txn_ctrl = txn_ctrl self.clock = clock - def _start_background_request(self, service): + def start_background_request(self, service): # start a sender for this appservice if we don't already have one if service.id in self.requests_in_flight: return @@ -138,16 +146,6 @@ class _ServiceQueuer: "as-sender-%s" % (service.id,), self._send_request, service ) - def enqueue_event(self, service: ApplicationService, event: EventBase): - self.queued_events.setdefault(service.id, []).append(event) - self._start_background_request(service) - - def enqueue_ephemeral( - self, service: ApplicationService, events: Iterable[JsonDict] - ) -> None: - self.queued_ephemeral.setdefault(service.id, []).extend(events) - self._start_background_request(service) - async def _send_request(self, service: ApplicationService): # sanity-check: we shouldn't get here if this service already has a sender # running. @@ -205,7 +203,15 @@ class _TransactionController: service: ApplicationService, events: List[EventBase], ephemeral: Optional[List[JsonDict]] = None, - ): + ) -> None: + """ + Create a transaction with the given data and send to the provided + application service. + Args: + service: The application service to send the transaction to. + events: The persistent events to include in the transaction. + ephemeral: The ephemeral events to include in the transaction. + """ try: txn = await self.store.create_appservice_txn( service=service, events=events, ephemeral=ephemeral or [] diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 4a7613b262..5a28ceac43 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -135,7 +135,9 @@ class ApplicationServicesHandler: # Fork off pushes to these services for service in services: - self.scheduler.submit_event_for_as(service, event) + self.scheduler.enqueue_for_appservice( + service, events=[event] + ) now = self.clock.time_msec() ts = await self.store.get_received_ts(event.event_id) @@ -292,7 +294,7 @@ class ApplicationServicesHandler: # and, if they apply to this application service, send it off. events = await self._handle_typing(service, new_token) if events: - self.scheduler.submit_ephemeral_events_for_as(service, events) + self.scheduler.enqueue_for_appservice(service, ephemeral=events) continue # Since we read/update the stream position for this AS/stream @@ -303,7 +305,7 @@ class ApplicationServicesHandler: ): if stream_key == "receipt_key": events = await self._handle_receipts(service, new_token) - self.scheduler.submit_ephemeral_events_for_as(service, events) + self.scheduler.enqueue_for_appservice(service, ephemeral=events) # Persist the latest handled stream token for this appservice await self.store.set_appservice_stream_type_pos( @@ -312,7 +314,7 @@ class ApplicationServicesHandler: elif stream_key == "presence_key": events = await self._handle_presence(service, users, new_token) - self.scheduler.submit_ephemeral_events_for_as(service, events) + self.scheduler.enqueue_for_appservice(service, ephemeral=events) # Persist the latest handled stream token for this appservice await self.store.set_appservice_stream_type_pos( @@ -325,8 +327,10 @@ class ApplicationServicesHandler: to_device_messages = await self._get_to_device_messages( service, new_token, users ) - self.scheduler.submit_ephemeral_events_for_as( - service, to_device_messages + # REVIEW: In a subsequent commit, we'll move this to a to-device-specific + # key in the AS transaction. + self.scheduler.enqueue_for_appservice( + service, ephemeral=to_device_messages ) # Persist the latest handled stream token for this appservice |