diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py
index 4534d05b93..38d792eb02 100644
--- a/tests/appservice/test_scheduler.py
+++ b/tests/appservice/test_scheduler.py
@@ -14,7 +14,7 @@
# limitations under the License.
from synapse.appservice import ApplicationServiceState, AppServiceTransaction
from synapse.appservice.scheduler import (
- _EventGrouper, _TransactionController, _Recoverer
+ _ServiceQueuer, _TransactionController, _Recoverer
)
from twisted.internet import defer
from ..utils import MockClock
@@ -28,25 +28,21 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase):
self.clock = MockClock()
self.store = Mock()
self.as_api = Mock()
- self.event_grouper = Mock()
self.recoverer = Mock()
self.recoverer_fn = Mock(return_value=self.recoverer)
self.txnctrl = _TransactionController(
clock=self.clock, store=self.store, as_api=self.as_api,
- event_grouper=self.event_grouper, recoverer_fn=self.recoverer_fn
+ recoverer_fn=self.recoverer_fn
)
- def test_poll_single_group_service_up(self):
+ def test_single_service_up_txn_sent(self):
# Test: The AS is up and the txn is successfully sent.
service = Mock()
events = [Mock(), Mock()]
- groups = {}
- groups[service] = events
txn_id = "foobar"
txn = Mock(id=txn_id, service=service, events=events)
# mock methods
- self.event_grouper.drain_groups = Mock(return_value=groups)
self.store.get_appservice_state = Mock(
return_value=defer.succeed(ApplicationServiceState.UP)
)
@@ -56,7 +52,7 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase):
)
# actual call
- self.txnctrl.start_polling()
+ self.txnctrl.send(service, events)
self.store.create_appservice_txn.assert_called_once_with(
service=service, events=events # txn made and saved
@@ -64,15 +60,12 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase):
self.assertEquals(0, len(self.txnctrl.recoverers)) # no recoverer made
txn.complete.assert_called_once_with(self.store) # txn completed
- def test_poll_single_group_service_down(self):
+ def test_single_service_down(self):
# Test: The AS is down so it shouldn't push; Recoverers will do it.
# It should still make a transaction though.
service = Mock()
events = [Mock(), Mock()]
- groups = {}
- groups[service] = events
- self.event_grouper.drain_groups = Mock(return_value=groups)
txn = Mock(id="idhere", service=service, events=events)
self.store.get_appservice_state = Mock(
return_value=defer.succeed(ApplicationServiceState.DOWN)
@@ -82,7 +75,7 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase):
)
# actual call
- self.txnctrl.start_polling()
+ self.txnctrl.send(service, events)
self.store.create_appservice_txn.assert_called_once_with(
service=service, events=events # txn made and saved
@@ -90,18 +83,15 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase):
self.assertEquals(0, txn.send.call_count) # txn not sent though
self.assertEquals(0, txn.complete.call_count) # or completed
- def test_poll_single_group_service_up(self):
+ def test_single_service_up_txn_not_sent(self):
# Test: The AS is up and the txn is not sent. A Recoverer is made and
# started.
service = Mock()
events = [Mock(), Mock()]
- groups = {}
- groups[service] = events
txn_id = "foobar"
txn = Mock(id=txn_id, service=service, events=events)
# mock methods
- self.event_grouper.drain_groups = Mock(return_value=groups)
self.store.get_appservice_state = Mock(
return_value=defer.succeed(ApplicationServiceState.UP)
)
@@ -112,7 +102,7 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase):
)
# actual call
- self.txnctrl.start_polling()
+ self.txnctrl.send(service, events)
self.store.create_appservice_txn.assert_called_once_with(
service=service, events=events
@@ -125,12 +115,6 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase):
service, ApplicationServiceState.DOWN # service marked as down
)
- def test_poll_no_groups(self):
- self.as_api.push_bulk = Mock()
- self.event_grouper.drain_groups = Mock(return_value={})
- self.txnctrl.start_polling()
- self.assertEquals(0, self.as_api.push_bulk.call_count)
-
class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase):
@@ -205,54 +189,24 @@ class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase):
self.callback.assert_called_once_with(self.recoverer)
-class ApplicationServiceSchedulerEventGrouperTestCase(unittest.TestCase):
+class ApplicationServiceSchedulerQueuerTestCase(unittest.TestCase):
def setUp(self):
- self.grouper = _EventGrouper()
-
- def test_drain_single_event(self):
- service = Mock()
- event = Mock()
- self.grouper.enqueue(service, event)
- groups = self.grouper.drain_groups()
- self.assertTrue(service in groups)
- self.assertEquals([event], groups[service])
- self.assertEquals(1, len(groups.keys()))
- # no more events
- self.assertEquals(self.grouper.drain_groups(), {})
-
- def test_drain_multiple_events(self):
- service = Mock()
- events = [Mock(), Mock(), Mock()]
- for e in events:
- self.grouper.enqueue(service, e)
- groups = self.grouper.drain_groups()
- self.assertTrue(service in groups)
- self.assertEquals(events, groups[service])
- # no more events
- self.assertEquals(self.grouper.drain_groups(), {})
-
- def test_drain_multiple_services(self):
- services = [Mock(), Mock(), Mock()]
- events_a = [Mock(), Mock()]
- events_b = [Mock()]
- events_c = [Mock(), Mock(), Mock(), Mock()]
- mappings = {
- services[0]: events_a,
- services[1]: events_b,
- services[2]: events_c
- }
- for e in events_b:
- self.grouper.enqueue(services[1], e)
- for e in events_c:
- self.grouper.enqueue(services[2], e)
- for e in events_a:
- self.grouper.enqueue(services[0], e)
-
- groups = self.grouper.drain_groups()
- for service in services:
- self.assertTrue(service in groups)
- self.assertEquals(mappings[service], groups[service])
- self.assertEquals(3, len(groups.keys()))
- # no more events
- self.assertEquals(self.grouper.drain_groups(), {})
+ self.txn_ctrl = Mock()
+ self.queuer = _ServiceQueuer(self.txn_ctrl)
+
+ def test_send_single_event_no_queue(self):
+ # Expect the event to be sent immediately.
+ pass
+
+ def test_send_single_event_with_queue(self):
+ # - Send an event and don't resolve it just yet.
+ # - Send another event: expect send() to NOT be called.
+ # - Resolve the send event
+ # - Expect queued event to be sent
+ pass
+
+ def test_multiple_service_queues(self):
+ # Tests that each service has its own queue, and that they don't block
+ # on each other.
+ pass
|