diff options
-rw-r--r-- | synapse/appservice/scheduler.py | 61 | ||||
-rw-r--r-- | tests/appservice/test_scheduler.py | 100 |
2 files changed, 60 insertions, 101 deletions
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index 59a870e271..54c42d1b94 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -16,11 +16,11 @@ This module controls the reliability for application service transactions. The nominal flow through this module looks like: - _________ ----ASa[e]-->| Event | -----ASb[e]->| Grouper |<-poll 1/s--+ ---ASa[e]--->|_________| | ASa[e,e] ASb[e] - V + __________ +1---ASa[e]-->| Service |--> Queue ASa[f] +2----ASb[e]->| Queuer | +3--ASa[f]--->|__________|-----------+ ASa[e], ASb[e] + V -````````- +------------+ |````````|<--StoreTxn-|Transaction | |Database| | Controller |---> SEND TO AS @@ -66,14 +66,14 @@ class AppServiceScheduler(object): self.clock = clock self.store = store self.as_api = as_api - self.event_grouper = _EventGrouper() def create_recoverer(service, callback): return _Recoverer(clock, store, as_api, service, callback) self.txn_ctrl = _TransactionController( - clock, store, as_api, self.event_grouper, create_recoverer + clock, store, as_api, create_recoverer ) + self.queuer = _ServiceQueuer(self.txn_ctrl) @defer.inlineCallbacks def start(self): @@ -86,17 +86,26 @@ class AppServiceScheduler(object): self.txn_ctrl.start_polling() def submit_event_for_as(self, service, event): - self.event_grouper.enqueue(service, event) + self.queuer.enqueue(service, event) -class _EventGrouper(object): - """Groups events for the same application service together. +class _ServiceQueuer(object): + """Queues events for the same application service together, sending + transactions as soon as possible. Once a transaction is sent successfully, + this schedules any other events in the queue to run. """ - def __init__(self): + def __init__(self, txn_ctrl): self.groups = {} # dict of {service: [events]} + self.txn_ctrl = txn_ctrl def enqueue(self, service, event): + # if nothing in queue for this service, send event immediately and add + # callbacks. + self.txn_ctrl.send(service, [event]) + + # else add to queue for this service + if service not in self.groups: self.groups[service] = [] self.groups[service].append(event) @@ -109,34 +118,30 @@ class _EventGrouper(object): class _TransactionController(object): - def __init__(self, clock, store, as_api, event_grouper, recoverer_fn): + def __init__(self, clock, store, as_api, recoverer_fn): self.clock = clock self.store = store self.as_api = as_api - self.event_grouper = event_grouper self.recoverer_fn = recoverer_fn # keep track of how many recoverers there are self.recoverers = [] @defer.inlineCallbacks - def start_polling(self): + def send(self, service, events): try: - groups = self.event_grouper.drain_groups() - for service in groups: - txn = yield self.store.create_appservice_txn( - service=service, - events=groups[service] - ) - service_is_up = yield self._is_service_up(service) - if service_is_up: - sent = yield txn.send(self.as_api) - if sent: - txn.complete(self.store) - else: - self._start_recoverer(service) + txn = yield self.store.create_appservice_txn( + service=service, + events=events + ) + service_is_up = yield self._is_service_up(service) + if service_is_up: + sent = yield txn.send(self.as_api) + if sent: + txn.complete(self.store) + else: + self._start_recoverer(service) except Exception as e: logger.exception(e) - self.clock.call_later(1, self.start_polling) @defer.inlineCallbacks def on_recovered(self, recoverer): diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py index 4534d05b93..38d792eb02 100644 --- a/tests/appservice/test_scheduler.py +++ b/tests/appservice/test_scheduler.py @@ -14,7 +14,7 @@ # limitations under the License. from synapse.appservice import ApplicationServiceState, AppServiceTransaction from synapse.appservice.scheduler import ( - _EventGrouper, _TransactionController, _Recoverer + _ServiceQueuer, _TransactionController, _Recoverer ) from twisted.internet import defer from ..utils import MockClock @@ -28,25 +28,21 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase): self.clock = MockClock() self.store = Mock() self.as_api = Mock() - self.event_grouper = Mock() self.recoverer = Mock() self.recoverer_fn = Mock(return_value=self.recoverer) self.txnctrl = _TransactionController( clock=self.clock, store=self.store, as_api=self.as_api, - event_grouper=self.event_grouper, recoverer_fn=self.recoverer_fn + recoverer_fn=self.recoverer_fn ) - def test_poll_single_group_service_up(self): + def test_single_service_up_txn_sent(self): # Test: The AS is up and the txn is successfully sent. service = Mock() events = [Mock(), Mock()] - groups = {} - groups[service] = events txn_id = "foobar" txn = Mock(id=txn_id, service=service, events=events) # mock methods - self.event_grouper.drain_groups = Mock(return_value=groups) self.store.get_appservice_state = Mock( return_value=defer.succeed(ApplicationServiceState.UP) ) @@ -56,7 +52,7 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase): ) # actual call - self.txnctrl.start_polling() + self.txnctrl.send(service, events) self.store.create_appservice_txn.assert_called_once_with( service=service, events=events # txn made and saved @@ -64,15 +60,12 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase): self.assertEquals(0, len(self.txnctrl.recoverers)) # no recoverer made txn.complete.assert_called_once_with(self.store) # txn completed - def test_poll_single_group_service_down(self): + def test_single_service_down(self): # Test: The AS is down so it shouldn't push; Recoverers will do it. # It should still make a transaction though. service = Mock() events = [Mock(), Mock()] - groups = {} - groups[service] = events - self.event_grouper.drain_groups = Mock(return_value=groups) txn = Mock(id="idhere", service=service, events=events) self.store.get_appservice_state = Mock( return_value=defer.succeed(ApplicationServiceState.DOWN) @@ -82,7 +75,7 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase): ) # actual call - self.txnctrl.start_polling() + self.txnctrl.send(service, events) self.store.create_appservice_txn.assert_called_once_with( service=service, events=events # txn made and saved @@ -90,18 +83,15 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase): self.assertEquals(0, txn.send.call_count) # txn not sent though self.assertEquals(0, txn.complete.call_count) # or completed - def test_poll_single_group_service_up(self): + def test_single_service_up_txn_not_sent(self): # Test: The AS is up and the txn is not sent. A Recoverer is made and # started. service = Mock() events = [Mock(), Mock()] - groups = {} - groups[service] = events txn_id = "foobar" txn = Mock(id=txn_id, service=service, events=events) # mock methods - self.event_grouper.drain_groups = Mock(return_value=groups) self.store.get_appservice_state = Mock( return_value=defer.succeed(ApplicationServiceState.UP) ) @@ -112,7 +102,7 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase): ) # actual call - self.txnctrl.start_polling() + self.txnctrl.send(service, events) self.store.create_appservice_txn.assert_called_once_with( service=service, events=events @@ -125,12 +115,6 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase): service, ApplicationServiceState.DOWN # service marked as down ) - def test_poll_no_groups(self): - self.as_api.push_bulk = Mock() - self.event_grouper.drain_groups = Mock(return_value={}) - self.txnctrl.start_polling() - self.assertEquals(0, self.as_api.push_bulk.call_count) - class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): @@ -205,54 +189,24 @@ class ApplicationServiceSchedulerRecovererTestCase(unittest.TestCase): self.callback.assert_called_once_with(self.recoverer) -class ApplicationServiceSchedulerEventGrouperTestCase(unittest.TestCase): +class ApplicationServiceSchedulerQueuerTestCase(unittest.TestCase): def setUp(self): - self.grouper = _EventGrouper() - - def test_drain_single_event(self): - service = Mock() - event = Mock() - self.grouper.enqueue(service, event) - groups = self.grouper.drain_groups() - self.assertTrue(service in groups) - self.assertEquals([event], groups[service]) - self.assertEquals(1, len(groups.keys())) - # no more events - self.assertEquals(self.grouper.drain_groups(), {}) - - def test_drain_multiple_events(self): - service = Mock() - events = [Mock(), Mock(), Mock()] - for e in events: - self.grouper.enqueue(service, e) - groups = self.grouper.drain_groups() - self.assertTrue(service in groups) - self.assertEquals(events, groups[service]) - # no more events - self.assertEquals(self.grouper.drain_groups(), {}) - - def test_drain_multiple_services(self): - services = [Mock(), Mock(), Mock()] - events_a = [Mock(), Mock()] - events_b = [Mock()] - events_c = [Mock(), Mock(), Mock(), Mock()] - mappings = { - services[0]: events_a, - services[1]: events_b, - services[2]: events_c - } - for e in events_b: - self.grouper.enqueue(services[1], e) - for e in events_c: - self.grouper.enqueue(services[2], e) - for e in events_a: - self.grouper.enqueue(services[0], e) - - groups = self.grouper.drain_groups() - for service in services: - self.assertTrue(service in groups) - self.assertEquals(mappings[service], groups[service]) - self.assertEquals(3, len(groups.keys())) - # no more events - self.assertEquals(self.grouper.drain_groups(), {}) + self.txn_ctrl = Mock() + self.queuer = _ServiceQueuer(self.txn_ctrl) + + def test_send_single_event_no_queue(self): + # Expect the event to be sent immediately. + pass + + def test_send_single_event_with_queue(self): + # - Send an event and don't resolve it just yet. + # - Send another event: expect send() to NOT be called. + # - Resolve the send event + # - Expect queued event to be sent + pass + + def test_multiple_service_queues(self): + # Tests that each service has its own queue, and that they don't block + # on each other. + pass |