summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/appservice/scheduler.py61
-rw-r--r--tests/appservice/test_scheduler.py100
2 files changed, 60 insertions, 101 deletions
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 59a870e271..54c42d1b94 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -16,11 +16,11 @@
 This module controls the reliability for application service transactions.
 
 The nominal flow through this module looks like:
-             _________
----ASa[e]-->|  Event  |
-----ASb[e]->| Grouper |<-poll 1/s--+
---ASa[e]--->|_________|            | ASa[e,e]  ASb[e]
-                                   V
+              __________
+1---ASa[e]-->|  Service |--> Queue ASa[f]
+2----ASb[e]->|  Queuer  |
+3--ASa[f]--->|__________|-----------+ ASa[e], ASb[e]
+                                    V
       -````````-            +------------+
       |````````|<--StoreTxn-|Transaction |
       |Database|            | Controller |---> SEND TO AS
@@ -66,14 +66,14 @@ class AppServiceScheduler(object):
         self.clock = clock
         self.store = store
         self.as_api = as_api
-        self.event_grouper = _EventGrouper()
 
         def create_recoverer(service, callback):
             return _Recoverer(clock, store, as_api, service, callback)
 
         self.txn_ctrl = _TransactionController(
-            clock, store, as_api, self.event_grouper, create_recoverer
+            clock, store, as_api, create_recoverer
         )
+        self.queuer = _ServiceQueuer(self.txn_ctrl)
 
     @defer.inlineCallbacks
     def start(self):
@@ -86,17 +86,26 @@ class AppServiceScheduler(object):
         self.txn_ctrl.start_polling()
 
     def submit_event_for_as(self, service, event):
