summary refs log tree commit diff
diff options
context:
space:
mode:
authorWill Hunt <will@half-shot.uk>2020-10-21 15:36:53 +0100
committerGitHub <noreply@github.com>2020-10-21 15:36:53 +0100
commit70259d8c8c0be71d3588a16211ccb42af87235da (patch)
tree21890a7107d4657a85ff58114bbb365eb17eccbe
parentSeparate the TCP and terse JSON formatting code. (#8587) (diff)
downloadsynapse-70259d8c8c0be71d3588a16211ccb42af87235da.tar.xz
Limit AS transactions to 100 events (#8606)
* Limit AS transactions to 100 events

* Update changelog.d/8606.feature

Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com>

* Add tests

* Update synapse/appservice/scheduler.py

Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com>
-rw-r--r--changelog.d/8606.feature1
-rw-r--r--synapse/appservice/scheduler.py18
-rw-r--r--tests/appservice/test_scheduler.py41
3 files changed, 58 insertions, 2 deletions
diff --git a/changelog.d/8606.feature b/changelog.d/8606.feature
new file mode 100644
index 0000000000..fad723c108
--- /dev/null
+++ b/changelog.d/8606.feature
@@ -0,0 +1 @@
+Limit appservice transactions to 100 persistent and 100 ephemeral events.
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index ad3c408519..58291afc22 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -60,6 +60,13 @@ from synapse.types import JsonDict
 logger = logging.getLogger(__name__)
 
 
+# Maximum number of events to provide in an AS transaction.
+MAX_PERSISTENT_EVENTS_PER_TRANSACTION = 100
+
+# Maximum number of ephemeral events to provide in an AS transaction.
+MAX_EPHEMERAL_EVENTS_PER_TRANSACTION = 100
+
+
 class ApplicationServiceScheduler:
     """ Public facing API for this module. Does the required DI to tie the
     components together. This also serves as the "event_pool", which in this
@@ -136,10 +143,17 @@ class _ServiceQueuer:
         self.requests_in_flight.add(service.id)
         try:
             while True:
-                events = self.queued_events.pop(service.id, [])
-                ephemeral = self.queued_ephemeral.pop(service.id, [])
+                all_events = self.queued_events.get(service.id, [])
+                events = all_events[:MAX_PERSISTENT_EVENTS_PER_TRANSACTION]
+                del all_events[:MAX_PERSISTENT_EVENTS_PER_TRANSACTION]
+
+                all_events_ephemeral = self.queued_ephemeral.get(service.id, [])
+                ephemeral = all_events_ephemeral[:MAX_EPHEMERAL_EVENTS_PER_TRANSACTION]
+                del all_events_ephemeral[:MAX_EPHEMERAL_EVENTS_PER_TRANSACTION]
+
                 if not events and not ephemeral:
                     return
+
                 try:
                     await self.txn_ctrl.send(service, events, ephemeral)
                 except Exception:
diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py
index 2acb8b7603..97f8cad0dd 100644
--- a/tests/appservice/test_scheduler.py
+++ b/tests/appservice/test_scheduler.py
@@ -260,6 +260,31 @@ class ApplicationServiceSchedulerQueuerTestCase(unittest.TestCase):
         self.txn_ctrl.send.assert_called_with(srv2, [srv_2_event2], [])
         self.assertEquals(3, self.txn_ctrl.send.call_count)
 
+    def test_send_large_txns(self):
+        srv_1_defer = defer.Deferred()
+        srv_2_defer = defer.Deferred()
+        send_return_list = [srv_1_defer, srv_2_defer]
+
+        def do_send(x, y, z):
+            return make_deferred_yieldable(send_return_list.pop(0))
+
+        self.txn_ctrl.send = Mock(side_effect=do_send)
+
+        service = Mock(id=4, name="service")
+        event_list = [Mock(name="event%i" % (i + 1)) for i in range(200)]
+        for event in event_list:
+            self.queuer.enqueue_event(service, event)
+
+        # Expect the first event to be sent immediately.
+        self.txn_ctrl.send.assert_called_with(service, [event_list[0]], [])
+        srv_1_defer.callback(service)
+        # Then send the next 100 events
+        self.txn_ctrl.send.assert_called_with(service, event_list[1:101], [])
+        srv_2_defer.callback(service)
+        # Then the final 99 events
+        self.txn_ctrl.send.assert_called_with(service, event_list[101:], [])
+        self.assertEquals(3, self.txn_ctrl.send.call_count)
+
     def test_send_single_ephemeral_no_queue(self):
         # Expect the event to be sent immediately.
         service = Mock(id=4, name="service")
@@ -296,3 +321,19 @@ class ApplicationServiceSchedulerQueuerTestCase(unittest.TestCase):
         # Expect the queued events to be sent
         self.txn_ctrl.send.assert_called_with(service, [], event_list_2 + event_list_3)
         self.assertEquals(2, self.txn_ctrl.send.call_count)
+
+    def test_send_large_txns_ephemeral(self):
+        d = defer.Deferred()
+        self.txn_ctrl.send = Mock(
+            side_effect=lambda x, y, z: make_deferred_yieldable(d)
+        )
+        # Expect the event to be sent immediately.
+        service = Mock(id=4, name="service")
+        first_chunk = [Mock(name="event%i" % (i + 1)) for i in range(100)]
+        second_chunk = [Mock(name="event%i" % (i + 101)) for i in range(50)]
+        event_list = first_chunk + second_chunk
+        self.queuer.enqueue_ephemeral(service, event_list)
+        self.txn_ctrl.send.assert_called_once_with(service, [], first_chunk)
+        d.callback(service)
+        self.txn_ctrl.send.assert_called_with(service, [], second_chunk)
+        self.assertEquals(2, self.txn_ctrl.send.call_count)