summary refs log tree commit diff
diff options
context:
space:
mode:
authorAndrew Morgan <andrew@amorgan.xyz>2021-11-19 18:26:51 +0000
committerAndrew Morgan <andrew@amorgan.xyz>2021-11-19 20:08:37 +0000
commit401cb2bbda7ae0899800ff98e054033e195ab2ab (patch)
tree80f2d3daa31ce86e46fc3cf8f192b7c4c6a937cd
parentBroaden type hints; update comment (diff)
downloadsynapse-401cb2bbda7ae0899800ff98e054033e195ab2ab.tar.xz
Deduplicate ephemeral events to send conditional
Test cases needed to be updated, as we now always call
submit_ephemeral_events_for_as, it may just be with an
empty events list.
-rw-r--r--synapse/appservice/scheduler.py12
-rw-r--r--synapse/handlers/appservice.py15
-rw-r--r--tests/handlers/test_appservice.py35
3 files changed, 34 insertions, 28 deletions
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 45cdee33f4..a211257088 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -97,6 +97,18 @@ class ApplicationServiceScheduler:
     def submit_ephemeral_events_for_as(
         self, service: ApplicationService, events: Iterable[JsonDict]
     ) -> None:
+        """
+        Send ephemeral events to application services, and schedule a new
+        outgoing AS transaction.
+
+        Args:
+            service: The service to send ephemeral events to.
+            events: The ephemeral events to send.
+        """
+        # Ensure we have some events to send
+        if not events:
+            return
+
         self.queuer.enqueue_ephemeral(service, events)
 
 
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 1b08ded9fe..88806bb78c 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -302,10 +302,7 @@ class ApplicationServicesHandler:
                 ):
                     if stream_key == "receipt_key":
                         events = await self._handle_receipts(service, new_token)
-                        if events:
-                            self.scheduler.submit_ephemeral_events_for_as(
-                                service, events
-                            )
+                        self.scheduler.submit_ephemeral_events_for_as(service, events)
 
                         # Persist the latest handled stream token for this appservice
                         await self.store.set_type_stream_id_for_appservice(
@@ -314,10 +311,7 @@ class ApplicationServicesHandler:
 
                     elif stream_key == "presence_key":
                         events = await self._handle_presence(service, users, new_token)
-                        if events:
-                            self.scheduler.submit_ephemeral_events_for_as(
-                                service, events
-                            )
+                        self.scheduler.submit_ephemeral_events_for_as(service, events)
 
                         # Persist the latest handled stream token for this appservice
                         await self.store.set_type_stream_id_for_appservice(
@@ -328,10 +322,7 @@ class ApplicationServicesHandler:
                         # Retrieve a list of to-device message events, as well as the
                         # maximum stream token of the messages we were able to retrieve.
                         events = await self._handle_to_device(service, new_token, users)
-                        if events:
-                            self.scheduler.submit_ephemeral_events_for_as(
-                                service, events
-                            )
+                        self.scheduler.submit_ephemeral_events_for_as(service, events)
 
                         # Persist the latest handled stream token for this appservice
                         await self.store.set_type_stream_id_for_appservice(
diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py
index af50bfdee8..fc37b8729d 100644
--- a/tests/handlers/test_appservice.py
+++ b/tests/handlers/test_appservice.py
@@ -47,6 +47,12 @@ class AppServiceHandlerTestCase(unittest.TestCase):
         self.handler = ApplicationServicesHandler(hs)
         self.event_source = hs.get_event_sources()
 
+        # Mock the ApplicationServiceScheduler queuer so that we can track any
+        # outgoing ephemeral events
+        self.mock_service_queuer = Mock()
+        self.mock_service_queuer.enqueue_ephemeral = Mock()
+        hs.get_application_service_handler().scheduler.queuer = self.mock_service_queuer
+
     def test_notify_interested_services(self):
         interested_service = self._mkservice(is_interested=True)
         services = [
@@ -279,7 +285,7 @@ class AppServiceHandlerTestCase(unittest.TestCase):
         self.handler.notify_interested_services_ephemeral(
             "receipt_key", 580, ["@fakerecipient:example.com"]
         )
-        self.mock_scheduler.submit_ephemeral_events_for_as.assert_called_once_with(
+        self.mock_service_queuer.enqueue_ephemeral.assert_called_once_with(
             interested_service, [event]
         )
         self.mock_store.set_type_stream_id_for_appservice.assert_called_once_with(
@@ -309,7 +315,7 @@ class AppServiceHandlerTestCase(unittest.TestCase):
         self.handler.notify_interested_services_ephemeral(
             "receipt_key", 580, ["@fakerecipient:example.com"]
         )
-        self.mock_scheduler.submit_ephemeral_events_for_as.assert_not_called()
+        self.mock_service_queuer.enqueue_ephemeral.assert_not_called()
 
     def _mkservice(self, is_interested, protocols=None):
         service = Mock()
@@ -337,12 +343,11 @@ class ApplicationServiceEphemeralEventsTestCase(unittest.HomeserverTestCase):
     ]
 
     def prepare(self, reactor, clock, hs):
-        # Mock the application service scheduler so that we can track any
-        # outgoing transactions
-        self.mock_scheduler = Mock()
-        self.mock_scheduler.submit_ephemeral_events_for_as = Mock()
-
-        hs.get_application_service_handler().scheduler = self.mock_scheduler
+        # Mock the ApplicationServiceScheduler queuer so that we can track any
+        # outgoing ephemeral events
+        self.mock_service_queuer = Mock()
+        self.mock_service_queuer.enqueue_ephemeral = Mock()
+        hs.get_application_service_handler().scheduler.queuer = self.mock_service_queuer
 
         self.device1 = "device1"
         self.user1 = self.register_user("user1", "password")
@@ -391,10 +396,8 @@ class ApplicationServiceEphemeralEventsTestCase(unittest.HomeserverTestCase):
         # Only the user1 -> user2 to-device message should have been forwarded to the AS.
         #
         # The uninterested application service should not have been notified at all.
-        self.assertEqual(
-            1, self.mock_scheduler.submit_ephemeral_events_for_as.call_count
-        )
-        service, events = self.mock_scheduler.submit_ephemeral_events_for_as.call_args[
+        self.mock_service_queuer.enqueue_ephemeral.assert_called_once()
+        service, events = self.mock_service_queuer.enqueue_ephemeral.call_args[
             0
         ]
 
@@ -481,14 +484,14 @@ class ApplicationServiceEphemeralEventsTestCase(unittest.HomeserverTestCase):
         )
         self.assertEqual(chan.code, 200, chan.result)
 
+        self.mock_service_queuer.enqueue_ephemeral.assert_called()
+
         # Count the total number of to-device messages that were sent out per-service.
         # Ensure that we only sent to-device messages to interested services, and that
         # each interested service received the full count of to-device messages.
         service_id_to_message_count: Dict[str, int] = {}
-        self.assertGreater(
-            self.mock_scheduler.submit_ephemeral_events_for_as.call_count, 0
-        )
-        for call in self.mock_scheduler.submit_ephemeral_events_for_as.call_args_list:
+
+        for call in self.mock_service_queuer.enqueue_ephemeral.call_args_list:
             service, events = call[0]
 
             # Check that this was made to an interested service