-        self.event_grouper.enqueue(service, event)
+        self.queuer.enqueue(service, event)
 
 
-class _EventGrouper(object):
-    """Groups events for the same application service together.
+class _ServiceQueuer(object):
+    """Queues events for the same application service together, sending
+    transactions as soon as possible. Once a transaction is sent successfully,
+    this schedules any other events in the queue to run.
     """
 
-    def __init__(self):
+    def __init__(self, txn_ctrl):
         self.groups = {}  # dict of {service: [events]}
+        self.txn_ctrl = txn_ctrl
 
     def enqueue(self, service, event):
+        # if nothing in queue for this service, send event immediately and add
+        # callbacks.
+        self.txn_ctrl.send(service, [event])
+
+        # else add to queue for this service
+
         if service not in self.groups:
             self.groups[service] = []
         self.groups[service].append(event)
@@ -109,34 +118,30 @@ class _EventGrouper(object):
 
 class _TransactionController(object):
 
-    def __init__(self, clock, store, as_api, event_grouper, recoverer_fn):
+    def __init__(self, clock, store, as_api, recoverer_fn):
         self.clock = clock
         self.store = store
         self.as_api = as_api
-        self.event_grouper = event_grouper
         self.recoverer_fn = recoverer_fn
         # keep track of how many recoverers there are
         self.recoverers = []
 
     @defer.inlineCallbacks
-    def start_polling(self):
+    def send(self, service, events):
         try:
-            groups = self.event_grouper.drain_groups()
-            for service in groups:
-                txn = yield self.store.create_appservice_txn(
-                    service=service,
-                    events=groups[service]
-                )
-                service_is_up = yield self._is_service_up(service)
-                if service_is_up:
-                    sent = yield txn.send(self.as_api)
-                    if sent:
-                        txn.complete(self.store)
-                    else:
-                        self._start_recoverer(service)
+            txn = yield self.store.create_appservice_txn(
+                service=service,
+                events=events
+            )
+            service_is_up = yield self._is_service_up(service)
+            if service_is_up:
+                sent = yield txn.send(self.as_api)
+                if sent:
+                    txn.complete(self.store)
+                else:
+                    self._start_recoverer(service)
         except Exception as e:
             logger.exception(e)
-        self.clock.call_later(1, self.start_polling)
 
     @defer.inlineCallbacks
     def on_recovered(self, recoverer):
diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py
index 4534d05b93..38d792eb02 100644
--- a/tests/appservice/test_scheduler.py
+++ b/tests/appservice/test_scheduler.py
@@ -14,7 +14,7 @@
 # limitations under the License.
 from synapse.appservice import ApplicationServiceState, AppServiceTransaction
 from synapse.appservice.scheduler import (
-    _EventGrouper, _TransactionController, _Recoverer
+    _ServiceQueuer, _TransactionController, _Recoverer
 )
 from twisted.internet import defer
 from ..utils import MockClock
@@ -28,25 +28,21 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase):
         self.clock = MockClock()
         self.store = Mock()
         self.as_api = Mock()
-        self.event_grouper = Mock()
         self.recoverer = Mock()
         self.recoverer_fn = Mock(return_value=self.recoverer)
         self.txnctrl = _TransactionController(
             clock=self.clock, store=self.store, as_api=self.as_api,
-            event_grouper=self.event_grouper, recoverer_fn=self.recoverer_fn
+            recoverer_fn=self.recoverer_fn
         )
 
-    def test_poll_single_group_service_up(self):
+    def test_single_service_up_txn_sent(self):
         # Test: The AS is up and the txn is successfully sent.
         service = Mock()
         events = [Mock(), Mock()]
-        groups = {}
-        groups[service] = events
         txn_id = "foobar"
         txn = Mock(id=txn_id, service=service, events=events)
 
         # mock methods
-        self.event_grouper.drain_groups = Mock(return_value=groups)
         self.store.get_appservice_state = Mock(
             return_value=defer.succeed(ApplicationServiceState.UP)
         )
@@ -56,7 +52,7 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase):
         )
 
         # actual call
-        self.txnctrl.start_polling()
+        self.txnctrl.send(service, events)
 
         self.store.create_appservice_txn.assert_called_once_with(
             service=service, events=events  # txn made and saved
@@ -64,15 +60,12 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase):
         self.assertEquals(0, len(self.txnctrl.recoverers))  # no recoverer made
         txn.complete.assert_called_once_with(self.store)  # txn completed
 
-    def test_poll_single_group_service_down(self):
+    def test_single_service_down(self):
         # Test: The AS is down so it shouldn't push; Recoverers will do it.
         # It should still make a transaction though.
         service = Mock()
         events = [Mock(), Mock()]
-        groups = {}
-        groups[service] = events
 
-        self.event_grouper.drain_groups = Mock(return_value=groups)
         txn = Mock(id="idhere", service=service, events=events)
         self.store.get_appservice_state = Mock(
             return_value=defer.succeed(ApplicationServiceState.DOWN)
@@ -82,7 +75,7 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase):
         )
 
         # actual call
-        self.txnctrl.start_polling()
+        self.txnctrl.send(service, events)
 
         self.store.create_appservice_txn.assert_called_once_with(
             service=service, events=events  # txn made and saved
@@ -90,18 +83,15 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase):
         self.assertEquals(0, txn.send.call_count)  # txn not sent though
         self.assertEquals(0, txn.complete.call_count)  # or completed
 
-    def test_poll_single_group_service_up(self):
+    def test_single_service_up_txn_not_sent(self):
         # Test: The AS is up and the txn is not sent. A Recoverer is made and
         # started.
         service = Mock()
         events = [Mock(), Mock()]
-        groups = {}
-        groups[service] = events
         txn_id = "foobar"
         txn = Mock(id=txn_id, service=service, events=events)
 
         # mock methods
-        self.event_grouper.drain_groups = Mock(return_value=groups)
         self.store.get_appservice_state = Mock(
             return_value=defer.succeed(ApplicationServiceState.UP)
         )
@@ -112,7 +102,7 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase):
         )
 
         # actual call
-        self.txnctrl.start_polling()
+        self.txnctrl.send(service, events)
 
         self.store.create_appservice_txn.assert_called_once_with(
             service=service, events=events
@@ -125,12 +115,6 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase):
             service, ApplicationServiceState.DOWN  # service marked as down
         )
 
-    def test_poll_no_groups(self):
-        self.as_api.push_bulk = Mock()
-        self.event_grouper.drain_groups = Mock(return_value={})
-        self.txnctrl.start_polling()
-        self.assertEquals(0, self.as_api.push_bulk.call_count)
-
 
 class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase):
 
@@ -205,54 +189,24 @@ class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase):
         self.callback.assert_called_once_with(self.recoverer)
 
 
-class ApplicationServiceSchedulerEventGrouperTestCase(unittest.TestCase):
+class ApplicationServiceSchedulerQueuerTestCase(unittest.TestCase):
 
     def setUp(self):
-        self.grouper = _EventGrouper()
-
-    def test_drain_single_event(self):
-        service = Mock()
-        event = Mock()
-        self.grouper.enqueue(service, event)
-        groups = self.grouper.drain_groups()
-        self.assertTrue(service in groups)
-        self.assertEquals([event], groups[service])
-        self.assertEquals(1, len(groups.keys()))
-        # no more events
-        self.assertEquals(self.grouper.drain_groups(), {})
-
-    def test_drain_multiple_events(self):
-        service = Mock()
-        events = [Mock(), Mock(), Mock()]
-        for e in events:
-            self.grouper.enqueue(service, e)
-        groups = self.grouper.drain_groups()
-        self.assertTrue(service in groups)
-        self.assertEquals(events, groups[service])
-        # no more events
-        self.assertEquals(self.grouper.drain_groups(), {})
-
-    def test_drain_multiple_services(self):
-        services = [Mock(), Mock(), Mock()]
-        events_a = [Mock(), Mock()]
-        events_b = [Mock()]
-        events_c = [Mock(), Mock(), Mock(), Mock()]
-        mappings = {
-            services[0]: events_a,
-            services[1]: events_b,
-            services[2]: events_c
-        }
-        for e in events_b:
-            self.grouper.enqueue(services[1], e)
-        for e in events_c:
-            self.grouper.enqueue(services[2], e)
-        for e in events_a:
-            self.grouper.enqueue(services[0], e)
-
-        groups = self.grouper.drain_groups()
-        for service in services:
-            self.assertTrue(service in groups)
-            self.assertEquals(mappings[service], groups[service])
-        self.assertEquals(3, len(groups.keys()))
-        # no more events
-        self.assertEquals(self.grouper.drain_groups(), {})
+        self.txn_ctrl = Mock()
+        self.queuer = _ServiceQueuer(self.txn_ctrl)
+
+    def test_send_single_event_no_queue(self):
+        # Expect the event to be sent immediately.
+        pass
+
+    def test_send_single_event_with_queue(self):
+        # - Send an event and don't resolve it just yet.
+        # - Send another event: expect send() to NOT be called.
+        # - Resolve the send event
+        # - Expect queued event to be sent
+        pass
+
+    def test_multiple_service_queues(self):
+        # Tests that each service has its own queue, and that they don't block
+        # on each other.
+        pass