summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/appservice/scheduler.py62
-rw-r--r--synapse/handlers/appservice.py16
2 files changed, 44 insertions, 34 deletions
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index a211257088..305e751d46 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -48,7 +48,7 @@ This is all tied together by the AppServiceScheduler which DIs the required
 components.
 """
 import logging
-from typing import Iterable, List, Optional
+from typing import Dict, Iterable, List, Optional
 
 from synapse.appservice import ApplicationService, ApplicationServiceState
 from synapse.events import EventBase
@@ -91,25 +91,31 @@ class ApplicationServiceScheduler:
         for service in services:
             self.txn_ctrl.start_recoverer(service)
 
-    def submit_event_for_as(self, service: ApplicationService, event: EventBase):
-        self.queuer.enqueue_event(service, event)
-
-    def submit_ephemeral_events_for_as(
-        self, service: ApplicationService, events: Iterable[JsonDict]
+    def enqueue_for_appservice(
+        self,
+        appservice: ApplicationService,
+        events: Optional[Iterable[EventBase]] = None,
+        ephemeral: Optional[Iterable[JsonDict]] = None,
     ) -> None:
         """
-        Send ephemeral events to application services, and schedule a new
-        outgoing AS transaction.
-
+        Enqueue some data to be sent off to an application service.
         Args:
-            service: The service to send ephemeral events to.
-            events: The ephemeral events to send.
+            appservice: The application service to create and send a transaction to.
+            events: The persistent room events to send.
+            ephemeral: The ephemeral events to send.
         """
-        # Ensure we have some events to send
-        if not events:
+        # We purposefully allow this method to run with empty events/ephemeral
+        # iterables, so that callers do not need to check iterable size themselves.
+        if not events and not ephemeral and not to_device_messages:
             return
 
-        self.queuer.enqueue_ephemeral(service, events)
+        if events:
+            self.queuer.queued_events.setdefault(appservice.id, []).extend(events)
+        if ephemeral:
+            self.queuer.queued_ephemeral.setdefault(appservice.id, []).extend(ephemeral)
+
+        # Kick off a new application service transaction
+        self.queuer.start_background_request(appservice)
 
 
 class _ServiceQueuer:
@@ -121,15 +127,17 @@ class _ServiceQueuer:
     """
 
     def __init__(self, txn_ctrl, clock):
-        self.queued_events = {}  # dict of {service_id: [events]}
-        self.queued_ephemeral = {}  # dict of {service_id: [events]}
+        # dict of {service_id: [events]}
+        self.queued_events: Dict[str, List[EventBase]] = {}
+        # dict of {service_id: [event_json]}
+        self.queued_ephemeral: Dict[str, List[JsonDict]] = {}
 
         # the appservices which currently have a transaction in flight
         self.requests_in_flight = set()
         self.txn_ctrl = txn_ctrl
         self.clock = clock
 
-    def _start_background_request(self, service):
+    def start_background_request(self, service):
         # start a sender for this appservice if we don't already have one
         if service.id in self.requests_in_flight:
             return
@@ -138,16 +146,6 @@ class _ServiceQueuer:
             "as-sender-%s" % (service.id,), self._send_request, service
         )
 
-    def enqueue_event(self, service: ApplicationService, event: EventBase):
-        self.queued_events.setdefault(service.id, []).append(event)
-        self._start_background_request(service)
-
-    def enqueue_ephemeral(
-        self, service: ApplicationService, events: Iterable[JsonDict]
-    ) -> None:
-        self.queued_ephemeral.setdefault(service.id, []).extend(events)
-        self._start_background_request(service)
-
     async def _send_request(self, service: ApplicationService):
         # sanity-check: we shouldn't get here if this service already has a sender
         # running.
@@ -205,7 +203,15 @@ class _TransactionController:
         service: ApplicationService,
         events: List[EventBase],
         ephemeral: Optional[List[JsonDict]] = None,
-    ):
+    ) -> None:
+        """
+        Create a transaction with the given data and send to the provided
+        application service.
+        Args:
+            service: The application service to send the transaction to.
+            events: The persistent events to include in the transaction.
+            ephemeral: The ephemeral events to include in the transaction.
+        """
         try:
             txn = await self.store.create_appservice_txn(
                 service=service, events=events, ephemeral=ephemeral or []
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 4a7613b262..5a28ceac43 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -135,7 +135,9 @@ class ApplicationServicesHandler:
 
                         # Fork off pushes to these services
                         for service in services:
-                            self.scheduler.submit_event_for_as(service, event)
+                            self.scheduler.enqueue_for_appservice(
+                                service, events=[event]
+                            )
 
                         now = self.clock.time_msec()
                         ts = await self.store.get_received_ts(event.event_id)
@@ -292,7 +294,7 @@ class ApplicationServicesHandler:
                     # and, if they apply to this application service, send it off.
                     events = await self._handle_typing(service, new_token)
                     if events:
-                        self.scheduler.submit_ephemeral_events_for_as(service, events)
+                        self.scheduler.enqueue_for_appservice(service, ephemeral=events)
                     continue
 
                 # Since we read/update the stream position for this AS/stream
@@ -303,7 +305,7 @@ class ApplicationServicesHandler:
                 ):
                     if stream_key == "receipt_key":
                         events = await self._handle_receipts(service, new_token)
-                        self.scheduler.submit_ephemeral_events_for_as(service, events)
+                        self.scheduler.enqueue_for_appservice(service, ephemeral=events)
 
                         # Persist the latest handled stream token for this appservice
                         await self.store.set_appservice_stream_type_pos(
@@ -312,7 +314,7 @@ class ApplicationServicesHandler:
 
                     elif stream_key == "presence_key":
                         events = await self._handle_presence(service, users, new_token)
-                        self.scheduler.submit_ephemeral_events_for_as(service, events)
+                        self.scheduler.enqueue_for_appservice(service, ephemeral=events)
 
                         # Persist the latest handled stream token for this appservice
                         await self.store.set_appservice_stream_type_pos(
@@ -325,8 +327,10 @@ class ApplicationServicesHandler:
                         to_device_messages = await self._get_to_device_messages(
                             service, new_token, users
                         )
-                        self.scheduler.submit_ephemeral_events_for_as(
-                            service, to_device_messages
+                        # REVIEW: In a subsequent commit, we'll move this to a to-device-specific
+                        #  key in the AS transaction.
+                        self.scheduler.enqueue_for_appservice(
+                            service, ephemeral=to_device_messages
                         )
 
                         # Persist the latest handled stream token for this appservice