diff --git a/changelog.d/12544.bugfix b/changelog.d/12544.bugfix
new file mode 100644
index 0000000000..b5169cd831
--- /dev/null
+++ b/changelog.d/12544.bugfix
@@ -0,0 +1 @@
+Fix a bug where attempting to send a large amount of read receipts to an application service all at once would result in duplicate content and abnormally high memory usage. Contributed by Brad & Nick @ Beeper.
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 1b57840506..b3894666cc 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -416,7 +416,7 @@ class ApplicationServicesHandler:
return typing
async def _handle_receipts(
- self, service: ApplicationService, new_token: Optional[int]
+ self, service: ApplicationService, new_token: int
) -> List[JsonDict]:
"""
Return the latest read receipts that the given application service should receive.
@@ -447,7 +447,7 @@ class ApplicationServicesHandler:
receipts_source = self.event_sources.sources.receipt
receipts, _ = await receipts_source.get_new_events_as(
- service=service, from_key=from_key
+ service=service, from_key=from_key, to_key=new_token
)
return receipts
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 6250bb3bdf..cfe860decc 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -239,13 +239,14 @@ class ReceiptEventSource(EventSource[int, JsonDict]):
return events, to_key
async def get_new_events_as(
- self, from_key: int, service: ApplicationService
+ self, from_key: int, to_key: int, service: ApplicationService
) -> Tuple[List[JsonDict], int]:
"""Returns a set of new read receipt events that an appservice
may be interested in.
Args:
from_key: the stream position at which events should be fetched from
+ to_key: the stream position up to which events should be fetched to
service: The appservice which may be interested
Returns:
@@ -255,7 +256,6 @@ class ReceiptEventSource(EventSource[int, JsonDict]):
* The current read receipt stream token.
"""
from_key = int(from_key)
- to_key = self.get_current_key()
if from_key == to_key:
return [], to_key
diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py
index 8c72cf6b30..5b0cd1ab86 100644
--- a/tests/handlers/test_appservice.py
+++ b/tests/handlers/test_appservice.py
@@ -411,6 +411,88 @@ class ApplicationServicesHandlerSendEventsTestCase(unittest.HomeserverTestCase):
"exclusive_as_user", "password", self.exclusive_as_user_device_id
)
+ def test_sending_read_receipt_batches_to_application_services(self):
+ """Tests that a large batch of read receipts are sent correctly to
+ interested application services.
+ """
+ # Register an application service that's interested in a certain user
+ # and room prefix
+ interested_appservice = self._register_application_service(
+ namespaces={
+ ApplicationService.NS_USERS: [
+ {
+ "regex": "@exclusive_as_user:.+",
+ "exclusive": True,
+ }
+ ],
+ ApplicationService.NS_ROOMS: [
+ {
+ "regex": "!fakeroom_.*",
+ "exclusive": True,
+ }
+ ],
+ },
+ )
+
+ # "Complete" a transaction.
+ # All this really does for us is make an entry in the application_services_state
+ # database table, which tracks the current stream_token per stream ID per AS.
+ self.get_success(
+ self.hs.get_datastores().main.complete_appservice_txn(
+ 0,
+ interested_appservice,
+ )
+ )
+
+ # Now, pretend that we receive a large burst of read receipts (300 total) that
+ # all come in at once.
+ for i in range(300):
+ self.get_success(
+ # Insert a fake read receipt into the database
+ self.hs.get_datastores().main.insert_receipt(
+ # We have to use unique room ID + user ID combinations here, as the db query
+ # is an upsert.
+ room_id=f"!fakeroom_{i}:test",
+ receipt_type="m.read",
+ user_id=self.local_user,
+ event_ids=[f"$eventid_{i}"],
+ data={},
+ )
+ )
+
+ # Now notify the appservice handler that 300 read receipts have all arrived
+ # at once. What will it do!
+ # note: stream tokens start at 2
+ for stream_token in range(2, 303):
+ self.get_success(
+ self.hs.get_application_service_handler()._notify_interested_services_ephemeral(
+ services=[interested_appservice],
+ stream_key="receipt_key",
+ new_token=stream_token,
+ users=[self.exclusive_as_user],
+ )
+ )
+
+ # Using our txn send mock, we can see what the AS received. After iterating over every
+ # transaction, we'd like to see all 300 read receipts accounted for.
+ # No more, no less.
+ all_ephemeral_events = []
+ for call in self.send_mock.call_args_list:
+ ephemeral_events = call[0][2]
+ all_ephemeral_events += ephemeral_events
+
+ # Ensure that no duplicate events were sent
+ self.assertEqual(len(all_ephemeral_events), 300)
+
+ # Check that the ephemeral event is a read receipt with the expected structure
+ latest_read_receipt = all_ephemeral_events[-1]
+ self.assertEqual(latest_read_receipt["type"], "m.receipt")
+
+ event_id = list(latest_read_receipt["content"].keys())[0]
+ self.assertEqual(
+ latest_read_receipt["content"][event_id]["m.read"], {self.local_user: {}}
+ )
+
@unittest.override_config(
{"experimental_features": {"msc2409_to_device_messages_enabled": True}}
)
|