diff options
author | Andrew Morgan <andrew@amorgan.xyz> | 2021-11-19 18:26:51 +0000 |
---|---|---|
committer | Andrew Morgan <andrew@amorgan.xyz> | 2021-11-19 20:08:37 +0000 |
commit | 401cb2bbda7ae0899800ff98e054033e195ab2ab (patch) | |
tree | 80f2d3daa31ce86e46fc3cf8f192b7c4c6a937cd | |
parent | Broaden type hints; update comment (diff) | |
download | synapse-401cb2bbda7ae0899800ff98e054033e195ab2ab.tar.xz |
Deduplicate ephemeral events to send conditional
Test cases needed to be updated, as we now always call submit_ephemeral_events_for_as, it may just be with an empty events list.
-rw-r--r-- | synapse/appservice/scheduler.py | 12 | ||||
-rw-r--r-- | synapse/handlers/appservice.py | 15 | ||||
-rw-r--r-- | tests/handlers/test_appservice.py | 35 |
3 files changed, 34 insertions, 28 deletions
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 45cdee33f4..a211257088 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -97,6 +97,18 @@ class ApplicationServiceScheduler: def submit_ephemeral_events_for_as( self, service: ApplicationService, events: Iterable[JsonDict] ) -> None: + """ + Send ephemeral events to application services, and schedule a new + outgoing AS transaction. + + Args: + service: The service to send ephemeral events to. + events: The ephemeral events to send. + """ + # Ensure we have some events to send + if not events: + return + self.queuer.enqueue_ephemeral(service, events) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 1b08ded9fe..88806bb78c 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -302,10 +302,7 @@ class ApplicationServicesHandler: ): if stream_key == "receipt_key": events = await self._handle_receipts(service, new_token) - if events: - self.scheduler.submit_ephemeral_events_for_as( - service, events - ) + self.scheduler.submit_ephemeral_events_for_as(service, events) # Persist the latest handled stream token for this appservice await self.store.set_type_stream_id_for_appservice( @@ -314,10 +311,7 @@ class ApplicationServicesHandler: elif stream_key == "presence_key": events = await self._handle_presence(service, users, new_token) - if events: - self.scheduler.submit_ephemeral_events_for_as( - service, events - ) + self.scheduler.submit_ephemeral_events_for_as(service, events) # Persist the latest handled stream token for this appservice await self.store.set_type_stream_id_for_appservice( @@ -328,10 +322,7 @@ class ApplicationServicesHandler: # Retrieve a list of to-device message events, as well as the # maximum stream token of the messages we were able to retrieve. events = await self._handle_to_device(service, new_token, users) - if events: - self.scheduler.submit_ephemeral_events_for_as( - service, events - ) + self.scheduler.submit_ephemeral_events_for_as(service, events) # Persist the latest handled stream token for this appservice await self.store.set_type_stream_id_for_appservice( diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index af50bfdee8..fc37b8729d 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -47,6 +47,12 @@ class AppServiceHandlerTestCase(unittest.TestCase): self.handler = ApplicationServicesHandler(hs) self.event_source = hs.get_event_sources() + # Mock the ApplicationServiceScheduler queuer so that we can track any + # outgoing ephemeral events + self.mock_service_queuer = Mock() + self.mock_service_queuer.enqueue_ephemeral = Mock() + hs.get_application_service_handler().scheduler.queuer = self.mock_service_queuer + def test_notify_interested_services(self): interested_service = self._mkservice(is_interested=True) services = [ @@ -279,7 +285,7 @@ class AppServiceHandlerTestCase(unittest.TestCase): self.handler.notify_interested_services_ephemeral( "receipt_key", 580, ["@fakerecipient:example.com"] ) - self.mock_scheduler.submit_ephemeral_events_for_as.assert_called_once_with( + self.mock_service_queuer.enqueue_ephemeral.assert_called_once_with( interested_service, [event] ) self.mock_store.set_type_stream_id_for_appservice.assert_called_once_with( @@ -309,7 +315,7 @@ class AppServiceHandlerTestCase(unittest.TestCase): self.handler.notify_interested_services_ephemeral( "receipt_key", 580, ["@fakerecipient:example.com"] ) - self.mock_scheduler.submit_ephemeral_events_for_as.assert_not_called() + self.mock_service_queuer.enqueue_ephemeral.assert_not_called() def _mkservice(self, is_interested, protocols=None): service = Mock() @@ -337,12 +343,11 @@ class ApplicationServiceEphemeralEventsTestCase(unittest.HomeserverTestCase): ] def prepare(self, reactor, clock, hs): - # Mock the application service scheduler so that we can track any - # outgoing transactions - self.mock_scheduler = Mock() - self.mock_scheduler.submit_ephemeral_events_for_as = Mock() - - hs.get_application_service_handler().scheduler = self.mock_scheduler + # Mock the ApplicationServiceScheduler queuer so that we can track any + # outgoing ephemeral events + self.mock_service_queuer = Mock() + self.mock_service_queuer.enqueue_ephemeral = Mock() + hs.get_application_service_handler().scheduler.queuer = self.mock_service_queuer self.device1 = "device1" self.user1 = self.register_user("user1", "password") @@ -391,10 +396,8 @@ class ApplicationServiceEphemeralEventsTestCase(unittest.HomeserverTestCase): # Only the user1 -> user2 to-device message should have been forwarded to the AS. # # The uninterested application service should not have been notified at all. - self.assertEqual( - 1, self.mock_scheduler.submit_ephemeral_events_for_as.call_count - ) - service, events = self.mock_scheduler.submit_ephemeral_events_for_as.call_args[ + self.mock_service_queuer.enqueue_ephemeral.assert_called_once() + service, events = self.mock_service_queuer.enqueue_ephemeral.call_args[ 0 ] @@ -481,14 +484,14 @@ class ApplicationServiceEphemeralEventsTestCase(unittest.HomeserverTestCase): ) self.assertEqual(chan.code, 200, chan.result) + self.mock_service_queuer.enqueue_ephemeral.assert_called() + # Count the total number of to-device messages that were sent out per-service. # Ensure that we only sent to-device messages to interested services, and that # each interested service received the full count of to-device messages. service_id_to_message_count: Dict[str, int] = {} - self.assertGreater( - self.mock_scheduler.submit_ephemeral_events_for_as.call_count, 0 - ) - for call in self.mock_scheduler.submit_ephemeral_events_for_as.call_args_list: + + for call in self.mock_service_queuer.enqueue_ephemeral.call_args_list: service, events = call[0] # Check that this was made to an interested service |