summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/12544.bugfix1
-rw-r--r--synapse/handlers/appservice.py4
-rw-r--r--synapse/handlers/receipts.py4
-rw-r--r--tests/handlers/test_appservice.py82
4 files changed, 87 insertions, 4 deletions
diff --git a/changelog.d/12544.bugfix b/changelog.d/12544.bugfix
new file mode 100644
index 0000000000..b5169cd831
--- /dev/null
+++ b/changelog.d/12544.bugfix
@@ -0,0 +1 @@
+Fix a bug where attempting to send a large amount of read receipts to an application service all at once would result in duplicate content and abnormally high memory usage. Contributed by Brad & Nick @ Beeper.
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 1b57840506..b3894666cc 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -416,7 +416,7 @@ class ApplicationServicesHandler:
         return typing
 
     async def _handle_receipts(
-        self, service: ApplicationService, new_token: Optional[int]
+        self, service: ApplicationService, new_token: int
     ) -> List[JsonDict]:
         """
         Return the latest read receipts that the given application service should receive.
@@ -447,7 +447,7 @@ class ApplicationServicesHandler:
 
         receipts_source = self.event_sources.sources.receipt
         receipts, _ = await receipts_source.get_new_events_as(
-            service=service, from_key=from_key
+            service=service, from_key=from_key, to_key=new_token
         )
         return receipts
 
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 6250bb3bdf..cfe860decc 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -239,13 +239,14 @@ class ReceiptEventSource(EventSource[int, JsonDict]):
         return events, to_key
 
     async def get_new_events_as(
-        self, from_key: int, service: ApplicationService
+        self, from_key: int, to_key: int, service: ApplicationService
     ) -> Tuple[List[JsonDict], int]:
         """Returns a set of new read receipt events that an appservice
         may be interested in.
 
         Args:
             from_key: the stream position at which events should be fetched from
+            to_key: the stream position up to which events should be fetched to
             service: The appservice which may be interested
 
         Returns:
@@ -255,7 +256,6 @@ class ReceiptEventSource(EventSource[int, JsonDict]):
                 * The current read receipt stream token.
         """
         from_key = int(from_key)
-        to_key = self.get_current_key()
 
         if from_key == to_key:
             return [], to_key
diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py
index 8c72cf6b30..5b0cd1ab86 100644
--- a/tests/handlers/test_appservice.py
+++ b/tests/handlers/test_appservice.py
@@ -411,6 +411,88 @@ class ApplicationServicesHandlerSendEventsTestCase(unittest.HomeserverTestCase):
             "exclusive_as_user", "password", self.exclusive_as_user_device_id
         )
 
+    def test_sending_read_receipt_batches_to_application_services(self):
+        """Tests that a large batch of read receipts are sent correctly to
+        interested application services.
+        """
+        # Register an application service that's interested in a certain user
+        # and room prefix
+        interested_appservice = self._register_application_service(
+            namespaces={
+                ApplicationService.NS_USERS: [
+                    {
+                        "regex": "@exclusive_as_user:.+",
+                        "exclusive": True,
+                    }
+                ],
+                ApplicationService.NS_ROOMS: [
+                    {
+                        "regex": "!fakeroom_.*",
+                        "exclusive": True,
+                    }
+                ],
+            },
+        )
+
+        # "Complete" a transaction.
+        # All this really does for us is make an entry in the application_services_state
+        # database table, which tracks the current stream_token per stream ID per AS.
+        self.get_success(
+            self.hs.get_datastores().main.complete_appservice_txn(
+                0,
+                interested_appservice,
+            )
+        )
+
+        # Now, pretend that we receive a large burst of read receipts (300 total) that
+        # all come in at once.
+        for i in range(300):
+            self.get_success(
+                # Insert a fake read receipt into the database
+                self.hs.get_datastores().main.insert_receipt(
+                    # We have to use unique room ID + user ID combinations here, as the db query
+                    # is an upsert.
+                    room_id=f"!fakeroom_{i}:test",
+                    receipt_type="m.read",
+                    user_id=self.local_user,
+                    event_ids=[f"$eventid_{i}"],
+                    data={},
+                )
+            )
+
+        # Now notify the appservice handler that 300 read receipts have all arrived
+        # at once. What will it do!
+        # note: stream tokens start at 2
+        for stream_token in range(2, 303):
+            self.get_success(
+                self.hs.get_application_service_handler()._notify_interested_services_ephemeral(
+                    services=[interested_appservice],
+                    stream_key="receipt_key",
+                    new_token=stream_token,
+                    users=[self.exclusive_as_user],
+                )
+            )
+
+        # Using our txn send mock, we can see what the AS received. After iterating over every
+        # transaction, we'd like to see all 300 read receipts accounted for.
+        # No more, no less.
+        all_ephemeral_events = []
+        for call in self.send_mock.call_args_list:
+            ephemeral_events = call[0][2]
+            all_ephemeral_events += ephemeral_events
+
+        # Ensure that no duplicate events were sent
+        self.assertEqual(len(all_ephemeral_events), 300)
+
+        # Check that the ephemeral event is a read receipt with the expected structure
+        latest_read_receipt = all_ephemeral_events[-1]
+        self.assertEqual(latest_read_receipt["type"], "m.receipt")
+
+        event_id = list(latest_read_receipt["content"].keys())[0]
+        self.assertEqual(
+            latest_read_receipt["content"][event_id]["m.read"], {self.local_user: {}}
+        )
+
     @unittest.override_config(
         {"experimental_features": {"msc2409_to_device_messages_enabled": True}}
     )